case-kの備忘録

日々の備忘録です。最近はGCPやデータ分析系のことを呟きます

Dataflowテンプレート作成方法

Pythonベースで記述したDataflowのコードをテンプレート化し、実行してみます。テンプレートを作成するためには以下のコードを追記するだけです。コードを実行するとGCS内にテンプレートが作られるので、作成したテンプレートを実行してみます。

gcloud_options.template_location = 'gs://{}/template/custom_template'.format(PROJECTID)

実際に実行するコードとなります。

# -*- coding: utf-8 -*-
import apache_beam as beam

# プロジェクトID
PROJECTID = 'PROJECTID '

# オプション設定
options = beam.options.pipeline_options.PipelineOptions()

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'pipeline1'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/temp'.format(PROJECTID)

# テンプレート設定
gcloud_options.template_location = 'gs://{}/template/custom_template'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

p = beam.Pipeline(options=options)

query = "SELECT * " \
"FROM `bigquery-public-data.samples.shakespeare`" \
"LIMIT 10"

(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
   | 'write' >> beam.io.WriteToText('gs://{}/p1.txt'.format(PROJECTID),
                                    num_shards=1)
)

p.run()

実行するとGCSにテンプレートが作られます。
f:id:casekblog:20191007194955p:plain:w300
GCPコンソールから作成したDataflowを選び、テンプレートを実行します。
カスタムテンプレートを選び、テンプレートをおいたGCSのパスを指定し実行します。
f:id:casekblog:20191007195320p:plain:w500

無事テンプレートを実行できました。
f:id:casekblog:20191007195433p:plain:w500

実行環境はこちらの記事をみてください。
www.case-k.jp

次回はCloud FunctionsでDataflowをキックする記事を書きたいと思います。