我有以下代码利用多处理迭代大型列表并找到匹配.一旦在任何一个进程中找到匹配,我如何才能停止所有进程?我见过一些例子,但我似乎都不适合我在这里做的事情.#!/usr/bin/env python3.5import sys, itertools, multip...

我有以下代码利用多处理迭代大型列表并找到匹配.一旦在任何一个进程中找到匹配,我如何才能停止所有进程?我见过一些例子,但我似乎都不适合我在这里做的事情.
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
def do_job(first_bits):
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
# CHECK FOR MATCH HERE
print(''.join(x))
# EXIT ALL PROCESSES IF MATCH FOUND
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
results = []
for i in range(num_parts):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
pool.apply_async(do_job, (first_bit,))
pool.close()
pool.join()
谢谢你的时间.
更新1:
我已经实现了@ShadowRanger在伟大方法中建议的更改,它几乎按照我想要的方式工作.所以我添加了一些日志记录来指示进度并在其中放置一个“测试”键以匹配.
我希望能够独立于num_parts增加/减少iNumberOfProcessors.在这个阶段,当我将它们都放在4时,一切都按预期工作,4个进程旋转(一个额外的控制台).当我更改iNumberOfProcessors = 6时,6个进程会启动,但只有它们有任何CPU使用率.所以看来2是空闲的.在上面的解决方案中,我能够在不增加num_parts的情况下设置更高的内核数量,并且可以使用所有进程.
我不确定如何重构这种新方法来给我相同的功能.你能看看并给我一些方向,重构需要能够彼此独立地设置iNumberOfProcessors和num_parts并仍然使用所有进程吗?
这是更新的代码:
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6
def do_job(first_bits):
iAttemptNumber = 0
iLastProgressUpdate = 0
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
sKey = ''.join(x)
iAttemptNumber = iAttemptNumber + 1
if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
print("Attempt#:", iAttemptNumber, "Key:", sKey)
if sKey == 'test':
print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
return True
def get_part(i):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
return first_bit
if __name__ == '__main__':
# with statement with Py3 multiprocessing.Pool terminates when block exits
with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:
# Don't need special case for final block; slices can
for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
if gotmatch:
break
else:
print("No matches found")
更新2:
好的,这是我试图尝试@noxdafox的建议.我根据他提出的建议链接汇总了以下内容.不幸的是,当我运行它时,我得到错误:
…行322,在apply_async中
引发ValueError(“池未运行”)
ValueError:池未运行
任何人都可以给我一些指导如何使这个工作.
基本上问题是我的第一次尝试进行了多处理,但是一旦找到匹配就不支持取消所有进程.
我的第二次尝试(基于@ShadowRanger建议)解决了这个问题,但打破了能够独立扩展进程数和num_parts大小的功能,这是我的第一次尝试可以做的事情.
我的第三次尝试(基于@noxdafox建议)抛出了上面列出的错误.
如果有人可以就如何维护我的第一次尝试的功能(能够独立地扩展进程数和num_parts大小)给我一些指导,并添加一旦找到匹配就取消所有进程的功能,将非常感谢.
感谢您的时间.
以下是基于@noxdafox建议的第三次尝试的代码:
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4
def find_match(first_bits):
iAttemptNumber = 0
iLastProgressUpdate = 0
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
sKey = ''.join(x)
iAttemptNumber = iAttemptNumber + 1
if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
print("Attempt#:", iAttemptNumber, "Key:", sKey)
if sKey == 'test':
print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
return True
def get_part(i):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
return first_bit
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
class Worker():
def __init__(self, workers):
self.workers = workers
def callback(self, result):
if result:
self.pool.terminate()
def do_job(self):
print(self.workers)
pool = multiprocessing.Pool(processes=self.workers)
for part in grouper(alphabet, part_size):
pool.apply_async(do_job, (part,), callback=self.callback)
pool.close()
pool.join()
print("All Jobs Queued")
if __name__ == '__main__':
w = Worker(4)
w.do_job()
解决方法:
您可以查看this question以查看解决问题的实施示例.
这也适用于concurrent.futures池.
只需用apply_async替换map方法,然后从调用者遍历列表.
像这样的东西.
for part in grouper(alphabet, part_size):
pool.apply_async(do_job, part, callback=self.callback)
grouper recipe
本文标题为:如果任何一个进程在python中找到匹配项,如何让所有pool.apply_async进程停止


基础教程推荐
- Linux安装python3.6 2023-09-03
- Windows下Python环境搭建 2023-09-03
- Centos7下安装python环境 2023-09-04
- Pytorch关于Dataset 的数据处理 2023-08-04
- Python retrying 重试机制详解 2023-08-11
- python学习笔记 day37 进程池 2023-09-03
- python multiprocessing-在正在运行的进程上进行类似选择,以查看已完成的进程 2023-11-14
- 适用于Linux的Python可控制的命令行音频播放器 2023-11-14
- python 实现syslog 服务器的详细过程 2022-08-30
- python-从子进程实时打印标准输出 2023-11-13