case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

DataflowのDynamic Destinationsを使って動的に出力先のテーブルを変える

オンプレ環境のテーブルをBigQueryにリアルタイムに連携するために、DataflowのDynamic Destinations機能を使いました。Dynamic Destinationsを使うことでテーブル名を考慮して出力先を変えることができます。カスタムテンプレートを作るにあたり、ベーステンプレートとしてGoogle提供のテンプレート「PubSubToBigQuery.java」を使いました。Dynamic Destinationsは現時点だとJavaのみのサポートになっています。

Google提供のテンプレートを使う

カスタムテンプレートを作るにあたりベースのテンプレートとして、Google提供のテンプレートを加工して新しくテンプレートを作ることにしました。
DataflowTemplates/PubSubToBigQuery.java at master · GoogleCloudPlatform/DataflowTemplates · GitHub

プロジェクトを取得

Dynamic Destinationsを使うにあたり、出力先のプロジェクトを選択する必要があります。テンプレート作成時のリクエストパラメータで渡したプロジェクトを取得できるようにOptionsで次のように定義します。

public interface Options extends PipelineOptions, JavascriptTextTransformerOptions {
 ・・・
    @Description("project id read from Template Params")
    @Default.InstanceFactory(value=GcpOptions.DefaultProjectFactory.class)
    String getProject();
    void setProject(String value);
  ・・・
 }

GcpOptions
GcpOptions (Apache Beam 2.16.0-SNAPSHOT)
getProject()
java - Apache Beam Maven Dependency Error - Stack Overflow

DynamicDestinationsを使う

次にDynamicDestinationsを使い、インプットストリームに含まれるテーブル名によって出力先を動的に変えます。事前にインプットテーブルのスキーマを定義する必要がありますが、今回は異なるスキーマの共通キーから出力先を分けたいので、データタイプとして「TableRow」を使います。

public static PipelineResult run(Options options) {
    
    ・・・
    String projectID = options.getProject();
    ・・・

    /*
     * Step #3: Write the successful records out to BigQuery
     */
    String projectID = options.getProject();
    WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT).apply(
        BigQueryIO.writeTableRows()
            .to(
                new DynamicDestinations<TableRow, String>() {
                @Override
                public String getDestination(ValueInSingleWindow<TableRow> elem) {
                    return elem.getValue().get("<table_name_key>").toString();
                }

                @Override
                public TableDestination getTable(String destination) {
                    return new TableDestination(
                        new TableReference()
                            .setProjectId(projectID)
                            .setDatasetId("<dataset_name>")
                            .setTableId("<table_name>" + "_" + destination),
                        "destination table" + destination);
                }

                @Override
                public TableSchema getSchema(String destination) {
                    TableSchema schema = schemaCreater(new TableSchema(), destination);
                    return schema;
                }
            })
            .withoutValidation()
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withExtendedErrorInfo()
            .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
   ・・・
 }

Fun with Serializable Functions and Dynamic Destinations in Cloud Dataflow – Shine Solutions Group
Google BigQuery I/O connector
DynamicDestinations

スキーマを定義

出力先のテーブルスキーマを定義します。

  public static TableSchema schemaCreater(TableSchema schema, String destination) {
    if (destination == "1002") {
        schema.setFields(
        ImmutableList.of(
            new TableFieldSchema()
                .setName("<massage_unique_id>")
                .setType("STRING")
                .setMode("NULLABLE"),
            new TableFieldSchema()
                .setName("<table_name_key>")
                .setType("STRING")
                .setMode("NULLABLE"),
            new TableFieldSchema()
                .setName("<col_a>")
                .setType("STRING")
                .setMode("NULLABLE")
            ));
    } else if (destination == "1005") {
        schema.setFields(
        ImmutableList.of(
            new TableFieldSchema()
                .setName("<massage_unique_id>")
                .setType("STRING")
                .setMode("NULLABLE"),
            new TableFieldSchema()
                .setName("<table_name_key>")
                .setType("STRING")
                .setMode("NULLABLE"),
            new TableFieldSchema()
                .setName("<col_b>")
                .setType("STRING")
                .setMode("NULLABLE")
            ));
    } else {
        schema.setFields(
        ImmutableList.of(
            new TableFieldSchema()
                .setName("<massage_unique_id>")
                .setType("STRING")
                .setMode("NULLABLE"),
            new TableFieldSchema()
                .setName("<table_name_key>")
                .setType("STRING")
                .setMode("NULLABLE"),
            new TableFieldSchema()
                .setName("<col_c>")
                .setType("STRING")
                .setMode("NULLABLE")
            ));
    }
    return schema;
  }

ライブラリ

参考までにライブラリは次の通りです。Google提供のテンプレートに必要なライブラリを追記しています。

package com.google.cloud.teleport.templates;

import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.ResourceUtils;
import com.google.cloud.teleport.util.ValueProviderUtils;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.collect.ImmutableList;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

カスタムテンプレートの作成

テンプレート用のコードがかけたらコンパイルして、GCSにカスタムテンプレートを出力します。

#!/bin/bash
# bash PubSubToBigQueryDynamicDestinationsTemplate.sh <project id>
project_id=$1
gcloud config set project ${project_id}
source ~/.bash_profile
mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQueryDynamicDestinations \
  -Dexec.args="--project=${project_id} \
              --tempLocation=gs://${project_id}/tmp \
              --templateLocation=gs://${project_id}/templates/PubSubToBigQueryDynamicDestinationsTemplate \
              --runner=DataflowRunner"

所感

Dynamic Destinationsを使うことで、サーバ費用も抑えることができるので非常に便利でした。
事前にインプットのスキーマを定義する必要があると思いましたが、データタイプとして「TableRow」を使えば異なるスキーマにも対応できたので安心しました。