ZOZO Advent Calendar 2022 カレンダー25日目の記事です
qiita.com
BigQueryに書き込まれたSQL Serverの変更追跡ログを使って、変更のあったPKの変更ログと変更前のログを取得する方法をご紹介します。
ZOZOではSQL Serverの変更追跡機能を使い、変更のあったPKの最新のレコードをBigQueryに連携しています。SQL Serverの変更追跡では以下のようなクエリを実行して、変更のあったPKとPKに紐づく最新のデータを取得できます。
SELECT a.SYS_CHANGE_OPERATION as changetrack_type, a.SYS_CHANGE_VERSION as changetrack_ver, #{columns} FROM CHANGETABLE(CHANGES #{@tablename}, @前回更新したバージョン) AS a LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}
SQL ServerなどDBの変更を追跡する機能はCDC(Change Data Capture)と呼ばれており、DBで変更のあったデータを全て連携するログベースのCDCから、クエリで変更データをポーリング するCDCなどがあります。SQL Serverの変更追跡は後者に該当します。
ログベースのCDCであれば変更前データも取れますが、SQL Serverの変更追跡機能を使う場合、変更のあったPKの変更前のデータは取得できません。本記事ではSQL Serverの変更追跡機能を使ってBigQueryへ連携した変更のあったPKの変更ログに加えて、変更前ログを取得する方法をご紹介します。
techblog.zozo.com
datacater.io
BigQueryで実現する方法
BigQuery上で変更のあったPKの変更ログと変更前のログを取得する方法を紹介します。
変更のあった差分データを取得
SQL Serverの変更追跡機能で連携された差分テーブルから直近2日分の変更データを取得します。直近2日にしているのは、BigQueryパーティションでコストとパフォーマンスを向上させるのと、後ほど紹介する変更前のデータが変更のあった差分データに含まれていない場合に対応するためです。
streaming AS ( SELECT changetrack_type, changetrack_ver , bigquery_insert_time, <primary_key> AS primary_key, <columns> FROM <変更追跡で連携された差分テーブル> WHERE bigquery_insert_time >= TIMESTAMP_SUB(CAST(FORMAT_TIMESTAMP("%Y-%m-%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 day), "Asia/Tokyo") AS timestamp), INTERVAL 9 HOUR)),
各カラムは以下の情報を含んでいます。
- changetrack_type: 変更処理。どのようは変更があったか確認できます(I: insert, U: update, D: delete)
- changetrack_ver:変更追跡のバージョン。SQL Serverのトランザクションごとに発行されるバージョンです。数値の大きいデータが最新になります
- primary_key: SQL ServerのPKをセットしています
- bigquery_insert_time: BigQueryに書き込まれた時刻.BigQueryのパーティション機能や遅延計測に使っています
- columns: 変更を追跡しているテーブルのカラム
最新の変更追跡バージョンを集計
先ほど抽出したデータを用いて、PKごとに最新の変更追跡バージョンを集計します。
streaming_latest_version AS ( SELECT primary_key, MAX(changetrack_ver) AS changetrack_ver_max FROM streaming -- set instead of tracking version WHERE bigquery_insert_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 60 second) GROUP BY primary_key ),
最新の変更追跡バージョンを用いて最新の変更ログを取得
PKごとに集計した最新の変更追跡バージョンと変更ログをJOINすることで、変更ログから変更のあったPKの最新のデータを取得できます。
streaming_latest AS ( SELECT changetrack_type, changetrack_ver, streaming.primary_key, <columns> FROM streaming INNER JOIN streaming_latest_version ON streaming.primary_key = streaming_latest_version.primary_key AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max ),
変更前の変更追跡バージョンを取得
次に変更のあったPKの変更前のデータを取得します。変更のあった差分データと最新のバージョンをLEFT JOINし、突合できなかったデータ(changetrack_ver_max IS NULL)から変更前の変更追跡バージョンを取得します。変更のあった最新のバージョンは除外されるため変更前の変更追跡バージョンを取得できます。
streaming_before_latest_version AS ( SELECT primary_key, MAX(changetrack_ver) AS changetrack_ver_before_latest FROM ( SELECT streaming.primary_key, streaming.changetrack_ver, streaming_latest_version.changetrack_ver_max FROM streaming LEFT OUTER JOIN streaming_latest_version ON streaming.primary_key = streaming_latest_version.primary_key AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max) WHERE changetrack_ver_max IS NULL GROUP BY primary_key ),
変更前の変更追跡バージョンを用いて変更前のログを取得
変更前の変更追跡バージョンを使って、変更のあったPKの変更前のデータを取得できます。
streaming_before_latest AS ( SELECT changetrack_type, changetrack_ver, streaming.primary_key, FROM streaming INNER JOIN streaming_before_latest_version ON streaming.primary_key = streaming_before_latest_version.changetrack_ver_before_latest AND streaming.changetrack_ver = streaming_before_latest_version.changetrack_ver_before_latest ),
前日分の全量テーブルから、差分データに含まれていない変更前データを取得
BigQueryのパーティションで絞りこんでいるため、変更のあった差分データの中に変更前のデータが含まれているとは限りません。変更のあった差分データの中に変更前のデータが含まれていない場合は前日分の全量テーブルから変更前のデータを取得します。
daily_before_latest AS ( SELECT CAST(NULL AS string) AS changetrack_type, CAST(NULL AS int64) AS changetrack_ver, <columns> FROM ( SELECT <primary_key> AS primary_key, <columns> FROM <前日の全量日付サフィックステーブル> WHERE _TABLE_SUFFIX IN (SUBSTR(FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 day), "Asia/Tokyo"), 3))) WHERE primary_key NOT IN ( SELECT primary_key FROM streaming_before_latest_version) AND primary_key IN ( SELECT primary_key FROM streaming_latest_version) )
変更のあったPKの変更ログと変更前のログを取得
最後に変更データと変更前データをUNIONすることで、変更のあったPKの最新データと変更前のデータを取得できます。
SELECT * FROM streaming_latest UNION ALL SELECT * FROM streaming_before_latest UNION ALL SELECT * FROM daily_before_latest
BigQueryクエリ完成形
完成形のクエリは以下のようになります。
WITH # 本日分の差分テーブル streaming AS ( SELECT changetrack_type, changetrack_ver, bigquery_insert_time, <primary_key> AS primary_key, <columns> FROM <変更追跡で連携された差分テーブル> WHERE bigquery_insert_time >= TIMESTAMP_SUB(CAST(FORMAT_TIMESTAMP("%Y-%m-%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 day), "Asia/Tokyo") AS timestamp), INTERVAL 9 HOUR)), # 差分テーブルから最新のバージョン streaming_latest_version AS ( SELECT primary_key, MAX(changetrack_ver) AS changetrack_ver_max FROM streaming -- set instead of tracking version WHERE bigquery_insert_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 60 second) GROUP BY primary_key ), # 差分テーブルから最新のログ streaming_latest AS ( SELECT changetrack_type, changetrack_ver, streaming.primary_key, <columns> FROM streaming INNER JOIN streaming_latest_version ON streaming.primary_key = streaming_latest_version.primary_key AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max ), # 差分テーブルにある変更前のバージョン(最新の一つ前のバージョン) streaming_before_latest_version AS ( SELECT primary_key, MAX(changetrack_ver) AS changetrack_ver_before_latest FROM ( SELECT streaming.primary_key, streaming.changetrack_ver, streaming_latest_version.changetrack_ver_max FROM streaming LEFT OUTER JOIN streaming_latest_version ON streaming.primary_key = streaming_latest_version.primary_key AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max) WHERE changetrack_ver_max IS NULL GROUP BY primary_key ), # 差分テーブルにある変更前のデータ(最新の一つ前のデータ) streaming_before_latest AS ( SELECT changetrack_type, changetrack_ver, streaming.primary_key, <columns> FROM streaming INNER JOIN streaming_before_latest_version ON streaming.primary_key = streaming_before_latest_version.changetrack_ver_before_latest AND streaming.changetrack_ver = streaming_before_latest_version.changetrack_ver_before_latest ), # 差分テーブルにない変更前のデータ daily_before_latest AS ( SELECT CAST(NULL AS string) AS changetrack_type, CAST(NULL AS int64) AS changetrack_ver, primary_key, <columns> FROM ( SELECT <primary_key> AS primary_key, <columns> FROM <前日の全量日付サフィックステーブル> WHERE _TABLE_SUFFIX IN (SUBSTR(FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 day), "Asia/Tokyo"), 3))) WHERE primary_key NOT IN ( SELECT primary_key FROM streaming_before_latest_version) AND primary_key IN ( SELECT primary_key FROM streaming_latest_version) ) SELECT * FROM streaming_latest UNION ALL SELECT * FROM streaming_before_latest UNION ALL SELECT * FROM daily_before_latest