Python multiprocessing with returned values to dict or list - lmmx/devnotes GitHub Wiki

Async multiprocessing with dict update upon return

Here's an example of how to return asynchronous results to update a Python dictionary:

Code example

import multiprocessing as mp
from multiprocessing import Process, Pool
from more_itertools import chunked
from functools import partial
from tqdm import tqdm


def batch_multiprocess_with_dict_return(
    function_list, pool_results_dict=None, n_cores=mp.cpu_count(), show_progress=True
):
    """
    Run a list of functions on `n_cores` (default: all CPU cores),
    with the option to show a progress bar using tqdm (default: shown).
    """
    iterator = [*chunked(function_list, n_cores)]
    pool_results_dict = pool_results_dict if pool_results_dict else {}
    pool = Pool(processes=n_cores)
    if show_progress:
        iterator = tqdm(iterator)
    for func_batch in iterator:
        procs = []
        for f in func_batch:
            pool.apply_async(func=f, callback=pool_results_dict.update)
    pool.close()
    pool.join()
    return pool_results_dict


def store_dict_entry(dict_entry, result_dict):
    result_dict.update(dict_entry)
    return result_dict


def make_some_functions(start_dict, n=30):
    funcs = []
    for i in range(1,n):
        f = partial(store_dict_entry, {str(i).zfill(3): i}, result_dict=start_dict)
        funcs.append(f)
    return funcs


def main():
    results_go_here = {"BEGIN": 0}
    funcs = make_some_functions(results_go_here)
    more_results = batch_multiprocess_with_dict_return(funcs, {"INIT": -1})
    return more_results


if __name__ == "__main__":
    results = main()
    from pprint import pprint
    pprint(results)

{'INIT': -1,
 'BEGIN': 0,
 '001': 1,
 '002': 2,
 '003': 3,
 '005': 5,
 '006': 6,
 '004': 4,
 '008': 8,
 '009': 9,
 '007': 7,
 '010': 10,
 '011': 11,
 '013': 13,
 '014': 14,
 '015': 15,
 '012': 12,
 '017': 17,
 '018': 18,
 '016': 16,
 '020': 20,
 '021': 21,
 '022': 22,
 '019': 19,
 '023': 23,
 '024': 24,
 '026': 26,
 '025': 25,
 '028': 28,
 '029': 29,
 '027': 27}
  • Note how the functions constructed in make_some_functions will return a dict: this is then passed to the update method of the start_dict dictionary
    • Note that this is wrapped: if you pass directly to the start_dict.update rather than the partial of the wrapped store_dict_entry function, you get a freeze, as the method gets locked when multiple async processes try to use it

Here's another version, which can either return a dictionary (if none is supplied as pool_results_dict) or else update the pre-existing dictionary (if one is supplied).

Code example

def batch_multiprocess_to_dict(
    function_list, pool_results_dict=None, n_cores=mp.cpu_count(), show_progress=True
):
    """
    Run a list of functions on `n_cores` (default: all CPU cores),
    with the option to show a progress bar using tqdm (default: shown).
    """
    iterator = [*chunked(function_list, n_cores)]
    no_preexisting_dict = pool_results_dict is None
    if no_preexisting_dict:
        pool_results_dict = {}
    pool = Pool(processes=n_cores)
    if show_progress:
        iterator = tqdm(iterator)
    for func_batch in iterator:
        procs = []
        for f in func_batch:
            pool.apply_async(func=f, callback=pool_results_dict.update)
    pool.close()
    pool.join()
    # if pool_results_dict was supplied, it's been updated, otherwise return new dict
    if no_preexisting_dict:
        return pool_results_dict
  • Note: if you're using Python 3.8+ already, you could use the walrus operator to assign no_preexisting_dict more concisely

Another nice feature to include is an argument to switch the behaviour of the function to sequential (rather than asynchronous), in case you need to verify that the concurrency is not the source of an error you're seeing in the output from the functions passing through this mechanism.

Code example

def batch_multiprocess_with_dict_updates(
    function_list, pool_results_dict=None, n_cores=mp.cpu_count(), show_progress=True,
    tqdm_desc=None, sequential=True,
):
    """
    Run a list of functions on `n_cores` (default: all CPU cores),
    with the option to show a progress bar using tqdm (default: shown).
    """
    iterator = [*chunked(function_list, n_cores)]
    no_preexisting_dict = pool_results_dict is None
    if no_preexisting_dict:
        pool_results_dict = {}
    if not sequential:
        pool = Pool(processes=n_cores)
    if show_progress:
        pbar = tqdm(total=len(function_list), desc=tqdm_desc)
    else:
        pbar=None
    update_pool_results = partial(
        update_dict_and_pbar, callback_dict=pool_results_dict, callback_pbar=pbar
    )
    for func_batch in iterator:
        procs = []
        for f in func_batch:
            if sequential:
                update_pool_results(f())
            else:
                pool.apply_async(func=f, callback=update_pool_results)
    if not sequential:
        pool.close()
        pool.join()
    # if pool_results_dict was supplied, it's been updated, otherwise return new dict
    if no_preexisting_dict:
        return pool_results_dict
  • In addition this version has a tqdm_desc setting, to set a message on the progress bar

Async multiprocessing with list append upon return

A simpler example with lists is:

Code example

import multiprocessing as mp
from multiprocessing import Process, Pool
from more_itertools import chunked
from functools import partial
from tqdm import tqdm


def batch_multiprocess_with_return(
    function_list, pool_results=None, n_cores=mp.cpu_count(), show_progress=True
):
    """
    Run a list of functions on `n_cores` (default: all CPU cores),
    with the option to show a progress bar using tqdm (default: shown).
    """
    iterator = [*chunked(function_list, n_cores)]
    pool_results = pool_results if pool_results else []
    pool = Pool(processes=n_cores)
    if show_progress:
        iterator = tqdm(iterator)
    for func_batch in iterator:
        procs = []
        for f in func_batch:
            pool.apply_async(func=f, callback=pool_results.append)
    pool.close()
    pool.join()
    return pool_results


def store_string(string, result_list):
    result_list.append(string) # change to extend for a flat result list
    return result_list


def make_some_functions(n=30):
    funcs = []
    for i in range(n):
        f = partial(
                store_string, f"Hi {str(i).zfill(3)}",
                result_list=[]
        )
        funcs.append(f)
    return funcs


def main():
    funcs = make_some_functions()
    more_results = batch_multiprocess_with_return(funcs)
    return more_results


if __name__ == "__main__":
    results = main()
    from pprint import pprint
    pprint(results)

[['Hi 000'],
 ['Hi 001'],
 ['Hi 002'],
 ['Hi 004'],
 ['Hi 003'],
 ['Hi 006'],
 ['Hi 005'],
 ['Hi 008'],
 ['Hi 007'],
 ['Hi 010'],
 ['Hi 011'],
 ['Hi 009'],
 ['Hi 012'],
 ['Hi 013'],
 ['Hi 014'],
 ['Hi 015'],
 ['Hi 017'],
 ['Hi 016'],
 ['Hi 018'],
 ['Hi 019'],
 ['Hi 020'],
 ['Hi 021'],
 ['Hi 022'],
 ['Hi 024'],
 ['Hi 025'],
 ['Hi 023'],
 ['Hi 026'],
 ['Hi 027'],
 ['Hi 028'],
 ['Hi 029']]
⚠️ **GitHub.com Fallback** ⚠️