case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

DigdagとEmbulkを使って並列処理をしてみる

DigdagとEmbulkを使って並列処理を行った際の備忘録です。

Embulkを使った並列処理

Embulkを使って並列処理を行いたい場合は「_parallel: true」として次のように書きます。次の処理はCSV3ファイルを並列にBigQueryにロードします。

+prepare:
  # +data1, +data2, and +data3 run in parallel.
  _parallel: true
  +data1:
    sh>: /root/bin/embulk run ./embulk/load.yml.liquid

  +data2:
    sh>: /root/bin/embulk run ./embulk/load2.yml.liquid

  +data3:
    sh>: /root/bin/embulk run ./embulk/load3.yml.liquid

Workflow definition — Digdag 0.9.41 documentation

環境や実装方法について

環境としては次のようにして、DigdagとEmbulkを使って並列処理を行うジョブを定義します。

Dockerfie

Dockerfieは次のようにしてDigdagとEmbulkをインストールして、コンテナ構築ごはコンソールから確認できるようサーバを起動します。
DigdagはJavaの最新版だと使えないのがちょっとあれです。JDK 8u72を使いました。
Java Archive Downloads - Java SE 8
GUIのところはこちらの記事を参考にさせて頂きました。
GUIから理解するDigdagチュートリアル | Developers.IO

FROM lwieske/java-8:jdk-8u72
COPY . /usr/src/app
WORKDIR /usr/src/app/workflow
RUN curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
RUN curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
#COPY ./modules/digdag /root/bin/digdag
#COPY ./modules/embulk /root/bin/embulk
RUN chmod +x /root//bin/digdag
RUN chmod +x /root/bin/embulk
RUN /root/bin/embulk gem install embulk-output-bigquery
RUN echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
RUN source ~/.bashrc
CMD /root/bin/digdag server -m --bind 0.0.0.0 -O /var/log/digdag -c /usr/src/app/config/digdag.conf

秘密情報

秘密情報は次のようにして管理してます。この設定がないとGUIに秘密情報を登録できないので必須です。

# digdag.conf
digdag.secret-encryption-key=<secretをDigdag ServerがDBに保存する時に暗号化するキー>
secrets.gcp.credential=<service account key>

DigdagのSecret機能を使う - Qiita

docker-compose

docker-composeは次のように定義します。環境変数としてGCPのサービスアカウントキーを渡します。

# docker-compose.yml
version: '3'
services:
  digdag:
    build:
      context: ./
    image: digdag
    tty: true
    container_name: digdag
    ports:
      - 65432:65432
    environment:
      - GCP_SERVICE_ACCOUNT_PATH=/usr/src/app/config/gcp-service-account.json

プロジェクトと秘密情報の登録

コンテナを起動したら次の処理を実行し、プロジェクトと秘密情報を登録します。
これを行わないとコンソールからは確認できません。

#!/bin/bash
/root/bin/digdag push workflow
cat $GCP_SERVICE_ACCOUNT_PATH | /root/bin/digdag secrets --project workflow --set gcp.credential=@$GCP_SERVICE_ACCOUNT_PATH

Embulkの設定ファイル。

ジョブは次のように定義しています。CSVをBigQueryに登録します。

#  ./embulk/load.yml.liquid
in:
  type: file
  path_prefix: /usr/src/app/workflow/data/sample.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_hander_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: 'sex', type: string}
    - {name: 'age', type: long}
    - {name: 'cnt', type: long}
out:
  type: bigquery
  mode: append
  auth_method: service_account
  json_keyfile: /usr/src/app/config/gcp-service-account.json
  project: <project  id>
  dataset: digdag_tst
  table: sample

以上となります。