我正在尝试并行化我编写的脚本.每个过程都需要进行计算,并将数据存储到数组的特定部分(列表列表).每个进程都在计算和存储其数据,但是我无法弄清楚如何从非根进程到根进程获取数据,以便它可以将数据打印到文件中.我创...

我正在尝试并行化我编写的脚本.每个过程都需要进行计算,并将数据存储到数组的特定部分(列表列表).每个进程都在计算和存储其数据,但是我无法弄清楚如何从非根进程到根进程获取数据,以便它可以将数据打印到文件中.我创建了一个脚本的最低限度的工作示例,该脚本仅出于简化目的而设计为在2个内核上运行:
from mpi4py import MPI
import pdb
import os
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
# Declare the array that will store all the temp results
temps = [[0 for x in xrange(5)] for x in xrange(4)]
# Loop over all directories
if rank==0:
counter = 0
for i in range(2):
for j in range(5):
temps[i][j] = counter
counter = counter + 1
else:
counter = 20
for i in range(2,4):
for j in range(5):
temps[i][j] = counter
counter = counter + 1
temps = comm.bcast(temps,root=0)
if rank==0:
print temps
我使用以下命令执行脚本:
mpiexec -n 2 python mne.py
完成案例后,输出为:
[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]]
因此,您可以看到数据共享无法正常运行.有人可以告诉我将数据返回到根进程的正确方法吗?
解决方法:
该代码可以正常工作,只是没有做您想要的.
这条线
temps = comm.bcast(temps,root=0)
将处理器0的temps变量广播到所有处理器(包括等级0),这当然给出了上面的结果.您要使用收集(或如果希望所有处理器都具有答案的话,可以使用聚集).看起来更像这样:
from mpi4py import MPI
import pdb
import os
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
assert size == 2
# Declare the array that will store all the temp results
temps = [[0 for x in xrange(5)] for x in xrange(4)]
# declare the array that holds the local results
locals =[[0 for x in xrange(5)] for x in xrange(2)]
# Loop over all directories
if rank==0:
counter = 0
for i in range(2):
for j in range(5):
locals[i][j] = counter
counter = counter + 1
else:
counter = 20
for i in range(2):
for j in range(5):
locals[i][j] = counter
counter = counter + 1
temps = comm.gather(locals,temps,root=0)
if rank==0:
print temps
如果您确实想就地进行收集,并且知道(说)所有真实数据都将大于初始化数据所用的零,则可以使用归约操作,但这会更容易与numpy数组:
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
assert size == 2
# Declare the array that will store all the temp results
temps = numpy.zeros((4,5))
# Loop over all directories
if rank==0:
counter = 0
for i in range(2):
for j in range(5):
temps[i,j] = counter
counter = counter + 1
else:
counter = 20
for i in range(2,4):
for j in range(5):
temps[i,j] = counter
counter = counter + 1
comm.Allreduce(MPI.IN_PLACE,temps,op=MPI.MAX)
if rank==0:
print temps
本文标题为:如何使用MPI在Python中的进程之间共享数据?


基础教程推荐
- 各种相似度计算的python实现 2023-09-04
- 带有变量的Python子进程调用 2023-11-14
- Python可执行文件到Linux列出具有大小的文件 2023-11-11
- Python pygame 项目实战事件监听 2022-08-30
- Python Panda中索引和选择 series 的数据 2022-10-20
- Python + selenium 自动化测试框架详解 2023-08-04
- python读取文件夹中图片的图片名并写入excel表格 2023-08-11
- python数据分析近年比特币价格涨幅趋势分布 2023-08-04
- 在Windows上安装Fabric(Python库)时出现“错误:找不到vcvarsall.bat” 2023-11-12
- Python 使用openpyxl处理Excel文件详情 2022-09-02