業務でDigdagを使う機会かあったので、DigDagを使ってBigQueryのジョブを実行してみました。
事前準備
JDK 8u72をインストール
https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html
JDKの最新版をインストールしたところ最適化オプションが効いてるようでinitできなかったため。
github.com
DigDagをインストール
$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" $ chmod +x ~/bin/digdag $ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc $ source ~/.bashrc
サービスアカウントキーを発行
BigQueryの秘密情報を渡します。
サービスアカウントキーに次の権限を与えて発行します。
- BigQuery ジョブユーザー
- BigQuery データ編集者
$ cat [service account.json] |digdag secrets --local --set gcp.credential=@[service account.json]
登録されていることを確認
$ digdag secrets --local 2020-05-07 17:40:55 +0900: Digdag v0.9.41 gcp.credential
DigDagを使ってみる
Digdagでジョブと依存関係を定義します。
ジョブ
今回逐次処理を行いたいます。Step1を実行して実行結果をテーブルに書き込みます。Step2ではStep1で作られたテーブルを参照するジョブを実行します。
step1.sql
SELECT sex, age, COUNT(*) as cnt FROM `bigquery-public-data.ml_datasets.census_adult_income` GROUP BY sex, age
step2.sql
SELECT sex, AVG(cnt) as mean_cnt FROM digdag_tst.step1_tb GROUP BY sex
依存関係を定義
_export: bq: dataset: digdag_tst +step1: bq>: queries/step1.sql estination_table: step1_tb +step2: bq>: queries/step2.sql destination_table: step2_tb
実行
ワークフローを実行してみます。
digdag run job.dig 2020-05-07 18:12:10 +0900: Digdag v0.9.41 2020-05-07 18:12:21 +0900 [WARN] (main): Reusing the last session time 2020-05-07T00:00:00+00:00. 2020-05-07 18:12:21 +0900 [INFO] (main): Using session /Users/username/Desktop/workspace/digdag/wf_sample/.digdag/status/20200507T000000+0000. 2020-05-07 18:12:21 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job session_time=2020-05-07T00:00:00+00:00 2020-05-07 18:12:22 +0900 [INFO] (0018@[0:default]+job+step1): bq>: queries/step1.sql 2020-05-07 18:12:22 +0900 [INFO] (0018@[0:default]+job+step1): bq>: queries/step1.sql 2020-05-07 18:12:23 +0900 [INFO] (0018@[0:default]+job+step1): Submitting BigQuery job: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328 2020-05-07 18:12:29 +0900 [INFO] (0018@[0:default]+job+step1): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328 2020-05-07 18:12:29 +0900 [INFO] (0018@[0:default]+job+step1): BigQuery job still running: digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328: checking again in 1s 2020-05-07 18:12:31 +0900 [INFO] (0018@[0:default]+job+step1): bq>: queries/step1.sql 2020-05-07 18:12:31 +0900 [INFO] (0018@[0:default]+job+step1): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328 2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step1): BigQuery job successfully done: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328 2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step2): bq>: queries/step2.sql 2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step2): bq>: queries/step2.sql 2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step2): Submitting BigQuery job: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae 2020-05-07 18:12:33 +0900 [INFO] (0018@[0:default]+job+step2): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae 2020-05-07 18:12:34 +0900 [INFO] (0018@[0:default]+job+step2): BigQuery job still running: digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae: checking again in 1s 2020-05-07 18:12:35 +0900 [INFO] (0018@[0:default]+job+step2): bq>: queries/step2.sql 2020-05-07 18:12:36 +0900 [INFO] (0018@[0:default]+job+step2): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae 2020-05-07 18:12:36 +0900 [INFO] (0018@[0:default]+job+step2): BigQuery job successfully done: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae Success. Task state is saved at /Users/username/Desktop/workspace/digdag/wf_sample/.digdag/status/20200507T000000+0000 directory. * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time. * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.
実行できました。BigQueryのコンソールからも確認できます。