Googleが事前に用意してくれてるDataFlowのテンプレートを使い、GCS上に配置したテキストファイルをDataflowで加工し、BigQueryのテーブルに書き込みます。
code
バケットの配下に以下配置して実行してください。JavaScriptで加工処理を行うファイルとデータソースをGCSに置くことで実行できます。
github.com
Dataflowを実行するための選択肢
Dataflowを実行するためには以下の方法があります。本記事ではGoogleの提供するテンプレートを使い、Dataflowを実行します。
Googleの提供するテンプレートを使う
GCPコンソールより「Dataflow」を選択し「テンプレートからジョブを作成」を選びます。今回はGCSのファイルをDataflowで加工しBigQueryに書き込むバッチ処理なので、「Cloud Dataflow テンプレート」から「Text Files on Cloud Storage To BigQuery」を選びます。
パラメータを選択する前に読み込むデータセットと実行する関数を準備しGCSに配置する必要があります。
以下をGCSに配置することで実行できます。
- dataset (sample.txt)
- table column (sample_bq.json)
- udf (sample_udf.js)
sample.txt
a1 a2 a3 a4 a5 a6 a7 cv 0 1 0 1 0 0 0 No 0 0 1 1 0 0 1 No ・・・
sample_bq.json
{ "BigQuery Schema": [ { "name": "a1", "type": "STRING" }, { "name": "a2", "type": "STRING" },{ "name": "a3", "type": "STRING" },{ "name": "a4", "type": "STRING" },{ "name": "a5", "type": "STRING" },{ "name": "a6", "type": "STRING" },{ "name": "a7", "type": "STRING" },{ "name": "cv", "type": "STRING" }] }
sample_udf.js
function transform(line) { var values = line.split(' '); var obj = new Object(); obj.a1= values[0]; obj.a2 = values[1]; obj.a3 = values[2]; obj.a4 = values[3]; obj.a5 = values[4]; obj.a6 = values[5]; obj.a7 = values[7]; obj.cv = values[8]; var jsonString = JSON.stringify(obj); return jsonString; }
GCS location of your JavaScript UDF The full URL of your .js file. Example: gs://my_bucket/my_function.js => gs://[bucket name]/191001/sample_udf.js GCS location of your BigQuery schema file, described as a JSON Example: { "BigQuery Schema": [ { "name": "location", "type": "STRING" }, { "name": "name", "type": "STRING" }, { "name": "age", "type": "STRING" }, { "name": "color", "type": "STRING" }, { "name": "coffee", "type": "STRING" } ] } => gs://[bucket name]/191001/sample_bq.json The name of the JavaScript function you wish to call as your UDF The function name should only contain letters, digits and underscores. Example: 'transform' or 'transform_udf1'. => transform ※ UDFで定義した関数名を選びます。 The fully qualified BigQuery table Example: my-project:dataset.table => [bucket name]:[data_set].[table_name] ※ データセットは事前に作成しておく必要があります。 The GCS location of the text you'd like to process => gs://[bucket name]/191001/sample.txt Temporary directory for BigQuery loading process Example: gs://my-bucket/my-files/temp_dir => gs://[bucket name]/191001/tmp_dir ※ tmp_dirは事前にディレクトリを作らなくても大丈夫です。 一時的なロケーション 一時ファイルを書き込むためのパスとファイル名の接頭辞(gs://MyBucket/tmp など) => gs://[bucket name]/191001/tmp ※ tmpは事前にディレクトリを作らなくても大丈夫です。
実行してみます。
無事完了しました。BigQueryにテーブルが作成されたか確認してみます。
問題なさそうです。経過時間は13 分 46 秒。 結構時間がかりました。。Dataflowの処理高速化はもう少し調べてみたいと思います。
CLIやAPIも用意されてるので、Pythonのライブラリを使わない場合はCloud SDKやAPIでDataflowを実行できそうです。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/latest/GCS_Text_to_BigQuery \ --parameters \ javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\ JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ inputFilePattern=PATH_TO_YOUR_TEXT_DATA,\ outputTable=BIGQUERY_TABLE,\ bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS
所感
DataflowはJavaとPythonをサポートしてますが、Googleの提供するDataflowのテンプレートは加工処理をJavascriptでかけるのがいいと思いました。
DataflowはPoolやPySparkと比較するとメモリやデータサイズはあまり気にしなくていいですが、実効速度は遅いように思います。
用途に応じて使い分けるのが良いかもしれません。
cloud.google.com
cloud.google.com