本記事はZOZOテクノロジーズ #1 Advent Calendar 2020 - Qiita 24日目の記事です。バッチ方式の日次データ基盤とストリーミング方式のリアルタイムデータ基盤のスキーマ反映でData Catalogがどのように役立つのか概要も踏まえてご紹介できればと思います。後半ではPythonで実際に意図したことができるか検証してみました。
Data Catalogとは
Data CatalogとはGCPが提供するフルマネージドでスケーラビリティの高いデータ検出およびメタデータを管理するシステムです。カラムレベルのアクセス制御を行うポリシータグも Data Catalogで管理されています。自動収集されるテクニカルメタデータやビジネス用にスキーマ化されたメタデータを登録することができます。またDLPとの統合によりBigQuery内のデータで個人情報がある場合自動的にタグを付与することもできます。個人情報に該当するカラムは管理はしていると思いますが、管理から漏れてしまうこともあるかと思います。Data CatalogとDLPを使うことでPIIのタグを付与し監視の役割を担うこともできます。
cloud.google.com
テクニカルメタデータ
GCPで自動収集されるメタデータです。BigQueryやPub/Sub、GCSのデータを自動的に同期されます。Data Catalogのコンソール画面でテーブル名を検索するとBigQueryのテーブルのスキーマ情報等確認できます。テクニカルメタデータからはBigQueryのスキーマにポリシータグが付与されているかも確認することができるので便利です。
ビジネスメタデータ
ユーザが登録するメタデータです。ビジネスメタデータはオンプレ環境にあるDBのメタデータ収集などで使えます。ビジネスメタデータにはタグ(タグテンプレート)を付与することができ、スキーマがどのような役割なのか(個人情報やマスク済みカラムなど)識別することが可能です。私がData Catalogを使いたいモチベーションとしてはオンプレのSQL ServerにあるスキーマをData Catalogで一元管理したいからです。一元管理することで日次、リアルタイム双方のデータ基盤の依存関係を制御し、BigQueryのスキーマの自動追従を行いたいと考えています。
Cloud SQL からData Catalogにスキーマを同期するコネクタも用意されてます。
www.qwiklabs.com
データ基盤におけるスキーマ管理の課題
データ基盤におけるスキーマ管理の課題について、運用者と利用者の視点からご紹介できればと思います。
データ基盤運用者の課題
スキーマの追従が結構大変です。元々は追従しなくていいよう「SELECT * FROM TABLE」で行ってましたが、カラム追加などスキーマが変更されるとBigQueryのスキーマを事前に変更する必要があり、運用負荷が高かったためカラム指定してクエリを実行するようになりました。
次の記事でもカラム指定の経緯など書いてあります。
techblog.zozo.com
スキーマを反映する際は人の手でPRを作り反映しています。これまでは日次データ基盤のみでしたが、最近だとリアルタイムデータ基盤もリリースしました。リアルタイムデータ基盤の詳細は次の記事に書きました。
techblog.zozo.com
リアルタイムデータ基盤ではオンプレのSQL Serverの差分のみを取得してBigQueryへ連携しています。日次データ基盤のバッチ方式とは異なり、データが常時流れてくるストリーミング方式なので、データ基盤間の依存関係が多くデプロイタイミングを意識する必要があります。構成としてはfluentdで差分データを取得し、Dataflowを介してBigQueryへ差分データを書き込んでいます。
詳細は記事後半に書きましたが、カラム追加の場合はBigQueryから反映し、Dataflow、fluentdの順番で反映する必要があります。カラム削除やスキーマ変更があった場合Dataflowを停止する必要があります。
また基盤利用者の要望としては最新の全量データを使いたいといった要望があるため、リアルタイムデータ基盤の差分データと日次バッチで転送したデータをUNIONしたビューを用意しています。ビューでUNIONを行う際、リアルタイムデータ基盤にはデプロイ済みのカラムも、日次データ基盤には反映されていないカラムが多々あります。その場合、まず日次データ基盤のスキーマを反映させる必要が出てきます。
特に煩雑な連携をしてるテーブルはカラム追加のハードルが高く追加する場合大変です。なので全量データを参照してるビューを日次データ基盤で反映済みのカラムに絞るなどの対応もしています。カラム指定を行うことで「SELECT * FROM TABLE」より運用は楽になった一方で、データ基盤間のスキーマが一致しなかったり、手動反映の運用負荷は残りました。
Data Catalogが解決する課題
Data Catalogを使うことでオンプレにあるSQL Serverのスキーマを同期し、Data Catalogでデータ基盤のスキーマを一元管理しようとしてます。一元管理できることで、ワークフローでデータ基盤間で依存関係のあるデプロイを制御し、スキーマの自動追従ができるのではないかと考えてます。Data Catalogのビジネスメタデータを使うことでオンプレ環境のスキーマ追従に加え、どのカラムが個人情報、もしくはマスクしたカラムなのかも識別できるようにしたいと考えてます。個人情報のカラムにはカラムレベルのアクセス制御を行うポリシータグを付与しています。
cloud.google.com
ワークフローでやりたいこと
Data Catalogを使った日次データ基盤とリアルタイムデータ基盤で実現したいことをご紹介できればと思います。リアルタイムデータ基盤ならではのスキーマ反映の手順や課題も合わせてご紹介できたらと思います。
日次データ基盤
日次データ基盤は日に1回オンプレからBigQueryへ連携する転送バッチです。日次のデータ基盤については次のようなワークフローを考えてます。データ転送前にSQL Serverのスキーマ情報をData Catalogへ同期し、最新のスキーマを使ってカラム指定したクエリを生成します。
BigQueryへの転送速度を上げるため、gsutilでGCSにコピーしたのち、GCSを経由してBigQueryへロードします。カラムレベルのアクセス制御を担っているポリシータグはテーブルを上書いてしまうと外れてしまいます。例外はコピージョブで、テーブルコピー ジョブはデータ変換を適用しないためポリシータグはそのまま残ります。そのため、1度ユーザが参照できないBigQueryのデータセット内にロードして、ポリシータグを付与したのちユーザが参照できるデータセット内にコピーします。この方法によりBigQueryでスキーマの自動追従を行います。
リアルタイムデータ基盤
リアルタイムデータ基盤は常にオンプレ環境のSQL Serverから差分データを取得しBigQueryへ連携するストリーミング処理です。リアルタイムデータ基盤の方は日次データ基盤のような完全自動化は難しく、カラムの削除やスキーマ変更があった場合人の手で対応が必要になります。カラムの自動追従は日次データ基盤同様実現できると考えています。スキーマの削除、変更、追加それぞれのケースを想定し必要な対応をご紹介できればと思います。
カラム削除の反映
スキーマの変更や削除が行われる場合人の手で対応が必要になります。まず、削除の場合、差分データをSQL Serverから取得しているfluentdから対象カラムを取り除きます。取り除かれたカラムはDataflow、BigQueryにはnull値でインサートされます。次にDataflowをドレインで停止させ処理中のデータはBigQueryへ転送した後停止させます。これはDataflowとBigQueryでスキーマが異なるとインサートに失敗してしまうからです。
差分データはPub/Subで保持されジョブを更新したタイミングで再度連携されます。Dataflowはテンプレートから対象スキーマを削除し、BigQueryは既存のテーブルから対象のカラムを取り除いた状態で上書きします。Dataflowはテーブル名を参照して出力先のテーブルを振り分けるDynamic Destinations機能を採用しており、次のようにスキーマを定義してます。このスキーマがBigQueryと異なるとインサートに失敗します。
case "TABLE": schema.setFields( ImmutableList.of( new TableFieldSchema().setName("massage_unique_id").setType("STRING").setMode("NULLABLE"), new TableFieldSchema().setName("database_name").setType("STRING").setMode("NULLABLE"), new TableFieldSchema().setName("table_name").setType("STRING").setMode("NULLABLE"), new TableFieldSchema().setName("id").setType("STRING").setMode("NULLABLE"), new TableFieldSchema().setName("name").setType("STRING").setMode("NULLABLE"), )); break;
BigQueryは次のようにして対象カラムを取り除きテーブルを上書きします。
SELECT * EXCEPT(colunns) FROM DATASET.TABLE
BigQueryのテーブルを上書きするとカラムレベルのアクセス制御を担っているポリシータグが外れてしまうので、ユーザが参照できないデータセット内に上書き、ポリシータグを付与したあとコピージョブでユーザが参照するデータセット内に保存します
スキーマ変更の反映
基本的にはカラム削除同様の対応でDataflowを止めて作業します。削除との違いはSQL Serverのスキーマ変更前に事前に対象カラムを削除、もしくは文字列変換します。SQL Serverのスキーマ変更後はもう一度Dataflowを止めて、Dataflow、BigQueryの対象スキーマを修正する必要がある点です。対象のカラムが使われてないようであればDROPしてしまうのがデータ基盤運用者、利用者にとって良いと思います。その場合Dataflowの停止は一度だけでよくなります。
次のようなクエリで対象カラムを文字列変換します。
SELECT id, name, cast(birthday as string) as birthday FROM TABLE
カラム追加の反映
カラム追加については日次データ連携のタイミングで定義しても問題ないように思ってます。一方、スキーマ変更やカラム削除がある場合は問題が出てきます。SQL Serverのスキーマ追加、変更、削除がある場合、事前に連絡がくる運用になってます。連絡がくるとリアルタイムデータ基盤の運用者は先ほどご紹介したように、事前にスキーマを文字列変換しスキーマが変わっても問題が起こらないようにします。
このスキーマはオンプレのSQL Serverのスキーマを変更したタイミングで、新しいスキーマにする予定です。しかし、日次で走るワークフローで基幹側変更前に書き換えてしまうとせっかく前もって文字列変換等準備したのに元の状態に戻ってしまいます。
そのため、ワークフローでスキーマ変更やカラム削除対象のカラムはSQL Server反映まで自動追従を止めるような制御が必要なのではないかと思ってます。
Data Catalogの検証
やりたいことが実現出来そうかData Catalogを試してみました。
ライブラリとGCPのサービスアカウントを環境変数に出力します。
pip3 install google-cloud-datacatalog==1.0.0 export GOOGLE_APPLICATION_CREDENTIALS="/usr/src/app/gcp-credentials.json"
Classを定義します。
from google.api_core.exceptions import NotFound, PermissionDenied from google.cloud import datacatalog_v1 class DataCatalogTask(object): def __init__(self, project_id, location, entry_group_id, entry_id, tag_template_id): self.project_id = project_id self.location = location self.entry_group_id = entry_group_id self.entry_id = entry_id self.tag_template_id = tag_template_id self.datacatalog = datacatalog_v1.DataCatalogClient() def delete_entry(self): expected_entry_name = self.datacatalog.entry_path(self.project_id, self.location, self.entry_group_id, self.entry_id) try: self.datacatalog.delete_entry(name=expected_entry_name) except (NotFound, PermissionDenied): pass def delete_entry_group(self): expected_entry_group_name = self.datacatalog.entry_group_path(self.project_id, self.location, self.entry_group_id) try: self.datacatalog.delete_entry_group(name=expected_entry_group_name) except (NotFound, PermissionDenied): pass def delete_tag_template(self): expected_template_name = self.datacatalog.tag_template_path(self.project_id, self.location, self.tag_template_id) try: self.datacatalog.delete_tag_template(name=expected_template_name, force=True) except (NotFound, PermissionDenied): pass def delete_pre_existing_data(self): self.delete_entry() self.delete_entry_group() self.delete_tag_template() def create_entry_group(self): entry_group_obj = datacatalog_v1.types.EntryGroup() entry_group_obj.display_name = self.entry_group_id entry_group_obj.description = '{} database schemae'.format(self.entry_group_id) entry_group = self.datacatalog.create_entry_group( parent=self.datacatalog.location_path(self.project_id, self.location), entry_group_id=self.entry_group_id, entry_group=entry_group_obj) print('Created entry group: {}'.format(entry_group.name)) return entry_group def create_entry(self, entry_group): entry = datacatalog_v1.types.Entry() entry.user_specified_system = self.entry_group_id entry.user_specified_type = self.entry_group_id entry.display_name = self.entry_id entry.description = 'this table is managed by {} database'.format(self.entry_group_id) # entry.linked_resource = '//my-onprem-server.com/dataAssets/my-awesome-data-asset' columns = [] columns.append(datacatalog_v1.types.ColumnSchema( column='first_column', type='STRING', description='This columns consists of ....', mode=None)) columns.append(datacatalog_v1.types.ColumnSchema( column='second_column', type='DOUBLE', description='This columns consists of ....', mode=None)) entry.schema.columns.extend(columns) entry = self.datacatalog.create_entry( parent=entry_group.name, entry_id=self.entry_id, entry=entry) print('Created entry: {}'.format(entry.name)) return entry def create_tag_template(self, tag_template_name): # https://cloud.google.com/data-catalog/docs/quickstart-tagging#data-catalog-quickstart-python tag_template = datacatalog_v1.types.TagTemplate() tag_template.display_name = tag_template_fields_key tag_template.fields[tag_template_fields_key].display_name = tag_template_fields_key tag_template.fields[tag_template_fields_key].type.primitive_type = \ datacatalog_v1.enums.FieldType.PrimitiveType.STRING.value tag_template = self.datacatalog.create_tag_template( parent=self.datacatalog.location_path(project_id, location), tag_template_id=self.tag_template_id, tag_template=tag_template) print('Created template: {}'.format(tag_template.name)) return tag_template def attach_tag_to_entry_column(self, entry, tag_template, tag_template_fields_key, tag_template_fields_value, column): tag = datacatalog_v1.types.Tag() tag.template = tag_template.name tag.fields[tag_template_fields_key].string_value = tag_template_fields_value # A Tag can be attached to a specific table column, instead of the table itself, by setting tag.column = column_name. # https://medium.com/google-cloud/data-catalog-hands-on-guide-templates-tags-with-python-c45eb93372ef tag.column = column #tag = self.datacatalog.create_tag(parent=entry.name, tag=tag) tag = self.datacatalog.create_tag(parent=entry.name, tag=tag) print('Created tag: {}'.format(tag.name)) return tag def fetch_list_entries(self): # fetch schema information # https://googleapis.dev/python/datacatalog/latest/gapic/v1beta1/api.html#google.cloud.datacatalog_v1beta1.DataCatalogClient.list_entries entry_group = self.datacatalog.list_entries( parent = self.datacatalog.entry_group_path( self.project_id, self.location, self.entry_group_id)) for element in entry_group: print(element) pass def fetch_columns_tag_template(self): # policy tag columns # https://googleapis.dev/python/datacatalog/latest/gapic/v1beta1/api.html#google.cloud.datacatalog_v1beta1.DataCatalogClient.list_tags parent = self.datacatalog.entry_path( self.project_id, self.location, self.entry_group_id, self.entry_id) for element in self.datacatalog.list_tags(parent): print(element) pass
それでは実行してみます。オンプレのSQL Serverないのスキーマ同期が目的なので、エントリーグループにはDB名、エントリーにはテーブル名を入れます。スキーマに対してタグテンプレートを付与して個人情報か識別できるようにします。次の記事でも触れましたがカラムが個人情報なのか、もしくはマスク済みカラムなのか識別子データ基盤のスキマーを自動で追従できるようにします。
www.case-k.jp
Datacatalogに対してエントリーグループ、エントリー、タグテンプレートを作り、作成したエントリーとタグテンプレートを紐付けます。タグテンプレートはカラムに対して付与するようにしました。最後に作成したエントーリを取得し、スキーマ情報とカラムに対してカラムが付与されているか確認してみます。
# variable project_id = '<project id >' location = 'us-central1' entry_group_id = 'databasename' entry_id = 'tablename' tag_template_id = 'databasename' # datacatalog instance datacatalog_task = DataCatalogTask(project_id,location, entry_group_id, entry_id, tag_template_id) # delete datacatalog_task.delete_pre_existing_data() # create entry_group = datacatalog_task.create_entry_group() entry = datacatalog_task.create_entry(entry_group) tag_template_fields_key='policy_tag' tag_template = datacatalog_task.create_tag_template(tag_template_fields_key) # attach column='first_column' tag_template_fields_value='name' datacatalog_task.attach_tag_to_entry_column(entry, tag_template, tag_template_fields_key, tag_template_fields_value, column) # fetch datacatalog_task.fetch_list_entries() datacatalog_task.fetch_columns_tag_template()
実行結果
python datacatalog_task.py Created entry group: projects/<project-id>/locations/us-central1/entryGroups/databasename Created entry: projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename Created template: projects/<project-id>/locations/us-central1/tagTemplates/databasename Created tag: projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename/tags/CVVD0hrSAP05 name: "projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename" display_name: "tablename" description: "this table is managed by databasename database" schema { columns { type: "STRING" description: "This columns consists of ...." mode: "NULLABLE" column: "first_column" } columns { type: "DOUBLE" description: "This columns consists of ...." mode: "NULLABLE" column: "second_column" } } user_specified_type: "databasename" user_specified_system: "databasename" name: "projects/<project-id>/locations/us-central1/entryGroups/databasename/entries/tablename/tags/CVVD0hrSAP05" template: "projects/<project id>/locations/us-central1/tagTemplates/databasename" fields { key: "policy_tag" value { display_name: "policy_tag" string_value: "name" } } column: "first_column" template_display_name: "policy_tag"
コンソールからも見てみます。
一通り思ってた通りのことはできそうで安心しました。オンプレ環境のSQL ServerのスキーマをData Catalogに同期することや、カラムに対してタグを付与することもできるので個人情報周りでやりたいことも実現できそうです。ただ、タグテンプレートとポリシータグの紐付けはできなかったので作るときに意識してやる必要がありそうです。
まとめ
Data Catalogでスキーマ情報を一元管理できることで、データ連携の転送ファイルの自動化や日次、リアルタイムどちらもカラムの自動追従は実現できそうに思いました。ポリシータグの情報もGCP側が自動収集してるテクニカルメタデータには反映されてるので、ビジネスメタデータ側とも紐付けられたらもっといいなあと思いました。リアルタイムデータ基盤に関してはテーブル数に応じてDataflowが増えないようDataflowのDytnamic Destinations機能を採用してます。コストメリットがある一方で、スキーマ変更や削除があった場合、全テーブルの連携を止める必要があるので常にデータが流れて来る状態で要件によっては課題になってきそうに思いました。