mt.concurrency

Concurrency using multiprocessing and asyncio.

Submodules

Functions

mt.concurrency.used_memory_too_much()

Checks whether memory usage exceeeds 90%.

mt.concurrency.used_cpu_too_much()

Checks whether CPU usage exceeds 90%.

mt.concurrency.split_works(num_works: int, num_buckets: int | None = None)

Splits a number of works randomly into a few buckets, returning the work id per bucket.

Parameters:
  • num_works (int) – number of works

  • num_buckets (int, optional) – number of buckets. If not specified, the number of cpus will be used.

Returns:

ll_workIds – a nested list of work id lists. The work ids, in interval [0, num_works), are split approximately evenly and randomly into the buckets

Return type:

list

mt.concurrency.serial_work_generator(func, num_work_ids: int)

A generator that serially does some works and yields the work results.

This function complements the WorkIterator class to deal with cases when the number of works is small.

Parameters:
  • func (function) – a function representing the work process. The function takes as input a non-negative integer ‘work_id’ and returns some result in the form of (work_id, …) if successful else None.

  • num_work_ids (int) – number of works to iterate over without using multiprocessing or multithreading.

Returns:

a Python generator

Return type:

object

async mt.concurrency.aio_work_generator(func, num_work_ids, skip_null: bool = True, func_kwargs: dict = {}, max_concurrency: int = 1024)

An asynchronous generator that does some works and yields the work results.

This function uses asyncio to do works concurrently. The number of concurrent works is optionally upper-bounded by max_concurrency. It is of good use when the works are IO-bound. Otherwise, WorkIterator or serial_work_generator() are more suitable options.

Parameters:
  • func (function) – a coroutine function (defined with ‘async def’) representing the work process. The function takes as input a non-negative integer ‘work_id’ and optionally some keyword arguments. It returns some result in the form of (work_id, …) if successful else None.

  • num_work_ids (int) – number of works to iterate over without using multiprocessing or multithreading.

  • skip_null (bool, optional) – whether or not to skip the iteration that contains None as the work result.

  • func_kwargs (dict) – additional keyword arguments to be passed to the function as-is

  • max_concurrency (int) – the maximum number of concurrent works at any time, good for managing memory allocations. If None is given, all works will be scheduled to run at once.

Returns:

an asynchronous generator yielding each result in the form (work_id, …)

Return type:

object

async mt.concurrency.run_asyn_works_in_context(progress_queue: Queue, func, func_args: tuple = (), func_kwargs: dict = {}, context_id=None, work_id_list: list = [], max_concurrency: int = 1024, context_vars: dict = {})

Invokes the same asyn function with different work ids concurrently and asynchronously, in a given context.

Parameters:
  • progress_queue (multiprocessing.Queue) – a shared queue so that the main process can observe the progress inside the context. See notes below.

  • func (function) – an asyn function that may return something and may raise an Exception. The function must have the first argument being the work id. The context variables provided to the function are automatically created via invoking mt.base.s3.create_context_vars().

  • func_args (tuple) – additional positional arguments to be passed to the function as-is

  • func_kwargs (dict) – additional keyword arguments to be passed to the function as-is

  • context_id (int, optional) – the context id to be assigned to the new context. Default is None if we do not care.

  • work_id_list (list) – list of work ids to be passed to the function

  • max_concurrency (int) – the maximum number of concurrent works in the context at any time, good for managing memory allocations. If None is given, all works in the context will be scheduled to run at once.

  • asyn (bool) – whether the asyn function is to be invoked asynchronously or synchronously

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client(). In asynchronous mode, variable ‘http_session’ must exist and hold an enter-result of an async with statement invoking mt.base.http.create_http_session(). You can use mt.base.s3.create_context_vars() to create a dictionary like this.

Notes

This function returns nothing but while it is running, the progress queue regularly receives messages. Each message is a tuple (context_id, message_code, work_id, …). The user should process the progress queue.

If KeyboardInterrupt is raised during the invocation, all remaining unscheduled tasks are cancelled. This means for the case that max_concurrency is None, KeyboardInterrupt has no effect.

async mt.concurrency.asyn_work_generator(func, func_args: tuple = (), func_kwargs: dict = {}, num_processes=None, num_works: int = 0, max_concurrency: int = 1024, profile=None, debug_logger=None, progress_queue=None, timeout: int = 300)

An asyn generator that does a large number of works concurrently and yields the work results.

Internally, it splits the list of work ids into blocks and invokes run_asyn_works_in_context() to run each block in a new context in a separate process. It uses a multiprocessing queue to facilitate the communications between the processes. Every time a task is done, either returning a result or raising an exception, it yields that information.

