case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Cloud FunctionsでDataflowテンプレートをキックさせる方法:Python

Cloud FunctionsでGCSのバケットに置かれたファイルを検知し、Dataflowのテンプレートをキックします。パラメータとしてCloud Functionsでファイル名を取得し、Dataflowのテンプレートに引数として渡します。GCSから加工しGBQに取り込むケースなどに使います。

code

github.com


# functions_dataflow_job_start.py

def _dataflow_job_start(data, context):
    # read from gcs
    from googleapiclient.discovery import build
    PROJECTID = data['bucket']
    file_name = data['name']

    job = 'job from cloud functions_3'
    #template = "gs://{}/template/custom_template_1008".format(PROJECTID)
    template = "gs://{}/template/GCS_TO_GCS_2".format(PROJECTID)
    parameters = {
         #'input': "gs://{}/{}".format(PROJECTID,file_name),
         #'output': "gs://{}/output/{}".format(PROJECTID,file_name),
         'inputFile': "gs://{}/{}".format(PROJECTID,file_name),
         'outputFile': "gs://{}/output/{}".format(PROJECTID,file_name),
     }

    
    service  = build("dataflow","v1b3",cache_discovery=False)
    #templates = service.projects().templates()
        
    request = service.projects().templates().launch(
        projectId=PROJECTID,
        gcsPath=template,
        body={
            'jobName': job,
            'parameters': parameters,
        }
    )
    return request.execute()

f:id:casekblog:20191008185306p:plain

テンプレートをキックできたことを確認します。
f:id:casekblog:20191008190522p:plain

バケットにファイルが作られたことも確認できました。
f:id:casekblog:20191015193229p:plain


参考
cloud.google.com
github.com