mt.aio

Useful subroutines dealing with asyn and asynch functions.

Asyn functions and asynch functions are functions that can operate either asynchronously or synchronously. Asynchronicity means like a coroutine function that can yield control back to the current event loop. Synchronicity means it does not yield control during its execution. Both asyn and functions accept a keyword argument ‘context_vars’ which is a dictionary of context variables. One of which must be a boolean variable ‘async’ telling whether the function is to be executed in asynchronous mode (context_vars[‘async’] is True) or in synchronous mode (context_vars[‘async’] is False). There can be other context variables, they are usually the enter-results of either asynchronous or normal with statements.

An asyn function is declared with ‘async def’. It can be invoked without an event loop, via invoking mt.aio.srun(). Asyn functions are good for building library subroutines supporting both asynchronous and synchronous modes, but they break backward compatibility because of their ‘async’ declaration requirement.

An asynch function is declared with ‘def’ like a normal function. When in asynchronous mode, the function returns a coroutine that must be intercepted with keyword ‘await’, as if it is a coroutine function. When in synchronous mode, the function behaves like a normal function. Asynch functions are good for backward compatibility, because they are normal functions that can pretend to be a coroutine function, but they are bad for developing library subroutines supporting both asynchronous and synchronous modes.

It is discouraged to implement an asynch function. You should only do so if you have no other choice.

Submodules

Functions

  • srun(): Invokes an asyn function synchronously, without using keyword ‘await’.

  • arun(): Invokes an asyn function from inside an asynch function.

  • arun2(): Invokes an asynch function from inside an asyn function.

  • sleep(): An asyn function that sleeps for a number of seconds.

  • yield_control(): Yields the control back to the current event loop.

  • safe_chmod(): Undocumented.

  • safe_rename(): Undocumented.

  • read_binary(): An asyn function that opens a binary file and reads the content.

  • write_binary(): An asyn function that creates a binary file and writes the content.

  • read_text(): An asyn function that opens a text file and reads the content.

  • write_text(): An asyn function that creates a text file and writes the content.

  • json_load(): An asyn function that loads the json-like object of a file.

  • json_save(): An asyn function that saves a json-like object to a file.

  • mkdtemp(): An asyn context manager that opens and creates a temporary directory.

  • qput_aio(): Puts obj into the queue q.

  • qget_aio(): Removes and returns an item from the queue q.

mt.aio.srun(asyn_func, *args, extra_context_vars: dict = {}, **kwargs) object

Invokes an asyn function synchronously, without using keyword ‘await’.

Parameters:
  • asyn_func (function) – an asyn function taking ‘asyn’ as a keyword argument

  • args (list) – postitional arguments to be passed to the function

  • extra_context_vars (dict) – additional context variables to be passed to the function. The ‘context_vars’ keyword to be passed to the function will be {‘async’: False}.update(extra_context_vars)

  • kwargs (dict) – other keyword arguments to be passed to the function

Returns:

whatever the function returns

Return type:

object

mt.aio.arun(asyn_func, *args, context_vars: dict = {}, **kwargs) object

Invokes an asyn function from inside an asynch function.

Parameters:
  • asyn_func (function) – an asyn function (declared with ‘async def’)

  • args (list) – positional arguments of the asyn function

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

  • kwargs (dict) – other keyword arguments of the asyn function

Returns:

whatver the asyn function returns

Return type:

object

async mt.aio.arun2(asynch_func, *args, context_vars: dict = {}, **kwargs) object

Invokes an asynch function from inside an asyn function.

Parameters:
  • asyn_func (function) – an asyn function (declared with ‘async def’)

  • args (list) – positional arguments of the asyn function

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

  • kwargs (dict) – other keyword arguments of the asynch function

Returns:

whatver the asyn function returns

Return type:

object

async mt.aio.sleep(secs: float, context_vars: dict = {})

An asyn function that sleeps for a number of seconds.

In asynchronous mode, it invokes asyncio.sleep(). In synchronous mode, it invokes time.sleep().

Parameters:
  • secs (float) – number of seconds to sleep

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

async mt.aio.yield_control()

Yields the control back to the current event loop.

async mt.aio.safe_chmod(filepath: str, file_mode: int = 436)
async mt.aio.safe_rename(filepath: str, new_filepath: str, context_vars: dict = {})
async mt.aio.read_binary(filepath, size: int | None = None, context_vars: dict = {}) bytes

An asyn function that opens a binary file and reads the content.

