ObjectStoreLogger#
- class composer.loggers.ObjectStoreLogger(object_store_cls, object_store_kwargs, should_log_artifact=None, object_name='{artifact_name}', num_concurrent_uploads=4, upload_staging_folder=None, use_procs=True, num_attempts=3)[source]#
Logger destination that uploads artifacts to an object store.
This logger destination handles calls to
Logger.file_artifact()
and uploads files toObjectStore
, such as AWS S3 or Google Cloud Storage. To minimize the training loop performance hit, it supports background uploads.from composer.loggers import ObjectStoreLogger from composer.utils import LibcloudObjectStore object_store_logger = ObjectStoreLogger( object_store_cls=LibcloudObjectStore, object_store_kwargs={ 'provider': 's3', 'container': 'my-bucket', 'provider_kwargs=': { 'key': 'AKIA...', 'secret': '*********', 'region': 'ap-northeast-1', }, }, ) # Construct the trainer using this logger trainer = Trainer( ..., loggers=[object_store_logger], )
Note
This callback blocks the training loop to copy each artifact where
should_log_artifact
returnsTrue
, as the uploading happens in the background. Here are some additional tips for minimizing the performance impact:Set
should_log
to filter which artifacts will be logged. By default, all artifacts are logged.Set
use_procs=True
(the default) to use background processes, instead of threads, to perform the file uploads. Processes are recommended to ensure that the GIL is not blocking the training loop when performing CPU operations on uploaded files (e.g. computing and comparing checksums). Network I/O happens always occurs in the background.Provide a RAM disk path for the
upload_staging_folder
parameter. Copying files to stage on RAM will be faster than writing to disk. However, there must have sufficient excess RAM, orMemoryError
s may be raised.
- Parameters
object_store_cls (Type[ObjectStore]) โ
The object store class.
As individual
ObjectStore
instances are not necessarily thread safe, each worker will construct its ownObjectStore
instance fromobject_store_cls
andobject_store_kwargs
.object_store_kwargs (Dict[str, Any]) โ
The keyword arguments to construct
object_store_cls
.As individual
ObjectStore
instances are not necessarily thread safe, each worker will construct its ownObjectStore
instance fromobject_store_cls
andobject_store_kwargs
.should_log_artifact ((State, LogLevel, str) -> bool, optional) โ
A function to filter which artifacts are uploaded.
The function should take the (current training state, log level, artifact name) and return a boolean indicating whether this file should be uploaded.
By default, all artifacts will be uploaded.
object_name (str, optional) โ
A format string used to determine the object name.
The following format variables are available:
Variable
Description
{artifact_name}
The name of the artifact being logged.
{run_name}
The name of the training run. See
State.run_name
.{rank}
The global rank, as returned by
get_global_rank()
.{local_rank}
The local rank of the process, as returned by
get_local_rank()
.{world_size}
The world size, as returned by
get_world_size()
.{local_world_size}
The local world size, as returned by
get_local_world_size()
.{node_rank}
The node rank, as returned by
get_node_rank()
.Leading slashes (
'/'
) will be stripped.Consider the following example, which subfolders the artifacts by their rank:
>>> object_store_logger = ObjectStoreLogger(..., object_name='rank_{rank}/{artifact_name}') >>> trainer = Trainer(..., run_name='foo', loggers=[object_store_logger]) >>> trainer.logger.file_artifact( ... log_level=LogLevel.EPOCH, ... artifact_name='bar.txt', ... file_path='path/to/file.txt', ... )
Assuming that the processโs rank is
0
, the object store would store the contents of'path/to/file.txt'
in an object named'rank0/bar.txt'
.Default:
'{artifact_name}'
num_concurrent_uploads (int, optional) โ Maximum number of concurrent uploads. Defaults to 4.
upload_staging_folder (str, optional) โ A folder to use for staging uploads. If not specified, defaults to using a
TemporaryDirectory()
.use_procs (bool, optional) โ Whether to perform file uploads in background processes (as opposed to threads). Defaults to True.
num_attempts (int, optional) โ For operations that fail with a transient error, the number of attempts to make. Defaults to 3.
- get_uri_for_artifact(artifact_name)[source]#
Get the object store provider uri for an artfact.
- Parameters
artifact_name (str) โ The name of an artifact.
- Returns
str โ The uri corresponding to the uploaded location of the artifact.
- property object_store[source]#
The
ObjectStore
instance for the main thread.
- wait_for_workers()[source]#
Wait for all tasks to be completed.
This is called after fit/eval/predict. If we donโt wait, then a worker might not schedule an upload before the interpreter is shutdown and garbage collection begins. While post_close logic ensures existing uploads are completed, trying to schedule new uploads during this time will error.