Dataflowテンプレートでカスタムパラメータを追加します。パラメータを静的に定義する方法とテンプレート実行時に動的にパラメータを指定する方法を紹介します。gclodコマンドで実行しますが、Cloud Functonsからテンプレートをキックする方法は以下の記事をみてみてください。
www.case-k.jp
code
静的なカスタムオプションの追加方法
class MyOptions(PipelineOptions): PROJECTID = 'project id ' @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input', help='Input for the pipeline', default='gs://{}/sample2.csv'.format(PROJECTID)) parser.add_argument('--output', help='Output for the pipeline', default='gs://{}/output/sample2.csv'.format(PROJECTID))
実際にテンプレートを作成します。
# -*- coding: utf-8 -*- import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # プロジェクトID PROJECTID = 'project id' # オプション設定 class MyOptions(PipelineOptions): PROJECTID = 'project id ' @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input', help='Input for the pipeline', default='gs://{}/sample2.csv'.format(PROJECTID)) parser.add_argument('--output', help='Output for the pipeline', default='gs://{}/output/sample2.csv'.format(PROJECTID)) # オプション設定 myoptions = MyOptions() options = beam.options.pipeline_options.PipelineOptions(options=myoptions) # GCP関連オプション gcloud_options = options.view_as( beam.options.pipeline_options.GoogleCloudOptions) gcloud_options.project = PROJECTID gcloud_options.job_name = 'p1008' gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID) gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID) # テンプレート配置 gcloud_options.template_location = 'gs://{}/template/custom_template_1008'.format(PROJECTID) # 標準オプション(実行環境を設定) std_options = options.view_as( beam.options.pipeline_options.StandardOptions) std_options.runner = 'DataflowRunner' p = beam.Pipeline(options=options) p | 'read' >> beam.io.ReadFromText(myoptions.input) | 'write' >> beam.io.WriteToText(myoptions.output) p.run()
GCSでテンプレートが作成されたことを確認します。
実行コマンド
$ gcloud dataflow jobs run job1 \ --gcs-location gs://[project id]/template/custom_template_1008 \ --parameters input=gs://[project id ]/sample1015.csv,output=gs://[project id ]/output/sample1015.csv WARNING: `--region` not set; defaulting to 'us-central1'. In an upcoming release, users must specify a region explicitly. See https://cloud.google.com/dataflow/docs/concepts/regional -endpoints for additional details. createTime: '2019-10-15T09:51:52.729133Z' currentStateTime: '1970-01-01T00:00:00Z' id: 2019-10-15_02_51_51-7411337449312765451 location: us-central1 name: job2 projectId: project id startTime: '2019-10-15T09:51:52.729133Z' type: JOB_TYPE_BATCH
この状態で実行すると実行時に使用したパラメータが考慮されません。
動的なカスタムオプションの追加方法
先ほどsample1015.csvを指定しましたがdefault値が反映されました。実行時にパラメータを指定する場合は「add_value_provider_argument」を使います。
# オプション設定 class MyOptions(PipelineOptions): PROJECTID = 'project id' @classmethod def _add_argparse_args(cls, parser): #parser.add_argument('--input', # help='Input for the pipeline', # default='gs://{}/sample2.csv'.format(PROJECTID)) #parser.add_argument('--output', # help='Output for the pipeline', # default='gs://{}/output/sample2.csv'.format(PROJECTID)) # 実行時に指定するパラメータ parser.add_value_provider_argument('--inputFile', help='InputFile for the pipeline', default='gs://{}/output/sample2.csv'.format(PROJECTID)) parser.add_value_provider_argument('--outputFile', help='OutputFile for the pipeline', default='gs://{}/output/sample2.csv'.format(PROJECTID))
実行時にパラメータが反映されるようにテンプレートを作ります。
# -*- coding: utf-8 -*- import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # プロジェクトID PROJECTID = '[PROJECTID]' # オプション設定 class MyOptions(PipelineOptions): PROJECTID = 'project id' @classmethod def _add_argparse_args(cls, parser): #parser.add_argument('--input', # help='Input for the pipeline', # default='gs://{}/sample2.csv'.format(PROJECTID)) #parser.add_argument('--output', # help='Output for the pipeline', # default='gs://{}/output/sample2.csv'.format(PROJECTID)) # 実行時に指定するパラメータ parser.add_value_provider_argument('--inputFile', help='InputFile for the pipeline', default='gs://{}/output/sample2.csv'.format(PROJECTID)) parser.add_value_provider_argument('--outputFile', help='OutputFile for the pipeline', default='gs://{}/output/sample2.csv'.format(PROJECTID)) # オプション設定 myoptions = MyOptions() options = beam.options.pipeline_options.PipelineOptions(options=myoptions) # GCP関連オプション gcloud_options = options.view_as( beam.options.pipeline_options.GoogleCloudOptions) gcloud_options.project = PROJECTID gcloud_options.job_name = 'job10152' gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID) gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID) # テンプレート配置 #gcloud_options.template_location = 'gs://{}/template/{}/-twproc_tmp'.format(PROJECTID,PROJECTID) #gcloud_options.template_location = 'gs888/{}/template/-twproc_tmp2'.format(PROJECTID) gcloud_options.template_location = 'gs://{}/template/GCS_TO_GCS_2'.format(PROJECTID) # 標準オプション(実行環境を設定) std_options = options.view_as( beam.options.pipeline_options.StandardOptions) std_options.runner = 'DataflowRunner' p = beam.Pipeline(options=options) p | 'read' >> beam.io.ReadFromText(myoptions.inputFile) | 'write' >> beam.io.WriteToText(myoptions.outputFile) p.run()
実行してみます。
gcloud dataflow jobs run job3 \ --gcs-location gs://[project id]//template/GCS_TO_GCS_2 \ --parameters inputFile=gs://[project id]/sample1015.csv,outputFile=gs://[project id]//output/sample1015.csv
バケットを確認すると実行時に指定したファイルが作られていることが確認できます。
以上となります。
参考
cloud.google.com
stackoverflow.com
stackoverflow.com
code-library.site