オンプレ環境のテーブルをBigQueryにリアルタイムに連携するために、DataflowのDynamic Destinations機能を使いました。Dynamic Destinationsを使うことでテーブル名を考慮して出力先を変えることができます。カスタムテンプレートを作るにあたり、ベーステンプレートとしてGoogle提供のテンプレート「PubSubToBigQuery.java」を使いました。Dynamic Destinationsは現時点だとJavaのみのサポートになっています。
Google提供のテンプレートを使う
カスタムテンプレートを作るにあたりベースのテンプレートとして、Google提供のテンプレートを加工して新しくテンプレートを作ることにしました。
DataflowTemplates/PubSubToBigQuery.java at master · GoogleCloudPlatform/DataflowTemplates · GitHub
プロジェクトを取得
Dynamic Destinationsを使うにあたり、出力先のプロジェクトを選択する必要があります。テンプレート作成時のリクエストパラメータで渡したプロジェクトを取得できるようにOptionsで次のように定義します。
public interface Options extends PipelineOptions, JavascriptTextTransformerOptions { ・・・ @Description("project id read from Template Params") @Default.InstanceFactory(value=GcpOptions.DefaultProjectFactory.class) String getProject(); void setProject(String value); ・・・ }
GcpOptions
GcpOptions (Apache Beam 2.16.0-SNAPSHOT)
getProject()
java - Apache Beam Maven Dependency Error - Stack Overflow
DynamicDestinationsを使う
次にDynamicDestinationsを使い、インプットストリームに含まれるテーブル名によって出力先を動的に変えます。事前にインプットテーブルのスキーマを定義する必要がありますが、今回は異なるスキーマの共通キーから出力先を分けたいので、データタイプとして「TableRow」を使います。
public static PipelineResult run(Options options) { ・・・ String projectID = options.getProject(); ・・・ /* * Step #3: Write the successful records out to BigQuery */ String projectID = options.getProject(); WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT).apply( BigQueryIO.writeTableRows() .to( new DynamicDestinations<TableRow, String>() { @Override public String getDestination(ValueInSingleWindow<TableRow> elem) { return elem.getValue().get("<table_name_key>").toString(); } @Override public TableDestination getTable(String destination) { return new TableDestination( new TableReference() .setProjectId(projectID) .setDatasetId("<dataset_name>") .setTableId("<table_name>" + "_" + destination), "destination table" + destination); } @Override public TableSchema getSchema(String destination) { TableSchema schema = schemaCreater(new TableSchema(), destination); return schema; } }) .withoutValidation() .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())); ・・・ }
Fun with Serializable Functions and Dynamic Destinations in Cloud Dataflow – Shine Solutions Group
Google BigQuery I/O connector
DynamicDestinations
スキーマを定義
出力先のテーブルスキーマを定義します。
public static TableSchema schemaCreater(TableSchema schema, String destination) { if (destination == "1002") { schema.setFields( ImmutableList.of( new TableFieldSchema() .setName("<massage_unique_id>") .setType("STRING") .setMode("NULLABLE"), new TableFieldSchema() .setName("<table_name_key>") .setType("STRING") .setMode("NULLABLE"), new TableFieldSchema() .setName("<col_a>") .setType("STRING") .setMode("NULLABLE") )); } else if (destination == "1005") { schema.setFields( ImmutableList.of( new TableFieldSchema() .setName("<massage_unique_id>") .setType("STRING") .setMode("NULLABLE"), new TableFieldSchema() .setName("<table_name_key>") .setType("STRING") .setMode("NULLABLE"), new TableFieldSchema() .setName("<col_b>") .setType("STRING") .setMode("NULLABLE") )); } else { schema.setFields( ImmutableList.of( new TableFieldSchema() .setName("<massage_unique_id>") .setType("STRING") .setMode("NULLABLE"), new TableFieldSchema() .setName("<table_name_key>") .setType("STRING") .setMode("NULLABLE"), new TableFieldSchema() .setName("<col_c>") .setType("STRING") .setMode("NULLABLE") )); } return schema; }
ライブラリ
参考までにライブラリは次の通りです。Google提供のテンプレートに必要なライブラリを追記しています。
package com.google.cloud.teleport.templates; import static com.google.cloud.teleport.templates.TextToBigQueryStreaming.wrapBigQueryInsertError; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.teleport.coders.FailsafeElementCoder; import com.google.cloud.teleport.templates.common.BigQueryConverters.FailsafeJsonToTableRow; import com.google.cloud.teleport.templates.common.ErrorConverters; import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf; import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions; import com.google.cloud.teleport.util.DualInputNestedValueProvider; import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput; import com.google.cloud.teleport.util.ResourceUtils; import com.google.cloud.teleport.util.ValueProviderUtils; import com.google.cloud.teleport.values.FailsafeElement; import com.google.common.collect.ImmutableList; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
カスタムテンプレートの作成
テンプレート用のコードがかけたらコンパイルして、GCSにカスタムテンプレートを出力します。
#!/bin/bash # bash PubSubToBigQueryDynamicDestinationsTemplate.sh <project id> project_id=$1 gcloud config set project ${project_id} source ~/.bash_profile mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQueryDynamicDestinations \ -Dexec.args="--project=${project_id} \ --tempLocation=gs://${project_id}/tmp \ --templateLocation=gs://${project_id}/templates/PubSubToBigQueryDynamicDestinationsTemplate \ --runner=DataflowRunner"