case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Dataflow Google BigQuery I/O connector:Python

Dataflowを使ってBigQueryからBigQueryに書き込む処理とCloud StorageからBigQueryに書き込む処理をします。

beam.apache.org

Code

github.com

options

# -*- coding: utf-8 -*-
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# プロジェクトID
PROJECTID = '[project id]'
            
# オプション設定
options = beam.options.pipeline_options.PipelineOptions()

# GCP関連オプション
gcloud_options = options.view_as(
  beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.project = PROJECTID
gcloud_options.job_name = 'jobgcstogbq'
gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID)
gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID)

# テンプレート配置
#gcloud_options.template_location = 'gs://{}/template/GCS_TO_GBQ'.format(PROJECTID)

# 標準オプション(実行環境を設定)
std_options = options.view_as(
  beam.options.pipeline_options.StandardOptions)
std_options.runner = 'DataflowRunner'

GBQ to GBQ

BigQueryのテーブルを参照し新しいテーブルを作ります。事前にデータセットはBigQuery上で作ってください。コードでは「dataset_tst」として事前に作っています。

table_spec = 'dataset_tst.table_gbq_to_gbq'
table_schema= 'word:STRING,word_count:INTEGER'

p = beam.Pipeline(options=options)
# p | 'read from gcs' >> beam.io.ReadFromText(myoptions.inputFile) 
query = "SELECT word , word_count " \
"FROM `bigquery-public-data.samples.shakespeare`" \
"LIMIT 10"

(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID,
                                                   use_standard_sql=True,
                                                   query=query))
  | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{}:{}'.format(PROJECTID,table_spec), schema=table_schema)
 )
p.run()

Dataflowでジョブが完了したことを確認します。
f:id:casekblog:20191017172630p:plain:w200

BigQuery上にテーブルが作成されたことを確認します。
f:id:casekblog:20191017172642p:plain:w400

GCS to GBQ

バケット配下のCSVを参照し新しいテーブルを作ります。
headerをskipして読み込みます。データはこちらのクエリ結果を「shakespeare_word _cnt.csv」としてバケット配下に保存して実行してください。今回バケット名はプロジェクトIDと同じ名前で作ってます。

SELECT
  word,
  word_count 
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT
  10

beam.apache.org

table_spec = 'dataset_tst.table_gcs_to_gbq'
table_schema= 'word:STRING,word_count:INTEGER'

class Split(beam.DoFn):

    def process(self, element):
        word, word_count = element.split(",")

        return [{
            'word': word,
            'word_count': int(word_count),
        }]

p = beam.Pipeline(options=options)
# p | 'read from gcs' >> beam.io.ReadFromText(myoptions.inputFile) 
(p | 'read' >> beam.io.ReadFromText('gs://[project id]/shakespeare_word _cnt.csv', skip_header_lines=1)
  | 'ParseCSV' >> beam.ParDo(Split())
  | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{}:{}'.format(PROJECTID,table_spec), schema=table_schema)
 )
p.run()

Dataflowでジョブが完了したことを確認します。
f:id:casekblog:20191017170348p:plain:w200

BigQuery上にテーブルが作成されたことを確認します。
f:id:casekblog:20191017170953p:plain:w400

GCSからGBQの書き込みでは以下のエラーが出ました。パース処理を入れることで解決しました。

error :AttributeError: 'str' object has no attribute 'items'

# parse
class Split(beam.DoFn):

    def process(self, element):
        word, word_count = element.split(",")

        return [{
            'word': word,
            'word_count': int(word_count),
        }]

codeday.me

Google BigQuery I/O connector
https://codeday.me/jp/qa/20190910/1582183.html