将复杂字典放入返回队列时,多处理进程不加入

2023-09-29Python开发问题
5

本文介绍了将复杂字典放入返回队列时,多处理进程不加入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

给定一个非常标准的读/写多线程进程,带有一个读队列和一个写队列:

Given a pretty standard read/write multithreaded process with a read Queue and a write Queue:

8 次 worker done 被打印,但 join() 语句从未通过.但是,如果我用 `queue_out.put(1) 替换 queue_out.put(r) 它就可以了.

8 times worker done is printed, but the join() statement is never passed. But if I replace queue_out.put(r) by `queue_out.put(1) it works.

这正在融化我的大脑,可能是非常愚蠢的事情.我应该复制我的字典并将其放入返回队列吗?我在某个地方犯了一个愚蠢的错误吗?

This is melting my brain, probably something really stupid. Should I make a copy of my dictionary and put that in the return Queue? Did I make a stupid mistake somewhere?

处理函数

def reader(queue_in, queue_out, funktion):
    # Read from the queue
    while True:
        r = queue_in.get()
        if r == 'DONE':
            return
        funktion(r) # funktion adds additional keys to the dictionary
        queue_out.put(r) # <---- replacing r by 1 does let me join()
    print "worker done" # <----- this happens

填充输入队列

def writer(generator, queue):
    # Write to the queue
    for r in enumerate(generator):
        # r is a complex dictionary
        queue.put(r)    
    print "writer done"
    for _ in range(0, WORKERS):
        queue.put((-1, "DONE"))

其余的

WORKERS = 8

# init Queues
queue_in = Queue()
queue_out = Queue()

# Start processes, with input and output quests
readers = []
for _ in range(0, WORKERS):
    p = Process(target=reader, args=(queue_in, queue_out, funktion))
    p.daemon = True
    p.start()
    readers.append(p)

writer(generator, queue_in)

for p in readers:
    p.join()

print "joined"  # <---- this never happens

queue_in.close()

while not queue_out.empty():
    print queue_out.get()
queue_out.close()

推荐答案

认为我从两个来源拼凑了这个,因为我总是有同样的问题.我认为重要的是这是在 Windows 中.

I think I have pieced this together from two sources as I always have the same problem. I think the important thing is that this is in Windows.

来自文档的注释

由于 Windows 缺少 os.fork() 它有一些额外的限制:

Since Windows lacks os.fork() it has a few extra restrictions:

然后阅读这里的答案join() 用于分叉处理.

Then read the answers here that join() is for forked processed.

我一直设法以与您类似的方式运行 multiprocessing 而没有使用 join() 并且没有看到任何错误 - 我很高兴有一个反例解释为什么需要它.事实上,删除它已经解决了您的问题.

I have always managed to run multiprocessing in a similar fashion to you without using join() and not seen any errors - I'm more than happy for a counterexample to explain why it's needed. Indeed, removing it has corrected your issue.

这篇文章更深入地探讨了这些差异与操作系统之间的多处理中的子进程.我确实认为 join() 的问题,具体来说,应该在文档中更加明确.

And this article goes into more depth about the differences with child processes in multiprocessing between operating systems. I do think that the issue with join(), specifically, should be more explicit in the documentation.

这篇关于将复杂字典放入返回队列时,多处理进程不加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

The End

相关推荐

在xarray中按单个维度的多个坐标分组
groupby multiple coords along a single dimension in xarray(在xarray中按单个维度的多个坐标分组)...
2024-08-22 Python开发问题
15

Pandas中的GROUP BY AND SUM不丢失列
Group by and Sum in Pandas without losing columns(Pandas中的GROUP BY AND SUM不丢失列)...
2024-08-22 Python开发问题
17

GROUP BY+新列+基于条件的前一行抓取值
Group by + New Column + Grab value former row based on conditionals(GROUP BY+新列+基于条件的前一行抓取值)...
2024-08-22 Python开发问题
18

PANDA中的Groupby算法和插值算法
Groupby and interpolate in Pandas(PANDA中的Groupby算法和插值算法)...
2024-08-22 Python开发问题
11

PANAS-基于列对行进行分组,并将NaN替换为非空值
Pandas - Group Rows based on a column and replace NaN with non-null values(PANAS-基于列对行进行分组,并将NaN替换为非空值)...
2024-08-22 Python开发问题
10

按10分钟间隔对 pandas 数据帧进行分组
Grouping pandas DataFrame by 10 minute intervals(按10分钟间隔对 pandas 数据帧进行分组)...
2024-08-22 Python开发问题
11