Parameters:
  • filepath (str) – path to the file

  • size (int) – size to read from the beginning of the file, in bytes. If None is given, read the whole file.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

Returns:

the content read from file

Return type:

bytes

async mt.aio.write_binary(filepath, buf: bytes, file_mode: int = 436, context_vars: dict = {}, file_write_delayed: bool = False)

An asyn function that creates a binary file and writes the content.

Parameters:
  • filepath (str) – path to the file

  • buf (bytes) – data (in bytes) to be written to the file

  • file_mode (int) – file mode to be set to using os.chmod(). Only valid if fp is a string. If None is given, no setting of file mode will happen.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

  • file_write_delayed (bool) – Only valid in asynchronous mode. If True, wraps the file write task into a future and returns the future. In all other cases, proceeds as usual.

Returns:

either a future or the number of bytes written, depending on whether the file write task is delayed or not

Return type:

asyncio.Future or int

Notes

The content is written to a file with ‘.mttmp’ extension first before the file is renamed to the right file.

async mt.aio.read_text(filepath, size: int | None = None, context_vars: dict = {}) str

An asyn function that opens a text file and reads the content.

Parameters:
  • filepath (str) – path to the file

  • size (int) – size to read from the beginning of the file, in bytes. If None is given, read the whole file.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

Returns:

the content read from file

Return type:

str

async mt.aio.write_text(filepath, buf: str, file_mode: int = 436, context_vars: dict = {}, file_write_delayed: bool = False)

An asyn function that creates a text file and writes the content.

Parameters:
  • filepath (str) – path to the file

  • buf (str) – data (in bytes) to be written to the file

  • file_mode (int) – file mode to be set to using os.chmod(). Only valid if fp is a string. If None is given, no setting of file mode will happen.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

  • file_write_delayed (bool) – Only valid in asynchronous mode. If True, wraps the file write task into a future and returns the future. In all other cases, proceeds as usual.

Returns:

either a future or the number of bytes written, depending on whether the file write task is delayed or not

Return type:

asyncio.Future or int

Notes

The content is written to a file with ‘.mttmp’ extension first before the file is renamed to the right file.

async mt.aio.json_load(filepath, context_vars: dict = {}, **kwargs)

An asyn function that loads the json-like object of a file.

Parameters:
  • filepath (str) – path to the file

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

  • kwargs (dict) – keyword arguments passed as-is to json.loads()

Returns:

the loaded json-like object

Return type:

object

async mt.aio.json_save(filepath, obj, file_mode: int = 436, context_vars: dict = {}, file_write_delayed: bool = False, **kwargs)

An asyn function that saves a json-like object to a file.

Parameters:
  • filepath (str) – path to the file

  • obj (object) – json-like object to be written to the file

  • file_mode (int) – file mode to be set to using os.chmod(). Only valid if fp is a string. If None is given, no setting of file mode will happen.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

  • file_write_delayed (bool) – Only valid in asynchronous mode. If True, wraps the file write task into a future and returns the future. In all other cases, proceeds as usual.

  • kwargs (dict) – keyword arguments passed as-is to json.dumps()

Returns:

either a future or the number of bytes written, depending on whether the file write task is delayed or not

Return type:

asyncio.Future or int

mt.aio.mkdtemp(context_vars: dict = {})

An asyn context manager that opens and creates a temporary directory.

Parameters:

context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.

Returns:

tmpdir – the context manager whose enter-value is a string containing the temporary dirpath.

Return type:

object

async mt.aio.qput_aio(q: Queue, obj, block: bool = True, timeout: float | None = None, aio_interval: float = 0.001)

Puts obj into the queue q.

If the optional argument block is True (the default) and timeout is None (the default), block asynchronously if necessary until a free slot is available. If timeout is a positive number, it blocks asynchronously at most timeout seconds and raises the queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the queue.Full exception (timeout is ignored in that case).

async mt.aio.qget_aio(q: Queue, block: bool = True, timeout: float | None = None, aio_interval: float = 0.001)

Removes and returns an item from the queue q.

If optional args block is True (the default) and timeout is None (the default), block asynchronously if necessary until an item is available. If timeout is a positive number, it blocks asynchronously at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

Classes

  • CreateFileH5: A context for creating an HDF5 file safely.

  • BgProcess: Launches a child process that communicates with the parent process via message passing.

  • AprocManager: Manages the completion of aprocs.

class mt.aio.CreateFileH5(filepath: str, file_mode: int = 436, context_vars: dict = {}, logger=None)

