GAEなどでDataflowのテンプレートの実行方法です。パラメータは以下のようにして渡します
code
Dataflowのテンプレートの実行
"parameters": { "input": "gs://{}/sample2.csv".format(PROJECTID), "output": "gs://{}/output/sample2_2.csv".format(PROJECTID), },
!pip install google-api-python-client
# dataflow_job_start.py
def _dataflow_job_start(): # 必要なライブラリ from oauth2client.client import GoogleCredentials from oauth2client.service_account import ServiceAccountCredentials #from apiclient.discovery import build from googleapiclient.discovery import build # クライアントライブラリを初期化 credentials = GoogleCredentials.get_application_default() service = build("dataflow","v1b3",credentials=credentials) templates = service.projects().templates() # プロジェクトID PROJECTID = 'project id ' # JOBの設定項目 BODY = { "jobName": "job", "gcsPath": "gs://{}/template/custom_template_1008".format(PROJECTID), "environment":{ "tempLocation": "gs://{}/temp".format(PROJECTID) }, "parameters": { "input": "gs://{}/sample2.csv".format(PROJECTID), "output": "gs://{}/output/sample2_2.csv".format(PROJECTID), }, } # JOBの実行 dfrequest = service.projects().templates().create(projectId=PROJECTID, body=BODY) return dfrequest.execute()
Out {'id': '2019-10-07_20_09_12-5166112950796065349', 'projectId': 'project id ', 'name': 'job name', 'type': 'JOB_TYPE_BATCH', 'currentStateTime': '1970-01-01T00:00:00Z', 'createTime': '2019-10-08T03:09:13.572502Z', 'location': 'us-central1', 'startTime': '2019-10-08T03:09:13.572502Z'}
実行結果を確認します。
Cloud Functionsでテンプレートを実行する場合はこちらを確認ください。
www.case-k.jp