case-kの備忘録

日々の備忘録です。最近はGCPやデータ分析系のことを呟きます

Dataflowパイプライン処理の備忘録:Python

Dataflowで使うパイプライン処理の備忘録です。随時更新できればと思います。

beam.apache.org

options

# -*- coding: utf-8 -*-
import apache_beam as beam

# プロジェクトID
PROJECTID = 'project id'

# オプション設定
options = beam.options.pipeline_options.PipelineOptions()

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'job1013'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

udf

p = beam.Pipeline(options=options)

query = "SELECT word " \
"FROM `bigquery-public-data.samples.shakespeare`" \
"LIMIT 10"

def upper_element(element):
    element['word'] = element['word'].upper()
    return element
    

(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
   | 'upper' >> beam.Map(upper_element)
   | 'write' >> beam.io.WriteToText('gs://{}/dataflow_upper_word.txt'.format(PROJECTID),
                                    num_shards=1)
)
p.run()

f:id:casekblog:20191013222202p:plain:w300
出力結果

{'word': 'LVII'}
{'word': 'QUALITY'}
{'word': "DIMM'D"}
{'word': 'HEED'}
{'word': 'UNTHRIFTY'}
{'word': 'PLAGUES'}
{'word': 'WHEREVER'}
{'word': 'SURMISE'}
{'word': 'TREASON'}
{'word': 'AUGURS'}

大文字に変換されてます。

branch

p = beam.Pipeline(options=options)

query = "SELECT word " \
"FROM `bigquery-public-data.samples.shakespeare`" \
"LIMIT 10"

def upper_element(element):
    element['upper_word'] = element['word'].upper()
    return element

def lower_element(element):
    element['lower_word'] = element['word'].lower()
    return element
    

query_results = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
branch1 = query_results  | 'upper' >> beam.Map(upper_element)
branch2 = query_results  | 'lower' >> beam.Map(lower_element)
(branch1, branch2) | beam.Flatten() |  beam.io.WriteToText(
    'gs://{}/dataflow_upper_lower_word.txt'.format(PROJECTID),
    num_shards=1)
p.run()

f:id:casekblog:20191013224001p:plain:w400

{'word': 'plagues', 'upper_word': 'PLAGUES'}
{'word': 'plagues', 'upper_word': 'PLAGUES', 'lower_word': 'plagues'}
{'word': 'augurs', 'upper_word': 'AUGURS'}
{'word': 'augurs', 'upper_word': 'AUGURS', 'lower_word': 'augurs'}
{'word': "dimm'd", 'upper_word': "DIMM'D"}
{'word': "dimm'd", 'upper_word': "DIMM'D", 'lower_word': "dimm'd"}
{'word': 'heed', 'upper_word': 'HEED'}
{'word': 'heed', 'upper_word': 'HEED', 'lower_word': 'heed'}
{'word': 'LVII', 'upper_word': 'LVII'}
{'word': 'LVII', 'upper_word': 'LVII', 'lower_word': 'lvii'}
{'word': 'surmise', 'upper_word': 'SURMISE'}
{'word': 'surmise', 'upper_word': 'SURMISE', 'lower_word': 'surmise'}
{'word': 'wherever', 'upper_word': 'WHEREVER'}
{'word': 'wherever', 'upper_word': 'WHEREVER', 'lower_word': 'wherever'}
{'word': 'Unthrifty', 'upper_word': 'UNTHRIFTY'}
{'word': 'Unthrifty', 'upper_word': 'UNTHRIFTY', 'lower_word': 'unthrifty'}
{'word': 'quality', 'upper_word': 'QUALITY'}
{'word': 'quality', 'upper_word': 'QUALITY', 'lower_word': 'quality'}
{'word': 'treason', 'upper_word': 'TREASON'}
{'word': 'treason', 'upper_word': 'TREASON', 'lower_word': 'treason'}

group by

p = beam.Pipeline(options=options)

query = "SELECT * " \
"FROM `bigquery-public-data.samples.shakespeare`" 

def create_key_value_pair(element):
    key = element['corpus'].upper()
    value = element['word_count']
    return (key, value)

def sum_element(element):
    return (element[0], sum(element[1]))
    

(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
   | 'create_key_value_pair' >> beam.Map(create_key_value_pair)
   | 'groupby' >> beam.GroupByKey()
   | 'sum' >> beam.Map(sum_element)
   | 'write' >> beam.io.WriteToText('gs://{}/dataflow_groupby_2.txt'.format(PROJECTID),
                                    num_shards=1)
)
p.run()

f:id:casekblog:20191013233204p:plain:w300

('LOVERSCOMPLAINT', 2586)
('CORIOLANUS', 29535)
('CYMBELINE', 29231)
('JULIUSCAESAR', 21052)
('KINGHENRYVIII', 26265)
('TROILUSANDCRESSIDA', 27837)
('LOVESLABOURSLOST', 23189)
('ALLSWELLTHATENDSWELL', 24622)
('KINGJOHN', 21983)
('TWELFTHNIGHT', 21633)
・・・・

Filter

p = beam.Pipeline(options=options)

query = "SELECT * " \
"FROM `bigquery-public-data.samples.shakespeare`" 

def create_key_value_pair(element):
    key = element['corpus'].upper()
    value = element['word_count']
    return (key, value)

def sum_element(element):
    return (element[0], sum(element[1]))


def is_perennial(plant):
    return plant[0] == 'LOVERSCOMPLAINT'

(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
   | 'create_key_value_pair' >> beam.Map(create_key_value_pair)
   | 'groupby' >> beam.GroupByKey()
   | 'sum' >> beam.Map(sum_element)
   | 'filter' >> beam.Filter(is_perennial)
   | 'write' >> beam.io.WriteToText('gs://{}/dataflow_groupby_filter.txt'.format(PROJECTID),
                                    num_shards=1)
)
p.run()

f:id:casekblog:20191014150258p:plain:w300

('LOVERSCOMPLAINT', 2586)