case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Digdagを使ってBigQueryのジョブを実行してみる

業務でDigdagを使う機会かあったので、DigDagを使ってBigQueryのジョブを実行してみました。

事前準備

JDK 8u72をインストール

https://www.oracle.com/java/technologies/javase/javase8-archive-downloads.html
JDKの最新版をインストールしたところ最適化オプションが効いてるようでinitできなかったため。
github.com

DigDagをインストール

$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

サービスアカウントキーを発行

BigQueryの秘密情報を渡します。
サービスアカウントキーに次の権限を与えて発行します。

  • BigQuery ジョブユーザー
  • BigQuery データ編集者
$ cat [service account.json] |digdag secrets --local --set gcp.credential=@[service account.json]

登録されていることを確認

$ digdag secrets --local
2020-05-07 17:40:55 +0900: Digdag v0.9.41
gcp.credential

Digdagを使ってBigQueryからテーブルをAvroフォーマットでExportする - Qiita

データセットをつくる

事前にデータセットを作っておきます。
データセット名:digdag_tst


DigDagを使ってみる

Digdagでジョブと依存関係を定義します。

ジョブ

今回逐次処理を行いたいます。Step1を実行して実行結果をテーブルに書き込みます。Step2ではStep1で作られたテーブルを参照するジョブを実行します。

step1.sql

SELECT
  sex,
  age,
  COUNT(*) as cnt
FROM
  `bigquery-public-data.ml_datasets.census_adult_income`
GROUP BY
  sex,
  age

step2.sql

SELECT
  sex,
  AVG(cnt) as mean_cnt
FROM
  digdag_tst.step1_tb
GROUP BY
  sex

依存関係を定義

_export:
  bq:
    dataset: digdag_tst
+step1:
  bq>: queries/step1.sql
  estination_table: step1_tb
+step2:
  bq>: queries/step2.sql
  destination_table: step2_tb

docs.digdag.io

ディレクトリ構成

ディレクトリ構成は次のようになっています。

└── wf_sample
    ├── job.dig
    └── queries
        ├── step1.sql
        └── step2.sql

実行

ワークフローを実行してみます。

digdag run job.dig
2020-05-07 18:12:10 +0900: Digdag v0.9.41
2020-05-07 18:12:21 +0900 [WARN] (main): Reusing the last session time 2020-05-07T00:00:00+00:00.
2020-05-07 18:12:21 +0900 [INFO] (main): Using session /Users/username/Desktop/workspace/digdag/wf_sample/.digdag/status/20200507T000000+0000.
2020-05-07 18:12:21 +0900 [INFO] (main): Starting a new session project id=1 workflow name=job session_time=2020-05-07T00:00:00+00:00
2020-05-07 18:12:22 +0900 [INFO] (0018@[0:default]+job+step1): bq>: queries/step1.sql
2020-05-07 18:12:22 +0900 [INFO] (0018@[0:default]+job+step1): bq>: queries/step1.sql
2020-05-07 18:12:23 +0900 [INFO] (0018@[0:default]+job+step1): Submitting BigQuery job: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328
2020-05-07 18:12:29 +0900 [INFO] (0018@[0:default]+job+step1): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328
2020-05-07 18:12:29 +0900 [INFO] (0018@[0:default]+job+step1): BigQuery job still running: digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328: checking again in 1s
2020-05-07 18:12:31 +0900 [INFO] (0018@[0:default]+job+step1): bq>: queries/step1.sql
2020-05-07 18:12:31 +0900 [INFO] (0018@[0:default]+job+step1): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328
2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step1): BigQuery job successfully done: [project id]:digdag_s0_p_default_w_job_t_2_a_1_08843218-25bd-44e4-9fca-7b174c347328
2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step2): bq>: queries/step2.sql
2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step2): bq>: queries/step2.sql
2020-05-07 18:12:32 +0900 [INFO] (0018@[0:default]+job+step2): Submitting BigQuery job: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae
2020-05-07 18:12:33 +0900 [INFO] (0018@[0:default]+job+step2): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae
2020-05-07 18:12:34 +0900 [INFO] (0018@[0:default]+job+step2): BigQuery job still running: digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae: checking again in 1s
2020-05-07 18:12:35 +0900 [INFO] (0018@[0:default]+job+step2): bq>: queries/step2.sql
2020-05-07 18:12:36 +0900 [INFO] (0018@[0:default]+job+step2): Checking BigQuery job status: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae
2020-05-07 18:12:36 +0900 [INFO] (0018@[0:default]+job+step2): BigQuery job successfully done: [project id]:digdag_s0_p_default_w_job_t_3_a_1_d1991b14-e869-41f2-a2d3-e41be527bcae
Success. Task state is saved at /Users/username/Desktop/workspace/digdag/wf_sample/.digdag/status/20200507T000000+0000 directory.
  * Use --session <daily | hourly | "yyyy-MM-dd[ HH:mm:ss]"> to not reuse the last session time.
  * Use --rerun, --start +NAME, or --goal +NAME argument to rerun skipped tasks.

実行できました。BigQueryのコンソールからも確認できます。
f:id:casekblog:20200507181620p:plain:w300

参考
GUIから理解するDigdagチュートリアル | Developers.IO