mt.base.s3

Useful subroutines dealing with S3 files via botocore and aioboto3.

Functions

  • split(): Splits an s3cmd url into bucket and prefix.

  • join(): Joins a bucket and a prefix into an s3cmd url.

  • get_session(): Gets a boto3 session, for either asynchronous mode or synchronous mode.

  • create_s3_client(): An asyn context manager that creates an s3 client for a given profile.

  • create_context_vars(): Creates a dictionary of context variables for running functions in this package.

  • list_objects(): An asyn function that lists all objects prefixed with a given s3cmd url.

  • list_object_info(): An asyn function that lists basic information of the object at a given s3cmd url.

  • get_object(): An asyn function that gets the content of a given s3cmd url.

  • get_object_acl(): An asyn function that gets the object properties of a given s3cmd url.

  • put_object(): An asyn function that puts some content to given s3cmd url.

  • delete_object(): An asyn function that deletes a given s3cmd url.

  • put_files(): An asyn function that uploads many files to the same S3 bucket.

  • put_files_boto3(): Uploads many files to the same S3 bucket using boto3.

mt.base.s3.split(s3cmd_url: str)

Splits an s3cmd url into bucket and prefix.

Parameters:

s3cmd_url (str) – an s3cmd url in the form ‘s3://bucket_name[/prefix]’

Returns:

  • bucket (str) – bucket name, always exists

  • prefix (str) – prefix. Can be None.

mt.base.s3.join(bucket: str, prefix: str | None = None)

Joins a bucket and a prefix into an s3cmd url.

Parameters:
  • bucket (str) – bucket name

  • prefix (str, optional) – prefix string

Returns:

an s3cmd url in the form ‘s3://bucket_name[/prefix]’

Return type:

str

mt.base.s3.get_session(profile=None, asyn: bool = True) Session | Session

Gets a boto3 session, for either asynchronous mode or synchronous mode.

Parameters:
  • profile (str, optional) – the profile from which the session is created

  • asyn (bool) – whether session is to be used asynchronously or synchronously

Returns:

session – In asynchronous mode, an aioboto3.session.Session instance is returned. In synchronous mode, a boto3.session.Session instance is returned.

Return type:

aioboto3.session.Session or boto3.session.Session

Notes

This function is used as part of create_s3_client() to create an s3 client.

mt.base.s3.create_s3_client(profile=None, asyn: bool = True) AioBaseClient | BaseClient

An asyn context manager that creates an s3 client for a given profile.

Parameters:
  • profile (str, optional) – the profile from which the s3 client is created

  • asyn (bool) – whether the function is to be invoked asynchronously or synchronously

Returns:

s3_client – the s3 client that matches with the ‘asyn’ keyword argument below

Return type:

aiobotocore.client.AioBaseClient or botocore.client.BaseClient

mt.base.s3.create_context_vars(profile=None, asyn: bool = False, logger: IndentedLoggerAdapter | None = None)

Creates a dictionary of context variables for running functions in this package.

Parameters:
  • profile (str, optional) – one of the profiles specified in the AWS. The default is used if None is given.

  • asyn (bool) – whether the functions are to be invoked asynchronously or synchronously

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Returns:

context_vars – dictionary of context variables to run the functions in this package. These include ‘s3_client’ and ‘http_session’.

Return type:

dict

async mt.base.s3.list_objects(s3cmd_url: str, show_progress=False, context_vars: dict = {})

An asyn function that lists all objects prefixed with a given s3cmd url.

Parameters:
  • s3cmd_url (str) – an s3cmd_url in the form ‘s3://bucket[/prefix]’

  • show_progress (bool) – show a progress spinner in the terminal

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

list of records, each of which corresponds to an object prefixed with the given s3cmd url. The record has multiple attributes.

Return type:

list

async mt.base.s3.list_object_info(s3cmd_url: str, context_vars: dict = {})

An asyn function that lists basic information of the object at a given s3cmd url.

Parameters:
  • s3cmd_url (str) – an s3cmd_url in the form ‘s3://bucket[/prefix]’

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

A dictionary of attributes related to the object, like ‘Key’, ‘LastModified’, ‘ETag’, ‘Size’, ‘StorageClass’, etc. If the object does not exist, None is returned.

Return type:

dict or None

async mt.base.s3.get_object(s3cmd_url: str, show_progress: bool = False, context_vars: dict = {})

An asyn function that gets the content of a given s3cmd url.

Parameters:
  • s3cmd_url (str) – an s3cmd_url in the form ‘s3://bucket[/prefix]’

  • show_progress (bool) – show a progress spinner in the terminal

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

the content of the given s3cmd url

Return type:

bytes

async mt.base.s3.get_object_acl(s3cmd_url: str, context_vars: dict = {})

An asyn function that gets the object properties of a given s3cmd url.

Parameters:
  • s3cmd_url (str) – an s3cmd_url in the form ‘s3://bucket[/prefix]’

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

the content of the given s3cmd url

Return type:

bytes

async mt.base.s3.put_object(s3cmd_url: str, data: bytes, show_progress: bool = False, context_vars: dict = {})

An asyn function that puts some content to given s3cmd url.

Parameters:
  • s3cmd_url (str) – an s3cmd_url in the form ‘s3://bucket[/prefix]’

  • data (bytes) – the content to be uploaded

  • show_progress (bool) – show a progress spinner in the terminal

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

the content of the given s3cmd url

Return type:

bytes

async mt.base.s3.delete_object(s3cmd_url: str, context_vars: dict = {})

An asyn function that deletes a given s3cmd url.

Parameters:
  • s3cmd_url (str) – an s3cmd_url in the form ‘s3://bucket[/prefix]’

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

the response from S3 of the deletion operation. Lots of attributes expected.

Return type:

list

async mt.base.s3.put_files(bucket: str, filepath2key_map: dict, show_progress: bool = False, context_vars: dict = {})

An asyn function that uploads many files to the same S3 bucket.

In asynchronous mode, the files are uploaded concurrently. In synchronous mode, the files are uploaded sequentially.

Despite our best effort, this function may sometimes be slower than calling ‘aws s3 sync’. Please see the following thread for more details:

https://stackoverflow.com/questions/56639630/how-can-i-increase-my-aws-s3-upload-speed-when-using-boto3

It is recommended to use put_files_boto3() in those cases.

Parameters:
  • bucket (str) – bucket name

  • filepath2key_map (dict) – mapping from local filepath to bucket key, defining which file to upload and where to upload to in the S3 bucket

  • show_progress (bool) – show a progress bar in the terminal

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

mt.base.s3.put_files_boto3(bucket: str, filepath2key_map: dict, show_progress: bool = False, total_filesize: int | None = None, set_acl_public_read: bool = False, context_vars: dict = {})

Uploads many files to the same S3 bucket using boto3.

This function implements the code in the url below. It does not use asyncio but it uses multi-threading.

https://stackoverflow.com/questions/56639630/how-can-i-increase-my-aws-s3-upload-speed-when-using-boto3

Parameters:
  • bucket (str) – bucket name

  • filepath2key_map (dict) – mapping from local filepath to bucket key, defining which file to upload and where to upload to in the S3 bucket

  • show_progress (bool) – show a progress bar in the terminal

  • total_filesize (int) – total size of all files in bytes, if you know. Useful for drawing a progress bar.

  • set_acl_public_read (bool) – whether or not to set ACL public-read policy on the uploaded object(s)

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().