A context for creating an HDF5 file safely.

It creates a temporary HDF5 file for writing. Once the user exits the context, the file is chmodded and renamed to a given name. Any intermediate folder that does not exist is created automatically.

The context can be synchronous or asynchronous, by specifying the ‘async’ keyword and argument ‘context_vars’.

Parameters:
  • filepath (str) – local file path to be written to

  • file_mode (int) – file mode to be set to using os.chmod(). If None is given, no setting of file mode will happen.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. Only used for the asynchronous mode.

  • logger (logging.Logger, optional) – logger for debugging purposes

tmp_filepath

the local file path of the temporary HDF5 file

Type:

str

handle

the handle of the temporary HDF5 file

Type:

h5py.File

Inheritance

digraph inheritance8ecd5bf422 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "CreateFileH5" [URL="#mt.aio.CreateFileH5",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A context for creating an HDF5 file safely."]; }
class mt.aio.BgProcess

Launches a child process that communicates with the parent process via message passing.

You should subclass this class and implement child_handle_message(). See the docstring of the function below.

Notes

In interactive mode, remember to delete any instance of the the class when you exit or else it will not exit.

Inheritance

digraph inheritance3a1e0b5d29 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "BgProcess" [URL="#mt.aio.BgProcess",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Launches a child process that communicates with the parent process via message passing."]; }
child_handle_message(msg: object) object

Handles a message obtained from the queue.

This function should only be called by the child process.

It takes as input a message from the parent-to-child queue and processes the message. Once done, it returns an object which will be wrapped into a message and placed into the child-to-parent queue.

An input message can be anything. Usually it is a tuple with the first component being a command string. The output message can also be anything. If the handle succeeds, the returning value is then wrapped into a (‘returned’, retval) output message. If an exception is raised, it is wrapped into a (‘raised_exception’, exc, callstack_lines) output message.

If the child process prints anything to stdout our stderr, it will be redirected as (‘write’, ‘stdout’ or ‘stderr’, text) in the output queue. Note that for now only Python-generated printouts can be redirected. Native printouts require more cumbersome solutions. See: https://exceptionshub.com/python-multiprocessing-how-can-i-reliably-redirect-stdout-from-a-child-process-2.html

The user should override this function. The default behaviour is returning whatever sent to it.

If KeyboardInterrupt is raised in the child process but outside this function, you will get a (‘ignored_exception’, KeyboardInterrupt) message. If the child process dies you will get a (‘exit’, None or Exception) message depending on whether the child process dies normally or abruptedly.

async send(msg, recv_timeout: float | None = None, recv_aio_interval=0.001)

Sends a message to the child process and awaits for the returning message.

Parameters:
  • msg (object) – message to be sent to the child process

  • recv_timeout (float) – If specified, the number of seconds to wait asynchronously to receive the message, before raising a queue.Empty exception. If not, asynchronously blocks until the message from the child process is received.

  • recv_aio_interval (float) – time unit to simulate asynchronous blocking while waiting for message response. Default is 1ms.

Returns:

message received from the child process

Return type:

object

Raises:

RuntimeError – if the child process is not alive while processing the message

class mt.aio.AprocManager(max_concurrency: int = 1024, handle_exception: str = 'raise', logger=None)

Manages the completion of aprocs.

With this manager, the user can just send an aproc to it and forget. To ensure all aprocs are completed, please invoke the cleanup function. Otherwise, some aprocs may never get awaited when the manager dies.

Parameters:
  • max_concurrency (int) – maximum number of concurrent aprocs that can be held pending

  • handle_exception ({'raise', 'silent', 'warn'}) – policy for handling an exception raised by an aproc. If ‘raise’, re-raise the caught exception. If ‘silent’, ignore the exception. If ‘warn’, use the provided logger to warn the user.

  • logger (logging.Logger or equivalent) – logger for warning purposes

Inheritance

digraph inheritancec9fccd820c { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "AprocManager" [URL="#mt.aio.AprocManager",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Manages the completion of aprocs."]; }
async cleanup()

Awaits until all aprocs are done.

async send(aproc: Future)

Sends an aproc to the manager so the user can forget about it.

The function usually returns immediately. However, if the maximum number of concurrent aprocs has been exceeded. It will await.

Parameters:

aproc (asyncio.Future) – a future (returned via asyncio.create_task() or asyncio.ensure_future()) that is a procedure

Variables

mt.aio.ipython

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

'outside'