case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

BigQueryに書き込まれたSQL Serverの変更ログを用いて、変更のあったPKの最新データと変更前のデータを取得する方法

ZOZO Advent Calendar 2022 カレンダー25日目の記事です
qiita.com

BigQueryに書き込まれたSQL Serverの変更追跡ログを使って、変更のあったPKの変更ログと変更前のログを取得する方法をご紹介します。

ZOZOではSQL Serverの変更追跡機能を使い、変更のあったPKの最新のレコードをBigQueryに連携しています。SQL Serverの変更追跡では以下のようなクエリを実行して、変更のあったPKとPKに紐づく最新のデータを取得できます。

  SELECT
    a.SYS_CHANGE_OPERATION as changetrack_type,
    a.SYS_CHANGE_VERSION as changetrack_ver,
    #{columns}
  FROM
    CHANGETABLE(CHANGES #{@tablename},
      @前回更新したバージョン) AS a
  LEFT OUTER JOIN #{@tablename} ON a.#{@primary_key} = b.#{@primary_key}

SQL ServerなどDBの変更を追跡する機能はCDC(Change Data Capture)と呼ばれており、DBで変更のあったデータを全て連携するログベースのCDCから、クエリで変更データをポーリング するCDCなどがあります。SQL Serverの変更追跡は後者に該当します。

ログベースのCDCであれば変更前データも取れますが、SQL Serverの変更追跡機能を使う場合、変更のあったPKの変更前のデータは取得できません。本記事ではSQL Serverの変更追跡機能を使ってBigQueryへ連携した変更のあったPKの変更ログに加えて、変更前ログを取得する方法をご紹介します。

techblog.zozo.com
datacater.io

BigQueryで実現する方法

BigQuery上で変更のあったPKの変更ログと変更前のログを取得する方法を紹介します。

変更のあった差分データを取得

