Missing lines when writing file with multiprocessing Lock Python(使用多处理锁定Python写入文件时缺少行)
问题描述
这是我的代码:
from multiprocessing import Pool, Lock
from datetime import datetime as dt
console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()
def writer(message):
    lock.acquire()
    with open(console_out, 'a') as out:
        out.write(message)
        out.flush()
    lock.release()
def conf_wrapper(state):
    import ProcessingModule as procs
    import sqlalchemy as sal
    stcd, nrows = state
    engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')
    writer("State {s} started  at: {n}"
           "
".format(s=str(stcd).zfill(2), n=dt.now()))
    with engine.connect() as conn, conn.begin():
        procs.processor(conn, stcd, nrows, chunksize)
    writer("	State {s} finished  at: {n}"
           "
".format(s=str(stcd).zfill(2), n=dt.now()))
def main():
    nprocesses = 12
    maxproc = 1
    state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]
    with open(console_out, 'w') as out:
        out.write("Starting at {n}
".format(n=dt.now()))
        out.write("Using {p} processes..."
                  "
".format(p=nprocesses))
    with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
        pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)
    with open(console_out, 'a') as out:
        out.write("
All done at {n}".format(n=dt.now()))
文件console_out从未包含所有7个状态。它总是缺少一个或多个状态。以下是最近一次运行的输出:
Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started  at: 2016-07-27 21:47:01.482322
State 02 started  at: 2016-07-27 21:47:01.497947
State 11 started  at: 2016-07-27 21:47:01.529198
State 10 started  at: 2016-07-27 21:47:01.497947
    State 11 finished  at: 2016-07-27 21:47:15.701207
    State 15 finished  at: 2016-07-27 21:47:24.123164
    State 44 finished  at: 2016-07-27 21:47:32.029489
    State 50 finished  at: 2016-07-27 21:47:51.203107
    State 10 finished  at: 2016-07-27 21:47:53.046876
    State 33 finished  at: 2016-07-27 21:47:58.156301
    State 02 finished  at: 2016-07-27 21:48:18.856979
All done at 2016-07-27 21:48:18.992277
为什么?
注意,操作系统为Windows Server 2012 R2。
推荐答案
由于您在Windows上运行,工作进程不会继承任何。每个进程"从头开始"运行整个主程序。
具体地说,对于编写的代码,每个进程都有自己的lock实例,而这些实例彼此无关。简而言之,lock根本没有提供任何进程间互斥。
要解决这个问题,可以将Pool构造函数更改为调用每个进程一次的初始化函数,您可以向该函数传递Lock()的实例。例如:
def init(L):
    global lock
    lock = L
,然后将这些参数添加到Pool()构造函数:
initializer=init, initargs=(Lock(),),
并且您不再需要:
lock = Lock()
行。
然后进程间互斥将按预期工作。
无锁定
如果您希望将所有输出委托给编写器进程,则可以跳过锁并使用队列为该进程提供数据[请参阅后面的不同版本]。
def writer_process(q):
    with open(console_out, 'w') as out:
        while True:
            message = q.get()
            if message is None:
                break
            out.write(message)
            out.flush() # can't guess whether you really want this
并将writer()更改为仅:
def writer(message):
    q.put(message)
您将再次需要在Pool构造函数中使用initializer=和initargs=,以便所有进程使用相同的队列。
只能运行一个进程writer_process(),该进程可以作为multiprocessing.Process的实例单独启动。
最后,为了让writer_process()知道是时候退出了,当是清空队列并返回的时间时,只需运行
q.put(None)
在主进程中。
以后
OP选择了这个版本,因为他们需要同时在其他代码中打开输出文件:
def writer_process(q):
    while True:
        message = q.get()
        if message == 'done':
            break
        else:
            with open(console_out, 'a') as out:
                out.write(message)
我不知道为什么终止哨兵更改为"done"。任何唯一值都适用于此;None是传统的。
这篇关于使用多处理锁定Python写入文件时缺少行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:使用多处理锁定Python写入文件时缺少行
				
        
 
            
        基础教程推荐
- 包装空间模型 2022-01-01
 - Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
 - 求两个直方图的卷积 2022-01-01
 - 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
 - PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
 - 修改列表中的数据帧不起作用 2022-01-01
 - 在Python中从Azure BLOB存储中读取文件 2022-01-01
 - 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
 - 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
 - PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
 
    	
    	
    	
    	
    	
    	
    	
    	
				
				
				
				