DigdagとEmbulkを使って並列処理を行った際の備忘録です。
Embulkを使った並列処理
Embulkを使って並列処理を行いたい場合は「_parallel: true」として次のように書きます。次の処理はCSV3ファイルを並列にBigQueryにロードします。
+prepare: # +data1, +data2, and +data3 run in parallel. _parallel: true +data1: sh>: /root/bin/embulk run ./embulk/load.yml.liquid +data2: sh>: /root/bin/embulk run ./embulk/load2.yml.liquid +data3: sh>: /root/bin/embulk run ./embulk/load3.yml.liquid
Workflow definition — Digdag 0_10_before_merge_v0_9_38 documentation
環境や実装方法について
環境としては次のようにして、DigdagとEmbulkを使って並列処理を行うジョブを定義します。
Dockerfie
Dockerfieは次のようにしてDigdagとEmbulkをインストールして、コンテナ構築ごはコンソールから確認できるようサーバを起動します。
DigdagはJavaの最新版だと使えないのがちょっとあれです。JDK 8u72を使いました。
Java Archive Downloads - Java SE 8
GUIのところはこちらの記事を参考にさせて頂きました。
GUIから理解するDigdagチュートリアル | Developers.IO
FROM lwieske/java-8:jdk-8u72 COPY . /usr/src/app WORKDIR /usr/src/app/workflow RUN curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest" RUN curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" #COPY ./modules/digdag /root/bin/digdag #COPY ./modules/embulk /root/bin/embulk RUN chmod +x /root//bin/digdag RUN chmod +x /root/bin/embulk RUN /root/bin/embulk gem install embulk-output-bigquery RUN echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc RUN source ~/.bashrc CMD /root/bin/digdag server -m --bind 0.0.0.0 -O /var/log/digdag -c /usr/src/app/config/digdag.conf
秘密情報
秘密情報は次のようにして管理してます。この設定がないとGUIに秘密情報を登録できないので必須です。
# digdag.conf digdag.secret-encryption-key=<secretをDigdag ServerがDBに保存する時に暗号化するキー> secrets.gcp.credential=<service account key>
DigdagのSecret機能を使う - Qiita
Digdag scheduler can't use secret. · Issue #587 · treasure-data/digdag · GitHub
docker-compose
docker-composeは次のように定義します。環境変数としてGCPのサービスアカウントキーを渡します。
# docker-compose.yml version: '3' services: digdag: build: context: ./ image: digdag tty: true container_name: digdag ports: - 65432:65432 environment: - GCP_SERVICE_ACCOUNT_PATH=/usr/src/app/config/gcp-service-account.json
プロジェクトと秘密情報の登録
コンテナを起動したら次の処理を実行し、プロジェクトと秘密情報を登録します。
これを行わないとコンソールからは確認できません。
#!/bin/bash /root/bin/digdag push workflow cat $GCP_SERVICE_ACCOUNT_PATH | /root/bin/digdag secrets --project workflow --set gcp.credential=@$GCP_SERVICE_ACCOUNT_PATH
Embulkの設定ファイル。
ジョブは次のように定義しています。CSVをBigQueryに登録します。
# ./embulk/load.yml.liquid in: type: file path_prefix: /usr/src/app/workflow/data/sample.csv parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' escape: '"' trim_if_not_quoted: false skip_hander_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: 'sex', type: string} - {name: 'age', type: long} - {name: 'cnt', type: long} out: type: bigquery mode: append auth_method: service_account json_keyfile: /usr/src/app/config/gcp-service-account.json project: <project id> dataset: digdag_tst table: sample
以上となります。