mt.concurrency.beehive

BeeHive concurrency using asyncio and multiprocessing.

This is a concurrency model made up by Minh-Tri. In this model, there is main process called the queen bee and multiple subprocesses called worker bees. The queen bee is responsible for spawning and destroying worker bees herself. She is also responsible for organisation-level tasks. She does not do individual-level tasks but instead assigns those tasks to worker bees.

The BeeHive model can be useful for applications like making ML model predictions from a dataframe. In this case, the queen bee owns access to the ML model and the dataframe, delegates the potentially IO-related preprocessing and postprocessing tasks of every row of the dataframe to worker bees, and deals with making batch predictions from the model.

Functions

  • logger_debug_msg(): Logs debugging statements extracted from a bee message. For internal use only.

  • subprocess_workerbee(): Creates a daemon subprocess that holds a worker bee and runs the bee in the subprocess.

  • beehive_run(): An asyn function that runs a task in a BeeHive concurrency model.

mt.concurrency.beehive.logger_debug_msg(msg, logger=None)

Logs debugging statements extracted from a bee message. For internal use only.

mt.concurrency.beehive.subprocess_workerbee(workerbee_class, workerbee_id: int, init_args: tuple = (), init_kwargs: dict = {}, s3_profile: str | None = None, max_concurrency: int = 1024, logger: IndentedLoggerAdapter | None = None)

Creates a daemon subprocess that holds a worker bee and runs the bee in the subprocess.

Parameters:
  • workerbee_class (class) – subclass of WorkerBee whose constructor accepts all positional and keyword arguments of the constructor of the super class

  • workerbee_id (int) – the id that the queen bee has assigned to the worker bee

  • init_args (tuple) – additional positional arguments to be passed as-is to the new bee’s constructor

  • init_kwargs (dict) – additional keyword arguments to be passed as-is to the new bee’s constructor

  • s3_profile (str, optional) – the S3 profile from which the context vars are created. See mt.base.s3.create_context_vars().

  • max_concurrency (int) – maximum number of concurrent tasks that the bee handles at a time

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Returns:

  • process (multiprocessing.Process) – the created subprocess

  • p_m2p (multiprocessing.Queue) – the me-to-parent connection for private communication with the worker bee

  • p_p2m (multiprocessing.Queue) – the parent-to-me connection for private communication with the worker bee

async mt.concurrency.beehive.beehive_run(queenbee_class, workerbee_class, task_name: str, task_args: tuple = (), task_kwargs: dict = {}, queenbee_init_args: tuple = (), queenbee_init_kwargs: dict = {}, workerbee_init_args: tuple = (), workerbee_init_kwargs: dict = {}, s3_profile: str | None = None, max_concurrency: int = 1024, queenbee_max_concurrency: int | None = None, context_vars: dict = {}, logger: IndentedLoggerAdapter | None = None)

An asyn function that runs a task in a BeeHive concurrency model.

