composer.datasets.streaming.dataset#
The StreamingDataset
class, used for building streaming iterable datasets.
Classes
A sharded, streaming, iterable dataset. |
|
A streaming image classification dataset, for (img, class) pairs. |
- class composer.datasets.streaming.dataset.StreamingDataset(remote, local, shuffle, decoders, timeout=60, batch_size=None)[source]#
Bases:
torch.utils.data.dataset.IterableDataset
A sharded, streaming, iterable dataset.
StreamingDataset
reads samples from binary .mds files that were written out byStreamingDatasetWriter
.It currently supports downloading data from etiher S3 paths or local filepaths.
It supports multi-gpu + multi-node training, and has smart local cacheing to minimize network bandwidth.
It also provides best-effort shuffling to preserve randomness when
shuffle=True
.- Parameters
remote (str) โ Download shards from this remote S3 path or directory.
local (str) โ Download shards to this local directory for for caching.
shuffle (bool) โ Whether to shuffle the samples. Note that if shuffle=False, the sample order is deterministic but dependent on the DataLoaderโs num_workers.
decoders (Dict[str, Callable[bytes, Any]]]) โ For each sample field you wish to read, you must provide a decoder to convert the raw bytes to an object.
timeout (float) โ How long to wait for shard to download before raising an exception. Default: 60 sec.
batch_size (Optional[int]) โ Hint the batch_size that will be used on each deviceโs DataLoader. Default:
None
. Worker indices will be constructed so that there is at most 1 incomplete batch at the end of each epoch. E.g. if the DataLoader is reading over (samples=[0, 1, 2, 3, 4, 5, 6, 7], num_workers=3, batch_size=2, drop_last=True) but batch_size is not hinted to the StreamingDataset ahead of time then the samples will by default be assigned like: w0: [0, 1, 2], w1: [3, 4, 5], w2: [6, 7] and will be read as batches: [0, 1], [3, 4], [6, 7] (with batches [2] and [5] dropped as incomplete) but this is suboptimal because we could have dropped no samples. So when batch_size is provided as a hint, we assign samples like this: w0: [0, 1, 2, 3], w1: [4, 5], w2: [6, 7] which will be read as batches: [0, 1], [4, 5], [6, 7], [2, 3]
To write the dataset: >>> from composer.datasets.streaming import StreamingDatasetWriter >>> samples = [ ... { ... "uid": f"{ix:06}".encode("utf-8"), ... "data": (3 * ix).to_bytes(4, "big"), ... "unused": "blah".encode("utf-8"), ... } ... for ix in range(100) ... ] >>> dirname = "remote" >>> fields = ["uid", "data"] >>> with StreamingDatasetWriter(dirname=dirname, fields=fields) as writer: ... writer.write_samples(samples=samples) To read the dataset: >>> from composer.datasets.streaming import StreamingDataset >>> remote = "remote" >>> local = "local" >>> decoders = { ... "uid": lambda uid_bytes: uid_bytes.decode("utf-8"), ... "data": lambda data_bytes: int.from_bytes(data_bytes, "big"), ... } >>> dataset = StreamingDataset(remote=remote, local=local, shuffle=False, decoders=decoders)
- class composer.datasets.streaming.dataset.StreamingImageClassDataset(remote, local, shuffle, transform=None, timeout=60, batch_size=None)[source]#
Bases:
composer.datasets.streaming.dataset.StreamingDataset
A streaming image classification dataset, for (img, class) pairs.
This is a subclass of
StreamingDataset
.- Parameters
remote (str) โ Download shards from this remote S3 path or directory.
local (str) โ Download shards to this local filesystem directory for reuse.
shuffle (bool) โ Whether to shuffle the samples. Note that if shuffle=False, the sample order is deterministic but dependent on the DataLoaderโs num_workers.
transform (Optional[Callable]) โ Optional input data transform for data augmentation, etc.
timeout (float) โ How long to wait for shard to download before raising an exception. Default: 60 sec.
batch_size (Optional[int]) โ Hint the batch_size that will be used on each deviceโs DataLoader. Default:
None
. Worker indices will be constructed so that there is at most 1 incomplete batch at the end of each epoch.