Strange Queue.PriorityQueue behaviour with multiprocessing in Python 2.7.6(Python 2.7.6 中多处理的奇怪 Queue.PriorityQueue 行为)
问题描述
正如您从标题中知道的那样,我正在尝试将 PriorityQueue 与多处理一起使用.更准确地说,我想制作共享 PriorityQueue,写了一些代码,但它没有按我预期的那样运行.
As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.
看代码:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
lock = Lock()
with lock:
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
得到以下输出:
worker 100
main 0
发生了什么以及如何以正确的方式做我想做的事?谢谢.
What's happening and how to do what I want the right way? Thank you.
推荐答案
问题不在于它在这种情况下不可picklable - 如果您使用的是类 Unix 平台,则可以将队列传递给子进程而无需酸洗.(不过,在 Windows 上,我认为您会在这里遇到酸洗错误).根本问题是您没有使用进程安全队列.可以在进程之间使用的唯一队列是 Queue 对象 位于 multiprocessing 模块中.不幸的是,没有可用的 PriorityQueue 实现.但是,您可以通过使用 multiprocessing 注册 类,像这样:PriorityQueue 轻松创建一个.Manager
The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue objects that live inside the multiprocessing module. Unfortunately, there is no PriorityQueue implementation available. However, you can easily create one by registering a PriorityQueue with a multiprocessing.Manager class, like this:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue
class MyManager(SyncManager):
pass
MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue
def Manager():
m = MyManager()
m.start()
return m
def worker(queue):
print(queue)
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
m = Manager()
pr_queue = m.PriorityQueue() # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
输出:
worker 100
main 100
请注意,如果它是标准的 multiprocessing.Queue 子类,它的性能可能不会那么好.基于 Manager 的 PriorityQueue 是通过创建一个实际上包含常规 PriorityQueue 的 Manager 服务器进程来实现的,然后为您的主进程和工作进程提供 Proxy 对象,这些对象使用IPC 读取/写入服务器进程中的队列.常规的 multiprocessing.Queue 只是向 Pipe 写入/读取数据.如果这是一个问题,您可以尝试通过从 multiprocessing.Queue 继承或委托来实现自己的 multiprocessing.PriorityQueue.不过,这可能不值得.
Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue subclass. The Manager-based PriorityQueue is implemented by creating a Manager server process which actually contains a regular PriorityQueue, and then providing your main and worker processes with Proxy objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queues just write/read data to/from a Pipe. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue by subclassing or delegating from multiprocessing.Queue. It may not be worth the effort, though.
这篇关于Python 2.7.6 中多处理的奇怪 Queue.PriorityQueue 行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Python 2.7.6 中多处理的奇怪 Queue.PriorityQueue 行为
基础教程推荐
- 修改列表中的数据帧不起作用 2022-01-01
- 包装空间模型 2022-01-01
- 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
- 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
- 在Python中从Azure BLOB存储中读取文件 2022-01-01
- PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
- Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
- 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
- 求两个直方图的卷积 2022-01-01
- PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
