case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Dataflow

Dataflow JDBC テンプレート検証(Java)

Python版を調べてみたがクエリの上書きができなかったり、余計な通信が発生していたりと現時点で本番運用できる状態ではなかった。Java版が使えるか検証してみる。Java版はテンプレートが用意されていたので、PostgresとSQL Serverでそれぞれ検証してみた。…

Apache Beam Python JDBCを使いDataflowを動かすには、ジョブの実行環境からもコネクションを張れる必要があった

Apache BeamのPython jdbcコネクタを使いDataflowでジョブを実行してみました。Cloud SQLとDataflowを同一サブネット内に作りプライベートIPで接続を試みました。検証したところジョブ実行時に実行環境からPostgresにコネクションを張ろうとしていることがわ…

Apache Beam Python SQLServer To BigQuery検証

Apache Beam Python SQLServer To BigQueryを検証。クエリの上書きができないのは検証済みだがそれ以外で不足機能がないかみてみた。 www.case-k.jpSQL Serverの場合jarファイルの追加が必要。PostgresSQLは新たにjarを追加しなくてもドライバが含まれている…

Apache Beam Python PostgreSQL To BigQuery検証

Apache BeamのPythonでPostgreSQLからBigQueryに書き込めるか検証。検証したところJDBCをつかいPostgreSQLからBigQueryに書きこむことはできた。ただし、クエリの上書きはできなかった。 すべて「SELECT * FROM TABLE」で実行されてします。以下の条件に該当…

DataflowでKinesisを扱う際の注意点

この記事はZOZO Advent Calendar 21日目の記事です。qiita.com DataflowでCloud Pub/Sub からKinesisへ書き込む処理とKinesisからBigQueryへ書き込む処理を作りました。本記事ではDataflowでKinesisを扱う際の注意点をご紹介できたらと思います。github.com …

Apache Beam ノートブックを使った開発

Apache Beamノートブックからパイプラインを作ってみました。単純にGCSからファイルを取得し文字数を計算するパイプラインとなります。所感としてはとっても使いやすかったです。JavaだとEclipseを使うことになりますが、データの収集からデバッグまでノート…

Data CatalogにオンプレDBのスキーマを同期させて、バッチ・ストリーミングデータ基盤(BigQuery)のスキーマ反映を自動化する

本記事はZOZOテクノロジーズ #1 Advent Calendar 2020 - Qiita 24日目の記事です。バッチ方式の日次データ基盤とストリーミング方式のリアルタイムデータ基盤のスキーマ反映でData Catalogがどのように役立つのか概要も踏まえてご紹介できればと思います。後…

技術調査メモ:Dataflow_Pythonサポート状況について_20200917

社内でデータ基盤を作っており、データの加工のしやすさだったり、書きやすさだったりでPythonで書き直すことができるか調べてみました。 調べた限りでは使いたい機能は既にサポートされていそうです。 Dynamic Destinations テーブル名を参照して動的にBQに…

DataflowでBigQueryのインサート時刻を付与する

DataflowでBigQueryのインサート時刻を書き込みたい場合は以下のようにして付与します。 .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString())) Dataflowのdynamic destinationを使う場合 WriteResult write…

DataflowのDynamic Destinationsを使って動的に出力先のテーブルを変える

オンプレ環境のテーブルをBigQueryにリアルタイムに連携するために、DataflowのDynamic Destinations機能を使いました。Dynamic Destinationsを使うことでテーブル名を考慮して出力先を変えることができます。カスタムテンプレートを作るにあたり、ベーステ…

Dataflowテンプレート作成方法(Java)

DataflowのJava カスタムテンプレートの備忘録となります。pythonのテンプレート作成方法は次の通りです。 www.case-k.jp 環境構築 テンプレート作成 サンプルテンプレート作成 オプションを定義 テンプレート作成 所感 環境構築 まずはMavenをインストール…

Dataflowでしかできないことってなんだろう。BigQueryだけじゃだめなのかな。

最近ストリーミングデータの連携基盤を作っており、パイプラインを作る過程である疑問が浮かびました。 Dataflow必要なのかな。BigQueryでも良い気がする そこでDataflowがなかった場合、どうなるのか考えてみました。 データパイプライン 次のようなデータ…

技術調査メモ:Google提供のDataflowテンプレートでできること、できないこと_20200604

