mt.concurrency.beehive
BeeHive concurrency using asyncio and multiprocessing.
This is a concurrency model made up by Minh-Tri. In this model, there is main process called the queen bee and multiple subprocesses called worker bees. The queen bee is responsible for spawning and destroying worker bees herself. She is also responsible for organisation-level tasks. She does not do individual-level tasks but instead assigns those tasks to worker bees.
The BeeHive model can be useful for applications like making ML model predictions from a dataframe. In this case, the queen bee owns access to the ML model and the dataframe, delegates the potentially IO-related preprocessing and postprocessing tasks of every row of the dataframe to worker bees, and deals with making batch predictions from the model.
Functions
logger_debug_msg()
: Logs debugging statements extracted from a bee message. For internal use only.subprocess_workerbee()
: Creates a daemon subprocess that holds a worker bee and runs the bee in the subprocess.beehive_run()
: An asyn function that runs a task in a BeeHive concurrency model.
- mt.concurrency.beehive.logger_debug_msg(msg, logger=None)
Logs debugging statements extracted from a bee message. For internal use only.
- mt.concurrency.beehive.subprocess_workerbee(workerbee_class, workerbee_id: int, init_args: tuple = (), init_kwargs: dict = {}, s3_profile: str | None = None, max_concurrency: int = 1024, logger: IndentedLoggerAdapter | None = None)
Creates a daemon subprocess that holds a worker bee and runs the bee in the subprocess.
- Parameters:
workerbee_class (class) – subclass of
WorkerBee
whose constructor accepts all positional and keyword arguments of the constructor of the super classworkerbee_id (int) – the id that the queen bee has assigned to the worker bee
init_args (tuple) – additional positional arguments to be passed as-is to the new bee’s constructor
init_kwargs (dict) – additional keyword arguments to be passed as-is to the new bee’s constructor
s3_profile (str, optional) – the S3 profile from which the context vars are created. See
mt.base.s3.create_context_vars()
.max_concurrency (int) – maximum number of concurrent tasks that the bee handles at a time
logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes
- Returns:
process (multiprocessing.Process) – the created subprocess
p_m2p (multiprocessing.Queue) – the me-to-parent connection for private communication with the worker bee
p_p2m (multiprocessing.Queue) – the parent-to-me connection for private communication with the worker bee
- async mt.concurrency.beehive.beehive_run(queenbee_class, workerbee_class, task_name: str, task_args: tuple = (), task_kwargs: dict = {}, queenbee_init_args: tuple = (), queenbee_init_kwargs: dict = {}, workerbee_init_args: tuple = (), workerbee_init_kwargs: dict = {}, s3_profile: str | None = None, max_concurrency: int = 1024, queenbee_max_concurrency: int | None = None, context_vars: dict = {}, logger: IndentedLoggerAdapter | None = None)
An asyn function that runs a task in a BeeHive concurrency model.
- Parameters:
queenbee_class (class) – a subclass of
QueenBee
workerbee_class (class) – a subclass of :class:WorkerBee`
task_name (str) – name of a task assigned to the queen bee, and equivalently the queen bee’s member asyn function handling the task
task_args (tuple) – positional arguments to be passed to the member function as-is
task_kwargs (dict) – keyword arguments to be passed to the member function as-is
queenbee_init_args (tuple) – additional positional arguments to be passed as-is to the new queen bee’s constructor
queenbee_init_kwargs (dict) – additional keyword arguments to be passed as-is to the new queen bee’s constructor
workerbee_init_args (tuple) – additional positional arguments to be passed as-is to each new worker bee’s constructor
workerbee_init_kwargs (dict) – additional keyword arguments to be passed as-is to each new worker bee’s constructor
s3_profile (str, optional) – the S3 profile from which the context vars are created. See
mt.base.s3.create_context_vars()
.max_concurrency (int) – maximum number of concurrent tasks that each worker bee handles at a time
queenbee_max_concurrency (int, optional) – maximum number of concurrent tasks that the queen bee handles at a time. Default is no limit.
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking
mt.base.s3.create_s3_client()
.logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes
- Returns:
anything returned by the task assigned to the queen bee
- Return type:
object
- Raises:
RuntimeError – when the queen bee has cancelled her task
Exception – anything raised by the task assigned to the queen bee
Classes
Bee
: The base class of a bee.WorkerBee
: A worker bee in the BeeHive concurrency model.QueenBee
: The queen bee in the BeeHive concurrency model.
- class mt.concurrency.beehive.Bee(my_id: int, p_m2p: Queue, p_p2m: Queue, max_concurrency: int | None = 1024, logger: IndentedLoggerAdapter | None = None)
The base class of a bee.
A bee is an asynchronous multi-tasking worker that can communicate with its parent and zero or more child bees via message passing. Communication means executing a task for the parent and delegating child bees to do subtasks. All tasks are asynchronous.
Each bee maintains a private communication its parent via a full-duplex (p_p2m, p_m2p) pair of queues. p2m and m2p stand for parent-to-me and me-to-parent. For every new task to be delegated to a child, the bee broadcasts a zero-based task id by sending a message {‘msg_type’: ‘new_task’, ‘task_id’: int} to every child. If a child wishes to process the order, it places a message {‘msg_type’: ‘task_accepted’, ‘task_id’: int} back to the parent via its p_m2p. If it does not wish to process the order, it should place a different message {‘msg_type’: ‘task_denied’, ‘task_id’: int}. The parent would choose one of the responders for doing the task and would send {‘msg_type’: ‘task_taken’, ‘task_id’: int} to all late accepters regarding the task. The communication to the chosen child is conducted via 2 further messages, each of which is a key-value dictionary.
The first message is sent from the parent bee describing the task to be delegated to the child {‘msg_type’: ‘task_info’, ‘task_id’: int, ‘name’: str, ‘args’: tuple, ‘kwargs’: dict}. Variables name is the task name. It is expected that the child bee class implements an asyn member function of the same name as the task name to handle the task. Variables args and kwargs will be passed as positional and keyword arguments respectively to the function. The parent assumes that the child will process the task asynchronously at some point in time, and awaits for response.
Upon finishing processing the task, either successfully or not, the child bee must send {‘msg_type’: ‘task_done’, ‘task_id’: int, ‘status’: str, …} back to the parent bee. status can be one of 3 values: ‘cancelled’, ‘raised’ and ‘succeeded’, depending on whether the task was cancelled, raised an exception, or succeeded without any exception. If status is ‘cancelled’, key reason tells the reason if it is not None. If status is ‘raised’, key exception contains an Exception instance and key traceback contains a list of text lines describing the call stack, and key other_details contains other supporting information. Finally, if status is ‘succeeded’, key returning_value holds the returning value.
Child bees are run in daemon subprocesses of the parent bee process. There are some special messages apart from messages controlling task delegation above. The first one is {‘msg_type’: ‘dead’, ‘death_type’: str, …} sent from a child bee to its parent, where death_type is either ‘normal’ or ‘killed’. If it is ‘normal’, key exit_code is either None or a value representing an exit code or a last word. If it is ‘killed’, key exception holds the Exception that caused the bee to be killed, and key traceback holds a list of text lines describing the call stack. This message is sent only when the bee has finished up tasks and its worker process is about to terminate. The second one is {‘msg_type’: ‘die’} sent from a parent bee to one of its children, telling the child bee to finish its life gracecfully. Bees are very sensitive, if the parent says ‘die’ they become very sad, stop accepting new tasks, and die as soon as all existing tasks have been done.
Each bee comes with its own life cycle, implemented in the async
run()
. The user should not have to override any private member function. They just have to create new asyn member functions representing different tasks.- Parameters:
my_id (int) – the id that the creator has assigned to the bee
p_m2p (queue.Queue) – the me-to-parent queue for private communication
p_p2m (queue.Queue) – the parent-to-me queue for private communication
max_concurrency (int, optional) – maximum number of concurrent tasks that the bee handles at a time. If None is provided, there is no constraint in maximum number of concurrent tasks.
logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes
Inheritance
digraph inheritance84bc8a21cd { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Bee" [URL="#mt.concurrency.beehive.Bee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The base class of a bee."]; }- async delegate(name: str, *args, **kwargs)
Delegates a task to one of child bees and awaits the result.
- Parameters:
name (str) – the name of the task/asyn member function of the child bee class responsible for handling the task
args (tuple) – positional arguments to be passed as-is to the task/asyn member function
kwargs (dict) – keyword arguments to be passed as-is to the task/asyn member function
- Returns:
task_result (dict) – returning message in the form {‘status’: str, …} where status is one of ‘cancelled’, ‘raised’ and ‘succeeded’. The remaining keys are described in the class docstring.
child_id (int) – the id of the child that has executed the task
- async run()
Implements the life-cycle of the bee. Invoke this function only once.
- class mt.concurrency.beehive.WorkerBee(my_id: int, p_m2p: Queue, p_p2m: Queue, max_concurrency: int = 1024, context_vars: dict = {}, logger: IndentedLoggerAdapter | None = None)
A worker bee in the BeeHive concurrency model.
See parent
Bee
for more details.- Parameters:
my_id (int) – the id that the queen bee has assigned to the worker bee
p_m2p (multiprocessing.Queue) – the me-to-parent connection for private communication
p_p2m (multiprocessing.Queue) – the parent-to-me connection for private communication
max_concurrency (int) – maximum number of concurrent tasks that the bee handles at a time
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking
mt.base.s3.create_s3_client()
.logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes
Inheritance
digraph inheritance6c73f247f5 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Bee" [URL="#mt.concurrency.beehive.Bee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The base class of a bee."]; "WorkerBee" [URL="#mt.concurrency.beehive.WorkerBee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A worker bee in the BeeHive concurrency model."]; "Bee" -> "WorkerBee" [arrowsize=0.5,style="setlinewidth(0.5)"]; }- async busy_status(context_vars: dict = {})
A task to check how busy the worker bee is.
- Parameters:
context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking
mt.base.s3.create_s3_client()
.- Returns:
a scalar between 0 and 1 where 0 means completely free and 1 means completely busy
- Return type:
float
- class mt.concurrency.beehive.QueenBee(my_id: int, p_m2p: Queue, p_p2m: Queue, workerbee_class, worker_init_args: tuple = (), worker_init_kwargs: dict = {}, s3_profile: str | None = None, max_concurrency: int | None = None, workerbee_max_concurrency: int | None = 1024, context_vars: dict = {}, logger: IndentedLoggerAdapter | None = None)
The queen bee in the BeeHive concurrency model.
The queen bee and her worker bees work in asynchronous mode. Each worker bee operates within a context returned from
mt.base.s3.create_context_vars()
, with a given profile. It is asked to upper-limit number of concurrent asyncio tasks to a given threshold.The queen bee herself is a worker bee. However, initialising a queen bee, the user must provide a subclass of
WorkerBee
to represent the worker bee class, from which the queen can spawn worker bees. Like other bees, it is expected that the user write member asynchornous functions to handle tasks.- Parameters:
my_id (int) – the id that the creator has assigned to the queen bee
p_m2p (queue.Queue) – the me-to-parent connection for private communication between the user (parent) and the queen
p_p2m (queue.Queue) – the parent-to-me connection for private communication with the worker bee
workerbee_class (class) – subclass of
WorkerBee
whose constructor accepts all positional and keyword arguments of the constructor of the super classworker_init_args (tuple) – additional positional arguments to be passed as-is to each new worker bee’s constructor
worker_init_kwargs (dict) – additional keyword arguments to be passed as-is to each new worker bee’s constructor
s3_profile (str, optional) – the S3 profile from which the context vars are created. See
mt.base.s3.create_context_vars()
.max_concurrency (int, optional) – the maximum number of concurrent tasks at any time for the bee, good for managing memory allocations. Should be default to None to allow the queen bee to deal with all requests.
workerbee_max_concurrency (int) – the maximum number of concurrent tasks at any time for a worker bee, good for managing memory allocations. Non-integer values are not accepted.
context_vars (dict) – a dictionary of context variables within which the queen be functions runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. In addition, variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking
mt.base.s3.create_s3_client()
.logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes
Inheritance
digraph inheritancee65fc3dc33 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "Bee" [URL="#mt.concurrency.beehive.Bee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The base class of a bee."]; "QueenBee" [URL="#mt.concurrency.beehive.QueenBee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="The queen bee in the BeeHive concurrency model."]; "WorkerBee" -> "QueenBee" [arrowsize=0.5,style="setlinewidth(0.5)"]; "WorkerBee" [URL="#mt.concurrency.beehive.WorkerBee",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A worker bee in the BeeHive concurrency model."]; "Bee" -> "WorkerBee" [arrowsize=0.5,style="setlinewidth(0.5)"]; }