case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Databricks × dbt 運用Tips:失敗したモデルだけを効率的にリトライする方法

本記事は、 Databricks - Qiita Advent Calendar 2024 - Qiitaシリーズ 3 の 24 日目の記事です。

データ基盤を運用する際、依存関係を考慮しながら、失敗したモデルやその依存関係のあるモデルのみを再実行したい場合があります。本記事では、Databricksとdbtを活用したリトライ方法についてご紹介します。

Case 1:失敗したモデルと依存関係のある後続モデルのリトライ

dbtの機能である「dbt retry」を活用し、失敗したモデルとその依存関係にある後続処理を効率的に再実行する方法をご紹介します。
dbtの実行結果は「run_results.json」で確認可能です。「dbt retry」では、この「run_results.json」に記録されたログを基に、失敗したモデルのみを再実行することができます。
dbt Cloudでは、失敗したモデルのみを再実行する機能が提供されています。一方、OSS版のdbt Coreを利用している場合、ログが欠損しないようストレージに永続化する仕組みが必要です。しかし、Databricksが提供する標準的なdbt処理ではこの要件に対応できないため、専用のdbt用共通モジュールを作成し、Notebook内でdbtを実行する運用を採用しています。

docs.getdbt.com

具体的には、以下のコードを使用して運用を行っています。Notebook Jobで渡されたパラメータに基づき、dbt runまたはdbt retryを制御します。また、永続化先のストレージで障害が発生した場合に実行ログを失う可能性を考慮し、ログは標準出力にも記録しています。

# Databricks notebook source

# COMMAND ----------
import os
import time
from datetime import datetime
import pytz
import json

env = dbutils.widgets.get("env")
dbt_tag = dbutils.widgets.get("dbt_tag")
threads = dbutils.widgets.get("threads")
dbt_dbfs_state_dir = dbutils.widgets.get("dbt_dbfs_state_dir")
dbt_tmp_dir = f'/tmp-{time.time()}'
dbt_project_dir = f'{dbt_tmp_dir}/dbt/config/{env}'
dbt_profile_dir = f'{dbt_tmp_dir}/dbt/config'
dbt_state_dir = f'{dbt_project_dir}/target'

os.environ['env'] = env
os.environ['dbt_tag'] = dbt_tag
os.environ['threads'] = threads
os.environ['DBT_TEMP_DIR'] = dbt_tmp_dir
os.environ['DBT_PROFILES_DIR'] = dbt_profile_dir
os.environ['DBT_PROJECT_DIR'] = dbt_project_dir
os.environ['DBT_STATE_DIR'] = dbt_state_dir
os.environ['DBT_ENV_SECRET_TOKEN'] = dbutils.secrets.get(f'<secret>', f'<secret-token>')

# COMMAND ----------
# MAGIC %sh
# MAGIC set -eu
# MAGIC ls -ltr ../
# MAGIC mkdir -p ${DBT_STATE_DIR}
# MAGIC cp -r ../dbt ${DBT_TEMP_DIR}
    
# COMMAND ----------
# This params are used to run dbt command with the custome date

dbt_retry = dbutils.widgets.get('dbt_retry')
dbt_command = f"dbt run   --select {dbt_tag} --target={env} --threads {threads}"
if dbt_retry != 'false':
    dbutils.fs.cp(f'{dbt_dbfs_state_dir}/run_results.json', f'file:{dbt_state_dir}/run_results.json')
    dbt_command = f"dbt retry --target={env} --threads {threads}"
print(f'dbt command: {dbt_command}')
os.environ['DBT_COMMAND'] = dbt_command

# COMMAND ----------
# MAGIC %sh
# MAGIC set -eu
# MAGIC cd ${DBT_TEMP_DIR}
# MAGIC dbt deps
# MAGIC ${DBT_COMMAND}

# COMMAND ----------

file_path = f'{dbt_state_dir}/run_results.json'
try:
    with open(file_path, 'r') as file:
        data = json.load(file)
        print(json.dumps(data))
    if dbt_retry != 'false':
        # Remove crc file to avoid error when copying to dbfs
        dbutils.fs.rm( f'file:{dbt_state_dir}/.run_results.json.crc')
    dbutils.fs.cp(f'file:{file_path}', f'{dbt_dbfs_state_dir}/run_results.json')
    for result in data['results']:
        if result['status'] == 'error':
            raise ValueError("Error detected in results")
except Exception as e:
    raise e

Case 2:データ不整合が判明したモデルのリトライ

次に想定されるシナリオとして、モデルの実行結果に不整合が発生し、再度成功したモデルを実行したい場合があります。このようなユースケースでは、dbtのタグ機能を活用します。以下に、モデルmodel_aおよびmodel_bを実行するケースを示します。

dbt run --select tag:model_a,model_b

```

まとめ

dbtで失敗したモデルを再実行する際には、dbt retryを利用しています。ログの損失を防ぐために、ログを永続化する仕組みを整えた上で、Notebookを通じてdbtを実行しています。また、既に成功しているモデルをリトライしたい場合には、dbtのタグ機能を活用して対応しています。