processmanager (public API)

The processmanager package provides a way to start a pool of worker processes and obtain results of work from them.

It allows offloading CPU-intensive work to other processor cores to improve overall performance.

This document is intended for those using the package in their own projects.

Note

Unless otherwise mentioned, items in this document should only be called from the main process.

Main Functions

Starting and Stopping Worker Processes

Note

The maximum number of worker processes is the available CPU count minus 1 (automatically capped)

processmanager.add_init_func(func, *args, **kwargs)

Add a function to run in each worker process as it starts.

This can be used to perform actions such as loading configuration from files.

Parameters
  • func – function to run in new worker process when it starts

  • args – args to pass to the function

  • kwargs – kwargs to pass to the function

Note

This should be called before processmanager.start_workers() if used.

See also

processmanager.globalvars.cross_process_lock for restricting actions to one process at a time.


processmanager.prepare_globals(process_count=0)

Initialize globals without starting any processes.

It is not required to call this, as processmanager.start_workers() will call it for you.

Parameters

process_count (int) – Number of worker processes to request.


processmanager.start_workers(process_count=0)

Start the worker processes.

This should be called once per program run.

The workers will run until processmanager.stop() is called or the program exits.

Parameters

process_count (int) – Number of worker processes to request.


processmanager.stop()

Stop worker processes.

This will send shutdown notifications to all worker processes, and they will exit their main loops.

It is recommended to call this before exiting your program. (e.g. by adding to an atexit handler)

Requesting Work From Worker Processes

processmanager.submit(func, args=(), kwargs=None, handler_class=<class 'processmanager.workrequestclasses.ResultHandler'>, is_generator=False)

Submit a request to run a function in a worker process.

Parameters
  • func (callable) – function to run

  • args (tuple) – (optional) args to pass to func. Single argument needs a trailing comma. e.g. (5,)

  • kwargs (dict) – (optional) kwargs to pass to func

  • handler_class – type of ResultHandler to use (may be a subclass of ResultHandler) (not an instance)

  • is_generator (bool) – When True indicates that func is a generator and will yield results

Returns

a Future object that will get the result of the work

Return type

concurrent.futures.Future

Raises

ChildProcessError if there is an issue with workers and disable_fail_open() was used

Note

The Future may get processmanager.CancelledError if processmanager.globalvars.current_state has no_go == True when it is handled

Example:

# Say we had a function:
def some_heavy_func(first, second, a=0, b=0):
    # some CPU-intensive operation done here

# The work request submission could look something like this:
new_future = processmanager.submit(some_heavy_func, args=(1, 2), kwargs={"a": 0, "b": 1})

# Then later access the result:
new_result = new_future.result()

Controlling Offload of Work

processmanager.disable_fail_open()

Disable handling work in the main process if child workers fail to initialize.

By default, if there is an issue with worker processes work will be handled in the main process. This changes that behavior to raise an exception instead.


processmanager.disable_offload(sticky=True)

Disable submissions to worker processes.

If disable_fail_open() was not called, new submissions will be handled in the main process.

Parameters

sticky (bool) – (optional) If True, then offload will not be automatically enabled by this package.


processmanager.enable_offload(force=True)

Enable submissions to worker processes.

Called automatically with force=False when all worker processes have been confirmed to be operational.

Parameters

force (bool) – (optional) When True, will override a previous disable_offload() call, even if sticky was True

Custom Handling of Results in the Main Process

It is possible to customize the encapsulated handling of results that occurs in the main process. This can change the data returned from functions submitted to processmanager.submit() in useful ways.

For example, raw results can be sent to the main process and you can modify them after arrival. The modified results would then be returned via the Future object.

class processmanager.ResultHandler(work_request, result_pipe=None)

Class that handles results from work requests.

If the work request is a generator, the results are returned in a list by default.

This behavior can be changed in subclasses.

cancel()

Cancel work from the main process side