Parameters:
  • queenbee_class (class) – a subclass of QueenBee

  • workerbee_class (class) – a subclass of :class:WorkerBee`

  • task_name (str) – name of a task assigned to the queen bee, and equivalently the queen bee’s member asyn function handling the task

  • task_args (tuple) – positional arguments to be passed to the member function as-is

  • task_kwargs (dict) – keyword arguments to be passed to the member function as-is

  • queenbee_init_args (tuple) – additional positional arguments to be passed as-is to the new queen bee’s constructor

  • queenbee_init_kwargs (dict) – additional keyword arguments to be passed as-is to the new queen bee’s constructor

  • workerbee_init_args (tuple) – additional positional arguments to be passed as-is to each new worker bee’s constructor

  • workerbee_init_kwargs (dict) – additional keyword arguments to be passed as-is to each new worker bee’s constructor

  • s3_profile (str, optional) – the S3 profile from which the context vars are created. See mt.base.s3.create_context_vars().

  • max_concurrency (int) – maximum number of concurrent tasks that each worker bee handles at a time

  • queenbee_max_concurrency (int, optional) – maximum number of concurrent tasks that the queen bee handles at a time. Default is no limit.

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Returns:

anything returned by the task assigned to the queen bee

Return type:

object

Raises:
  • RuntimeError – when the queen bee has cancelled her task

  • Exception – anything raised by the task assigned to the queen bee

Classes

  • Bee: The base class of a bee.

  • WorkerBee: A worker bee in the BeeHive concurrency model.

  • QueenBee: The queen bee in the BeeHive concurrency model.

class mt.concurrency.beehive.Bee(my_id: int, p_m2p: Queue, p_p2m: Queue, max_concurrency: int | None = 1024, logger: IndentedLoggerAdapter | None = None)

The base class of a bee.

A bee is an asynchronous multi-tasking worker that can communicate with its parent and zero or more child bees via message passing. Communication means executing a task for the parent and delegating child bees to do subtasks. All tasks are asynchronous.

Each bee maintains a private communication its parent via a full-duplex (p_p2m, p_m2p) pair of queues. p2m and m2p stand for parent-to-me and me-to-parent. For every new task to be delegated to a child, the bee broadcasts a zero-based task id by sending a message {‘msg_type’: ‘new_task’, ‘task_id’: int} to every child. If a child wishes to process the order, it places a message {‘msg_type’: ‘task_accepted’, ‘task_id’: int} back to the parent via its p_m2p. If it does not wish to process the order, it should place a different message {‘msg_type’: ‘task_denied’, ‘task_id’: int}. The parent would choose one of the responders for doing the task and would send {‘msg_type’: ‘task_taken’, ‘task_id’: int} to all late accepters regarding the task. The communication to the chosen child is conducted via 2 further messages, each of which is a key-value dictionary.

The first message is sent from the parent bee describing the task to be delegated to the child {‘msg_type’: ‘task_info’, ‘task_id’: int, ‘name’: str, ‘args’: tuple, ‘kwargs’: dict}. Variables name is the task name. It is expected that the child bee class implements an asyn member function of the same name as the task name to handle the task. Variables args and kwargs will be passed as positional and keyword arguments respectively to the function. The parent assumes that the child will process the task asynchronously at some point in time, and awaits for response.

Upon finishing processing the task, either successfully or not, the child bee must send {‘msg_type’: ‘task_done’, ‘task_id’: int, ‘status’: str, …} back to the parent bee. status can be one of 3 values: ‘cancelled’, ‘raised’ and ‘succeeded’, depending on whether the task was cancelled, raised an exception, or succeeded without any exception. If status is ‘cancelled’, key reason tells the reason if it is not None. If status is ‘raised’, key exception contains an Exception instance and key traceback contains a list of text lines describing the call stack, and key other_details contains other supporting information. Finally, if status is ‘succeeded’, key returning_value holds the returning value.

Child bees are run in daemon subprocesses of the parent bee process. There are some special messages apart from messages controlling task delegation above. The first one is {‘msg_type’: ‘dead’, ‘death_type’: str, …} sent from a child bee to its parent, where death_type is either ‘normal’ or ‘killed’. If it is ‘normal’, key exit_code is either None or a value representing an exit code or a last word. If it is ‘killed’, key exception holds the Exception that caused the bee to be killed, and key traceback holds a list of text lines describing the call stack. This message is sent only when the bee has finished up tasks and its worker process is about to terminate. The second one is {‘msg_type’: ‘die’} sent from a parent bee to one of its children, telling the child bee to finish its life gracecfully. Bees are very sensitive, if the parent says ‘die’ they become very sad, stop accepting new tasks, and die as soon as all existing tasks have been done.

Each bee comes with its own life cycle, implemented in the async run(). The user should not have to override any private member function. They just have to create new asyn member functions representing different tasks.

Parameters:
  • my_id (int) – the id that the creator has assigned to the bee

  • p_m2p (queue.Queue) – the me-to-parent queue for private communication

  • p_p2m (queue.Queue) – the parent-to-me queue for private communication

  • max_concurrency (int, optional) – maximum number of concurrent tasks that the bee handles at a time. If None is provided, there is no constraint in maximum number of concurrent tasks.

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Inheritance

digraph inheritance84bc8a21cd { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Bee" [URL="#mt.concurrency.beehive.Bee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The base class of a bee."]; }
async delegate(name: str, *args, **kwargs)

Delegates a task to one of child bees and awaits the result.

Parameters:
  • name (str) – the name of the task/asyn member function of the child bee class responsible for handling the task

  • args (tuple) – positional arguments to be passed as-is to the task/asyn member function

  • kwargs (dict) – keyword arguments to be passed as-is to the task/asyn member function

Returns:

  • task_result (dict) – returning message in the form {‘status’: str, …} where status is one of ‘cancelled’, ‘raised’ and ‘succeeded’. The remaining keys are described in the class docstring.

  • child_id (int) – the id of the child that has executed the task

async run()

Implements the life-cycle of the bee. Invoke this function only once.

class mt.concurrency.beehive.WorkerBee(my_id: int, p_m2p: Queue, p_p2m: Queue, max_concurrency: int = 1024, context_vars: dict = {}, logger: IndentedLoggerAdapter | None = None)

A worker bee in the BeeHive concurrency model.

See parent Bee for more details.

Parameters:
  • my_id (int) – the id that the queen bee has assigned to the worker bee

  • p_m2p (multiprocessing.Queue) – the me-to-parent connection for private communication

  • p_p2m (multiprocessing.Queue) – the parent-to-me connection for private communication

  • max_concurrency (int) – maximum number of concurrent tasks that the bee handles at a time

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Inheritance

digraph inheritance6c73f247f5 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Bee" [URL="#mt.concurrency.beehive.Bee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The base class of a bee."]; "WorkerBee" [URL="#mt.concurrency.beehive.WorkerBee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A worker bee in the BeeHive concurrency model."]; "Bee" -> "WorkerBee" [arrowsize=0.5,style="setlinewidth(0.5)"]; }
async busy_status(context_vars: dict = {})

A task to check how busy the worker bee is.

Parameters:

context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

Returns:

a scalar between 0 and 1 where 0 means completely free and 1 means completely busy

Return type:

float

class mt.concurrency.beehive.QueenBee(my_id: int, p_m2p: Queue, p_p2m: Queue, workerbee_class, worker_init_args: tuple = (), worker_init_kwargs: dict = {}, s3_profile: str | None = None, max_concurrency: int | None = None, workerbee_max_concurrency: int | None = 1024, context_vars: dict = {}, logger: IndentedLoggerAdapter | None = None)

The queen bee in the BeeHive concurrency model.

The queen bee and her worker bees work in asynchronous mode. Each worker bee operates within a context returned from mt.base.s3.create_context_vars(), with a given profile. It is asked to upper-limit number of concurrent asyncio tasks to a given threshold.

The queen bee herself is a worker bee. However, initialising a queen bee, the user must provide a subclass of WorkerBee to represent the worker bee class, from which the queen can spawn worker bees. Like other bees, it is expected that the user write member asynchornous functions to handle tasks.

Parameters:
  • my_id (int) – the id that the creator has assigned to the queen bee

  • p_m2p (queue.Queue) – the me-to-parent connection for private communication between the user (parent) and the queen

  • p_p2m (queue.Queue) – the parent-to-me connection for private communication with the worker bee

  • workerbee_class (class) – subclass of WorkerBee whose constructor accepts all positional and keyword arguments of the constructor of the super class

  • worker_init_args (tuple) – additional positional arguments to be passed as-is to each new worker bee’s constructor

  • worker_init_kwargs (dict) – additional keyword arguments to be passed as-is to each new worker bee’s constructor

  • s3_profile (str, optional) – the S3 profile from which the context vars are created. See mt.base.s3.create_context_vars().

  • max_concurrency (int, optional) – the maximum number of concurrent tasks at any time for the bee, good for managing memory allocations. Should be default to None to allow the queen bee to deal with all requests.

  • workerbee_max_concurrency (int) – the maximum number of concurrent tasks at any time for a worker bee, good for managing memory allocations. Non-integer values are not accepted.

  • context_vars (dict) – a dictionary of context variables within which the queen be functions runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client().

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Inheritance

digraph inheritancee65fc3dc33 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Bee" [URL="#mt.concurrency.beehive.Bee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The base class of a bee."]; "QueenBee" [URL="#mt.concurrency.beehive.QueenBee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The queen bee in the BeeHive concurrency model."]; "WorkerBee" -> "QueenBee" [arrowsize=0.5,style="setlinewidth(0.5)"]; "WorkerBee" [URL="#mt.concurrency.beehive.WorkerBee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A worker bee in the BeeHive concurrency model."]; "Bee" -> "WorkerBee" [arrowsize=0.5,style="setlinewidth(0.5)"]; }