Cloud Pub/Subの概要とPythonでの使い方を記事にしました。簡単な概要と、Python Clientを使いトピック・サブスクリプションの作成からメッセージ送信・確認まで行います。
Code
Jupyter Notebookはこちらを参照ください。
github.com
Cloud Pub/Subの概要
Cloud Pub/Subとは
GCPのオートスケール可能なメッセージングキューイングシステムで、名前の通り、メッセージを送信(Publish)して購読(Subscribe)するメッセージングシステムです。Pub/Subはストリーミングデータなどイベントデータの格納先として利用されます。リアルタイムデータの分析や機械学習モデルを構築したい場合、キューに格納されたPub/Subからデータを取得し処理を行います。
メッセージ配信法式
Pub/Subのメッセージ方式にはPull型とPush型があります。Pullはサブスクライバーの任意のタイミングでデータを取得し、Pushはメッセージ到着次第サブスクライバー側にメッセージを送る方式です。Pub/Subは主にストリーミングデータに使われますが、ネットワークコストを抑えるためにバッチ処理も可能で、複数のメッセージをまとめて送ることも可能です。
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913
Pub/SubとDataflowのパイプライン
Pub/Subは「at-least-once 配信」を採用しており、メッセージを「少なくとも 1 回」の配信が保証されます。これはデータ送信側の不備や故障等でメッセージが欠損してしまうのを防ぐために行われていますが、メッセージの重複が発生します。またメッセージの順序も保証されません。この重複や順序の並べ変えはDataflow側で処理するのがGoogleの推奨方法となります。
こちらはログ収集ツールであるfluenttdで収集したログをPub/Subに送り、Datafowで加工・重複排除を行いBigQueryに書き込むパイプラインとなります。
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-bigquery-201896-113180907
Cloud Pub/SubがApache Kafkaより優れているところ
KafkaもPub/Subと同じメッセージキューイングシステムです。細かい違いはいくつかありますが、一番大きな違いはオートスケールするか否かだと思います。Pub/Subは自動でオートスケールするのでログが指数関数的に増える場合にも対応することができます。ストリーミングデータは無制限に増えるデータなのでオートスケールできる必要があります。
Apache Kafkaについては以下を確認してみてください。
Apache Kafkaの概要とアーキテクチャ - Qiita
Pythonでの実践
Python Client を使いCloud Pub/Subを試します。
事前準備
ライブラリや変数をセットします。
!pip install google-cloud-pubsub !pip install pandas from google.cloud import pubsub import pandas as pd from google.cloud import pubsub_v1 # Set Options topic_name = 'my-topic' subscription_name = 'my-subscription' input_path = './sensor_obs2008.csv.gz' project_id = '' # Read Data !gsutil cp gs://cloud-training-demos/sandiego/sensor_obs2008.csv.gz . data = pd.read_csv(input_path) data.head()
トピックの作成
publisher = pubsub_v1.PublisherClient() topic_name = 'projects/{project_id}/topics/{topic}'.format( project_id=project_id, topic=topic_name, # Set this to something appropriate. ) publisher.create_topic(topic_name)
サブスクリプションの作成
subscriber = pubsub_v1.SubscriberClient() subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format( project_id=project_id, sub=subscription_name , # Set this to something appropriate. ) subscriber.create_subscription( name=subscription_name, topic=topic_name)
メッセージ送信
トピックとサブスクリプションが作れたらメッセージを送信します。
for i , v in data.head(10).iterrows(): message = str(list(v.values)).encode('utf-8') future = publisher.publish(topic_name, data=message) print(message)
メッセージの確認
サブスクライバーからメッセージを確認します。
def callback(message): print(message.data) message.ack() future = subscriber.subscribe(subscription_name, callback)