Dataflow
Python版を調べてみたがクエリの上書きができなかったり、余計な通信が発生していたりと現時点で本番運用できる状態ではなかった。Java版が使えるか検証してみる。Java版はテンプレートが用意されていたので、PostgresとSQL Serverでそれぞれ検証してみた。…
Apache BeamのPython jdbcコネクタを使いDataflowでジョブを実行してみました。Cloud SQLとDataflowを同一サブネット内に作りプライベートIPで接続を試みました。検証したところジョブ実行時に実行環境からPostgresにコネクションを張ろうとしていることがわ…
Apache Beam Python SQLServer To BigQueryを検証。クエリの上書きができないのは検証済みだがそれ以外で不足機能がないかみてみた。 www.case-k.jpSQL Serverの場合jarファイルの追加が必要。PostgresSQLは新たにjarを追加しなくてもドライバが含まれている…
Apache BeamのPythonでPostgreSQLからBigQueryに書き込めるか検証。検証したところJDBCをつかいPostgreSQLからBigQueryに書きこむことはできた。ただし、クエリの上書きはできなかった。 すべて「SELECT * FROM TABLE」で実行されてします。以下の条件に該当…
この記事はZOZO Advent Calendar 21日目の記事です。qiita.com DataflowでCloud Pub/Sub からKinesisへ書き込む処理とKinesisからBigQueryへ書き込む処理を作りました。本記事ではDataflowでKinesisを扱う際の注意点をご紹介できたらと思います。github.com …
Apache Beamノートブックからパイプラインを作ってみました。単純にGCSからファイルを取得し文字数を計算するパイプラインとなります。所感としてはとっても使いやすかったです。JavaだとEclipseを使うことになりますが、データの収集からデバッグまでノート…
本記事はZOZOテクノロジーズ #1 Advent Calendar 2020 - Qiita 24日目の記事です。バッチ方式の日次データ基盤とストリーミング方式のリアルタイムデータ基盤のスキーマ反映でData Catalogがどのように役立つのか概要も踏まえてご紹介できればと思います。後…
社内でデータ基盤を作っており、データの加工のしやすさだったり、書きやすさだったりでPythonで書き直すことができるか調べてみました。 調べた限りでは使いたい機能は既にサポートされていそうです。 Dynamic Destinations テーブル名を参照して動的にBQに…
DataflowでBigQueryのインサート時刻を書き込みたい場合は以下のようにして付与します。 .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString())) Dataflowのdynamic destinationを使う場合 WriteResult write…
オンプレ環境のテーブルをBigQueryにリアルタイムに連携するために、DataflowのDynamic Destinations機能を使いました。Dynamic Destinationsを使うことでテーブル名を考慮して出力先を変えることができます。カスタムテンプレートを作るにあたり、ベーステ…
DataflowのJava カスタムテンプレートの備忘録となります。pythonのテンプレート作成方法は次の通りです。 www.case-k.jp 環境構築 テンプレート作成 サンプルテンプレート作成 オプションを定義 テンプレート作成 所感 環境構築 まずはMavenをインストール…
最近ストリーミングデータの連携基盤を作っており、パイプラインを作る過程である疑問が浮かびました。 Dataflow必要なのかな。BigQueryでも良い気がする そこでDataflowがなかった場合、どうなるのか考えてみました。 データパイプライン 次のようなデータ…
Dataflow テンプレートを使用すると、Cloud Storage 上のパイプラインをステージングして、さまざまな環境で実行できます。テンプレートは、Google 提供のテンプレートを使用することも、自身で作成することもできます。 Google提供のDataflowテンプレートで…
Dataflowが解決するストリーミング処理の課題と基盤を作る上で考慮すべき点をいくつか資料を参考に備忘録もかねて整理してみました。 ストリーミング処理の概要 ストリーミング処理とは バッチ処理との違い ストリーミング処理の課題 データ量と変動性 遅延…
Dataflowが得意なこと、苦手なことを考えてみました。 得意なこと バッチ/ストリーミング処理(特にストリーミング処理) サイズの大きいデータを扱うこと サーバ費用を抑えること 苦手なこと 逐次処理 複雑なパイプライン制御(役割が異なる) 得意なこと バッ…
Dataflowを使ってBigQueryからBigQueryに書き込む処理とCloud StorageからBigQueryに書き込む処理をします。 Code options GBQ to GBQ GCS to GBQ beam.apache.org Code github.com options # -*- coding: utf-8 -*- import apache_beam as beam from apache…
Cloud ComposerでDataflowテンプレートを順次キックしていく逐次処理を行います。順番に処理が行われたことを確認するために前のDataflowのテンプレート実行し作られたファイルを参照する処理にしました。 code Composer環境を作る Dataflowのテンプレートを…
Dataflowで使うパイプライン処理の備忘録です。随時更新できればと思います。 options udf branch group by Filter beam.apache.org options # -*- coding: utf-8 -*- import apache_beam as beam # プロジェクトID PROJECTID = 'project id' # オプション設…
Dataflowテンプレートでカスタムパラメータを追加します。パラメータを静的に定義する方法とテンプレート実行時に動的にパラメータを指定する方法を紹介します。gclodコマンドで実行しますが、Cloud Functonsからテンプレートをキックする方法は以下の記事を…
GAEなどでDataflowのテンプレートの実行方法です。パラメータは以下のようにして渡します code Dataflowのテンプレートの実行 code github.com Dataflowのテンプレートの実行 "parameters": { "input": "gs://{}/sample2.csv".format(PROJECTID), "output": …
Cloud FunctionsでGCSのバケットに置かれたファイルを検知し、Dataflowのテンプレートをキックします。パラメータとしてCloud Functionsでファイル名を取得し、Dataflowのテンプレートに引数として渡します。GCSから加工しGBQに取り込むケースなどに使います…
Pythonベースで記述したDataflowのコードをテンプレート化し、実行してみます。テンプレートを作成するためには以下のコードを追記するだけです。コードを実行するとGCS内にテンプレートが作られるので、作成したテンプレートを実行してみます。 gcloud_opti…
DataflowでPython3系を使って良さそうか調べてみました。 Python3系を使いたい理由 DataflowのPython3系のサポート状況について Apach Beamのissueについて 動作確認 ストリーミング処理は? 結論(個人的な) Python3系を使いたい理由 DataflowはETLツールな…
Googleが事前に用意してくれてるDataFlowのテンプレートを使い、GCS上に配置したテキストファイルをDataflowで加工し、BigQueryのテーブルに書き込みます。 code Dataflowを実行するための選択肢 Googleの提供するテンプレートを使う code バケットの配下に…
GCPのサーバレスETLツールであるDataflowの検証をしてみました。前に使った時はPython2系しか使えなかったんですが3系のサポートも始めたらしいので期待です。 Dataflowとは Python 2系で実行 Python 3系で実行 Dataflowとは DataFlowはGCPのETLツールでデ…