case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Data CatalogにオンプレDBのスキーマを同期させて、バッチ・ストリーミングデータ基盤(BigQuery)のスキーマ反映を自動化する

本記事はZOZOテクノロジーズ #1 Advent Calendar 2020 - Qiita 24日目の記事です。バッチ方式の日次データ基盤とストリーミング方式のリアルタイムデータ基盤のスキーマ反映でData Catalogがどのように役立つのか概要も踏まえてご紹介できればと思います。後半ではPythonで実際に意図したことができるか検証してみました。

Data Catalogとは

Data CatalogとはGCPが提供するフルマネージドでスケーラビリティの高いデータ検出およびメタデータを管理するシステムです。カラムレベルのアクセス制御を行うポリシータグも Data Catalogで管理されています。自動収集されるテクニカルメタデータやビジネス用にスキーマ化されたメタデータを登録することができます。またDLPとの統合によりBigQuery内のデータで個人情報がある場合自動的にタグを付与することもできます。個人情報に該当するカラムは管理はしていると思いますが、管理から漏れてしまうこともあるかと思います。Data CatalogとDLPを使うことでPIIのタグを付与し監視の役割を担うこともできます。
cloud.google.com

towardsdatascience.com

テクニカルメタデータ

GCPで自動収集されるメタデータです。BigQueryやPub/Sub、GCSのデータを自動的に同期されます。Data Catalogのコンソール画面でテーブル名を検索するとBigQueryのテーブルのスキーマ情報等確認できます。テクニカルメタデータからはBigQueryのスキーマにポリシータグが付与されているかも確認することができるので便利です。

ビジネスメタデータ

ユーザが登録するメタデータです。ビジネスメタデータはオンプレ環境にあるDBのメタデータ収集などで使えます。ビジネスメタデータにはタグ(タグテンプレート)を付与することができ、スキーマがどのような役割なのか(個人情報やマスク済みカラムなど)識別することが可能です。私がData Catalogを使いたいモチベーションとしてはオンプレのSQL ServerにあるスキーマをData Catalogで一元管理したいからです。一元管理することで日次、リアルタイム双方のデータ基盤の依存関係を制御し、BigQueryのスキーマの自動追従を行いたいと考えています。
f:id:casekblog:20201222120559p:plain

Cloud SQL からData Catalogにスキーマを同期するコネクタも用意されてます。
www.qwiklabs.com

料金体系

料金体系はビジネスメタデータの量とリクエスト数によって異なります。

1 MB まで無料です。超過分は 1 GB あたり月額 $100かかります。許容範囲ではあるものの思ったよりかかる印象です。

  • Catalog API Calls pricing

1 か月あたり 100 万回まで無料です。超過分は 10 万回あたり月額 $10かかるようです。
次のようにリアルタイムデータ基盤でDataflowから呼び出すことも想定してましたが、リクエスト数のことを考えると事前にスキーマ情報をテンプレートに反映しといた方が良さそうだと思いました。
f:id:casekblog:20201223173956p:plain

cloud.google.com

qiita.com

データ基盤におけるスキーマ管理の課題

データ基盤におけるスキーマ管理の課題について、運用者と利用者の視点からご紹介できればと思います。

データ基盤運用者の課題

スキーマの追従が結構大変です。元々は追従しなくていいよう「SELECT * FROM TABLE」で行ってましたが、カラム追加などスキーマが変更されるとBigQueryのスキーマを事前に変更する必要があり、運用負荷が高かったためカラム指定してクエリを実行するようになりました。
f:id:casekblog:20201222171955p:plain

次の記事でもカラム指定の経緯など書いてあります。
techblog.zozo.com

スキーマを反映する際は人の手でPRを作り反映しています。これまでは日次データ基盤のみでしたが、最近だとリアルタイムデータ基盤もリリースしました。リアルタイムデータ基盤の詳細は次の記事に書きました。
techblog.zozo.com

リアルタイムデータ基盤ではオンプレのSQL Serverの差分のみを取得してBigQueryへ連携しています。日次データ基盤のバッチ方式とは異なり、データが常時流れてくるストリーミング方式なので、データ基盤間の依存関係が多くデプロイタイミングを意識する必要があります。構成としてはfluentdで差分データを取得し、Dataflowを介してBigQueryへ差分データを書き込んでいます。

