composer.datasets.streaming.dataset#

The StreamingDataset class, used for building streaming iterable datasets.

Classes

StreamingDataset

A sharded, streaming, iterable dataset.

StreamingImageClassDataset

A streaming image classification dataset, for (img, class) pairs.

class composer.datasets.streaming.dataset.StreamingDataset(remote, local, shuffle, decoders, timeout=60, batch_size=None)[source]#

Bases: torch.utils.data.dataset.IterableDataset

A sharded, streaming, iterable dataset.

StreamingDataset reads samples from binary .mds files that were written out by StreamingDatasetWriter.

It currently supports downloading data from etiher S3 paths or local filepaths.

It supports multi-gpu + multi-node training, and has smart local cacheing to minimize network bandwidth.

It also provides best-effort shuffling to preserve randomness when shuffle=True.

Parameters
  • remote (str) โ€“ Download shards from this remote S3 path or directory.

  • local (str) โ€“ Download shards to this local directory for for caching.

  • shuffle (bool) โ€“ Whether to shuffle the samples. Note that if shuffle=False, the sample order is deterministic but dependent on the DataLoaderโ€™s num_workers.

  • decoders (Dict[str, Callable[bytes, Any]]]) โ€“ For each sample field you wish to read, you must provide a decoder to convert the raw bytes to an object.

  • timeout (float) โ€“ How long to wait for shard to download before raising an exception. Default: 60 sec.

  • batch_size (Optional[int]) โ€“ Hint the batch_size that will be used on each deviceโ€™s DataLoader. Default: None. Worker indices will be constructed so that there is at most 1 incomplete batch at the end of each epoch. E.g. if the DataLoader is reading over (samples=[0, 1, 2, 3, 4, 5, 6, 7], num_workers=3, batch_size=2, drop_last=True) but batch_size is not hinted to the StreamingDataset ahead of time then the samples will by default be assigned like: w0: [0, 1, 2], w1: [3, 4, 5], w2: [6, 7] and will be read as batches: [0, 1], [3, 4], [6, 7] (with batches [2] and [5] dropped as incomplete) but this is suboptimal because we could have dropped no samples. So when batch_size is provided as a hint, we assign samples like this: w0: [0, 1, 2, 3], w1: [4, 5], w2: [6, 7] which will be read as batches: [0, 1], [4, 5], [6, 7], [2, 3]

To write the dataset:
>>> from composer.datasets.streaming import StreamingDatasetWriter
>>> samples = [
...     {
...         "uid": f"{ix:06}".encode("utf-8"),
...         "data": (3 * ix).to_bytes(4, "big"),
...         "unused": "blah".encode("utf-8"),
...     }
...     for ix in range(100)
... ]
>>> dirname = "remote"
>>> fields = ["uid", "data"]
>>> with StreamingDatasetWriter(dirname=dirname, fields=fields) as writer:
...     writer.write_samples(samples=samples)

To read the dataset:
>>> from composer.datasets.streaming import StreamingDataset
>>> remote = "remote"
>>> local = "local"
>>> decoders = {
...     "uid": lambda uid_bytes: uid_bytes.decode("utf-8"),
...     "data": lambda data_bytes: int.from_bytes(data_bytes, "big"),
... }
>>> dataset = StreamingDataset(remote=remote, local=local, shuffle=False, decoders=decoders)
class composer.datasets.streaming.dataset.StreamingImageClassDataset(remote, local, shuffle, transform=None, timeout=60, batch_size=None)[source]#

Bases: composer.datasets.streaming.dataset.StreamingDataset

A streaming image classification dataset, for (img, class) pairs.

This is a subclass of StreamingDataset.

Parameters
  • remote (str) โ€“ Download shards from this remote S3 path or directory.

  • local (str) โ€“ Download shards to this local filesystem directory for reuse.

  • shuffle (bool) โ€“ Whether to shuffle the samples. Note that if shuffle=False, the sample order is deterministic but dependent on the DataLoaderโ€™s num_workers.

  • transform (Optional[Callable]) โ€“ Optional input data transform for data augmentation, etc.

  • timeout (float) โ€“ How long to wait for shard to download before raising an exception. Default: 60 sec.

  • batch_size (Optional[int]) โ€“ Hint the batch_size that will be used on each deviceโ€™s DataLoader. Default: None. Worker indices will be constructed so that there is at most 1 incomplete batch at the end of each epoch.

decode_class(data)[source]#

Decode the sample class.

Parameters

data (bytes) โ€“ The raw bytes.

Returns

np.int64 โ€“ The class encoded by the bytes.

decode_image(data)[source]#

Decode the sample image.

Parameters

data (bytes) โ€“ The raw bytes.

Returns

Image โ€“ PIL image encoded by the bytes.