Source code for komanawa.simple_farm_model.run_multiprocess

"""
created matt_dumont 
on: 8/24/24
"""
"""
created matt_dumont 
on: 25/11/22
"""
import warnings

import psutil
import multiprocessing
import logging
import os
import sys
from functools import partial
from concurrent.futures import ProcessPoolExecutor


def _start_process():
    """
    function to run at the start of each multiprocess sets the priority lower

    :return:
    """
    logger = multiprocessing.get_logger()
    if logger.level <= logging.INFO:
        print('Starting', multiprocessing.current_process().name)
    p = psutil.Process(os.getpid())
    # set to lowest priority, this is windows only, on Unix use ps.nice(19)
    if sys.platform == "linux":
        p.nice(19)
        # linux
    elif sys.platform == "darwin":
        # OS X
        p.nice(19)
        raise NotImplementedError
    elif sys.platform == "win32":
        # Windows...
        p.nice(psutil.BELOW_NORMAL_PRIORITY_CLASS)
    else:
        raise ValueError(f'unexpected platform: {sys.platform}')


[docs] def run_multiprocess(func, runs, logical=True, num_cores=None, logging_level=logging.INFO, constant_kwargs=None, subprocess_cores=1): """ count the number of processors and then instiute the runs of a function to :param func: function with one argument kwargs. :param runs: a list of runs to pass to the function the function is called via func(kwargs) :param num_cores: int or None, if None then use all cores (+-logical) if int, set pool size to number of cores :param logical: bool if True then add the logical processors to the count :param logging_level: logging level to use one of: logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL more info https://docs.python.org/3/howto/logging.html default is logging.INFO :param constant_kwargs: dict of kwargs to pass to func each time it is called, these kwargs are constant across all runs. :param subprocess_cores: int a number of cores needed for a subprocess (e.g. if the subprocess needs 5 cores to run then only create ncores//5 processes :return: """ assert isinstance(num_cores, int) or num_cores is None multiprocessing.log_to_stderr(logging_level) if num_cores is None: pool_size = psutil.cpu_count(logical=logical) else: pool_size = num_cores pool_size = int(pool_size // subprocess_cores) if subprocess_cores > 1: warnings.warn('allowing pool processes to spawn other children... use with care') with ProcessPoolExecutor(pool_size) as pool: if constant_kwargs is not None: func = partial(func, **constant_kwargs) results = pool.map(func, runs) pool_outputs = results else: with multiprocessing.Pool(processes=pool_size, initializer=_start_process, ) as pool: if constant_kwargs is not None: func = partial(func, **constant_kwargs) results = pool.map_async(func, runs) pool_outputs = results.get() return pool_outputs