case-kの備忘録

日々の備忘録です。最近はGCPやデータ分析系のことを呟きます

Dataflowを検証「Python2系と3系」

GCPのサーバレスETLツールであるDataflowの検証をしてみました。前に使った時はPython2系しか使えなかったんですが3系のサポートも始めたらしいので期待です。

Dataflowとは

DataFlowはGCPのETLツールでデータを加工したりします。サーバレスで並列分散処理できるので大規模データに適しています。
特にストリーミングとバッチを同じコードで扱えるのが推しポイントとされてるようです。
リアルタイムデータはセンサー側の不備やイベントデータのサイズでも遅延します。DataflowはGCPのPub/Subと連携することで、時系列のレンジ(Window)を指定して集計可能で、リアルタイムデータの集計や推定に適したツールです。
言語としてはJavaPython、Goをサポートしてます。

似てるものでGCPのDataprocがあります。これはクラスタが前提でサーバーが必要です。
PySpark等のジョブを実行できるため並列処理やアドホックな分析に適してるものだと思います。


以下のコードを使います。GBQからデータを取得してGCSに書き込む処理です。
sample.py

# -*- coding: utf-8 -*-
import apache_beam as beam

# プロジェクトID
PROJECTID = 'PROJECTID '

# オプション設定
options = beam.options.pipeline_options.PipelineOptions()

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'pipeline1'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/temp'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

p = beam.Pipeline(options=options)

query = "SELECT * " \
"FROM `bigquery-public-data.samples.shakespeare`" \
"LIMIT 10"

(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
   | 'write' >> beam.io.WriteToText('gs://{}/p1.txt'.format(PROJECTID),
                                    num_shards=1)
)

p.run()

Python 2系で実行

DataflowはPythonJavaをサポートしています。現行がPythonなのでまずはPythonで検証していきたいと思います。
(Dataflowで残念なのはPython 3系がまだ完全にサポートされてないことです)

今回はGCEのCentos7で行いました。
www.case-k.jp

# pipのインストール
$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
$ sudo python get-pip.py

独立した環境を準備します。環境を分けないとgoogle-cloud-dataflowインストール時に以下のエラーが出ます。

 Found existing installation: requests 2.6.0
ERROR: Cannot uninstall 'requests'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
$ sudo pip install virtualenv
$ mkdir env
$ virtualenv env
$ cd env
$ source bin/activate

qiita.com

# バージョンによって実行できないのでバージョン指定しときます。

$ pip install apache-beam[gcp]
$ gcloud auth application-default login

実行後のURLをクリックし、ブラウザで認証させてください。
認証が完了したら実行します。

$ pyton sample.py

問題なくできたようなのでGCSを確認してみます。
|

f:id:casekblog:20190906161832p:plain
問題なくファイルを出力できたようです。
同じ処理を行う場合、サンプルのジョブの名前を変更する必要があります。

gcloud_options.job_name = 'pipeline1'
 => gcloud_options.job_name = 'pipeline2'

Python 3系で実行

Python3系の環境を作ってみます。現在はアルファ版からベータ版になったようです。
f:id:casekblog:20191007165858p:plain
issues.apache.org
cloud.google.com

# バージョン 
Python 3.6.3 |Anaconda, Inc.| (default, Oct 13 2017, 12:02:49) 
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.

それでは3系で実行してみたいと思います。

$ sudo pip install virtualenv
$ mkdir env
$ virtualenv env
$ cd env
$ source bin/activate
$ pip install apache-beam[gcp]
$ python sample.py

f:id:casekblog:20190911174449p:plain

Python3系でも実行が確認できました。