ObjectStoreLogger#

class composer.loggers.ObjectStoreLogger(object_store_cls, object_store_kwargs, should_log_artifact=None, object_name='{artifact_name}', num_concurrent_uploads=4, upload_staging_folder=None, use_procs=True, num_attempts=3)[source]#

Logger destination that uploads artifacts to an object store.

This logger destination handles calls to Logger.file_artifact() and uploads files to ObjectStore, such as AWS S3 or Google Cloud Storage. To minimize the training loop performance hit, it supports background uploads.

from composer.loggers import ObjectStoreLogger
from composer.utils import LibcloudObjectStore

object_store_logger = ObjectStoreLogger(
    object_store_cls=LibcloudObjectStore,
    object_store_kwargs={
        'provider': 's3',
        'container': 'my-bucket',
        'provider_kwargs=': {
            'key': 'AKIA...',
            'secret': '*********',
            'region': 'ap-northeast-1',
        },
    },
)

# Construct the trainer using this logger
trainer = Trainer(
    ...,
    loggers=[object_store_logger],
)

Note

This callback blocks the training loop to copy each artifact where should_log_artifact returns True, as the uploading happens in the background. Here are some additional tips for minimizing the performance impact:

  • Set should_log to filter which artifacts will be logged. By default, all artifacts are logged.

  • Set use_procs=True (the default) to use background processes, instead of threads, to perform the file uploads. Processes are recommended to ensure that the GIL is not blocking the training loop when performing CPU operations on uploaded files (e.g. computing and comparing checksums). Network I/O happens always occurs in the background.

  • Provide a RAM disk path for the upload_staging_folder parameter. Copying files to stage on RAM will be faster than writing to disk. However, there must have sufficient excess RAM, or MemoryErrors may be raised.

Parameters
  • object_store_cls (Type[ObjectStore]) โ€“

    The object store class.

    As individual ObjectStore instances are not necessarily thread safe, each worker will construct its own ObjectStore instance from object_store_cls and object_store_kwargs.

  • object_store_kwargs (Dict[str, Any]) โ€“

    The keyword arguments to construct object_store_cls.

    As individual ObjectStore instances are not necessarily thread safe, each worker will construct its own ObjectStore instance from object_store_cls and object_store_kwargs.

  • should_log_artifact ((State, LogLevel, str) -> bool, optional) โ€“

    A function to filter which artifacts are uploaded.

    The function should take the (current training state, log level, artifact name) and return a boolean indicating whether this file should be uploaded.

    By default, all artifacts will be uploaded.

  • object_name (str, optional) โ€“

    A format string used to determine the object name.

    The following format variables are available:

    Variable

    Description

    {artifact_name}

    The name of the artifact being logged.

    {run_name}

    The name of the training run. See State.run_name.

    {rank}

    The global rank, as returned by get_global_rank().

    {local_rank}

    The local rank of the process, as returned by get_local_rank().

    {world_size}

    The world size, as returned by get_world_size().

    {local_world_size}

    The local world size, as returned by get_local_world_size().

    {node_rank}

    The node rank, as returned by get_node_rank().

    Leading slashes ('/') will be stripped.

    Consider the following example, which subfolders the artifacts by their rank:

    >>> object_store_logger = ObjectStoreLogger(..., object_name='rank_{rank}/{artifact_name}')
    >>> trainer = Trainer(..., run_name='foo', loggers=[object_store_logger])
    >>> trainer.logger.file_artifact(
    ...     log_level=LogLevel.EPOCH,
    ...     artifact_name='bar.txt',
    ...     file_path='path/to/file.txt',
    ... )
    

    Assuming that the processโ€™s rank is 0, the object store would store the contents of 'path/to/file.txt' in an object named 'rank0/bar.txt'.

    Default: '{artifact_name}'

  • num_concurrent_uploads (int, optional) โ€“ Maximum number of concurrent uploads. Defaults to 4.

  • upload_staging_folder (str, optional) โ€“ A folder to use for staging uploads. If not specified, defaults to using a TemporaryDirectory().

  • use_procs (bool, optional) โ€“ Whether to perform file uploads in background processes (as opposed to threads). Defaults to True.

  • num_attempts (int, optional) โ€“ For operations that fail with a transient error, the number of attempts to make. Defaults to 3.

can_log_file_artifacts()[source]#

Whether the logger supports logging file artifacts.

get_uri_for_artifact(artifact_name)[source]#

Get the object store provider uri for an artfact.

Parameters

artifact_name (str) โ€“ The name of an artifact.

Returns

str โ€“ The uri corresponding to the uploaded location of the artifact.

property object_store[source]#

The ObjectStore instance for the main thread.

wait_for_workers()[source]#

Wait for all tasks to be completed.

This is called after fit/eval/predict. If we donโ€™t wait, then a worker might not schedule an upload before the interpreter is shutdown and garbage collection begins. While post_close logic ensures existing uploads are completed, trying to schedule new uploads during this time will error.