Parameters:
  • func (function) – an asyn function that may return something and may raise an Exception. The function must have the first argument being the work id. The context variables provided to the function are automatically created via invoking mt.base.s3.create_context_vars().

  • func_args (tuple) – additional positional arguments to be passed to the function as-is

  • func_kwargs (dict) – additional keyword arguments to be passed to the function as-is

  • num_processes (int) – the number of processes to be created. If not specified, it is equal to the number of CPUs.

  • num_works (int) – number of works

  • max_concurrency (int) – the maximum number of concurrent works in each context at any time, good for managing memory allocations. If None is given, all works in each context will be scheduled to run at once.

  • profile (str, optional) – one of the profiles specified in the AWS. The default is used if None is given.

  • debug_logger (logging.Logger or equivalent) – logger for debugging purposes, if needed

  • progress_queue (multiprocessing.Queue) – a shared queue so that the main process can observe the progress inside the context. If not provided, one will be created internally.

  • timeout (int, optional) – timeout, in (0.1*second) unit

Notes

The context ids are zero-based. The number of contexts is equal to the number of processes. The work ids are task ids and are zero-based. The messages related to when a task is done are yielded.

Asyncio and KeyboardInterrupt are not happy with each other. https://bugs.python.org/issue42683

mt.concurrency.worker_process(func, heartbeat_pipe: Connection, queue_in: Queue, queue_out: Queue, logger: IndentedLoggerAdapter | None = None)

The worker process.

The worker process operates in the following way. There is a pipe, named heartbeat_pipe to communicate between the worker and its parent process about their heartbeats. Each side of the pipe constantly sends a heartbeat in every 0.5 seconds. If one side does not hear any heartbeat within 5*240 seconds, then it can assume the other side has died for some unexpected reason. After detecting this fateful event, the parent process can abruptly kill the worker process, but in contrast, the worker can only finish its own fate.

A heart beat from the parent process is a byte. If the heartbeat is 0, the parent is alive. If it is 1, the worker process must do a seppuku.

A heart beat from the worker process is also a byte. If it is 0, the worker is alive. If it is 1, the worker signals that it has received keyboard interruption and is about to die in peace.

There is global pair of queues, queue_in and queue_out, to distribute workloads. Workloads are divided into smaller pieces identified by zero-based work ids. The parent process sends work ids to queue_in. Any worker process can extract a work id from the queue, do the work, and return the outcome to queue_out. The data in queue_in are pure integers. The data in queue_out are tuples of the form (work_id, …).

The parent process owns all the pipes and queues.

Parameters:
  • func (function) – a function taking work_id as the only argument and returning something. The function is run in a background thread of the worker process

  • heartbeat_pipe (multiprocessing.connection.Connection) – the child connection of a parent-child pipe to communicate with the parent about their heartbeats

  • queue_in (multiprocessing.Queue) – queue containing work ids

  • queue_out (multiprocessing.Queue) – queue containing results (work_id, …)

  • logger (IndentedLoggerAdapter, optional) – logger for debugging purposes

mt.concurrency.asyn_pmap(asyn_func: Callable, input_iterable: Iterable, batch_size: int = 1024, asyn_func_args: tuple = (), asyn_func_kwargs: dict = {}, pool_processes: int | None = None, pool_maxtasksperchild: int | None = None, pool_ordered: bool = True, init_func: Callable | None = None, init_func_args: tuple = (), init_func_kwargs: dict = {}, cvc_func: Callable | None = None, cvc_func_args: tuple = (), cvc_func_kwargs: dict = {}) Iterable

pmap over an iterator with an asyn function.

Internally, the iterator is batched into a batch iterator. Each batch is sent to one worker for processing. The worker goes through every item of a batch and invokes asyn function asyn_func providing the context_vars dictionary. Each batch of results is then unbatched. The output iterator yield resultant items, which may or may not in order depending on the pool_ordered argument.

Parameters:
  • asyn_func (function) – an asyn function that returns something. The first positional argument of function is the item to be processed. The keyword argument ‘context_vars’ is provided to the function.

  • input_iterable (iterable) – any iterable object to act as the input iterator

  • batch_size (int) – number of batch items in each batch. It should be chosen by the user to balance between the benefit of iterating over the items of a batch in async mode and the cost of allocated memory to store pending transformed items of the batch

  • asyn_func_args (tuple, optional) – additional positional arguments to be passed to the asyn function as-is

  • asyn_func_kwargs (dict, optional) – additional keyword arguments to be passed to the asyn function as-is

  • pool_processes (int, optional) – the number of processes to be created. If not specified, it is equal to the number of CPUs. Passed as-is to multiprocessing.Pool.

  • pool_maxtasksperchild (int, optional) – the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool. Passed as-is to multiprocessing.Pool.

  • pool_ordered (bool) – whether the output iterator provides the same order as the input iterator or not

  • init_func (function, optional) – a function returning None that is invoked after a worker process is created

  • init_func_args (tuple, optional) – additional positional arguments to be passed to the init function as-is

  • init_func_kwargs (dict, optional) – additional keyword arguments to be passed to the init function as-is

  • cvc_func (function, optional) – a function returning the context_vars dictionary to be provided to the asyn function as a keyword argument. The context is consistent throughout the lifecycle of one block. If not provided, context_vars={“async”: True}.

  • cvc_func_args (tuple, optional) – additional positional arguments to be passed to the cvc function as-is

  • cvc_func_kwargs (dict, optional) – additional keyword arguments to be passed to the cvc function as-is

