mt.pandas.dataframe_processing

Batch processing a dataframe.

Functions

async mt.pandas.dataframe_processing.default_preprocess(s: Series, iter_id: int, rng_seed: int, *args, context_vars: dict = {}, **kwargs)

Preprocesses a row in the dataframe.

The returning value should be the result of preprocessing the row or, in case some caching is used, even the result of postprocessing the row, bypassing the batch-processing step.

The user should override the function to customise the behaviour. The default is to raise a NotImplementedError exception.

Parameters:
  • s (pandas.Series) – the row that has been selected for preprocessing

  • iter_id (int) – iteration id

  • rng_seed (int) – a number that can be used to seed an RNG for preprocessing s

  • args (tuple) – additional positional arguments

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. Variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client(). In asynchronous mode, variable ‘http_session’ must exist and hold an enter-result of an async with statement invoking mt.base.http.create_http_session().

  • kwargs (dict) – additional keyword arguments

Returns:

The output can be a dict signalling the result of the preprocessing step or a pandas.Series signalling the result of even postprocessing the row, bypassing the batch-processing step. If it is a dict, each key is a tensor name and its value is a numpy array representing the tensor value. If it is a pandas.Series, it is the output series corresponding to the input series s.

Return type:

dict or pandas.Series

async mt.pandas.dataframe_processing.default_batchprocess(batch_tensor_dict: dict, *args, context_vars: dict = {}, **kwargs)

Processes batch tensors.

The returning value should be the result of batchprocessing a collection of batch tensors or, in case some caching is used, even the result of postprocessing the batch of rows, bypassing the batch-processing step.

The user should override the function to customise the behaviour. The default is to raise a NotImplementedError exception.

Parameters:
  • batch_tensor_dict (dict) – a dictionary of tensors extracted from s. Each key is a tensor name. Each value is a numpy array with the first dimension being the item id, or a list of objects, representing the content of the tensor. In any case, its size equals the size of iter_id_list.

  • args (tuple) – additional positional arguments

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. Variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client(). In asynchronous mode, variable ‘http_session’ must exist and hold an enter-result of an async with statement invoking mt.base.http.create_http_session().

  • kwargs (dict) – additional keyword arguments

Returns:

If a dictionary is returned, it is a dictionary of output batch tensors. Each key is a tensor name. Each value is a numpy array with the first dimension being the item id, or a list of objects, representing the content of the tensor. In any case, it has the same number of items as that of batch_tensor_dict. If a list is returned, it is a list of pandas.Series instances representing the output pandas.Series of each input item/row, after even the postprocessing step.

Return type:

dict or list

async mt.pandas.dataframe_processing.default_postprocess(tensor_dict: dict, *args, context_vars: dict = {}, **kwargs)

Post-processes output tensors obtained from a row.

The user should override the function to customise the behaviour. The default is to turn the input tensor dict into a pandas.Series instance.

Parameters:
  • tensor_dict (dict) – a dictionary of output tensors from a batch procession. Each key is a tensor name. Each value is a numpy array representing the content of the tensor. The first dimension of a tensor no longer represents the items of the batch.

  • args (tuple) – additional positional arguments

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. Variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client(). In asynchronous mode, variable ‘http_session’ must exist and hold an enter-result of an async with statement invoking mt.base.http.create_http_session().

  • kwargs (dict) – additional keyword arguments

Returns:

A series of a fixed number of fields representing the output fields of the item.

Return type:

pandas.Series

mt.pandas.dataframe_processing.sample_rows(df: DataFrame, rng_seed: int = 0, num_iters: int | None = None, iter_policy: str = 'sequential', resampling_col: str | None = None)

Sample the rows of a dataframe, returning (row_id, rng_seed) pairs.

Parameters:
  • df (pandas.DataFrame) – an input unindexed dataframe

  • rng_seed (int, optional) – seed for making RNGs

  • num_iters (int, optional) – number of iterations or equivalently number of rows selected during the call. If not provided, it is set to the number of rows of df.

  • iter_policy ({'sequential', 'resampling'}) – policy for iterating the rows. If ‘sequential’ is given, the items are iterated sequentially from the first row to the last row and then back to the first row if required. If ‘resampling’ is given, then the dataframe column provided by the resampling_col argument provides the resampling weights. Rows are resampled randomly using these weights as the resampling distribution.

  • resampling_col (str) – name of the column/field containing the resampling weights. Only valid if iter_policy is ‘resampling’.

Returns:

list of (row_id, rng_seed) pairs sampled from the dataframe

Return type:

list

