GCPのサーバレスETLツールであるDataflowの検証をしてみました。前に使った時はPython2系しか使えなかったんですが3系のサポートも始めたらしいので期待です。
Dataflowとは
DataFlowはGCPのETLツールでデータを加工したりします。サーバレスで並列分散処理できるので大規模データに適しています。
特にストリーミングとバッチを同じコードで扱えるのが推しポイントとされてるようです。
リアルタイムデータはセンサー側の不備やイベントデータのサイズでも遅延します。DataflowはGCPのPub/Subと連携することで、時系列のレンジ(Window)を指定して集計可能で、リアルタイムデータの集計や推定に適したツールです。
言語としてはJavaとPython、Goをサポートしてます。
似てるものでGCPのDataprocがあります。これはクラスタが前提でサーバーが必要です。
PySpark等のジョブを実行できるため並列処理やアドホックな分析に適してるものだと思います。
以下のコードを使います。GBQからデータを取得してGCSに書き込む処理です。
sample.py
# -*- coding: utf-8 -*- import apache_beam as beam # プロジェクトID PROJECTID = 'PROJECTID ' # オプション設定 options = beam.options.pipeline_options.PipelineOptions() # GCP関連オプション gcloud_options = options.view_as( beam.options.pipeline_options.GoogleCloudOptions) gcloud_options.project = PROJECTID gcloud_options.job_name = 'pipeline1' gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID) gcloud_options.temp_location = 'gs://{}/temp'.format(PROJECTID) # 標準オプション(実行環境を設定) std_options = options.view_as( beam.options.pipeline_options.StandardOptions) std_options.runner = 'DataflowRunner' p = beam.Pipeline(options=options) query = "SELECT * " \ "FROM `bigquery-public-data.samples.shakespeare`" \ "LIMIT 10" (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID, use_standard_sql=True, query=query)) | 'write' >> beam.io.WriteToText('gs://{}/p1.txt'.format(PROJECTID), num_shards=1) ) p.run()
Python 2系で実行
DataflowはPythonとJavaをサポートしています。現行がPythonなのでまずはPythonで検証していきたいと思います。
(Dataflowで残念なのはPython 3系がまだ完全にサポートされてないことです)
今回はGCEのCentos7で行いました。
www.case-k.jp
# pipのインストール
$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
$ sudo python get-pip.py
独立した環境を準備します。環境を分けないとgoogle-cloud-dataflowインストール時に以下のエラーが出ます。
Found existing installation: requests 2.6.0 ERROR: Cannot uninstall 'requests'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
$ sudo pip install virtualenv $ mkdir env $ virtualenv env $ cd env $ source bin/activate
# バージョンによって実行できないのでバージョン指定しときます。
$ pip install apache-beam[gcp] $ gcloud auth application-default login
実行後のURLをクリックし、ブラウザで認証させてください。
認証が完了したら実行します。
$ pyton sample.py
問題なくできたようなのでGCSを確認してみます。
|
問題なくファイルを出力できたようです。
同じ処理を行う場合、サンプルのジョブの名前を変更する必要があります。
gcloud_options.job_name = 'pipeline1' => gcloud_options.job_name = 'pipeline2'
Python 3系で実行
Python3系の環境を作ってみます。現在はアルファ版からベータ版になったようです。
issues.apache.org
cloud.google.com
# バージョン Python 3.6.3 |Anaconda, Inc.| (default, Oct 13 2017, 12:02:49) [GCC 7.2.0] on linux Type "help", "copyright", "credits" or "license" for more information.
それでは3系で実行してみたいと思います。
$ sudo pip install virtualenv
$ mkdir env
$ virtualenv env
$ cd env
$ source bin/activate
$ pip install apache-beam[gcp]
$ python sample.py
Python3系でも実行が確認できました。