Returns:

output_iterable – the output iterator

Return type:

iterable

Classes

  • Counter: Counter class without the race-condition bug

  • ProcessParalleliser: Run a function with different inputs in parallel using multiprocessing.

  • WorkIterator: Iterates work from id 0 to infinity, returning the work result in each iteration.

class mt.concurrency.Counter

Counter class without the race-condition bug

Inheritance

digraph inheritance40accdb559 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Counter" [URL="#mt.concurrency.Counter",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Counter class without the race-condition bug"]; }
class mt.concurrency.ProcessParalleliser(func, max_num_workers=None, logger=None)

Run a function with different inputs in parallel using multiprocessing.

Parameters:
  • func (function) – a function to be run in parallel. The function takes as input a non-negative integer ‘work_id’ and returns some result.

  • max_num_workers (int, optional) – maximum number of concurrent workers or equivalently processes to be allocated

  • logger (IndentedLoggerAdapter, optional) – for logging messages

Inheritance

digraph inheritance77940e215a { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "ProcessParalleliser" [URL="#mt.concurrency.ProcessParalleliser",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Run a function with different inputs in parallel using multiprocessing."]; }
close()

Closes the instance properly.

empty()

Returns whether the output queue is empty.

pop(timeout=3600)

Returns a pair (work_id, result) when at least one such pair is available.

Parameters:

timeout (float) – number of seconds to wait to pop a work result from the queue before bailing output

Returns:

  • int – non-negative integer representing the work id

  • object – work result

Raises:

queue.Empty – if there is no work result after the timeout

push(work_id, timeout=30)

Pushes a work id to the background to run the provided function in parallel.

Parameters:
  • work_id (int) – non-negative integer to be provided to the function

  • timeout (float) – number of seconds to wait to push the id to the queue before bailing out

Returns:

whether or not the work id has been pushed

Return type:

bool

Notes

You should use pop() or empty() to check for the output of each work.

class mt.concurrency.WorkIterator(func, buffer_size=None, skip_null: bool = True, push_timeout=30, pop_timeout=3600, max_num_workers=None, serial_mode=False, logger=None)

Iterates work from id 0 to infinity, returning the work result in each iteration.

By default, the class invokes ProcessParalleliser to do a few works ahead of time. It switches to serial mode if requested.

Parameters:
  • func (function) – a function representing the work process. The function takes as input a non-negative integer ‘work_id’ and returns some result in the form of (work_id, …) if successful else None.

  • buffer_size (int, optional) – maximum number of work resultant items to be buffered ahead of time. If not specified, default to be twice the number of processes.

  • push_timeout (float, optional) – timeout in second for each push to input queue. See ProcessParalleliser.push().

  • pop_timeout (float, optional) – timeout in second for each pop from output queue. See ProcessParalleliser.pop().

  • skip_null (bool, optional) – whether or not to skip the iteration that contains None as the work result.

  • logger (logging.Logger, optional) – for logging messages

  • max_num_workers (int, optional) – maximum number of concurrent workers or equivalently processes to be allocated

  • serial_mode (bool) – whether or not to activate the serial mode. Useful when the number of works is small.

  • logger – logger for debugging purposes

Notes

Instances of the class qualify as a thread-safe Python iterator. Each iteration returns a (work_id, result) pair. To avoid a possible deadlock during garbage collection, it is recommended to explicitly invoke close() to clean up background processes.

As of 2021/2/17, instances of WorkIterator can be used in a with statement. Upon exiting, close() is invoked.

As of 2021/4/30, you can switch version of paralleliser.

As of 2021/08/13, the class has been extended to do work in serial if the number of works is provided and is less than or equal a provided threshold.

Inheritance

digraph inheritanceaab770444e { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "WorkIterator" [URL="#mt.concurrency.WorkIterator",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Iterates work from id 0 to infinity, returning the work result in each iteration."]; }
close()

Closes the iterator for further use.