case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Dataflowが解決するストリーミング処理の課題と基盤構築で考慮すること

Dataflowが解決するストリーミング処理の課題と基盤を作る上で考慮すべき点をいくつか資料を参考に備忘録もかねて整理してみました。

ストリーミング処理の概要

ストリーミング処理とは

ストリーミング処理は「無制限に増え続けるデータ」を処理するためのプログラミングモデルです。ストリーミング処理は推論モデルでイベントや気象情報など動的に変わる変数をリアルタイムに捉えたい場合などに使われます。
f:id:casekblog:20191108195048p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913

バッチ処理との違い

ストリーミング処理と比較されるバッチ処理は「データの量・範囲が明確なデータ」を処理するためのモデルです。いま現在ではなく、過去の傾向を捉えたい場合に利用されています。

ストリーミング処理の課題

ストリーミング処理にはバッチ処理では考慮する必要がなかった課題が大きく3つ考えられます。

データ量と変動性

ストリーミングデータは無制限に増え続けるデータです。データ量が多く、イベントなどで変動が大きくなる場合もあります。過去に基づいた想定値からインフラを構成し備えることはできますが、データ量が少ない時間帯も存在します。このような時間帯では必要以上のインフラを構成しているために無駄なコストが発生します。また、データサイズが過去に基づいた想定値を超えてしまった場合は処理しきれずに、レイテンシーの低下やサーバがダウンしてしまうことも考えられます。

遅延データの扱い

ストリーミングデータを扱う上で「遅延データ」の扱いは大きな課題となります。「イベント発生時刻」と「サーバ受信時刻」は同じではありません。ここで問題となってくるのが、「どのタイミング」で処理を行うかです。「速度」と「正確性」はトレードオフな関係です。遅延データを待てば、正確性は向上しますが、出力するタイミングが遅くなり、リアルタイム性が失われます。一方、遅延データの到着を待たないとリアルタイム性は担保されますが、正確性は失われます。このように、「速度」と「正確性」はトレードオフな関係で遅延データの扱いはストリーミングデータを扱う上でもっとも大きな課題と言えます。
f:id:casekblog:20191110142034p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913

異なるプログラミングモデル

従来、ストリーミング処理とバッチ処理はプログラミングモデルが異なるため、別々のプログラミングモデルを用いてモジュールを作る必要性がありました。現在と過去のデータを比較したり、推論モデルで過去の傾向も取り入れたい場合など、重複する処理をそれぞれ作る必要がありました。

Dataflowでどのように解決すのか

Datafllowにはストリーミング処理で想定される課題を解決するための機能が用意されています。前項であげた3つの課題に対して、Dataflowは「オートスケール」、「遅延データの制御」、「プログラミングモデルの統一」を行うことで解決することが可能です。

オートスケール

先ほど記載した通り「データ量と変動性」はストリーミング処理を行う上で課題です。Dataflowは受信したデータを処理するために、必要な分だけサーバを用意する「オートスケール」機能があります。サーバレスで必要な分だけサーバ立てるのでインフラ費用を抑えることが可能です。
f:id:casekblog:20191108195429j:plain:w500
Dataflowをオートスケールさせてみる - Qiita
※ 現時点ではPythonはストリーミング処理ののオートスケールをサポートできていません。
www.case-k.jp

遅延データの制御

Dataflowにはイベント時間のどの区間のデータをどのタイミングで出力するのか制御するための機能が用意されています。重要となるのが、「ウィンドウ」、「ウォーターマーク」、「トリガー」、「アキュミュレーション」といった概念です。少し長くなるので詳細については次項で記載しています。

プログラミングモデルの統一

DataflowはオープンソースであるApach Beamを採用しています。Apach Beamは「バッチ処理」と「ストリーミング処理」を同じプログラミングモデルで扱うことができます。通常ストリーミングデータとバッチ処理は異なるプログラミングモデルが必要となるため、大きな差別化要素となっています。
f:id:casekblog:20191107211458p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913
Dataflow はフルマネージドな「オートスケーリング」環境をApach Beam パイプラインに提供する役割を果たしています。互いに機能を補完することでストリーミング処理とバッチ処理のプログラミングモデルを統一を実現し(Apach Beam)、オートスケール 環境(Dataflow)で実行できるようになっています。Dataflowを使うことで他のGCPプロダクトとのシームレスな連携や並列分散処理が可能となりました。バッチ処理はGCSやGBQから、ストリーミング処理はPub/Subから入力先を変更させるだけで簡単に対応することが可能です。

