use ValueProvider to format a BigQuery in Dataflow(使用ValueProvider格式化数据流中的BigQuery)
本文介绍了使用ValueProvider格式化数据流中的BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我当前正在使用Dataflow在Python中执行循环批处理。
基本上我从BigQuery读取数据并在其上执行操作。我的管道如下所示
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
| "doing stuff" >> beam.Map(do_some_stuff)
)
我希望使用数据流模板运行作业,以使其适应运行时。
多亏了文档https://cloud.google.com/dataflow/docs/guides/templates/creating-templates,即函数部分中的Using ValueProvider,我成功地使用Pardo在运行时为"do_ome_Stuff"提供了一个额外的参数。
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
但我还希望更改进程关注的用户数,因此我希望使查询适应运行时。
class TemplateOption(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--template_nb_users',
default=100,
type=int)
parser.add_value_provider_argument('--template_do_stuff_param',
default=45,
type=int)
class MyDoStuffFn(beam.DoFn):
def __init__(self, template_do_stuff_param):
self.template_do_stuff_param = template_do_stuff_param
def process(self, *_):
yield do_some_stuff(self.template_do_stuff_param.get())
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)
lines = (p
| 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
use_standard_sql=True))
| "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
)
.这不起作用,因为我在管道执行之前调用了get()。到目前为止,我还没有成功地将我对do_ome_Stuff函数所做的修改修改为"Read"行
任何关于如何进行的建议或解决方案都将不胜感激。谢谢!
推荐答案
遗憾的是,BigQuerySource
不支持值提供程序。这是因为它是在数据流运行器中本机实现的,因此所有信息都需要在管道构造时可用。
您可以尝试转换apache_beam.io.gcp.bigquery.ReadFromBigQuery
-这将允许您使用值提供程序。
这篇关于使用ValueProvider格式化数据流中的BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:使用ValueProvider格式化数据流中的BigQuery


基础教程推荐
猜你喜欢
- PermissionError: pip 从 8.1.1 升级到 8.1.2 2022-01-01
- 无法导入 Pytorch [WinError 126] 找不到指定的模块 2022-01-01
- 包装空间模型 2022-01-01
- 修改列表中的数据帧不起作用 2022-01-01
- 在同一图形上绘制Bokeh的烛台和音量条 2022-01-01
- 求两个直方图的卷积 2022-01-01
- Plotly:如何设置绘图图形的样式,使其不显示缺失日期的间隙? 2022-01-01
- PANDA VALUE_COUNTS包含GROUP BY之前的所有值 2022-01-01
- 使用大型矩阵时禁止 Pycharm 输出中的自动换行符 2022-01-01
- 在Python中从Azure BLOB存储中读取文件 2022-01-01