詳細は記事後半に書きましたが、カラム追加の場合はBigQueryから反映し、Dataflow、fluentdの順番で反映する必要があります。カラム削除やスキーマ変更があった場合Dataflowを停止する必要があります。

また基盤利用者の要望としては最新の全量データを使いたいといった要望があるため、リアルタイムデータ基盤の差分データと日次バッチで転送したデータをUNIONしたビューを用意しています。ビューでUNIONを行う際、リアルタイムデータ基盤にはデプロイ済みのカラムも、日次データ基盤には反映されていないカラムが多々あります。その場合、まず日次データ基盤のスキーマを反映させる必要が出てきます。

特に煩雑な連携をしてるテーブルはカラム追加のハードルが高く追加する場合大変です。なので全量データを参照してるビューを日次データ基盤で反映済みのカラムに絞るなどの対応もしています。カラム指定を行うことで「SELECT * FROM TABLE」より運用は楽になった一方で、データ基盤間のスキーマが一致しなかったり、手動反映の運用負荷は残りました。

データ基盤利用者の課題

スキーマの反映を人の手で行うことで最新のスキーマをBigQueryで扱うことができません。サービスや分析で必要なカラムは運用チームに依頼し用意するところから始める必要があります。新機能のリリースや分析結果の反映などユーザ提供までのリードタイムが長くなってしまいます。

Data Catalogが解決する課題

Data Catalogを使うことでオンプレにあるSQL Serverスキーマを同期し、Data Catalogでデータ基盤のスキーマを一元管理しようとしてます。一元管理できることで、ワークフローでデータ基盤間で依存関係のあるデプロイを制御し、スキーマの自動追従ができるのではないかと考えてます。Data Catalogのビジネスメタデータを使うことでオンプレ環境のスキーマ追従に加え、どのカラムが個人情報、もしくはマスクしたカラムなのかも識別できるようにしたいと考えてます。個人情報のカラムにはカラムレベルのアクセス制御を行うポリシータグを付与しています。
cloud.google.com

ワークフローでやりたいこと

Data Catalogを使った日次データ基盤とリアルタイムデータ基盤で実現したいことをご紹介できればと思います。リアルタイムデータ基盤ならではのスキーマ反映の手順や課題も合わせてご紹介できたらと思います。

日次データ基盤

日次データ基盤は日に1回オンプレからBigQueryへ連携する転送バッチです。日次のデータ基盤については次のようなワークフローを考えてます。データ転送前にSQL Serverスキーマ情報をData Catalogへ同期し、最新のスキーマを使ってカラム指定したクエリを生成します。

BigQueryへの転送速度を上げるため、gsutilでGCSにコピーしたのち、GCSを経由してBigQueryへロードします。カラムレベルのアクセス制御を担っているポリシータグはテーブルを上書いてしまうと外れてしまいます。例外はコピージョブで、テーブルコピー ジョブはデータ変換を適用しないためポリシータグはそのまま残ります。そのため、1度ユーザが参照できないBigQueryのデータセット内にロードして、ポリシータグを付与したのちユーザが参照できるデータセット内にコピーします。この方法によりBigQueryでスキーマの自動追従を行います。
f:id:casekblog:20201222115813p:plain

cloud.google.com

リアルタイムデータ基盤

リアルタイムデータ基盤は常にオンプレ環境のSQL Serverから差分データを取得しBigQueryへ連携するストリーミング処理です。リアルタイムデータ基盤の方は日次データ基盤のような完全自動化は難しく、カラムの削除やスキーマ変更があった場合人の手で対応が必要になります。カラムの自動追従は日次データ基盤同様実現できると考えています。スキーマの削除、変更、追加それぞれのケースを想定し必要な対応をご紹介できればと思います。

カラム削除の反映

スキーマの変更や削除が行われる場合人の手で対応が必要になります。まず、削除の場合、差分データをSQL Serverから取得しているfluentdから対象カラムを取り除きます。取り除かれたカラムはDataflow、BigQueryにはnull値でインサートされます。次にDataflowをドレインで停止させ処理中のデータはBigQueryへ転送した後停止させます。これはDataflowとBigQueryでスキーマが異なるとインサートに失敗してしまうからです。
f:id:casekblog:20201223172649p:plain