ストリーミング処理

p = beam.Pipeline(options=options)
(p | 'ReadFmPubSub' >> beam.io.ReadFromPubSub(topic=input_topic)
  | 'ParseCSV' >> beam.ParDo(Split())
  | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{}:{}'.format(PROJECTID,table_spec), schema=table_schema)
 )
p.run()

バッチ処理

p = beam.Pipeline(options=options)
(p | 'ReadFmGCS' >> beam.io.ReadFromText('gs://[project id]/shakespeare_word _cnt.csv', skip_header_lines=1)
  | 'ParseCSV' >> beam.ParDo(Split())
  | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{}:{}'.format(PROJECTID,table_spec), schema=table_schema)
 )
p.run()

バッチ処理とストリーミング処理で、プログラミングモデルが同じなので以下のようなパイプライン制御を実現することができます。
f:id:casekblog:20191107211830p:plain:w500
https://www.amazon.co.jp/Data-Science-Google-Cloud-Platform/dp/1491974567

遅延データの制御 (機能詳細)

ウィンドウとは

ウィンドウとは「イベント発生時刻」に基づいた処理を実現します。先ほど記載した通り、「イベントの発生時刻」と「サーバの処理時刻」は異なります。確認したいのはイベント発生時刻に基づいた出力値です。ウィンドウ処理を使うことで「イベント時間のどの部分を集計するか?」を明らかにすることができます。
f:id:casekblog:20191107200703p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913
代表的なウィンドウ処理である「固定ウィンドウ」、「スライディングウィンドウ」、「セッションウィンドウ」を紹介できればと思います。

固定ウィンドウ

固定ウィンドウは決められたイベント発生時刻に基づいて集計します。例えばWEBサイトのイベント発生時刻に基づいた1時間ごとの人数を集計したい場合などに活用できます。
f:id:casekblog:20191107171249p:plain:w500
https://cloud.google.com/dataflow/docs/guides/sql/streaming-pipeline-basics?hl=ja
固定ウィンドウ(集計単位:60分(3600秒))

from apache_beam import window
fixed_windowed_items = (
    items | 'window' >> beam.WindowInto(window.FixedWindows(60 * 60)))
スライディングウィンドウ

スライディングウィンドウは指定したウィンドウ処理の開始時期を変更できる処理です。ウィンドウの開始タイミングをピリオドと呼びます。例えばウィンドウサイズを1時間、ピリオドを10分で設定した場合、10分間隔で1時間を集計区間とした新しいウィンドウ処理が開始されます。活用用途としては局所的なスパイクを避けたい場合など移動平均を求めたい時に利用することができます。例えば、WEBサイトで10分ごとに1時間ごとの平均人数を集計し、集計結果から移動平均人数を計算したい場合などに活用できます。
f:id:casekblog:20191107171318p:plain:w500
https://cloud.google.com/dataflow/docs/guides/sql/streaming-pipeline-basics?hl=ja
スライディングウィンドウ(集計単位:60分(3600秒)、ピリオド:5分(300秒))

from apache_beam import window
sliding_windowed_items = (
    items | 'window' >> beam.WindowInto(window.SlidingWindows(60 * 60, 5 * 60)))
セッションウィンドウ

セッションウィンドウは時間だけでなく、KEY単位でトラッキングしたい場合です。配信間隔が不規則なデータに対して役立ちます。セッションウィンドウは最小ギャップ時間を設定することができ、最小ギャップ時間以降に到着したウィンドウは別ウィンドウとして処理されます。
f:id:casekblog:20191107171335p:plain:w500
https://cloud.google.com/dataflow/docs/guides/sql/streaming-pipeline-basics?hl=ja
セッションウィンドウ(最小ギャップ:10分(600秒))

from apache_beam import window
session_windowed_items = (
    items | 'window' >> beam.WindowInto(window.Sessions(10 * 60)))

これまでは「時間」単位で処理していたのに対し、「KEY」単位で処理を行うことができます。活用用途としてはユーザの行動に基づくアクションを集計したい場合などです。少しややこしいので想定されるケースを2例ほど記載します。

例1 :WEBページごとのアクセス数を各ユーザーごと集計したい場合
※ 前提条件としてユーザーセッションは30分ごとに切れる

