Dataflowで使うパイプライン処理の備忘録です。随時更新できればと思います。
options
# -*- coding: utf-8 -*- import apache_beam as beam # プロジェクトID PROJECTID = 'project id' # オプション設定 options = beam.options.pipeline_options.PipelineOptions() # GCP関連オプション gcloud_options = options.view_as( beam.options.pipeline_options.GoogleCloudOptions) gcloud_options.project = PROJECTID gcloud_options.job_name = 'job1013' gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID) gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID) # 標準オプション(実行環境を設定) std_options = options.view_as( beam.options.pipeline_options.StandardOptions) std_options.runner = 'DataflowRunner'
udf
p = beam.Pipeline(options=options) query = "SELECT word " \ "FROM `bigquery-public-data.samples.shakespeare`" \ "LIMIT 10" def upper_element(element): element['word'] = element['word'].upper() return element (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID, use_standard_sql=True, query=query)) | 'upper' >> beam.Map(upper_element) | 'write' >> beam.io.WriteToText('gs://{}/dataflow_upper_word.txt'.format(PROJECTID), num_shards=1) ) p.run()
出力結果
{'word': 'LVII'} {'word': 'QUALITY'} {'word': "DIMM'D"} {'word': 'HEED'} {'word': 'UNTHRIFTY'} {'word': 'PLAGUES'} {'word': 'WHEREVER'} {'word': 'SURMISE'} {'word': 'TREASON'} {'word': 'AUGURS'}
大文字に変換されてます。
branch
p = beam.Pipeline(options=options) query = "SELECT word " \ "FROM `bigquery-public-data.samples.shakespeare`" \ "LIMIT 10" def upper_element(element): element['upper_word'] = element['word'].upper() return element def lower_element(element): element['lower_word'] = element['word'].lower() return element query_results = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID, use_standard_sql=True, query=query)) branch1 = query_results | 'upper' >> beam.Map(upper_element) branch2 = query_results | 'lower' >> beam.Map(lower_element) (branch1, branch2) | beam.Flatten() | beam.io.WriteToText( 'gs://{}/dataflow_upper_lower_word.txt'.format(PROJECTID), num_shards=1) p.run()
{'word': 'plagues', 'upper_word': 'PLAGUES'} {'word': 'plagues', 'upper_word': 'PLAGUES', 'lower_word': 'plagues'} {'word': 'augurs', 'upper_word': 'AUGURS'} {'word': 'augurs', 'upper_word': 'AUGURS', 'lower_word': 'augurs'} {'word': "dimm'd", 'upper_word': "DIMM'D"} {'word': "dimm'd", 'upper_word': "DIMM'D", 'lower_word': "dimm'd"} {'word': 'heed', 'upper_word': 'HEED'} {'word': 'heed', 'upper_word': 'HEED', 'lower_word': 'heed'} {'word': 'LVII', 'upper_word': 'LVII'} {'word': 'LVII', 'upper_word': 'LVII', 'lower_word': 'lvii'} {'word': 'surmise', 'upper_word': 'SURMISE'} {'word': 'surmise', 'upper_word': 'SURMISE', 'lower_word': 'surmise'} {'word': 'wherever', 'upper_word': 'WHEREVER'} {'word': 'wherever', 'upper_word': 'WHEREVER', 'lower_word': 'wherever'} {'word': 'Unthrifty', 'upper_word': 'UNTHRIFTY'} {'word': 'Unthrifty', 'upper_word': 'UNTHRIFTY', 'lower_word': 'unthrifty'} {'word': 'quality', 'upper_word': 'QUALITY'} {'word': 'quality', 'upper_word': 'QUALITY', 'lower_word': 'quality'} {'word': 'treason', 'upper_word': 'TREASON'} {'word': 'treason', 'upper_word': 'TREASON', 'lower_word': 'treason'}
group by
p = beam.Pipeline(options=options) query = "SELECT * " \ "FROM `bigquery-public-data.samples.shakespeare`" def create_key_value_pair(element): key = element['corpus'].upper() value = element['word_count'] return (key, value) def sum_element(element): return (element[0], sum(element[1])) (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID, use_standard_sql=True, query=query)) | 'create_key_value_pair' >> beam.Map(create_key_value_pair) | 'groupby' >> beam.GroupByKey() | 'sum' >> beam.Map(sum_element) | 'write' >> beam.io.WriteToText('gs://{}/dataflow_groupby_2.txt'.format(PROJECTID), num_shards=1) ) p.run()
('LOVERSCOMPLAINT', 2586) ('CORIOLANUS', 29535) ('CYMBELINE', 29231) ('JULIUSCAESAR', 21052) ('KINGHENRYVIII', 26265) ('TROILUSANDCRESSIDA', 27837) ('LOVESLABOURSLOST', 23189) ('ALLSWELLTHATENDSWELL', 24622) ('KINGJOHN', 21983) ('TWELFTHNIGHT', 21633) ・・・・
Filter
p = beam.Pipeline(options=options) query = "SELECT * " \ "FROM `bigquery-public-data.samples.shakespeare`" def create_key_value_pair(element): key = element['corpus'].upper() value = element['word_count'] return (key, value) def sum_element(element): return (element[0], sum(element[1])) def is_perennial(plant): return plant[0] == 'LOVERSCOMPLAINT' (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID, use_standard_sql=True, query=query)) | 'create_key_value_pair' >> beam.Map(create_key_value_pair) | 'groupby' >> beam.GroupByKey() | 'sum' >> beam.Map(sum_element) | 'filter' >> beam.Filter(is_perennial) | 'write' >> beam.io.WriteToText('gs://{}/dataflow_groupby_filter.txt'.format(PROJECTID), num_shards=1) ) p.run()
('LOVERSCOMPLAINT', 2586)