差分データはPub/Subで保持されジョブを更新したタイミングで再度連携されます。Dataflowはテンプレートから対象スキーマを削除し、BigQueryは既存のテーブルから対象のカラムを取り除いた状態で上書きします。Dataflowはテーブル名を参照して出力先のテーブルを振り分けるDynamic Destinations機能を採用しており、次のようにスキーマを定義してます。このスキーマがBigQueryと異なるとインサートに失敗します。

case "TABLE":
    schema.setFields(
    ImmutableList.of(
    new TableFieldSchema().setName("massage_unique_id").setType("STRING").setMode("NULLABLE"),
    new TableFieldSchema().setName("database_name").setType("STRING").setMode("NULLABLE"),
    new TableFieldSchema().setName("table_name").setType("STRING").setMode("NULLABLE"),
    new TableFieldSchema().setName("id").setType("STRING").setMode("NULLABLE"),
    new TableFieldSchema().setName("name").setType("STRING").setMode("NULLABLE"),
    ));
    break;

www.case-k.jp

BigQueryは次のようにして対象カラムを取り除きテーブルを上書きします。

SELECT
  *
   EXCEPT(colunns) 
FROM
  DATASET.TABLE

BigQueryのテーブルを上書きするとカラムレベルのアクセス制御を担っているポリシータグが外れてしまうので、ユーザが参照できないデータセット内に上書き、ポリシータグを付与したあとコピージョブでユーザが参照するデータセット内に保存します
f:id:casekblog:20201223175503p:plain

スキーマ変更の反映

基本的にはカラム削除同様の対応でDataflowを止めて作業します。削除との違いはSQL Serverスキーマ変更前に事前に対象カラムを削除、もしくは文字列変換します。SQL Serverスキーマ変更後はもう一度Dataflowを止めて、Dataflow、BigQueryの対象スキーマを修正する必要がある点です。対象のカラムが使われてないようであればDROPしてしまうのがデータ基盤運用者、利用者にとって良いと思います。その場合Dataflowの停止は一度だけでよくなります。

f:id:casekblog:20201223172759p:plain

次のようなクエリで対象カラムを文字列変換します。

SELECT
  id,
  name,
  cast(birthday as string)  as birthday
FROM
  TABLE
カラム追加の反映

カラム追加については日次データ連携のタイミングで定義しても問題ないように思ってます。一方、スキーマ変更やカラム削除がある場合は問題が出てきます。SQL Serverスキーマ追加、変更、削除がある場合、事前に連絡がくる運用になってます。連絡がくるとリアルタイムデータ基盤の運用者は先ほどご紹介したように、事前にスキーマを文字列変換しスキーマが変わっても問題が起こらないようにします。

このスキーマはオンプレのSQL Serverスキーマを変更したタイミングで、新しいスキーマにする予定です。しかし、日次で走るワークフローで基幹側変更前に書き換えてしまうとせっかく前もって文字列変換等準備したのに元の状態に戻ってしまいます。

そのため、ワークフローでスキーマ変更やカラム削除対象のカラムはSQL Server反映まで自動追従を止めるような制御が必要なのではないかと思ってます。

f:id:casekblog:20201223172917p:plain

Data Catalogの検証

やりたいことが実現出来そうかData Catalogを試してみました。

ライブラリとGCPのサービスアカウントを環境変数に出力します。

pip3 install google-cloud-datacatalog==1.0.0
export GOOGLE_APPLICATION_CREDENTIALS="/usr/src/app/gcp-credentials.json"

Classを定義します。

from google.api_core.exceptions import NotFound, PermissionDenied
from google.cloud import datacatalog_v1


