Launch concurrent.futures.ProcessPoolExecutor with initialization?(启动 concurrent.futures.ProcessPoolExecutor 并初始化?)
问题描述
我打算使用 concurrent.futures.ProcessPoolExecutor 来并行执行函数.根据 documentation,它的 executor 对象map 中只能接受一个简单的函数.我的实际情况涉及执行待并行化"函数之前的初始化(加载数据).我该如何安排?
I'm planning to use concurrent.futures.ProcessPoolExecutor to parallelize execution of functions. According to the documentation, its executor object can only accept a simple function in map. My actual situation involves initialization (loading of data) prior to execution of the 'to-be-parallelized' function. How do I arrange that?
待并行化"函数在一次迭代中被多次调用.我不希望它每次都重新初始化.
The 'to-be-parallelized' function is called in an iteration for many times. I don't want it to be re-initialized each time.
换句话说,有一个 init 函数可以为这个 tbp 函数产生一些输出.每个孩子都应该有自己的输出副本,因为函数依赖于此.
In other words, there's an init function that produces some output to this tbp function. Each child should have its own copy of that output, because the function depended on that.
推荐答案
听起来您正在寻找与 initializer/initargs 选项等效的选项 multiprocessing.Pool 需要.目前,concurrent.futures.ProcessPoolExecutor 不存在这种行为,尽管有一个 补丁等待添加该行为的审核.
It sounds like you're looking for an equivalent to the initializer/initargs options that multiprocessing.Pool takes. Currently, that behavior doesn't exist for concurrent.futures.ProcessPoolExecutor, though there is a patch waiting for review that adds that behavior.
因此,您可以使用 multiprocessing.Pool(这可能适合您的用例),等待该补丁被合并和发布(您可能要等待一段时间 :)),或者推出自己的解决方案.事实证明,为 map 编写一个采用 initializer 的包装函数并不难,但每个进程只调用一个:
So, you can either use multiprocessing.Pool (which might be fine for your usecase), wait for that patch to get merged and released (you might be waiting a while :)), or roll your own solution. Turns out, it's not too hard to write a wrapper function for map that takes an initializer, but only calls it one per process:
from concurrent.futures import ProcessPoolExecutor
from functools import partial
inited = False
initresult = None
def initwrapper(initfunc, initargs, f, x):
# This will be called in the child. inited
# Will be False the first time its called, but then
# remain True every other time its called in a given
# worker process.
global inited, initresult
if not inited:
inited = True
initresult = initfunc(*initargs)
return f(x)
def do_init(a,b):
print('ran init {} {}'.format(a,b))
return os.getpid() # Just to demonstrate it will be unique per process
def f(x):
print("Hey there {}".format(x))
print('initresult is {}'.format(initresult))
return x+1
def initmap(executor, initializer, initargs, f, it):
return executor.map(partial(initwrapper, initializer, initargs, f), it)
if __name__ == "__main__":
with ProcessPoolExecutor(4) as executor:
out = initmap(executor, do_init, (5,6), f, range(10))
print(list(out))
输出:
ran init 5 6
Hey there 0
initresult is 4568
ran init 5 6
Hey there 1
initresult is 4569
ran init 5 6
Hey there 2
initresult is 4570
Hey there 3
initresult is 4569
Hey there 4
initresult is 4568
ran init 5 6
Hey there 5
initresult is 4571
Hey there 6
initresult is 4570
Hey there 7
initresult is 4569
Hey there 8
initresult is 4568
Hey there 9
initresult is 4570
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
这篇关于启动 concurrent.futures.ProcessPoolExecutor 并初始化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:启动 concurrent.futures.ProcessPoolExecutor 并初始化?
基础教程推荐
- 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
- 在Python中从Azure BLOB存储中读取文件 2022-01-01
- 修改列表中的数据帧不起作用 2022-01-01
- 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
- 包装空间模型 2022-01-01
- 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
- Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
- 求两个直方图的卷积 2022-01-01
- PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
- PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
