Using a a manager for updating a Queue in a Python multiprocess(在 Python 多进程中使用管理器更新队列)
问题描述
我正在设计一个 Python 多处理代码以在一个队列中工作,该队列可能会在处理过程中更新.下面的代码有时会起作用,或者卡住,或者出现 Empty 错误.
I am designing a Python multiprocessing code to work in a queue that might be updated along the processing. The following code sometimes works, or get stuck, or rises an Empty error.
import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() is True:
            break    
        else:
            picked = working_queue.get_nowait()
            if picked % 2 == 0: 
                    output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return
if __name__ == '__main__':
    manager = mp.Manager()
    static_input = xrange(100)    
    working_q = manager.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() is True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank
我应该使用列表作为全局变量并锁定它,而不是 manager.Queue()?
Should I use a list as a global variable, and lock it, instead of a manager.Queue()?
推荐答案
我刚刚添加了一个 try: 和 except Exception: 来处理 Empty 错误.现在的结果似乎是一致的.如果您发现我在此解决方案中忽略的问题,请告诉我.
I just added a try: and except Exception: to handle the Empty error. The results seem to be consistent now. Please let me know If you find problems I overlooked in this solution.
import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        try:
            if working_queue.empty() is True:
                break  
            else:
                picked = working_queue.get_nowait()
                if picked % 2 == 0: 
                        output_queue.put(picked)
                else:
                    working_queue.put(picked+1)
        except Exception:
            continue
    return
if __name__ == '__main__':
    #Manager seem to be unnecessary.
    #manager = mp.Manager()
    #working_q = manager.Queue()
    working_q = mp.Queue()
    output_q = mp.Queue()
    static_input = xrange(100)     
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
       if output_q.empty() is True:
           break
       results_bank.append(output_q.get_nowait())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank
                        这篇关于在 Python 多进程中使用管理器更新队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:在 Python 多进程中使用管理器更新队列
				
        
 
            
        基础教程推荐
- 求两个直方图的卷积 2022-01-01
 - 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
 - 包装空间模型 2022-01-01
 - PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
 - 修改列表中的数据帧不起作用 2022-01-01
 - 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
 - PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
 - 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
 - 在Python中从Azure BLOB存储中读取文件 2022-01-01
 - Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
 
    	
    	
    	
    	
    	
    	
    	
    	
				
				
				
				