Dataflowを使ってBigQueryからBigQueryに書き込む処理とCloud StorageからBigQueryに書き込む処理をします。
Code
options
# -*- coding: utf-8 -*- import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # プロジェクトID PROJECTID = '[project id]' # オプション設定 options = beam.options.pipeline_options.PipelineOptions() # GCP関連オプション gcloud_options = options.view_as( beam.options.pipeline_options.GoogleCloudOptions) gcloud_options.project = PROJECTID gcloud_options.job_name = 'jobgcstogbq' gcloud_options.staging_location = 'gs://{}/staging'.format(PROJECTID) gcloud_options.temp_location = 'gs://{}/tem'.format(PROJECTID) # テンプレート配置 #gcloud_options.template_location = 'gs://{}/template/GCS_TO_GBQ'.format(PROJECTID) # 標準オプション(実行環境を設定) std_options = options.view_as( beam.options.pipeline_options.StandardOptions) std_options.runner = 'DataflowRunner'
GBQ to GBQ
BigQueryのテーブルを参照し新しいテーブルを作ります。事前にデータセットはBigQuery上で作ってください。コードでは「dataset_tst」として事前に作っています。
table_spec = 'dataset_tst.table_gbq_to_gbq' table_schema= 'word:STRING,word_count:INTEGER' p = beam.Pipeline(options=options) # p | 'read from gcs' >> beam.io.ReadFromText(myoptions.inputFile) query = "SELECT word , word_count " \ "FROM `bigquery-public-data.samples.shakespeare`" \ "LIMIT 10" (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project=PROJECTID, use_standard_sql=True, query=query)) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{}:{}'.format(PROJECTID,table_spec), schema=table_schema) ) p.run()
Dataflowでジョブが完了したことを確認します。
BigQuery上にテーブルが作成されたことを確認します。
GCS to GBQ
バケット配下のCSVを参照し新しいテーブルを作ります。
headerをskipして読み込みます。データはこちらのクエリ結果を「shakespeare_word _cnt.csv」としてバケット配下に保存して実行してください。今回バケット名はプロジェクトIDと同じ名前で作ってます。
SELECT word, word_count FROM `bigquery-public-data.samples.shakespeare` LIMIT 10
table_spec = 'dataset_tst.table_gcs_to_gbq' table_schema= 'word:STRING,word_count:INTEGER' class Split(beam.DoFn): def process(self, element): word, word_count = element.split(",") return [{ 'word': word, 'word_count': int(word_count), }] p = beam.Pipeline(options=options) # p | 'read from gcs' >> beam.io.ReadFromText(myoptions.inputFile) (p | 'read' >> beam.io.ReadFromText('gs://[project id]/shakespeare_word _cnt.csv', skip_header_lines=1) | 'ParseCSV' >> beam.ParDo(Split()) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{}:{}'.format(PROJECTID,table_spec), schema=table_schema) ) p.run()
Dataflowでジョブが完了したことを確認します。
BigQuery上にテーブルが作成されたことを確認します。
GCSからGBQの書き込みでは以下のエラーが出ました。パース処理を入れることで解決しました。
error :AttributeError: 'str' object has no attribute 'items' # parse class Split(beam.DoFn): def process(self, element): word, word_count = element.split(",") return [{ 'word': word, 'word_count': int(word_count), }]
Google BigQuery I/O connector
https://codeday.me/jp/qa/20190910/1582183.html