case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Cloud Composerを使いDataflowのパイプライン制御をする方法

Cloud ComposerでDataflowテンプレートを順次キックしていく逐次処理を行います。順番に処理が行われたことを確認するために前のDataflowのテンプレート実行し作られたファイルを参照する処理にしました。

code

github.com

Composer環境を作る

GCPのCloud ShellよりComposer環境を作ります。

$ gcloud composer environments create composer1  --location asia-northeast1 

Dataflowのテンプレートを作る

テンプレートはこちらとほぼ同じものを使います。
www.case-k.jp
テンプレートを作る際に出力ファイル数を1つにし、拡張子もCSVにしたいので、以下の変更を加えテンプレートを作ります。デフォルトだと出力ファイルは複数となり、テンプレートとしてファイル末尾は00000-of-00001となります。
beam.apache.org

 beam.io.WriteToText(myoptions.outputFile,num_shards=1,shard_name_template=''))

今回使うDataflowのテンプレートです。GCSからGCSに書き込みを行います。
# dataflow_gcs_to_gcs.py

# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# プロジェクトID
PROJECTID = '[project id]'

# オプション設定
class MyOptions(PipelineOptions):
    @classmethod  
    def _add_argparse_args(cls, parser):
        #parser.add_argument('--input',
        #                    help='Input for the pipeline',
        #                    default='gs://{}/sample2.csv'.format(PROJECTID))
        # parser.add_argument('--output',
        #                    help='Output for the pipeline',
        #                    default='gs://{}/output/sample2.csv'.format(PROJECTID))
        
        # 実行時に指定するパラメータ
        parser.add_value_provider_argument('--inputFile',
                            help='InputFile for the pipeline',
                            default='gs://{}/output/sample1016.csv'.format(PROJECTID))
        parser.add_value_provider_argument('--outputFile',
                            help='OutputFile for the pipeline',
                            default='gs://{}/output/sample1006.csv'.format(PROJECTID))
            
# オプション設定
myoptions = MyOptions()
options = beam.options.pipeline_options.PipelineOptions(options=myoptions)

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'job1016'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID)

# テンプレート配置
#gcloud_options.template_location = 'gs://{}/template/{}/-twproc_tmp'.format(PROJECTID,PROJECTID)
#gcloud_options.template_location = 'gs888/{}/template/-twproc_tmp2'.format(PROJECTID)
gcloud_options.template_location = 'gs://{}/template/GCS_TO_GCS_5'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

p = beam.Pipeline(options=options)
( p | 'read' >> beam.io.ReadFromText(myoptions.inputFile) 
   | 'write' >> beam.io.WriteToText(myoptions.outputFile,num_shards=1,shard_name_template=''))
p.run()

DAGファイルを作る

テンプレートの配置が完了したら、ワークフローの実行順序を記載したDAGファイルを作り、Composerが参照するGCS上に配置します。テンプレート順次実行していき、参照するファイルは前のテンプレートで作られたファイルとします。
# dag_dataflow_gcs_to_gcs.py

from __future__ import print_function

import datetime
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models
#from airflow.operators import bash_operator
#from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
    'retries': 1,
    'dataflow_default_options': {
        'project': '[project id]',
        #'region': 'europe-west1',
        #'zone': 'europe-west1-d',
        #'tempLocation': 'gs://[project id]/template/custom_template_1008',
    }
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.

template_path = 'gs://[project id]/template/GCS_TO_GCS_5'
with models.DAG(
        'composer_dataflowtemplate1016_3',
        schedule_interval='@once',
        default_args=default_dag_args,
        concurrency=1,
        max_active_runs=1) as dag:

    execute_dataflow_1 = DataflowTemplateOperator(
        task_id='datapflow_example1',
        template=template_path,
        parameters={
            'inputFile': "gs://[project id]/sample.csv",
            'outputFile': "gs://[project id]/composer_output/sample_1.csv",
        },
        dag=dag)
    execute_dataflow_2 = DataflowTemplateOperator(
        task_id='datapflow_example2',
        template=template_path,
        parameters={
            'inputFile': "gs://[project id]/composer_output/sample_1.csv",
            'outputFile': "gs://[project id]/composer_output/sample_2.csv",
        },
        dag=dag)
    execute_dataflow_3 = DataflowTemplateOperator(
        task_id='datapflow_example3',
        template=template_path,
        parameters={
            'inputFile': "gs://[project id]/composer_output/sample_2.csv",
            'outputFile': "gs://[project id]/composer_output/sample_3.csv",
        },
        dag=dag)
    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    execute_dataflow_1 >> execute_dataflow_2 >> execute_dataflow_3

DAGファイルをComposer環境に配置し逐次処理を行う

DAGファイルをComposerに配置しDataflowテンプレートを使った逐次処理を行います。

# 参照ファイルを配置
gsutil cp ./gcp/composer/sample.csv gs://[project id ]/sample.csv

# DAGファイルを配置
gcloud composer environments storage dags import \
  --environment composer1  --location asia-northeast1  \
  --source gs://[project id ]/dag_dataflow_gcs_to_gcs.py

DAGファイルを配置すると逐次処理が実行されます。Composerの「composer_dataflowtemplate1016_3schedule」のGraph ViewよりDataflowテンプレートが指定した順番で実行されていることを確認することができます。
f:id:casekblog:20191018183222p:plain
バケットにファイルが作られたのが確認できます。指定したフォーマットであるCSVで、ファイルサイズも問題ないことが確認できます。
f:id:casekblog:20191016225422p:plain

確認できたのでComposer環境を削除します。

$ gcloud composer environments delete composer1  --location asia-northeast1 
Deleting the following environments:
 - [composer1] in [asia-northeast1]
Do you want to continue (Y/n)?  y


以上となります。

参考
airflow.apache.org
cloud.google.com
medium.com