Cloud FunctionsでバケットにアップしたファイルをBigQueryに書き込む処理をします。
code
Cloud Functionsとは
イベントを検知し処理を実行するトリガーの役割があります。サーバレスでイベントをトリガーにバッチ処理やストリーミング処理、別ファイルをキックするために使われます。
cloud.google.com
制限事項
Cloud Funcionには以下の制限があります。実行時間や割り当てられるメモリの制限があるのでメモリを多く消費する処理や時間のかかる処理には向かないようです。なのでDataflowと組み合わせて使われることが多いようです。
Cloud Functionsを試す
GCSのバケットにCSVヲアップロードし、該当ファイルのテーブルをBigQueryに作ります。
事前準備
・バケット:Cloud Functionsがファイルアップロードを検知するバケット
・データセット:Big Queryの対象のデータセット[my_dataset]にテーブルを作ります。
・サンプルファイル:こちらのサンプルをGCPにアップロードしトリガーとさせます。
・関数の定義:実行する関数を定義します。GCPコンソールよりCloud Functionsを選び関数を作りFunctionsの参照用バケットを選びます。言語はPython3.7で実行します。
実行する関数とライブラリをmain.py、requirements.txtに記載します。
main.py # functions_load_to_gbq.py
def if_tbl_exists(client, table_ref): from google.cloud.exceptions import NotFound try: client.get_table(table_ref) return True except NotFound: return False def load_to_gbq(data, context): from google.cloud import bigquery import pandas as pd from google.cloud.exceptions import NotFound # read from gcs bucket_name = data['bucket'] file_name = data['name'] project_id = "[project id ]" # create data set dataset_name = 'my_dataset' bq_client = bigquery.Client(project=project_id) dataset = bigquery.Dataset(bq_client.dataset(dataset_name )) # table name table_ref = dataset.table(file_name[0:-4]) # table check tbl_exists = if_tbl_exists(bq_client, table_ref) print(tbl_exists) if tbl_exists == False: """ SCHEMA = [ bigquery.SchemaField('col1', 'STRING', mode='required'), bigquery.SchemaField('col2', 'INTEGER', mode='required'), ] table = bigquery.Table(table_ref, schema=SCHEMA) table = bq_client.create_table(table) # API request """ GS_URL = 'gs://{}/{}'.format(bucket_name, file_name) job_id_prefix = "my_job" job_config = bigquery.LoadJobConfig() #job_config.skip_leading_rows = 1 job_config.source_format = 'CSV' job_config.autodetect = True load_job = bq_client.load_table_from_uri( GS_URL, table_ref, job_config=job_config, job_id_prefix=job_id_prefix) # API request load_job.result() # Waits for table load to complete. print('load finish')
requirements.txt
# Function dependencies, for example: # package>=version pandas google-cloud-bigquery
動作確認
準備ができたらGCPのCloud Shellより、CSVをGCSにアップロードし、実行してみます。
$ git clone https://github.com/case-k-git/gcp.git $ gsutil cp ./gcp/sample2.csv gs://cloudfnctst/ Copying file://./gcp/sample2.csv [Content-Type=text/csv]... - [1 files][ 788.0 B/ 788.0 B]
Stackdriverで実行を確認
BigQueryでバケットにアップロードしたファイルのテーブルが作られていることが確認できます。
以上となります。次はCloud FunctionsでDataflowのカスタムテンプレートをキックしてみたいと思います。
参考
stackoverflow.com
googleapis.dev
google-cloud-python.readthedocs.io