class DataCatalogTask(object):

    def __init__(self, project_id, location, entry_group_id, entry_id, tag_template_id):
        self.project_id = project_id
        self.location = location
        self.entry_group_id = entry_group_id
        self.entry_id = entry_id
        self.tag_template_id = tag_template_id
        self.datacatalog = datacatalog_v1.DataCatalogClient()

    def delete_entry(self):
        expected_entry_name = self.datacatalog.entry_path(self.project_id, self.location, self.entry_group_id, self.entry_id)
        try:
            self.datacatalog.delete_entry(name=expected_entry_name)
        except (NotFound, PermissionDenied):
            pass

    def delete_entry_group(self):
        expected_entry_group_name = self.datacatalog.entry_group_path(self.project_id, self.location, self.entry_group_id)
        try:
            self.datacatalog.delete_entry_group(name=expected_entry_group_name)
        except (NotFound, PermissionDenied):
            pass

    def delete_tag_template(self):
        expected_template_name = self.datacatalog.tag_template_path(self.project_id, self.location, self.tag_template_id)
        try:
            self.datacatalog.delete_tag_template(name=expected_template_name, force=True)
        except (NotFound, PermissionDenied):
            pass

    def delete_pre_existing_data(self):
        self.delete_entry()
        self.delete_entry_group()
        self.delete_tag_template()

    def create_entry_group(self):
        entry_group_obj = datacatalog_v1.types.EntryGroup()
        entry_group_obj.display_name = self.entry_group_id
        entry_group_obj.description = '{} database schemae'.format(self.entry_group_id)
        entry_group = self.datacatalog.create_entry_group(
            parent=self.datacatalog.location_path(self.project_id, self.location),
            entry_group_id=self.entry_group_id,
            entry_group=entry_group_obj)
        print('Created entry group: {}'.format(entry_group.name))
        return entry_group

    def create_entry(self, entry_group):
        entry = datacatalog_v1.types.Entry()
        entry.user_specified_system = self.entry_group_id
        entry.user_specified_type = self.entry_group_id
        entry.display_name = self.entry_id
        entry.description = 'this table is managed by {} database'.format(self.entry_group_id)
        # entry.linked_resource = '//my-onprem-server.com/dataAssets/my-awesome-data-asset'
        columns = []
        columns.append(datacatalog_v1.types.ColumnSchema(
            column='first_column',
            type='STRING',
            description='This columns consists of ....',
            mode=None))

        columns.append(datacatalog_v1.types.ColumnSchema(
            column='second_column',
            type='DOUBLE',
            description='This columns consists of ....',
            mode=None))

        entry.schema.columns.extend(columns)

        entry = self.datacatalog.create_entry(
            parent=entry_group.name,
            entry_id=self.entry_id,
            entry=entry)
        print('Created entry: {}'.format(entry.name))
        return entry

    def create_tag_template(self, tag_template_name):
        # https://cloud.google.com/data-catalog/docs/quickstart-tagging#data-catalog-quickstart-python
        tag_template = datacatalog_v1.types.TagTemplate()
        tag_template.display_name = tag_template_fields_key
        tag_template.fields[tag_template_fields_key].display_name = tag_template_fields_key
        tag_template.fields[tag_template_fields_key].type.primitive_type = \
            datacatalog_v1.enums.FieldType.PrimitiveType.STRING.value
        tag_template = self.datacatalog.create_tag_template(
            parent=self.datacatalog.location_path(project_id, location),
            tag_template_id=self.tag_template_id,
            tag_template=tag_template)
        print('Created template: {}'.format(tag_template.name))
        return tag_template

    def attach_tag_to_entry_column(self, entry, tag_template, tag_template_fields_key, tag_template_fields_value, column):
        tag = datacatalog_v1.types.Tag()
        tag.template = tag_template.name
        tag.fields[tag_template_fields_key].string_value = tag_template_fields_value
        # A Tag can be attached to a specific table column, instead of the table itself, by setting tag.column = column_name.
        # https://medium.com/google-cloud/data-catalog-hands-on-guide-templates-tags-with-python-c45eb93372ef
        tag.column = column
        #tag = self.datacatalog.create_tag(parent=entry.name, tag=tag)
        tag = self.datacatalog.create_tag(parent=entry.name, tag=tag)
        print('Created tag: {}'.format(tag.name))
        return tag

    def fetch_list_entries(self):
        # fetch schema information
        # https://googleapis.dev/python/datacatalog/latest/gapic/v1beta1/api.html#google.cloud.datacatalog_v1beta1.DataCatalogClient.list_entries
        entry_group = self.datacatalog.list_entries(
        parent = self.datacatalog.entry_group_path(
            self.project_id,
            self.location,
            self.entry_group_id))
        for element in entry_group:
            print(element)
        pass

    def fetch_columns_tag_template(self):
        # policy tag columns
        # https://googleapis.dev/python/datacatalog/latest/gapic/v1beta1/api.html#google.cloud.datacatalog_v1beta1.DataCatalogClient.list_tags
        parent = self.datacatalog.entry_path(
            self.project_id,
            self.location,
            self.entry_group_id,
            self.entry_id)
        for element in self.datacatalog.list_tags(parent):
            print(element)
        pass

