Cloud ComposerでDataflowテンプレートを順次キックしていく逐次処理を行います。順番に処理が行われたことを確認するために前のDataflowのテンプレート実行し作られたファイルを参照する処理にしました。
code
Composer環境を作る
GCPのCloud ShellよりComposer環境を作ります。
$ gcloud composer environments create composer1 --location asia-northeast1
Dataflowのテンプレートを作る
テンプレートはこちらとほぼ同じものを使います。
www.case-k.jp
テンプレートを作る際に出力ファイル数を1つにし、拡張子もCSVにしたいので、以下の変更を加えテンプレートを作ります。デフォルトだと出力ファイルは複数となり、テンプレートとしてファイル末尾は00000-of-00001となります。
beam.apache.org
beam.io.WriteToText(myoptions.outputFile,num_shards=1,shard_name_template=''))
今回使うDataflowのテンプレートです。GCSからGCSに書き込みを行います。
# dataflow_gcs_to_gcs.py
# -*- coding: utf-8 -*- import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # プロジェクトID PROJECTID = '[project id]' # オプション設定 class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): #parser.add_argument('--input', # help='Input for the pipeline', # default='gs://{}/sample2.csv'.format(PROJECTID)) # parser.add_argument('--output', # help='Output for the pipeline', # default='gs://{}/output/sample2.csv'.format(PROJECTID)) # 実行時に指定するパラメータ parser.add_value_provider_argument('--inputFile', help='InputFile for the pipeline', default='gs://{}/output/sample1016.csv'.format(PROJECTID)) parser.add_value_provider_argument('--outputFile', help='OutputFile for the pipeline', default='gs://{}/output/sample1006.csv'.format(PROJECTID)) # オプション設定 myoptions = MyOptions() options = beam.options.pipeline_options.PipelineOptions(options=myoptions) # GCP関連オプション gcloud_options = options.view_as( beam.options.pipeline_options.GoogleCloudOptions) gcloud_options.project = PROJECTID gcloud_options.job_name = 'job1016' gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID) gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID) # テンプレート配置 #gcloud_options.template_location = 'gs://{}/template/{}/-twproc_tmp'.format(PROJECTID,PROJECTID) #gcloud_options.template_location = 'gs888/{}/template/-twproc_tmp2'.format(PROJECTID) gcloud_options.template_location = 'gs://{}/template/GCS_TO_GCS_5'.format(PROJECTID) # 標準オプション(実行環境を設定) std_options = options.view_as( beam.options.pipeline_options.StandardOptions) std_options.runner = 'DataflowRunner' p = beam.Pipeline(options=options) ( p | 'read' >> beam.io.ReadFromText(myoptions.inputFile) | 'write' >> beam.io.WriteToText(myoptions.outputFile,num_shards=1,shard_name_template='')) p.run()
DAGファイルを作る
テンプレートの配置が完了したら、ワークフローの実行順序を記載したDAGファイルを作り、Composerが参照するGCS上に配置します。テンプレート順次実行していき、参照するファイルは前のテンプレートで作られたファイルとします。
# dag_dataflow_gcs_to_gcs.py
from __future__ import print_function import datetime from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator from airflow import models #from airflow.operators import bash_operator #from airflow.operators import python_operator default_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date 'start_date': datetime.datetime(2018, 1, 1), 'retries': 1, 'dataflow_default_options': { 'project': '[project id]', #'region': 'europe-west1', #'zone': 'europe-west1-d', #'tempLocation': 'gs://[project id]/template/custom_template_1008', } } # Define a DAG (directed acyclic graph) of tasks. # Any task you create within the context manager is automatically added to the # DAG object. template_path = 'gs://[project id]/template/GCS_TO_GCS_5' with models.DAG( 'composer_dataflowtemplate1016_3', schedule_interval='@once', default_args=default_dag_args, concurrency=1, max_active_runs=1) as dag: execute_dataflow_1 = DataflowTemplateOperator( task_id='datapflow_example1', template=template_path, parameters={ 'inputFile': "gs://[project id]/sample.csv", 'outputFile': "gs://[project id]/composer_output/sample_1.csv", }, dag=dag) execute_dataflow_2 = DataflowTemplateOperator( task_id='datapflow_example2', template=template_path, parameters={ 'inputFile': "gs://[project id]/composer_output/sample_1.csv", 'outputFile': "gs://[project id]/composer_output/sample_2.csv", }, dag=dag) execute_dataflow_3 = DataflowTemplateOperator( task_id='datapflow_example3', template=template_path, parameters={ 'inputFile': "gs://[project id]/composer_output/sample_2.csv", 'outputFile': "gs://[project id]/composer_output/sample_3.csv", }, dag=dag) # Define the order in which the tasks complete by using the >> and << # operators. In this example, hello_python executes before goodbye_bash. execute_dataflow_1 >> execute_dataflow_2 >> execute_dataflow_3
DAGファイルをComposer環境に配置し逐次処理を行う
DAGファイルをComposerに配置しDataflowテンプレートを使った逐次処理を行います。
# 参照ファイルを配置 gsutil cp ./gcp/composer/sample.csv gs://[project id ]/sample.csv # DAGファイルを配置 gcloud composer environments storage dags import \ --environment composer1 --location asia-northeast1 \ --source gs://[project id ]/dag_dataflow_gcs_to_gcs.py
DAGファイルを配置すると逐次処理が実行されます。Composerの「composer_dataflowtemplate1016_3schedule」のGraph ViewよりDataflowテンプレートが指定した順番で実行されていることを確認することができます。
バケットにファイルが作られたのが確認できます。指定したフォーマットであるCSVで、ファイルサイズも問題ないことが確認できます。
確認できたのでComposer環境を削除します。
$ gcloud composer environments delete composer1 --location asia-northeast1 Deleting the following environments: - [composer1] in [asia-northeast1] Do you want to continue (Y/n)? y
以上となります。