Can be called from within a handle_result() override if needed.

finalize_result()

Perform any modifications necessary to the current result just before it gets returned.

Note

This can be over-ridden in subclasses to handle data differently.

handle_result(result)

Handle a result of the function from the work request

When the work request is a generator, this will accumulate the results in self.result In any case, it can be used to transform self.result before it gets returned.

Note

This can be over-ridden in subclasses to handle data differently.

Attributes

globalvars.cross_process_lock

globalvars.current_state

Exceptions

exception processmanager.WorkError

An error occurred while handling a WorkRequest


exception processmanager.CancelledError

Work request was cancelled

This inherits from WorkError

Helper Functions

processmanager.current_process_count()

Get the number of worker processes that was chosen during initialization


processmanager.processes_started()

Check if the worker processes have been started successfully


processmanager.wait_for_complete_load(timeout=None)

Wait until submission queues are ready to handle work requests.

This is especially useful in scripts, as you can delay submissions until everything is ready to handle them.

Parameters

timeout (float) – (optional) when provided will wait up to this many seconds

Returns

bool True when ready, False if timeout was reached and not yet ready


processmanager.wait_for_process_start(timeout=None)

Wait until all worker processes have started successfully.

Note that there may return slightly before processes are ready for submissions.

If you need them ready for submissions, use wait_for_complete_load() instead.

Parameters

timeout (float) – (optional) when provided will wait up to twice this many seconds

Returns

bool True when ready, False if timeout was reached and not yet ready

Notifications

Notifications are messages sent from the main process to the worker processes.

It is possible to add custom notifications and associate them with custom actions. These notifications are handled in dedicated threads in all processes, so they are acted on quickly.

processmanager.add_custom_notification(notification_class, func)

Register a custom notification enum.IntEnum type with associated handler func

The handler should accept a single argument.

When you call enqueue_notification() with a member of the enum, the worker processes will pass it to the handler.

Parameters
  • notification_classenum.IntEnum subclass

  • func – Callable that accepts a single argument of an enum member

Note

This must be called before start_workers() is called. After that, new registrations are not allowed.

Example:

# Create your notification type and values
class UpdateNotification(enum.IntEnum):
    CONFIG = 1  # means configuration file was updated
    JSON = 2  # means json file for data was updated

# Define a handler for it that will accept a single member of the enum
def handle_update_notification(message):
    if message is UpdateNotification.CONFIG:
        reload_config()
    elif message is UpdateNotification.JSON:
        reload_json()
    else:
        logger.error(f"Unknown notification message: {message}")

# Register the handler and notification type
processmanager.add_custom_notification(UpdateNotification, handle_update_notification)

# At the relevant time in your program, you can send notifications from the main process:
processmanager.enqueue_notification(UpdateNotification.CONFIG)

# All worker processes will get the notification and pass the message to your handler.

processmanager.enqueue_notification(notification, timeout=None)

Add a notification to the queue to be sent to worker processes.

The NotificationSender thread is responsible for dispatching from the queue to worker processes.

The NotificationMonitor in each process will call the registered handler with the notification as the argument.

Parameters
  • notification (enum.IntEnum) – A member of any registered custom notification (an enum member)

  • timeout (float) – Number of seconds to wait in case the notification queue is full

Returns

True if added to queue, False if the worker processes were not ready yet

Return type

bool


processmanager.update_log_level(log_level)

Update the log level for root logger in worker processes.

Parameters

log_level (int) – one of the levels from logging module (logging.DEBUG, logging.INFO, …)

Raises

ValueError when input is not equivalent to one of the built-in levels from the logging module


processmanager.update_state_value(value)

Update the global go/no-go for handling work requests.

This is commonly used to keep in sync with threadmanager’s go/no-go, but can be used without threadmanager as well.

Parameters

value (bool) – When False new/pending work requests will be dropped and work will be aborted/cancelled

See also

processmanager.globalvars.current_state for more information and examples