mt.aio
Useful subroutines dealing with asyn and asynch functions.
Asyn functions and asynch functions are functions that can operate either asynchronously or synchronously. Asynchronicity means like a coroutine function that can yield control back to the current event loop. Synchronicity means it does not yield control during its execution. Both asyn and functions accept a keyword argument ‘context_vars’ which is a dictionary of context variables. One of which must be a boolean variable ‘async’ telling whether the function is to be executed in asynchronous mode (context_vars[‘async’] is True) or in synchronous mode (context_vars[‘async’] is False). There can be other context variables, they are usually the enter-results of either asynchronous or normal with statements.
An asyn function is declared with ‘async def’. It can be invoked without an event loop, via
invoking mt.aio.srun()
. Asyn functions are good for building library subroutines supporting
both asynchronous and synchronous modes, but they break backward compatibility because of their
‘async’ declaration requirement.
An asynch function is declared with ‘def’ like a normal function. When in asynchronous mode, the function returns a coroutine that must be intercepted with keyword ‘await’, as if it is a coroutine function. When in synchronous mode, the function behaves like a normal function. Asynch functions are good for backward compatibility, because they are normal functions that can pretend to be a coroutine function, but they are bad for developing library subroutines supporting both asynchronous and synchronous modes.
It is discouraged to implement an asynch function. You should only do so if you have no other choice.
Submodules
Functions
srun()
: Invokes an asyn function synchronously, without using keyword ‘await’.arun()
: Invokes an asyn function from inside an asynch function.arun2()
: Invokes an asynch function from inside an asyn function.sleep()
: An asyn function that sleeps for a number of seconds.yield_control()
: Yields the control back to the current event loop.safe_chmod()
: Undocumented.safe_rename()
: Undocumented.read_binary()
: An asyn function that opens a binary file and reads the content.write_binary()
: An asyn function that creates a binary file and writes the content.read_text()
: An asyn function that opens a text file and reads the content.write_text()
: An asyn function that creates a text file and writes the content.json_load()
: An asyn function that loads the json-like object of a file.json_save()
: An asyn function that saves a json-like object to a file.mkdtemp()
: An asyn context manager that opens and creates a temporary directory.qput_aio()
: Puts obj into the queue q.qget_aio()
: Removes and returns an item from the queue q.
- mt.aio.srun(asyn_func, *args, extra_context_vars: dict = {}, **kwargs) object
Invokes an asyn function synchronously, without using keyword ‘await’.
- Parameters:
asyn_func (function) – an asyn function taking ‘asyn’ as a keyword argument
args (list) – postitional arguments to be passed to the function
extra_context_vars (dict) – additional context variables to be passed to the function. The ‘context_vars’ keyword to be passed to the function will be {‘async’: False}.update(extra_context_vars)
kwargs (dict) – other keyword arguments to be passed to the function
- Returns:
whatever the function returns
- Return type:
object
- mt.aio.arun(asyn_func, *args, context_vars: dict = {}, **kwargs) object
Invokes an asyn function from inside an asynch function.
- Parameters:
asyn_func (function) – an asyn function (declared with ‘async def’)
args (list) – positional arguments of the asyn function
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
kwargs (dict) – other keyword arguments of the asyn function
- Returns:
whatver the asyn function returns
- Return type:
object
- async mt.aio.arun2(asynch_func, *args, context_vars: dict = {}, **kwargs) object
Invokes an asynch function from inside an asyn function.
- Parameters:
asyn_func (function) – an asyn function (declared with ‘async def’)
args (list) – positional arguments of the asyn function
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
kwargs (dict) – other keyword arguments of the asynch function
- Returns:
whatver the asyn function returns
- Return type:
object
- async mt.aio.sleep(secs: float, context_vars: dict = {})
An asyn function that sleeps for a number of seconds.
In asynchronous mode, it invokes
asyncio.sleep()
. In synchronous mode, it invokestime.sleep()
.- Parameters:
secs (float) – number of seconds to sleep
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
- async mt.aio.yield_control()
Yields the control back to the current event loop.
- async mt.aio.safe_chmod(filepath: str, file_mode: int = 436)
- async mt.aio.safe_rename(filepath: str, new_filepath: str, context_vars: dict = {})
- async mt.aio.read_binary(filepath, size: int | None = None, context_vars: dict = {}) bytes
An asyn function that opens a binary file and reads the content.
- Parameters:
filepath (str) – path to the file
size (int) – size to read from the beginning of the file, in bytes. If None is given, read the whole file.
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
- Returns:
the content read from file
- Return type:
bytes
- async mt.aio.write_binary(filepath, buf: bytes, file_mode: int = 436, context_vars: dict = {}, file_write_delayed: bool = False)
An asyn function that creates a binary file and writes the content.
- Parameters:
filepath (str) – path to the file
buf (bytes) – data (in bytes) to be written to the file
file_mode (int) – file mode to be set to using
os.chmod()
. Only valid if fp is a string. If None is given, no setting of file mode will happen.context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
file_write_delayed (bool) – Only valid in asynchronous mode. If True, wraps the file write task into a future and returns the future. In all other cases, proceeds as usual.
- Returns:
either a future or the number of bytes written, depending on whether the file write task is delayed or not
- Return type:
asyncio.Future or int
Notes
The content is written to a file with ‘.mttmp’ extension first before the file is renamed to the right file.
- async mt.aio.read_text(filepath, size: int | None = None, context_vars: dict = {}) str
An asyn function that opens a text file and reads the content.
- Parameters:
filepath (str) – path to the file
size (int) – size to read from the beginning of the file, in bytes. If None is given, read the whole file.
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
- Returns:
the content read from file
- Return type:
str
- async mt.aio.write_text(filepath, buf: str, file_mode: int = 436, context_vars: dict = {}, file_write_delayed: bool = False)
An asyn function that creates a text file and writes the content.
- Parameters:
filepath (str) – path to the file
buf (str) – data (in bytes) to be written to the file
file_mode (int) – file mode to be set to using
os.chmod()
. Only valid if fp is a string. If None is given, no setting of file mode will happen.context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
file_write_delayed (bool) – Only valid in asynchronous mode. If True, wraps the file write task into a future and returns the future. In all other cases, proceeds as usual.
- Returns:
either a future or the number of bytes written, depending on whether the file write task is delayed or not
- Return type:
asyncio.Future or int
Notes
The content is written to a file with ‘.mttmp’ extension first before the file is renamed to the right file.
- async mt.aio.json_load(filepath, context_vars: dict = {}, **kwargs)
An asyn function that loads the json-like object of a file.
- Parameters:
filepath (str) – path to the file
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
kwargs (dict) – keyword arguments passed as-is to
json.loads()
- Returns:
the loaded json-like object
- Return type:
object
- async mt.aio.json_save(filepath, obj, file_mode: int = 436, context_vars: dict = {}, file_write_delayed: bool = False, **kwargs)
An asyn function that saves a json-like object to a file.
- Parameters:
filepath (str) – path to the file
obj (object) – json-like object to be written to the file
file_mode (int) – file mode to be set to using
os.chmod()
. Only valid if fp is a string. If None is given, no setting of file mode will happen.context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
file_write_delayed (bool) – Only valid in asynchronous mode. If True, wraps the file write task into a future and returns the future. In all other cases, proceeds as usual.
kwargs (dict) – keyword arguments passed as-is to
json.dumps()
- Returns:
either a future or the number of bytes written, depending on whether the file write task is delayed or not
- Return type:
asyncio.Future or int
- mt.aio.mkdtemp(context_vars: dict = {})
An asyn context manager that opens and creates a temporary directory.
- Parameters:
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not.
- Returns:
tmpdir – the context manager whose enter-value is a string containing the temporary dirpath.
- Return type:
object
- async mt.aio.qput_aio(q: Queue, obj, block: bool = True, timeout: float | None = None, aio_interval: float = 0.001)
Puts obj into the queue q.
If the optional argument block is True (the default) and timeout is None (the default), block asynchronously if necessary until a free slot is available. If timeout is a positive number, it blocks asynchronously at most timeout seconds and raises the
queue.Full
exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise thequeue.Full
exception (timeout is ignored in that case).
- async mt.aio.qget_aio(q: Queue, block: bool = True, timeout: float | None = None, aio_interval: float = 0.001)
Removes and returns an item from the queue q.
If optional args block is True (the default) and timeout is None (the default), block asynchronously if necessary until an item is available. If timeout is a positive number, it blocks asynchronously at most timeout seconds and raises the
queue.Empty
exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise thequeue.Empty
exception (timeout is ignored in that case).
Classes
CreateFileH5
: A context for creating an HDF5 file safely.BgProcess
: Launches a child process that communicates with the parent process via message passing.AprocManager
: Manages the completion of aprocs.
- class mt.aio.CreateFileH5(filepath: str, file_mode: int = 436, context_vars: dict = {}, logger=None)
A context for creating an HDF5 file safely.
It creates a temporary HDF5 file for writing. Once the user exits the context, the file is chmodded and renamed to a given name. Any intermediate folder that does not exist is created automatically.
The context can be synchronous or asynchronous, by specifying the ‘async’ keyword and argument ‘context_vars’.
- Parameters:
filepath (str) – local file path to be written to
file_mode (int) – file mode to be set to using
os.chmod()
. If None is given, no setting of file mode will happen.context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. Only used for the asynchronous mode.
logger (logging.Logger, optional) – logger for debugging purposes
- tmp_filepath
the local file path of the temporary HDF5 file
- Type:
str
- handle
the handle of the temporary HDF5 file
- Type:
h5py.File
Inheritance
digraph inheritance8ecd5bf422 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "CreateFileH5" [URL="#mt.aio.CreateFileH5",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A context for creating an HDF5 file safely."]; }
- class mt.aio.BgProcess
Launches a child process that communicates with the parent process via message passing.
You should subclass this class and implement
child_handle_message()
. See the docstring of the function below.Notes
In interactive mode, remember to delete any instance of the the class when you exit or else it will not exit.
Inheritance
digraph inheritance3a1e0b5d29 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "BgProcess" [URL="#mt.aio.BgProcess",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Launches a child process that communicates with the parent process via message passing."]; }- child_handle_message(msg: object) object
Handles a message obtained from the queue.
This function should only be called by the child process.
It takes as input a message from the parent-to-child queue and processes the message. Once done, it returns an object which will be wrapped into a message and placed into the child-to-parent queue.
An input message can be anything. Usually it is a tuple with the first component being a command string. The output message can also be anything. If the handle succeeds, the returning value is then wrapped into a (‘returned’, retval) output message. If an exception is raised, it is wrapped into a (‘raised_exception’, exc, callstack_lines) output message.
If the child process prints anything to stdout our stderr, it will be redirected as (‘write’, ‘stdout’ or ‘stderr’, text) in the output queue. Note that for now only Python-generated printouts can be redirected. Native printouts require more cumbersome solutions. See: https://exceptionshub.com/python-multiprocessing-how-can-i-reliably-redirect-stdout-from-a-child-process-2.html
The user should override this function. The default behaviour is returning whatever sent to it.
If KeyboardInterrupt is raised in the child process but outside this function, you will get a (‘ignored_exception’, KeyboardInterrupt) message. If the child process dies you will get a (‘exit’, None or Exception) message depending on whether the child process dies normally or abruptedly.
- async send(msg, recv_timeout: float | None = None, recv_aio_interval=0.001)
Sends a message to the child process and awaits for the returning message.
- Parameters:
msg (object) – message to be sent to the child process
recv_timeout (float) – If specified, the number of seconds to wait asynchronously to receive the message, before raising a
queue.Empty
exception. If not, asynchronously blocks until the message from the child process is received.recv_aio_interval (float) – time unit to simulate asynchronous blocking while waiting for message response. Default is 1ms.
- Returns:
message received from the child process
- Return type:
object
- Raises:
RuntimeError – if the child process is not alive while processing the message
- class mt.aio.AprocManager(max_concurrency: int = 1024, handle_exception: str = 'raise', logger=None)
Manages the completion of aprocs.
With this manager, the user can just send an aproc to it and forget. To ensure all aprocs are completed, please invoke the cleanup function. Otherwise, some aprocs may never get awaited when the manager dies.
- Parameters:
max_concurrency (int) – maximum number of concurrent aprocs that can be held pending
handle_exception ({'raise', 'silent', 'warn'}) – policy for handling an exception raised by an aproc. If ‘raise’, re-raise the caught exception. If ‘silent’, ignore the exception. If ‘warn’, use the provided logger to warn the user.
logger (logging.Logger or equivalent) – logger for warning purposes
Inheritance
digraph inheritancec9fccd820c { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "AprocManager" [URL="#mt.aio.AprocManager",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Manages the completion of aprocs."]; }- async cleanup()
Awaits until all aprocs are done.
- async send(aproc: Future)
Sends an aproc to the manager so the user can forget about it.
The function usually returns immediately. However, if the maximum number of concurrent aprocs has been exceeded. It will await.
- Parameters:
aproc (asyncio.Future) – a future (returned via
asyncio.create_task()
orasyncio.ensure_future()
) that is a procedure
Variables
- mt.aio.ipython
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.
'outside'