Dataflow テンプレートを使用すると、Cloud Storage 上のパイプラインをステージングして、さまざまな環境で実行できます。テンプレートは、Google 提供のテンプレートを使用することも、自身で作成することもできます。 Google提供のDataflowテンプレートで…

Dataflowが解決するストリーミング処理の課題と基盤構築で考慮すること

Dataflowが解決するストリーミング処理の課題と基盤を作る上で考慮すべき点をいくつか資料を参考に備忘録もかねて整理してみました。 ストリーミング処理の概要 ストリーミング処理とは バッチ処理との違い ストリーミング処理の課題 データ量と変動性 遅延…

Dataflowが得意なこと、苦手なこと

Dataflowが得意なこと、苦手なことを考えてみました。 得意なこと バッチ/ストリーミング処理(特にストリーミング処理) サイズの大きいデータを扱うこと サーバ費用を抑えること 苦手なこと 逐次処理 複雑なパイプライン制御(役割が異なる) 得意なこと バッ…

Dataflow Google BigQuery I/O connector:Python

Dataflowを使ってBigQueryからBigQueryに書き込む処理とCloud StorageからBigQueryに書き込む処理をします。 Code options GBQ to GBQ GCS to GBQ beam.apache.org Code github.com options # -*- coding: utf-8 -*- import apache_beam as beam from apache…

Cloud Composerを使いDataflowのパイプライン制御をする方法

Cloud ComposerでDataflowテンプレートを順次キックしていく逐次処理を行います。順番に処理が行われたことを確認するために前のDataflowのテンプレート実行し作られたファイルを参照する処理にしました。 code Composer環境を作る Dataflowのテンプレートを…

Dataflowパイプライン処理の備忘録:Python

Dataflowで使うパイプライン処理の備忘録です。随時更新できればと思います。 options udf branch group by Filter beam.apache.org options # -*- coding: utf-8 -*- import apache_beam as beam # プロジェクトID PROJECTID = 'project id' # オプション設…

Dataflowカスタムパラメータの追加方法:Python

Dataflowテンプレートでカスタムパラメータを追加します。パラメータを静的に定義する方法とテンプレート実行時に動的にパラメータを指定する方法を紹介します。gclodコマンドで実行しますが、Cloud Functonsからテンプレートをキックする方法は以下の記事を…

Dataflowテンプレート実行方法:Python

GAEなどでDataflowのテンプレートの実行方法です。パラメータは以下のようにして渡します code Dataflowのテンプレートの実行 code github.com Dataflowのテンプレートの実行 "parameters": { "input": "gs://{}/sample2.csv".format(PROJECTID), "output": …

Cloud FunctionsでDataflowテンプレートをキックさせる方法:Python

Cloud FunctionsでGCSのバケットに置かれたファイルを検知し、Dataflowのテンプレートをキックします。パラメータとしてCloud Functionsでファイル名を取得し、Dataflowのテンプレートに引数として渡します。GCSから加工しGBQに取り込むケースなどに使います…

Dataflowテンプレート作成方法(Python)

Pythonベースで記述したDataflowのコードをテンプレート化し、実行してみます。テンプレートを作成するためには以下のコードを追記するだけです。コードを実行するとGCS内にテンプレートが作られるので、作成したテンプレートを実行してみます。 gcloud_opti…

DataflowでPython3系を使って良いのか検討してみた

DataflowでPython3系を使って良さそうか調べてみました。 Python3系を使いたい理由 DataflowのPython3系のサポート状況について Apach Beamのissueについて 動作確認 ストリーミング処理は? 結論(個人的な) Python3系を使いたい理由 DataflowはETLツールな…

Googleが提供するDataflowテンプレートを使う方法

Googleが事前に用意してくれてるDataFlowのテンプレートを使い、GCS上に配置したテキストファイルをDataflowで加工し、BigQueryのテーブルに書き込みます。 code Dataflowを実行するための選択肢 Googleの提供するテンプレートを使う code バケットの配下に…

Dataflowを検証「Python2系と3系」

GCPのサーバレスETLツールであるDataflowの検証をしてみました。前に使った時はPython2系しか使えなかったんですが3系のサポートも始めたらしいので期待です。 Dataflowとは Python 2系で実行 Python 3系で実行 Dataflowとは DataFlowはGCPのETLツールでデ…