Initializing task module global in dask worker using --preload?(正在使用--preload初始化DaskWorker中的全局任务模块?)
问题描述
我试图实现类似于这些问题(Initializing state on dask-distributed workers,Setting up Dask worker with variable)的内容,其中我有一个(相对)大的模型,我希望在接受需要该模型的任务的工作线程子集上预初始化该模型。理想情况下,我甚至不希望客户端计算机具有该模型。
在发现这些问题之前,我最初的尝试是在共享模块worker_task.model中定义delayed任务,并在工作程序的--preload脚本中为该任务分配一个模块全局变量(例如worker_tasks.model.model)以供该任务使用;然而,由于某种原因,这并不起作用-该变量在预加载脚本中设置,但在调用该任务时仍为None。
init_Model_worker.py:
import logging
from uuid import uuid4
from worker_tasks import model
def dask_setup(worker):
    model.model = f'<mock model {uuid4()}>'
    logger = logging.getLogger('distributed')
    logger.warning(f'model = {model.model}')
worker_tasks/model.py:
import logging
import random
from time import sleep
from uuid import uuid4
import dask
model = None
@dask.delayed
def compute_clinical(inp):        
    if model is None:
        raise RuntimeError('Model not initialized.')
    sleep(random.uniform(3, 17))
    return {
        'result': random.choice((True, False)),
        'confidence': random.uniform(0, 1)
        }
这是我启动它并将某些内容提交给计划程序时的工作日志:
> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.28.0.4:41743'                         
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging      
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>                   
distributed.worker - INFO -       Start worker at:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -          Listening to:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -              nanny at:           172.28.0.4:41743                       
distributed.worker - INFO -              bokeh at:           172.28.0.4:37766                       
distributed.worker - INFO - Waiting to connect to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -               Threads:                          4                       
distributed.worker - INFO -                Memory:                    1.93 GB                       
distributed.worker - INFO -       Local Directory:           /worker-mhozo9ru                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -         Registered to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.core - INFO - Starting established connection                                           
distributed.worker - WARNING -  Compute Failed                                                      
Function:  compute_clinical                                                                         
args:      ('mock')                                                                                 
kwargs:    {}                                                                                       
Exception: RuntimeError('Model not initialized.')                                                   
您可以看到,重新加载预加载脚本后,model是<mock model faa41af0-d925-46ef-91c9-086093d37c71>;但当我尝试从任务中调用它时,得到None。
我将尝试根据对其他问题的回答来实施解决方案,但我有几个与Worker预加载相关的问题:
- 为什么在预加载脚本中分配任务后,调用任务时模型
None会出现? - 是否一般建议避免在Worker
--preload脚本中执行此类操作?从客户端调用工作进程状态的初始化是否更好?如果是,为什么? 
推荐答案
我怀疑模型变量会立即绑定到您的函数中,但是它会序列化函数。您可以尝试执行以下操作:
@dask.delayed
def compute_clinical(inp):       
    from worker_tasks.model import model
    if model is None:
        raise RuntimeError('Model not initialized.')
或者,与其将变量分配给全局模块作用域(这在Python中可能很难理解),不如尝试将其分配给Worker本身。
from dask.distributed import get_worker
def dask_setup(worker):
    worker.model = f'<mock model {uuid4()}>'
@dask.delayed
def compute_clinical(inp):       
    if get_worker().model is None:
        raise RuntimeError('Model not initialized.')
                        这篇关于正在使用--preload初始化DaskWorker中的全局任务模块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:正在使用--preload初始化DaskWorker中的全局任务模块?
				
        
 
            
        基础教程推荐
- PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
 - 求两个直方图的卷积 2022-01-01
 - 修改列表中的数据帧不起作用 2022-01-01
 - 在Python中从Azure BLOB存储中读取文件 2022-01-01
 - Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
 - 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
 - 包装空间模型 2022-01-01
 - 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
 - PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
 - 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
 
    	
    	
    	
    	
    	
    	
    	
    	
				
				
				
				