async mt.pandas.dataframe_processing.process_dataframe(df: DataFrame, preprocess_func, batchprocess_func=None, postprocess_func=None, rng_seed: int = 0, num_iters: int | None = None, preprocess_args: tuple = (), preprocess_kwargs: dict = {}, batchprocess_args: tuple = (), batchprocess_kwargs: dict = {}, postprocess_args: tuple = (), postprocess_kwargs: dict = {}, skip_null: bool = False, iter_policy: str = 'sequential', resampling_col: str | None = None, batch_size: int = 32, s3_profile: str | None = None, max_concurrency: int = 16, context_vars: dict = {}, logger=None)

An asyn function that does batch processing a dataframe.

The functionality provided here addresses the following situation. The user has a dataframe in which each row represents an event or an image. There is a need to take some fields of each row and to convert them into tensors to feed one or more models for training, prediction, validation or evaluation purposes. Upon applying the tensors to a model and getting some results, there is a need to transform output tensors back to some fields. In addition, the model(s) can only operate in batches, rather than on individual items.

To address this situation, the user needs to provide 3 asyn functions: preprocess, batchprocess and postprocess. preprocess applies to each row for converting some fields of the row into a dictionary of tensors. Tensors of the same name are stacked up to form a dictionary of batch tensors, and then are fed to batchprocess for batch processing using the model(s). The output batch tensors are unstacked into individual tensors and then fed to postprocess to convert them back into pandas.Series representing fields for each row. Finally, these new fields are concatenated to form an output dataframe.

The three above functions have a default implementation, namely default_preprocess(), default_batchprocess() and default_postprocess(), respectively. The user must make sure the APIs of their functions match with those of the default ones. Note that it is possible that during preprocessing or batchprocessing a row the function can skip batch-processing and postprocessing altogether and returns an output series corresponding to each input row.

Internally, we use the BeeHive concurrency model to address the problem. The queen bee is responsible for forming batches, batch processing, and forming the output dataframe, the worker bees that she spawns are responsible for doing preprocessing and postprocessing works of each row.

Parameters:
  • df (pandas.DataFrame) – an input unindexed dataframe

  • preprocess_func (function) – the preprocessing function

  • batchprocess_func (function, optional) – the function for batch-processing. If not provided, the preprocess function must returned postprocessed pandas.Series instances.

  • postprocess_func (function, optional) – the postrocessing function. If not provided, the preprocess and batchprocess functions must make sure that every row is processed and a pandas.Series is returned.

  • rng_seed (int, optional) – seed for making RNGs

  • num_iters (int, optional) – number of iterations or equivalently number of rows selected during the call. If not provided, it is set to the number of rows of df.

  • preprocess_args (tuple, optional) – positional arguments to be passed as-is to preprocess()

  • preprocess_kwargs (dict, optional) – keyword arguments to be passed as-is to preprocess()

  • batchprocess_args (tuple, optional) – positional arguments to be passed as-is to batchprocess()

  • batchprocess_kwargs (dict, optional) – keyword arguments to be passed as-is to batchprocess()

  • postprocess_args (tuple, optional) – positional arguments to be passed as-is to postprocess()

  • postprocess_kwargs (dict, optional) – keyword arguments to be passed as-is to postprocess()

  • skip_null (bool) – If True, any None returned value from the provided functions will be considered as a trigger to skip the row. Otherwise, an exception is raised as usual.

  • iter_policy ({'sequential', 'resampling'}) – policy for iterating the rows. If ‘sequential’ is given, the items are iterated sequentially from the first row to the last row and then back to the first row if required. If ‘resampling’ is given, then the dataframe column provided by the resampling_col argument provides the resampling weights. Rows are resampled randomly using these weights as the resampling distribution.

  • resampling_col (str) – name of the column/field containing the resampling weights. Only valid if iter_policy is ‘resampling’.

  • batch_size (int) – maximum batch size for each batch that is formed internally

  • s3_profile (str, optional) – The AWS S3 profile to be used so that we can spawn an S3 client for each newly created subprocess. If not provided, the default profile will be used.

  • max_concurrency (int) – the maximum number of concurrent tasks each worker bee handles at a time

  • context_vars (dict) – a dictionary of context variables within which the function runs. It must include context_vars[‘async’] to tell whether to invoke the function asynchronously or not. Variable ‘s3_client’ must exist and hold an enter-result of an async with statement invoking mt.base.s3.create_s3_client(). In asynchronous mode, variable ‘http_session’ must exist and hold an enter-result of an async with statement invoking mt.base.http.create_http_session().

  • logger (mt.logg.IndentedLoggerAdapter, optional) – logger for debugging purposes

Returns:

df – an output unindexed dataframe

Return type:

pandas.DataFrame

Notes

The function only works in asynchronous mode. That means context_vars[‘async’] is True is required.