このような場合はギャップ期間が30分のセッションウィンドウを使う必要があります。集計方法としては、まずユーザ・ページごとの訪問数を集計し、その後ページ単位でアクセス数を集計する必要がある。固定ウィンドウでは対応できません。固定ウィンドウではイベント時刻に基づいてウィンドウをきるため、30分の固定ウィンドウを設定してもユーザー単位の集計結果を計算することができません。

例2 :WEBサイトで1時間に渡ってユーザーのインタラクションがない際に対象のユーザにPUSH通知を出したい場合

このような場合はギャップ期間が60分のセッションウィンドウを使う必要があります。IDとイベント発生時刻を考慮した判定条件をDataflowで作り、訪問してから60分間インタラクションがないユーザにPUSH通知を行います。

ウォーターマークとは

ウォーターマークとは確定したウィンドウのデータがすべてパイプラインに到着したとシステムが見なすタイミングのことです。
ウォーターマークを超えて到着したデータは遅延データとして扱われます。Dataflowは最適なウォータマークを学習します。左側の図は遅延データがくるまで待機し、右の図はDataflowの予測に基づいたウォーターマークです。この場合、遅延データは計算に考慮されません。
f:id:casekblog:20191107192439p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913

トリガーとは

トリガーはウォーターマークに基づいて出力するタイミングのことです。

アキュミュレーションとは

アキュミュレーションとは集計結果出力時の計算法式のことです。計算方式としては以下が想定されます。
・破棄モード
・累積モード
・累積 & 後退モード

破棄モード

遅延データを破棄するか、累積するかです。デフォルトではDataflowはウィンドウに基づいて適切なタイミングを学習しウォータマークを決めます。ウォータマーク以降のデータは集計結果に反映されません。右の図では「正確性を犠牲にして速度を向上」させています。
f:id:casekblog:20191107192439p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913

累積モード

迅速に出力結果を取得したい一方で、遅延データが到着したら再計算したい場合もあると思います。左の図では「速度を犠牲にして正確性を向上」させています。
f:id:casekblog:20191107192439p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913

累積 & 後退モード

こちらは累積と破棄モードのバランスをとった形となります。こちらはDataflowのヒューリスティックウォーターマークに基づいて一度出力を行います。出力後、ウォーターマークを超えて後から遅延データがウィンドウに届いた場合に新しく届いたデータを使い再度トリガーを実行し計算処理を行い出力します。間に合うようであればストリーミング処理に更新した正確性の高いデータを使うことができます。
f:id:casekblog:20191107192848p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913

Dataflowでストリーミング処理の基盤作成で考慮すること

Dataflowでストリーミング処理を扱う場合は、イベント時間のどの部分に対し、何をどのように計算して、いつ出力するのか決める必要があります。

何を計算するか?(ETL)

DataflowはETLツールであり、入力から出力までの過程でデータの前処理を行います。前処理のためにどこからデータを受け取り、何を行い、どこに出力するのか考える必要があります。

イベント時間のどこを対象にするか?(ウィンドウ)

イベント発生時刻のどの区間を対象に処理を行うのか検討する必要があります。前項で紹介したようにDataflowには様々なウィンドウ関数が用意されています。要件にあったウィンドウ処理を行い、イベント発生時刻のどの区間を対象に計算するのか決める必要があります。

処理時間のどの時点を対象にするか?(ウォーターマーク & トリガー)

遅延データをどこまで待ち、どのタイミングで出力するか検討する必要があります。遅延データの待機時間を決め、どのタイミングで出力するか決める必要があります。

更新をどのように関連付けるか?(アキュミュレーション)

遅延データを「破棄」するのか、「累積」するのか、遅れて新しいデータが届いたら再度トリガーを実行して更新するのかなど、決める必要があります。ストリーミング処理にはCloud Pub/Subを使います。Cloud Pub/Subの概要はこちらの記事を確認してみてください。
www.case-k.jp

まとめ

Dataflowを使うことでストリーミングデータの課題解決が可能です。記載したとおり、オートスケール等開発言語の制限もあるので注意が必要です。Dataflowでストリーミング処理基盤を構築する際はどのような処理(ETL)をどの区間(ウィンドウ)でいつ(ウォーターマーク & トリガー)どうやって(アキュミュレーション)出力するのか考える必要があります。

参考
[Cloud OnAir] GCP 上でストリーミングデータ処理基盤を構築してみよう! 2018年9月13日 放送
ストリーム処理とは何か?+2016年の出来事 - Qiita
Beam Programming Guide