case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Googleが提供するDataflowテンプレートを使う方法

Googleが事前に用意してくれてるDataFlowのテンプレートを使い、GCS上に配置したテキストファイルをDataflowで加工し、BigQueryのテーブルに書き込みます。

code

バケットの配下に以下配置して実行してください。JavaScriptで加工処理を行うファイルとデータソースをGCSに置くことで実行できます。
github.com

Dataflowを実行するための選択肢

Dataflowを実行するためには以下の方法があります。本記事ではGoogleの提供するテンプレートを使い、Dataflowを実行します。

  1. Dataflowの関数を定義し実行する方法: JavaPython、GOのいずれかで実装
  2. Dataflowの関数を定義しテンプレート化し実行する方法: JavaPython、GOのいずれかで実装
  3. Googleの提供するテンプレートを使う方法: Googleのテンプレートを使いJavaScriptでUDFを定義
  4. Dataprepを使う方法: GUIベースでコードを書かなくても実行可能

Googleの提供するテンプレートを使う

GCPコンソールより「Dataflow」を選択し「テンプレートからジョブを作成」を選びます。今回はGCSのファイルをDataflowで加工しBigQueryに書き込むバッチ処理なので、「Cloud Dataflow テンプレート」から「Text Files on Cloud Storage To BigQuery」を選びます。
f:id:casekblog:20191001170433p:plain:w300

パラメータを選択する前に読み込むデータセットと実行する関数を準備しGCSに配置する必要があります。
以下をGCSに配置することで実行できます。

  • dataset (sample.txt)
  • table column (sample_bq.json)
  • udf (sample_udf.js)

sample.txt

a1 a2 a3 a4 a5 a6 a7 cv
0 1 0 1 0 0 0 No
0 0 1 1 0 0 1 No
・・・

sample_bq.json

{ "BigQuery Schema": [ { "name": "a1", "type": "STRING" }, { "name": "a2", "type": "STRING" },{ "name": "a3", "type": "STRING" },{ "name": "a4", "type": "STRING" },{ "name": "a5", "type": "STRING" },{ "name": "a6", "type": "STRING" },{ "name": "a7", "type": "STRING" },{ "name": "cv", "type": "STRING" }] }

sample_udf.js

function transform(line) {
  var values = line.split(' ');
  var obj = new Object();
  obj.a1= values[0];
  obj.a2 = values[1];
  obj.a3 = values[2];
  obj.a4 = values[3];
  obj.a5 = values[4];
  obj.a6 = values[5];
  obj.a7 = values[7];
  obj.cv = values[8];
  var jsonString = JSON.stringify(obj);
  return jsonString;
}
GCS location of your JavaScript UDF
The full URL of your .js file. Example: gs://my_bucket/my_function.js
=> gs://[bucket name]/191001/sample_udf.js

GCS location of your BigQuery schema file, described as a JSON
Example: { "BigQuery Schema": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING" }, { "name": "coffee", "type": "STRING" } ] }
=> gs://[bucket name]/191001/sample_bq.json

The name of the JavaScript function you wish to call as your UDF
The function name should only contain letters, digits and underscores. Example: 'transform' or 'transform_udf1'.
=> transform
※ UDFで定義した関数名を選びます。

The fully qualified BigQuery table
Example: my-project:dataset.table
=> [bucket name]:[data_set].[table_name]
※ データセットは事前に作成しておく必要があります。

The GCS location of the text you'd like to process
=> gs://[bucket name]/191001/sample.txt

Temporary directory for BigQuery loading process
Example: gs://my-bucket/my-files/temp_dir
=> gs://[bucket name]/191001/tmp_dir
※ tmp_dirは事前にディレクトリを作らなくても大丈夫です。
一時的なロケーション
一時ファイルを書き込むためのパスとファイル名の接頭辞(gs://MyBucket/tmp など)
=> gs://[bucket name]/191001/tmp
※ tmpは事前にディレクトリを作らなくても大丈夫です。

実行してみます。
f:id:casekblog:20191001163655p:plain
無事完了しました。BigQueryにテーブルが作成されたか確認してみます。
f:id:casekblog:20191001173057p:plain:w300

問題なさそうです。経過時間は13 分 46 秒。 結構時間がかりました。。Dataflowの処理高速化はもう少し調べてみたいと思います。
CLIAPIも用意されてるので、Pythonのライブラリを使わない場合はCloud SDKAPIでDataflowを実行できそうです。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/GCS_Text_to_BigQuery \
    --parameters \
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS


所感
DataflowはJavaPythonをサポートしてますが、Googleの提供するDataflowのテンプレートは加工処理をJavascriptでかけるのがいいと思いました。
DataflowはPoolやPySparkと比較するとメモリやデータサイズはあまり気にしなくていいですが、実効速度は遅いように思います。
用途に応じて使い分けるのが良いかもしれません。
cloud.google.com
cloud.google.com