case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Cloud Functionsで検知したファイルをBigQueryに書き込む

Cloud FunctionsでバケットにアップしたファイルをBigQueryに書き込む処理をします。

code

github.com

Cloud Functionsとは

イベントを検知し処理を実行するトリガーの役割があります。サーバレスでイベントをトリガーにバッチ処理やストリーミング処理、別ファイルをキックするために使われます。
cloud.google.com

制限事項

Cloud Funcionには以下の制限があります。実行時間や割り当てられるメモリの制限があるのでメモリを多く消費する処理や時間のかかる処理には向かないようです。なのでDataflowと組み合わせて使われることが多いようです。

cloud.google.com

Cloud Functionsを試す

GCSのバケットCSVヲアップロードし、該当ファイルのテーブルをBigQueryに作ります。

事前準備

バケット:Cloud Functionsがファイルアップロードを検知するバケット
・データセット:Big Queryの対象のデータセット[my_dataset]にテーブルを作ります。
・サンプルファイル:こちらのサンプルをGCPにアップロードしトリガーとさせます。
・関数の定義:実行する関数を定義します。GCPコンソールよりCloud Functionsを選び関数を作りFunctionsの参照用バケットを選びます。言語はPython3.7で実行します。
f:id:casekblog:20191007113848p:plain:w300

実行する関数とライブラリをmain.py、requirements.txtに記載します。

main.py # functions_load_to_gbq.py

def if_tbl_exists(client, table_ref):
    from google.cloud.exceptions import NotFound
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

def load_to_gbq(data, context):
    from google.cloud import bigquery
    import pandas as pd
    from google.cloud.exceptions import NotFound
    
    # read from gcs
    bucket_name = data['bucket']
    file_name = data['name']
    project_id = "[project id ]"
    
    # create data set
    dataset_name = 'my_dataset'
    bq_client = bigquery.Client(project=project_id)
    dataset = bigquery.Dataset(bq_client.dataset(dataset_name ))
    
    # table name
    table_ref = dataset.table(file_name[0:-4])    
    # table check 
    tbl_exists = if_tbl_exists(bq_client, table_ref)
    print(tbl_exists)
    if tbl_exists == False:
        """
        SCHEMA = [
            bigquery.SchemaField('col1', 'STRING', mode='required'),
            bigquery.SchemaField('col2', 'INTEGER', mode='required'),
        ]
        table = bigquery.Table(table_ref, schema=SCHEMA)
        table = bq_client.create_table(table)      # API request
        """
        GS_URL = 'gs://{}/{}'.format(bucket_name, file_name)
        job_id_prefix = "my_job"
        job_config = bigquery.LoadJobConfig()
        #job_config.skip_leading_rows = 1
        job_config.source_format = 'CSV'
        job_config.autodetect = True
        load_job = bq_client.load_table_from_uri(
            GS_URL, table_ref, job_config=job_config,
            job_id_prefix=job_id_prefix)  # API request
        load_job.result()  # Waits for table load to complete.
        print('load finish')

requirements.txt

# Function dependencies, for example:
# package>=version
pandas 
google-cloud-bigquery
動作確認

準備ができたらGCPのCloud Shellより、CSVをGCSにアップロードし、実行してみます。

$ git clone https://github.com/case-k-git/gcp.git
$ gsutil cp ./gcp/sample2.csv gs://cloudfnctst/
Copying file://./gcp/sample2.csv [Content-Type=text/csv]...
- [1 files][  788.0 B/  788.0 B]

Stackdriverで実行を確認
f:id:casekblog:20191007115521p:plain
BigQueryでバケットにアップロードしたファイルのテーブルが作られていることが確認できます。
f:id:casekblog:20191007114457p:plain:w300


以上となります。次はCloud FunctionsでDataflowのカスタムテンプレートをキックしてみたいと思います。

参考
stackoverflow.com
googleapis.dev
google-cloud-python.readthedocs.io