それでは実行してみます。オンプレのSQL Serverないのスキーマ同期が目的なので、エントリーグループにはDB名、エントリーにはテーブル名を入れます。スキーマに対してタグテンプレートを付与して個人情報か識別できるようにします。次の記事でも触れましたがカラムが個人情報なのか、もしくはマスク済みカラムなのか識別子データ基盤のスキマーを自動で追従できるようにします。
www.case-k.jp

Datacatalogに対してエントリーグループ、エントリー、タグテンプレートを作り、作成したエントリーとタグテンプレートを紐付けます。タグテンプレートはカラムに対して付与するようにしました。最後に作成したエントーリを取得し、スキーマ情報とカラムに対してカラムが付与されているか確認してみます。

# variable
project_id = '<project id >'
location = 'us-central1'
entry_group_id = 'databasename'
entry_id = 'tablename'
tag_template_id = 'databasename'

# datacatalog instance
datacatalog_task = DataCatalogTask(project_id,location, entry_group_id, entry_id, tag_template_id)

# delete 
datacatalog_task.delete_pre_existing_data()

# create
entry_group = datacatalog_task.create_entry_group()
entry = datacatalog_task.create_entry(entry_group)
tag_template_fields_key='policy_tag'
tag_template = datacatalog_task.create_tag_template(tag_template_fields_key)

# attach
column='first_column'
tag_template_fields_value='name'
datacatalog_task.attach_tag_to_entry_column(entry, tag_template, tag_template_fields_key, tag_template_fields_value, column)

# fetch
datacatalog_task.fetch_list_entries()
datacatalog_task.fetch_columns_tag_template()

実行結果

python datacatalog_task.py

Created entry group: projects/<project-id>/locations/us-central1/entryGroups/databasename
Created entry: projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename
Created template: projects/<project-id>/locations/us-central1/tagTemplates/databasename
Created tag: projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename/tags/CVVD0hrSAP05
name: "projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename"
display_name: "tablename"
description: "this table is managed by databasename database"
schema {
  columns {
    type: "STRING"
    description: "This columns consists of ...."
    mode: "NULLABLE"
    column: "first_column"
  }
  columns {
    type: "DOUBLE"
    description: "This columns consists of ...."
    mode: "NULLABLE"
    column: "second_column"
  }
}
user_specified_type: "databasename"
user_specified_system: "databasename"

name: "projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename/tags/CVVD0hrSAP05"
template: "projects/<project id>/locations/us-central1/tagTemplates/databasename"
fields {
  key: "policy_tag"
  value {
    display_name: "policy_tag"
    string_value: "name"
  }
}
column: "first_column"
template_display_name: "policy_tag"

コンソールからも見てみます。
f:id:casekblog:20201226191723p:plain

一通り思ってた通りのことはできそうで安心しました。オンプレ環境のSQL ServerスキーマをData Catalogに同期することや、カラムに対してタグを付与することもできるので個人情報周りでやりたいことも実現できそうです。ただ、タグテンプレートとポリシータグの紐付けはできなかったので作るときに意識してやる必要がありそうです。

まとめ

Data Catalogでスキーマ情報を一元管理できることで、データ連携の転送ファイルの自動化や日次、リアルタイムどちらもカラムの自動追従は実現できそうに思いました。ポリシータグの情報もGCP側が自動収集してるテクニカルメタデータには反映されてるので、ビジネスメタデータ側とも紐付けられたらもっといいなあと思いました。リアルタイムデータ基盤に関してはテーブル数に応じてDataflowが増えないようDataflowのDytnamic Destinations機能を採用してます。コストメリットがある一方で、スキーマ変更や削除があった場合、全テーブルの連携を止める必要があるので常にデータが流れて来る状態で要件によっては課題になってきそうに思いました。