case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Dataflowカスタムパラメータの追加方法:Python

Dataflowテンプレートでカスタムパラメータを追加します。パラメータを静的に定義する方法とテンプレート実行時に動的にパラメータを指定する方法を紹介します。gclodコマンドで実行しますが、Cloud Functonsからテンプレートをキックする方法は以下の記事をみてみてください。
www.case-k.jp

code

github.com

静的なカスタムオプションの追加方法

class MyOptions(PipelineOptions):
    PROJECTID = 'project id '
    @classmethod  
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input',
                            help='Input for the pipeline',
                            default='gs://{}/sample2.csv'.format(PROJECTID))
        parser.add_argument('--output',
                            help='Output for the pipeline',
                            default='gs://{}/output/sample2.csv'.format(PROJECTID))

実際にテンプレートを作成します。

# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# プロジェクトID
PROJECTID = 'project id'

# オプション設定
class MyOptions(PipelineOptions):
    PROJECTID = 'project id '
    @classmethod  
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input',
                            help='Input for the pipeline',
                            default='gs://{}/sample2.csv'.format(PROJECTID))
        parser.add_argument('--output',
                            help='Output for the pipeline',
                            default='gs://{}/output/sample2.csv'.format(PROJECTID))

# オプション設定
myoptions = MyOptions()
options = beam.options.pipeline_options.PipelineOptions(options=myoptions)

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'p1008'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID)

# テンプレート配置
gcloud_options.template_location = 'gs://{}/template/custom_template_1008'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

p = beam.Pipeline(options=options)
p | 'read' >> beam.io.ReadFromText(myoptions.input)
 | 'write' >> beam.io.WriteToText(myoptions.output)

p.run()


GCSでテンプレートが作成されたことを確認します。
f:id:casekblog:20191008192709p:plain

実行コマンド

$ gcloud dataflow jobs run job1 \
        --gcs-location gs://[project id]/template/custom_template_1008 \
        --parameters input=gs://[project id ]/sample1015.csv,output=gs://[project id ]/output/sample1015.csv
WARNING: `--region` not set; defaulting to 'us-central1'. In an upcoming release, users must specify a region explicitly. See https://cloud.google.com/dataflow/docs/concepts/regional
-endpoints for additional details.
createTime: '2019-10-15T09:51:52.729133Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: 2019-10-15_02_51_51-7411337449312765451
location: us-central1
name: job2
projectId: project id
startTime: '2019-10-15T09:51:52.729133Z'
type: JOB_TYPE_BATCH

f:id:casekblog:20191015190150p:plain
この状態で実行すると実行時に使用したパラメータが考慮されません。

動的なカスタムオプションの追加方法

先ほどsample1015.csvを指定しましたがdefault値が反映されました。実行時にパラメータを指定する場合は「add_value_provider_argument」を使います。

# オプション設定
class MyOptions(PipelineOptions):
    PROJECTID = 'project id'
    @classmethod  
    def _add_argparse_args(cls, parser):
        #parser.add_argument('--input',
        #                    help='Input for the pipeline',
        #                    default='gs://{}/sample2.csv'.format(PROJECTID))
        #parser.add_argument('--output',
        #                   help='Output for the pipeline',
        #                    default='gs://{}/output/sample2.csv'.format(PROJECTID))
        
        # 実行時に指定するパラメータ
        parser.add_value_provider_argument('--inputFile',
                            help='InputFile for the pipeline',
                            default='gs://{}/output/sample2.csv'.format(PROJECTID))
        parser.add_value_provider_argument('--outputFile',
                            help='OutputFile for the pipeline',
                            default='gs://{}/output/sample2.csv'.format(PROJECTID))
            

実行時にパラメータが反映されるようにテンプレートを作ります。

# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# プロジェクトID
PROJECTID = '[PROJECTID]'

# オプション設定
class MyOptions(PipelineOptions):
    PROJECTID = 'project id'
    @classmethod  
    def _add_argparse_args(cls, parser):
        #parser.add_argument('--input',
        #                    help='Input for the pipeline',
        #                    default='gs://{}/sample2.csv'.format(PROJECTID))
        #parser.add_argument('--output',
        #                    help='Output for the pipeline',
        #                    default='gs://{}/output/sample2.csv'.format(PROJECTID))
        
        # 実行時に指定するパラメータ
        parser.add_value_provider_argument('--inputFile',
                            help='InputFile for the pipeline',
                            default='gs://{}/output/sample2.csv'.format(PROJECTID))
        parser.add_value_provider_argument('--outputFile',
                            help='OutputFile for the pipeline',
                            default='gs://{}/output/sample2.csv'.format(PROJECTID))
            
# オプション設定
myoptions = MyOptions()
options = beam.options.pipeline_options.PipelineOptions(options=myoptions)

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'job10152'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID)

# テンプレート配置
#gcloud_options.template_location = 'gs://{}/template/{}/-twproc_tmp'.format(PROJECTID,PROJECTID)
#gcloud_options.template_location = 'gs888/{}/template/-twproc_tmp2'.format(PROJECTID)
gcloud_options.template_location = 'gs://{}/template/GCS_TO_GCS_2'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

p = beam.Pipeline(options=options)
p | 'read' >> beam.io.ReadFromText(myoptions.inputFile) | 'write' >> beam.io.WriteToText(myoptions.outputFile)
p.run()

実行してみます。

 gcloud dataflow jobs run job3 \
        --gcs-location gs://[project id]//template/GCS_TO_GCS_2 \
        --parameters inputFile=gs://[project id]/sample1015.csv,outputFile=gs://[project id]//output/sample1015.csv

バケットを確認すると実行時に指定したファイルが作られていることが確認できます。

f:id:casekblog:20191015193229p:plain

以上となります。

参考
cloud.google.com
stackoverflow.com
stackoverflow.com
code-library.site