バッチ処理またはストリーム処理によるデータのクリーニングと検証
データのクリーニングと検証は、レイクハウス内のデータ資産の品質を確保するために不可欠です。 この記事では、データ品質を促進するために設計された Databricks 製品の概要と、カスタムルールを実装するためのビジネスロジックを定義するための推奨事項について説明します。
Databricksにおけるスキーマ強制
Delta Lake には、書き込み時にスキーマと制約のチェックを適用するセマンティクスが用意されており、レイクハウス内のテーブルのデータ品質に関する保証が提供されます。
スキーマ強制は、テーブルに書き込まれるデータが定義済みのスキーマに準拠していることを保証します。 スキーマ検証ルールは操作によって異なります。 スキーマ強制を参照してください。
スキーマの進化に対応するために、 Delta には、スキーマの変更とテーブルの進化を行うためのメカニズムが用意されています。 スキーマ進化をいつ使用するかを慎重に検討して、フィールドのドロップやパイプラインの失敗を回避することが重要です。 スキーマの手動更新または自動更新の詳細については、Delta Lake テーブル スキーマの更新を参照してください。
テーブルの制約
制約は、情報プライマリ・キー制約と外部キー制約、または強制制約の形式をとることができます。 ADD CONSTRAINT 句を参照してください。
Databricks のテーブル制約は、適用されるか、情報として提供されます。
強制制約には、 NOT NULL
制約と CHECK
制約が含まれます。
情報制約には、プライマリ・キー制約と外部キー制約があります。
Databricks の制約を参照してください。
null 値または欠損値の処理
NOT NULL は Delta テーブルに適用できます。 既存のテーブルで有効にできるのは、列に既存のレコードが null でない場合のみで、null 値を持つ新しいレコードがテーブルに挿入されるのを防ぎます。
パターンの適用
正規表現 (regex) を使用して、データ フィールドで予期されるパターンを適用できます。 これは、特定の形式やパターンに従う必要があるテキストデータを処理する場合に特に便利です。
正規表現を使用してパターンを適用するには、SQL の REGEXP
関数または RLIKE
関数を使用できます。 これらの関数を使用すると、データフィールドを指定した正規表現パターンと照合できます。
SQL でパターンを適用するために正規表現で CHECK
制約を使用する方法の例を次に示します。
CREATE TABLE table_name (
column_name STRING CHECK (column_name REGEXP '^[A-Za-z0-9]+$')
);
値の強制
制約を使用して、テーブル内の列に値範囲を強制できます。 これにより、指定された範囲内の有効な値のみを挿入または更新できるようになります。
値範囲制約を適用するには、SQL で CHECK
制約を使用できます。 CHECK
制約を使用すると、テーブル内のすべてのローに対して true である必要がある条件を定義できます。
CHECK
制約を使用して列に値の範囲を適用する方法の例を次に示します。
CREATE TABLE table_name (
column_name INT CHECK (column_name >= 0 AND column_name <= 100)
);
DLT を使用して期待値を定義および構成します。
DLT では、マテリアライズドビューまたはストリーミングテーブルを宣言する際のエクスペクテーションを定義できます。違反について警告するエクスペクテーション、違反レコードの削除、または違反に基づくワークロードの失敗を設定することを選択できます。パイプラインのエクスペクテーションを使用してデータ品質を管理するを参照してください。
データのモニタリング
Databricks は、アカウント内のすべてのテーブルのデータの統計的プロパティと品質を監視できるデータ品質モニタリングサービスを提供しています。 Databricks レイクハウスモニタリングの概要を参照してください。
データ型のキャスト
テーブルにデータを挿入または更新する場合、Databricks は、情報を失うことなく安全に行える場合にデータ型をキャストします。
キャスティングの振る舞いについて詳しくは、次の記事を参照してください。
カスタムビジネスロジック
フィルタと WHERE
句を使用して、不良レコードを検疫し、ダウンストリーム テーブルに伝播しないようにするカスタム ロジックを定義できます。 CASE WHEN ... OTHERWISE
句を使用すると、条件付きロジックを定義して、予測可能な方法で期待に反するレコードにビジネス ロジックを適切に適用できます。
DECLARE current_time = now()
INSERT INTO silver_table
SELECT * FROM bronze_table
WHERE event_timestamp <= current_time AND quantity >= 0;
INSERT INTO quarantine_table
SELECT * FROM bronze_table
WHERE event_timestamp > current_time OR quantity < 0;
Databricks では、特に構造化ストリーミングを使用する場合は、フィルター処理されたデータを常に個別の書き込み操作として処理することをお勧めします。 .foreachBatch
を使用して複数のテーブルに書き込むと、結果に一貫性がなくなる可能性があります。
たとえば、 NULL
値をエンコードできないアップストリーム システムがあるため、プレースホルダー値 -1
を使用して欠落しているデータを表す場合があります。 Databricks のすべてのダウンストリーム クエリに対してカスタム ロジックを記述して -1
を含むレコードを無視するのではなく、case when ステートメントを使用して、これらのレコードを変換として動的に置き換えることができます。
INSERT INTO silver_table
SELECT
* EXCEPT weight,
CASE
WHEN weight = -1 THEN NULL
ELSE weight
END AS weight
FROM bronze_table;