SQL Serverの変更追跡機能で連携された差分テーブルから直近2日分の変更データを取得します。直近2日にしているのは、BigQueryパーティションでコストとパフォーマンスを向上させるのと、後ほど紹介する変更前のデータが変更のあった差分データに含まれていない場合に対応するためです。

  streaming AS (
  SELECT
    changetrack_type,
    changetrack_ver ,
    bigquery_insert_time,
   <primary_key> AS primary_key,
    <columns>
  FROM
    <変更追跡で連携された差分テーブル>
  WHERE
    bigquery_insert_time >= TIMESTAMP_SUB(CAST(FORMAT_TIMESTAMP("%Y-%m-%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 day), "Asia/Tokyo") AS timestamp), INTERVAL 9 HOUR)),

各カラムは以下の情報を含んでいます。

  • changetrack_type: 変更処理。どのようは変更があったか確認できます(I: insert, U: update, D: delete)
  • changetrack_ver:変更追跡のバージョン。SQL Serverトランザクションごとに発行されるバージョンです。数値の大きいデータが最新になります
  • primary_key: SQL ServerのPKをセットしています
  • bigquery_insert_time: BigQueryに書き込まれた時刻.BigQueryのパーティション機能や遅延計測に使っています
  • columns: 変更を追跡しているテーブルのカラム

最新の変更追跡バージョンを集計

先ほど抽出したデータを用いて、PKごとに最新の変更追跡バージョンを集計します。

  streaming_latest_version AS (
  SELECT
    primary_key,
    MAX(changetrack_ver) AS changetrack_ver_max
  FROM
    streaming
    -- set instead of tracking version
  WHERE
    bigquery_insert_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 60 second)
  GROUP BY
    primary_key ),

最新の変更追跡バージョンを用いて最新の変更ログを取得

PKごとに集計した最新の変更追跡バージョンと変更ログをJOINすることで、変更ログから変更のあったPKの最新のデータを取得できます。

  streaming_latest AS (
  SELECT
    changetrack_type,
    changetrack_ver,
    streaming.primary_key,
    <columns>
  FROM
    streaming
  INNER JOIN
    streaming_latest_version
  ON
    streaming.primary_key = streaming_latest_version.primary_key
    AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max ),

変更前の変更追跡バージョンを取得

次に変更のあったPKの変更前のデータを取得します。変更のあった差分データと最新のバージョンをLEFT JOINし、突合できなかったデータ(changetrack_ver_max IS NULL)から変更前の変更追跡バージョンを取得します。変更のあった最新のバージョンは除外されるため変更前の変更追跡バージョンを取得できます。

  streaming_before_latest_version AS (
  SELECT
    primary_key,
    MAX(changetrack_ver) AS changetrack_ver_before_latest
  FROM (
    SELECT
      streaming.primary_key,
      streaming.changetrack_ver,
      streaming_latest_version.changetrack_ver_max
    FROM
      streaming
    LEFT OUTER JOIN
      streaming_latest_version
    ON
      streaming.primary_key = streaming_latest_version.primary_key
      AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max)
  WHERE
    changetrack_ver_max IS NULL
  GROUP BY
    primary_key ),

変更前の変更追跡バージョンを用いて変更前のログを取得

変更前の変更追跡バージョンを使って、変更のあったPKの変更前のデータを取得できます。

  streaming_before_latest AS (
  SELECT
    changetrack_type,
    changetrack_ver,
    streaming.primary_key,
  FROM
    streaming
  INNER JOIN
    streaming_before_latest_version
  ON
    streaming.primary_key = streaming_before_latest_version.changetrack_ver_before_latest
    AND streaming.changetrack_ver = streaming_before_latest_version.changetrack_ver_before_latest ),

前日分の全量テーブルから、差分データに含まれていない変更前データを取得

BigQueryのパーティションで絞りこんでいるため、変更のあった差分データの中に変更前のデータが含まれているとは限りません。変更のあった差分データの中に変更前のデータが含まれていない場合は前日分の全量テーブルから変更前のデータを取得します。

  daily_before_latest AS (
  SELECT
    CAST(NULL AS string) AS changetrack_type,
    CAST(NULL AS int64) AS changetrack_ver,
 <columns>
  FROM (
    SELECT
     <primary_key> AS primary_key,
      <columns>
    FROM
      <前日の全量日付サフィックステーブル>
    WHERE
      _TABLE_SUFFIX IN (SUBSTR(FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 day), "Asia/Tokyo"), 3)))
  WHERE
    primary_key NOT IN (
    SELECT
      primary_key
    FROM
      streaming_before_latest_version)
    AND primary_key IN (
    SELECT
      primary_key
    FROM
      streaming_latest_version) )

変更のあったPKの変更ログと変更前のログを取得

最後に変更データと変更前データをUNIONすることで、変更のあったPKの最新データと変更前のデータを取得できます。

SELECT
  *
FROM
  streaming_latest
UNION ALL
SELECT
  *
FROM
  streaming_before_latest
UNION ALL
SELECT
  *
FROM
  daily_before_latest

BigQueryクエリ完成形

完成形のクエリは以下のようになります。

WITH
  # 本日分の差分テーブル
  streaming AS (
  SELECT
    changetrack_type,
    changetrack_ver,
    bigquery_insert_time,
    <primary_key> AS primary_key,
    <columns>
  FROM
    <変更追跡で連携された差分テーブル>
  WHERE
    bigquery_insert_time >= TIMESTAMP_SUB(CAST(FORMAT_TIMESTAMP("%Y-%m-%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 day), "Asia/Tokyo") AS timestamp), INTERVAL 9 HOUR)),
  # 差分テーブルから最新のバージョン
  streaming_latest_version AS (
  SELECT
    primary_key,
    MAX(changetrack_ver) AS changetrack_ver_max
  FROM
    streaming
    -- set instead of tracking version
  WHERE
    bigquery_insert_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 60 second)
  GROUP BY
    primary_key ),
  # 差分テーブルから最新のログ
  streaming_latest AS (
  SELECT
    changetrack_type,
    changetrack_ver,
    streaming.primary_key,
    <columns>
  FROM
    streaming
  INNER JOIN
    streaming_latest_version
  ON
    streaming.primary_key = streaming_latest_version.primary_key
    AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max ),
  # 差分テーブルにある変更前のバージョン(最新の一つ前のバージョン)
  streaming_before_latest_version AS (
  SELECT
    primary_key,
    MAX(changetrack_ver) AS changetrack_ver_before_latest
  FROM (
    SELECT
      streaming.primary_key,
      streaming.changetrack_ver,
      streaming_latest_version.changetrack_ver_max
    FROM
      streaming
    LEFT OUTER JOIN
      streaming_latest_version
    ON
      streaming.primary_key = streaming_latest_version.primary_key
      AND streaming.changetrack_ver = streaming_latest_version.changetrack_ver_max)
  WHERE
    changetrack_ver_max IS NULL
  GROUP BY
    primary_key ),
  # 差分テーブルにある変更前のデータ(最新の一つ前のデータ)
  streaming_before_latest AS (
  SELECT
    changetrack_type,
    changetrack_ver,
    streaming.primary_key,
    <columns>
  FROM
    streaming
  INNER JOIN
    streaming_before_latest_version
  ON
    streaming.primary_key = streaming_before_latest_version.changetrack_ver_before_latest
    AND streaming.changetrack_ver = streaming_before_latest_version.changetrack_ver_before_latest ),
  # 差分テーブルにない変更前のデータ
  daily_before_latest AS (
  SELECT
    CAST(NULL AS string) AS changetrack_type,
    CAST(NULL AS int64) AS changetrack_ver,
    primary_key,
    <columns>
  FROM (
    SELECT
     <primary_key> AS primary_key,
     <columns>
    FROM
       <前日の全量日付サフィックステーブル>
    WHERE
      _TABLE_SUFFIX IN (SUBSTR(FORMAT_TIMESTAMP("%Y%m%d", TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 day), "Asia/Tokyo"), 3)))
  WHERE
    primary_key NOT IN (
    SELECT
      primary_key
    FROM
      streaming_before_latest_version)
    AND primary_key IN (
    SELECT
      primary_key
    FROM
      streaming_latest_version) )
SELECT
  *
FROM
  streaming_latest
UNION ALL
SELECT
  *
FROM
  streaming_before_latest
UNION ALL
SELECT
  *
FROM
  daily_before_latest