case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Dataflowテンプレート実行方法:Python

GAEなどでDataflowのテンプレートの実行方法です。パラメータは以下のようにして渡します

code

github.com

Dataflowのテンプレートの実行

"parameters": {
   "input": "gs://{}/sample2.csv".format(PROJECTID),
   "output": "gs://{}/output/sample2_2.csv".format(PROJECTID),
   },
!pip install google-api-python-client

# dataflow_job_start.py

def _dataflow_job_start():
    # 必要なライブラリ
    from oauth2client.client import GoogleCredentials
    from oauth2client.service_account import ServiceAccountCredentials
    #from apiclient.discovery import build
    from googleapiclient.discovery import build
    
    # クライアントライブラリを初期化
    credentials = GoogleCredentials.get_application_default()
    service  = build("dataflow","v1b3",credentials=credentials)
    templates = service.projects().templates()
    
    # プロジェクトID
    PROJECTID = 'project id '
    
    # JOBの設定項目
    BODY = {
        "jobName": "job",
        "gcsPath": "gs://{}/template/custom_template_1008".format(PROJECTID),
        "environment":{
            "tempLocation": "gs://{}/temp".format(PROJECTID)
        },
        "parameters": {
            "input": "gs://{}/sample2.csv".format(PROJECTID),
            "output": "gs://{}/output/sample2_2.csv".format(PROJECTID),
        },
    }
    
    # JOBの実行
    dfrequest = service.projects().templates().create(projectId=PROJECTID, body=BODY)
    return dfrequest.execute()
Out 

{'id': '2019-10-07_20_09_12-5166112950796065349',
 'projectId': 'project id ',
 'name': 'job name',
 'type': 'JOB_TYPE_BATCH',
 'currentStateTime': '1970-01-01T00:00:00Z',
 'createTime': '2019-10-08T03:09:13.572502Z',
 'location': 'us-central1',
 'startTime': '2019-10-08T03:09:13.572502Z'}

実行結果を確認します。
f:id:casekblog:20191008121821p:plain

Cloud Functionsでテンプレートを実行する場合はこちらを確認ください。
www.case-k.jp


参考
qiita.com
stackoverflow.com