processmanager (public API)¶
The processmanager package provides a way to start a pool of worker processes and obtain results of work from them.
It allows offloading CPU-intensive work to other processor cores to improve overall performance.
This document is intended for those using the package in their own projects.
Note
Unless otherwise mentioned, items in this document should only be called from the main process.
Main Functions¶
Starting and Stopping Worker Processes¶
Note
The maximum number of worker processes is the available CPU count minus 1 (automatically capped)
-
processmanager.
add_init_func
(func, *args, **kwargs)¶ Add a function to run in each worker process as it starts.
This can be used to perform actions such as loading configuration from files.
- Parameters
func – function to run in new worker process when it starts
args – args to pass to the function
kwargs – kwargs to pass to the function
Note
This should be called before
processmanager.start_workers()
if used.See also
processmanager.globalvars.cross_process_lock
for restricting actions to one process at a time.
-
processmanager.
prepare_globals
(process_count=0)¶ Initialize globals without starting any processes.
It is not required to call this, as
processmanager.start_workers()
will call it for you.- Parameters
process_count (int) – Number of worker processes to request.
-
processmanager.
start_workers
(process_count=0)¶ Start the worker processes.
This should be called once per program run.
The workers will run until
processmanager.stop()
is called or the program exits.- Parameters
process_count (int) – Number of worker processes to request.
Requesting Work From Worker Processes¶
-
processmanager.
submit
(func, args=(), kwargs=None, handler_class=<class 'processmanager.workrequestclasses.ResultHandler'>, is_generator=False)¶ Submit a request to run a function in a worker process.
- Parameters
func (callable) – function to run
args (tuple) – (optional) args to pass to func. Single argument needs a trailing comma. e.g. (5,)
kwargs (dict) – (optional) kwargs to pass to func
handler_class – type of ResultHandler to use (may be a subclass of
ResultHandler
) (not an instance)is_generator (bool) – When True indicates that func is a generator and will yield results
- Returns
a Future object that will get the result of the work
- Return type
- Raises
ChildProcessError
if there is an issue with workers anddisable_fail_open()
was used
Note
The Future may get
processmanager.CancelledError
ifprocessmanager.globalvars.current_state
has no_go == True when it is handledExample:
# Say we had a function: def some_heavy_func(first, second, a=0, b=0): # some CPU-intensive operation done here # The work request submission could look something like this: new_future = processmanager.submit(some_heavy_func, args=(1, 2), kwargs={"a": 0, "b": 1}) # Then later access the result: new_result = new_future.result()
Controlling Offload of Work¶
-
processmanager.
disable_fail_open
()¶ Disable handling work in the main process if child workers fail to initialize.
By default, if there is an issue with worker processes work will be handled in the main process. This changes that behavior to raise an exception instead.
-
processmanager.
disable_offload
(sticky=True)¶ Disable submissions to worker processes.
If
disable_fail_open()
was not called, new submissions will be handled in the main process.- Parameters
sticky (bool) – (optional) If True, then offload will not be automatically enabled by this package.
-
processmanager.
enable_offload
(force=True)¶ Enable submissions to worker processes.
Called automatically with force=False when all worker processes have been confirmed to be operational.
- Parameters
force (bool) – (optional) When True, will override a previous
disable_offload()
call, even if sticky was True
Custom Handling of Results in the Main Process¶
It is possible to customize the encapsulated handling of results that occurs in the main process.
This can change the data returned from functions submitted to processmanager.submit()
in useful ways.
For example, raw results can be sent to the main process and you can modify them after arrival. The modified results would then be returned via the Future object.
-
class
processmanager.
ResultHandler
(work_request, result_pipe=None)¶ Class that handles results from work requests.
If the work request is a generator, the results are returned in a list by default.
This behavior can be changed in subclasses.
-
cancel
()¶ Cancel work from the main process side
Can be called from within a
handle_result()
override if needed.
-
finalize_result
()¶ Perform any modifications necessary to the current result just before it gets returned.
Note
This can be over-ridden in subclasses to handle data differently.
-
handle_result
(result)¶ Handle a result of the function from the work request
When the work request is a generator, this will accumulate the results in self.result In any case, it can be used to transform self.result before it gets returned.
Note
This can be over-ridden in subclasses to handle data differently.
-
Helper Functions¶
-
processmanager.
current_process_count
()¶ Get the number of worker processes that was chosen during initialization
-
processmanager.
processes_started
()¶ Check if the worker processes have been started successfully
-
processmanager.
wait_for_complete_load
(timeout=None)¶ Wait until submission queues are ready to handle work requests.
This is especially useful in scripts, as you can delay submissions until everything is ready to handle them.
- Parameters
timeout (float) – (optional) when provided will wait up to this many seconds
- Returns
bool True when ready, False if timeout was reached and not yet ready
-
processmanager.
wait_for_process_start
(timeout=None)¶ Wait until all worker processes have started successfully.
Note that there may return slightly before processes are ready for submissions.
If you need them ready for submissions, use
wait_for_complete_load()
instead.- Parameters
timeout (float) – (optional) when provided will wait up to twice this many seconds
- Returns
bool True when ready, False if timeout was reached and not yet ready
Notifications¶
Notifications are messages sent from the main process to the worker processes.
It is possible to add custom notifications and associate them with custom actions. These notifications are handled in dedicated threads in all processes, so they are acted on quickly.
-
processmanager.
add_custom_notification
(notification_class, func)¶ Register a custom notification enum.IntEnum type with associated handler func
The handler should accept a single argument.
When you call
enqueue_notification()
with a member of the enum, the worker processes will pass it to the handler.- Parameters
notification_class –
enum.IntEnum
subclassfunc – Callable that accepts a single argument of an enum member
Note
This must be called before
start_workers()
is called. After that, new registrations are not allowed.Example:
# Create your notification type and values class UpdateNotification(enum.IntEnum): CONFIG = 1 # means configuration file was updated JSON = 2 # means json file for data was updated # Define a handler for it that will accept a single member of the enum def handle_update_notification(message): if message is UpdateNotification.CONFIG: reload_config() elif message is UpdateNotification.JSON: reload_json() else: logger.error(f"Unknown notification message: {message}") # Register the handler and notification type processmanager.add_custom_notification(UpdateNotification, handle_update_notification) # At the relevant time in your program, you can send notifications from the main process: processmanager.enqueue_notification(UpdateNotification.CONFIG) # All worker processes will get the notification and pass the message to your handler.
-
processmanager.
enqueue_notification
(notification, timeout=None)¶ Add a notification to the queue to be sent to worker processes.
The NotificationSender thread is responsible for dispatching from the queue to worker processes.
The NotificationMonitor in each process will call the registered handler with the notification as the argument.
- Parameters
notification (enum.IntEnum) – A member of any registered custom notification (an enum member)
timeout (float) – Number of seconds to wait in case the notification queue is full
- Returns
True if added to queue, False if the worker processes were not ready yet
- Return type
-
processmanager.
update_log_level
(log_level)¶ Update the log level for root logger in worker processes.
- Parameters
log_level (int) – one of the levels from
logging
module (logging.DEBUG
,logging.INFO
, …)- Raises
ValueError
when input is not equivalent to one of the built-in levels from thelogging
module
-
processmanager.
update_state_value
(value)¶ Update the global go/no-go for handling work requests.
This is commonly used to keep in sync with threadmanager’s go/no-go, but can be used without threadmanager as well.
- Parameters
value (bool) – When False new/pending work requests will be dropped and work will be aborted/cancelled
See also
processmanager.globalvars.current_state
for more information and examples