case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Dataflowテンプレート作成方法(Java)

DataflowのJava カスタムテンプレートの備忘録となります。pythonのテンプレート作成方法は次の通りです。
www.case-k.jp

環境構築

まずはMavenをインストールしてJavaプログラムのビルド環境を作ります。

tar xzvf apache-maven-3.6.3-bin.tar
  • Dockerfile
FROM google/cloud-sdk

RUN apt-get update
RUN apt-get install vim -y
RUN echo export M3_HOME=/tmp/working/apache-maven-3.6.3 >> ~/.bash_profile
RUN echo M3=$M3_HOME/bin >> ~/.bash_profile
RUN echo export PATH=$M3:$PATH >> ~/.bash_profile
RUN source ~/.bash_profile
RUN gcloud config set project <project ID>
  • docker-compose.yml
version: "3"
services:
  cloudsdk:
    build: .
    tty: true
    container_name: cloudsdk
    volumes:
      - $PWD:/tmp/working
    working_dir: /tmp/working

Maven – Download Apache Maven
Maven – Installing Apache Maven
【超初心者向け】Maven超入門 - Qiita
よく使うMavenコマンド集 - Qiita
Java と Apache Maven を使用したクイックスタート  |  Cloud Dataflow  |  Google Cloud

テンプレート作成

ビルド環境ができたのでテンプレートを作ります。

サンプルテンプレート作成

テンプレート機能はApache Beam ではなくDataflow独自のものです。パッケージには「-Dpackage=com.google.cloud.teleport.templates 」を適用する必要があります。
次のディレクトリにある共通モジュール「common」の方がコマンドから実行するモジュールより充実していますが、mvnでパッケージを選んで実行します。
github.com


Apache Beam 2.22.0-SNAPSHOT
Cloud Dataflow Runner
DataflowTemplates/WordCount.java at master · GoogleCloudPlatform/DataflowTemplates · GitHub

mvn archetype:generate \
      -DarchetypeGroupId=org.apache.beam \
      -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
      -DarchetypeVersion=2.20.0 \
      -DgroupId=org.example \
      -DartifactId=word-count-beam \
      -Dversion="0.1" \
      -Dpackage=com.google.cloud.teleport.templates \
      -DinteractiveMode=false

オプションを定義

オプションにテンプレと作成に必要なパラメータを定義します。
Beam Programming Guide

  public interface WordCountOptions extends PipelineOptions {

    /**
     * By default, this example reads from a public dataset containing the text of King Lear. Set
     * this option to choose a different input file or glob.
     */
    @Description("Path of the file to read from")
    @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
    String getInputFile();

    void setInputFile(String value);

    /** Set this required option to specify where to write the output. */
    @Description("Path of the file to write to")
    @Required
    String getOutput();

    void setOutput(String value);
    /**

    @Description("Staging for the pipeline")
    @Default.String("gs://streaming-datatransfer-dev/staging/")
    String setStagingLocation();
    void setStagingLocation(String value);
    */
    //@Description("Template for the pipeline")
    //@Default.String("gs://streaming-datatransfer-dev/templates/WordCountTmp.java")
    String getTemplateLocation();
    void setTemplateLocation(String value);
  }

どうやら「.waitUntilFinish();」を使うとテンプレートは作れますがエラーが出ます。Apache Beam のISSUEにも定義してありました。
「p.run()」でテンプレートを作ります。
Exception in thread "main" java.lang.UnsupportedOperationException: The result of template creation should not be used.
https://issues.apache.org/jira/browse/BEAM-2400

 // p.run().waitUntilFinish();
 p.run();

テンプレート作成

mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=com.google.cloud.teleport.templates.WordCount \
    -Dexec.args="--project=<project id> \
                --tempLocation=gs://<project id> /tmp \
                --templateLocation=gs://<project id> /templates/WordCountTmp \
                --output=gs://<project id> /output \
                --runner=DataflowRunner"

テンプレートの作成  |  Cloud Dataflow  |  Google Cloud
Java と Apache Maven を使用したクイックスタート  |  Cloud Dataflow  |  Google Cloud



GCS上にテンプレートが作られたことを確認します。

所感

Googleドキュメントないで見つけられなかっただけかもしれませんが、ドキュメントに記載のある方法で実行してもうまく実行できなかったので調べるのに時間がかかりました。