case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

技術調査メモ:Google提供のDataflowテンプレートでできること、できないこと_20200604

Dataflow テンプレートを使用すると、Cloud Storage 上のパイプラインをステージングして、さまざまな環境で実行できます。テンプレートは、Google 提供のテンプレートを使用することも、自身で作成することもできます。
Google提供のDataflowテンプレートでできること、できないことを調べてみました。

Googleが提供しているテンプレートを使う場合

1. メッセージ重複

Publisherが複数回Publishした場合

メッセージの重複は発生するようです。メッセージの重複を排除したい場合は、Pub/Subで付与されるメッセージIDだけではなく、Publish時にPublisherで定義して渡す必要がある。Pub/Subで付与されたIDではPublisherが複数回Publishした場合に対応できないため。
例えばサーバーがクラッシュしてから回復して、メッセージを PubSub に再びパブリッシュしたケースなどが考えられます。
https://cloud.google.com/community/tutorials/pubsub-spring-dedup-messages
https://cloud.google.com/blog/products/gcp/after-lambda-exactly-once-processing-in-cloud-dataflow-part-3-sources-and-sinks

Apache Beamドキュメントにもユニークを担保するにはwithIdAttributeを使う必要があると書いてありました。
https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withIdAttribute-java.lang.String-

しかし、withIdAttributeを使っても、Googleドキュメントに10分を超える場合は重複を排除できません。Apache Beamのドキュメント(withIdAttribute)に記載はありませんが計測したところ10分間のみ有効でした。
https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids

Pub/Subで自動付与されるmessageId重複排除

Pub/Subで自動付与されるmessageIdはDataflowで自動的に重複排除をしています。
https://qiita.com/malt/items/26571eeccf850815c6ac
Pub/Sub を使用したストリーミング  |  Cloud Dataflow  |  Google Cloud

3. ウィンドウ処理やウォーターマーク対応可否について

ウィンドウ処理やウォーターマークには対応していないようです。ソースコードをみると必要な処理がありませんでした。
https://beam.apache.org/documentation/programming-guide/#windowing

Dataflowのカスタムテンプレートを作る場合

カスタムテンプレートを作る場合は、Java、もしくはPythonが良さそう。

1. Python3系のサポート状況

サポート済み
https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-1251

2. オートスケール対応

対応済み
https://cloud.google.com/dataflow/docs/resources/faq?hl=ja#python_6

4. Publisherが複数回Publishした場合

id_labelを使ってメッセージの重複を排除することができます。
https://beam.apache.org/releases/pydoc/2.8.0/_modules/apache_beam/io/gcp/pubsub.html

その他

BigQueryのInsert APIを使った重複排除もある。
>データの整合性を維持するために、挿入された各行に対し insertId を指定できます。
>BigQuery は、この ID を少なくとも 1 分間記憶します。この時間内に同じ行セットのストリーミングを試行した場合、insertId プロパティが設定されていれば、BigQuery は insertId プロパティを使用してデータの重複をベスト エフォートで排除します。
https://cloud.google.com/bigquery/streaming-data-into-bigquery?hl=ja#dataconsistency

所感

Googleのテンプレートを使う場合はwithIdAttributeを使ってないので重複は発生する。
目的が重複の排除であれば、Publisher側でメッセージのユニークIDを割り当てる必要があり、DataflowではなくBigQueryのViewでも対応できそう。(リアルタイムで集計値や推論モデルの特徴量を作りたい場合はDataflowが有効)
また、Googleのドキュメントに記載されているwithIdAttributeの10分制限が事実であればwithIdAttributeを使っても重複は発生する。
Dataflowを使う場合、ウィンドウ処理やウォーターマークを使い集計時に重複を排除してあげるのが良さそう。
ウィンドウ処理やウォーターマークを使ったストリーミング処理を行いたい場合は、Googleのテンプレートではなくカスタムテンプレートを使う必要がある。
ストリーミングは正確性と速さがトレードオフなので、後々正値をみたい場合はそれ用にバッチでテーブルを作るのが良さそう。