case-kの備忘録

日々の備忘録です。最近はGCPやデータ分析系のことを呟きます

Cloud Pub/Subの概要とPythonでの実践

Cloud Pub/Subの概要とPythonでの使い方を記事にしました。簡単な概要と、Python Clientを使いトピック・サブスクリプションの作成からメッセージ送信・確認まで行います。

Code

Jupyter Notebookはこちらを参照ください。
github.com

Cloud Pub/Subの概要

Cloud Pub/Subとは

GCPのオートスケール可能なメッセージングキューイングシステムで、名前の通り、メッセージを送信(Publish)して購読(Subscribe)するメッセージングシステムです。Pub/Subはストリーミングデータなどイベントデータの格納先として利用されます。リアルタイムデータの分析や機械学習モデルを構築したい場合、キューに格納されたPub/Subからデータを取得し処理を行います。

メッセージ配信法式

Pub/Subのメッセージ方式にはPull型とPush型があります。Pullはサブスクライバーの任意のタイミングでデータを取得し、Pushはメッセージ到着次第サブスクライバー側にメッセージを送る方式です。Pub/Subは主にストリーミングデータに使われますが、ネットワークコストを抑えるためにバッチ処理も可能で、複数のメッセージをまとめて送ることも可能です。
f:id:casekblog:20191107164259p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-gcp-2018913


Pub/SubとDataflowのパイプライン

Pub/Subは「at-least-once 配信」を採用しており、メッセージを「少なくとも 1 回」の配信が保証されます。これはデータ送信側の不備や故障等でメッセージが欠損してしまうのを防ぐために行われていますが、メッセージの重複が発生します。またメッセージの順序も保証されません。この重複や順序の並べ変えはDataflow側で処理するのがGoogleの推奨方法となります。
こちらはログ収集ツールであるfluenttdで収集したログをPub/Subに送り、Datafowで加工・重複排除を行いBigQueryに書き込むパイプラインとなります。
f:id:casekblog:20191008122921p:plain:w500
https://www.slideshare.net/GoogleCloudPlatformJP/cloud-onair-bigquery-201896-113180907

Cloud Pub/SubがApache Kafkaより優れているところ

KafkaもPub/Subと同じメッセージキューイングシステムです。細かい違いはいくつかありますが、一番大きな違いはオートスケールするか否かだと思います。Pub/Subは自動でオートスケールするのでログが指数関数的に増える場合にも対応することができます。ストリーミングデータは無制限に増えるデータなのでオートスケールできる必要があります。
Apache Kafkaについては以下を確認してみてください。
Apache Kafkaの概要とアーキテクチャ - Qiita

Pythonでの実践

Python Client を使いCloud Pub/Subを試します。

事前準備

ライブラリや変数をセットします。

!pip install google-cloud-pubsub
!pip install pandas
from google.cloud import pubsub
import pandas as pd
from google.cloud import pubsub_v1

# Set Options
topic_name = 'my-topic'
subscription_name = 'my-subscription'
input_path = './sensor_obs2008.csv.gz'
project_id = ''

# Read Data
!gsutil cp gs://cloud-training-demos/sandiego/sensor_obs2008.csv.gz .
data  =  pd.read_csv(input_path)
data.head()

f:id:casekblog:20191104121410p:plain

トピックの作成

publisher = pubsub_v1.PublisherClient()
topic_name = 'projects/{project_id}/topics/{topic}'.format(
    project_id=project_id,
    topic=topic_name,  # Set this to something appropriate.
)
publisher.create_topic(topic_name)

サブスクリプションの作成

subscriber = pubsub_v1.SubscriberClient()
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
    project_id=project_id,
    sub=subscription_name ,  # Set this to something appropriate.
)
subscriber.create_subscription(
    name=subscription_name, topic=topic_name)

メッセージ送信

トピックとサブスクリプションが作れたらメッセージを送信します。

for i , v in data.head(10).iterrows():
    message = str(list(v.values)).encode('utf-8')
    future = publisher.publish(topic_name, data=message)
    print(message)

f:id:casekblog:20191104121602p:plain

メッセージの確認

サブスクライバーからメッセージを確認します。

def callback(message):
    print(message.data)
    message.ack()
future = subscriber.subscribe(subscription_name, callback)

f:id:casekblog:20191104121905p:plain

参考
googleapis.dev