メインコンテンツまでスキップ

DLT の from_json を使用してスキーマを推論および進化させる

備考

プレビュー

この機能はパブリック プレビュー段階です。

この記事では、DLT の from_json SQL 関数を使用して JSON BLOB のスキーマを推論し、進化させる方法について説明します。

概要

from_json SQL 関数は、JSON 文字列列を解析し、構造体値を返します。DLT の外部で使用する場合は、 schema 引数を使用して戻り値のスキーマを明示的に指定する必要があります。DLT と併用すると、スキーマの推論と進化を有効にして、戻り値のスキーマを自動的に管理できます。この機能により、初期設定 (特にスキーマが不明な場合) と、スキーマが頻繁に変更される場合の継続的な操作の両方が簡略化されます。JSONAuto Loaderこれにより、 、Kafka 、 などのストリーミング データソースからの任意の BLOBKinesis をシームレスに処理できます。

具体的には、DLT で使用すると、 from_json SQL 関数のスキーマ推論と進化により、次のことが可能になります。

  • 受信 JSON レコード (ネストされた JSON オブジェクトを含む) の新しいフィールドを検出する
  • フィールド型を推測し、適切な Spark データ型にマップします
  • スキーマを自動的に進化させて新しいフィールドに対応
  • 現在のスキーマに準拠していないデータを自動的に処理します

構文: スキーマを自動的に推論して進化させる

DLT で from_json を使用すると、スキーマを自動的に推論して進化させることができます。これを有効にするには、スキーマを NULL に設定し、 schemaLocationKey オプションを指定します。これにより、スキーマを推測して追跡できます。

SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>[, otherOptions]))

クエリには複数の from_json 式を含めることができますが、各式には一意の schemaLocationKeyが必要です。また、 schemaLocationKey はパイプラインごとに一意である必要があります。

SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

構文: 固定スキーマ

代わりに特定のスキーマを適用する場合は、次の from_json 構文を使用して、そのスキーマを使用して JSON 文字列を解析できます。

from_json(jsonStr, schema, [, options])

この構文は、DLT を含む任意の Databricks 環境で使用できます。詳細については、 こちらをご覧ください

スキーマ推論

from_json JSON データ列の最初のバッチからスキーマを推論し、その schemaLocationKey (必須) によって内部的にインデックスを作成します。

JSON 文字列が 1 つのオブジェクト ( {"id": 123, "name": "John"}など) の場合、 from_json は STRUCT 型のスキーマを推論し、フィールドのリストに rescuedDataColumn を追加します。

STRUCT<id LONG, name STRING, _rescued_data STRING>

ただし、JSON 文字列に最上位の配列( ["id": 123, "name": "John"]など)がある場合、 from_json は ARRAY を STRUCT でラップします。このアプローチにより、推論されたスキーマと互換性のないデータをレスキューできます。配列の値をダウンストリームの別々の行に 分解 するオプションがあります。

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

スキーマヒントを使用してスキーマ推論をオーバーライドする

必要に応じて、列のタイプを推論する方法に影響を与える schemaHints を提供できます from_json 。これは、列が特定のデータ型であることがわかっている場合や、より一般的なデータ型 (たとえば、整数ではなく double など) を選択する場合に便利です。SQL スキーマ仕様構文を使用して、列データ型に任意の数のヒントを指定できます。スキーマ ヒントのセマンティクスは、 Auto Loader スキーマ ヒントのセマンティクスと同じです。 例えば:

SQL
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

JSON 文字列に最上位の ARRAY が含まれている場合、その文字列は STRUCT にラップされます。このような場合、スキーマ ヒントは、ラップされた STRUCT ではなく ARRAY スキーマに適用されます。たとえば、次のような最上位の配列を含む JSON 文字列について考えてみます。

[{"id": 123, "name": "John"}]

推論された ARRAY スキーマは、STRUCT にラップされます。

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

idのデータ型を変更するには、スキーマ ヒントを文字列として指定しますelement.id。DOUBLE 型の新しい列を追加するには、 element.new_col DOUBLE を指定します。これらのヒントにより、最上位の JSON 配列のスキーマは次のようになります。

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

スキーマを進化させるには、 schemaEvolutionMode

from_json は、データの処理中に新しい列の追加を検出します。from_json は新しいフィールドを検出すると、新しい列をスキーマの末尾にマージすることで、推論されたスキーマを最新のスキーマに更新します。既存の列のデータ型は変更されません。スキーマの更新後、パイプラインは更新されたスキーマで自動的に再起動します。

from_json は、オプションの schemaEvolutionMode 設定を使用して設定するスキーマ進化の次のモードをサポートします。 これらのモードは Auto Loaderと一致しています。

schemaEvolutionMode

新しい列の読み取り時の動作

addNewColumns (デフォルト)

ストリームが失敗します。新しい列がスキーマに追加されます。既存の列ではデータ型が進化しません。

rescue

スキーマは進化せず、スキーマの変更によってストリームが失敗することはありません。すべての新しい列は、 レスキューされたデータ列に記録されます

failOnNewColumns

ストリームが失敗します。ストリームは、 schemaHints が更新されるか、問題のあるデータが削除されない限り、再起動しません。

none

スキーマは進化せず、新しい列は無視され、rescuedDataColumnオプションが設定されていない限りデータはレスキューされません。スキーマの変更によってストリームが失敗することはありません。

例えば:

SQL
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

救出されたデータ列

レスキューされたデータ列は、スキーマに _rescued_dataとして自動的に追加されます。列の名前を変更するには、 rescuedDataColumn オプションを設定します。例えば:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

救出されたデータ列を使用することを選択すると、推論されたスキーマと一致しない列は、ドロップされるのではなく救出されます。これは、データ型の不一致、スキーマ内の列の欠落、または列名の大文字と小文字の違いが原因で発生する可能性があります。

破損したレコードの処理

形式が正しくなく、解析できないレコードを格納するには、次の例のようにスキーマ ヒントを設定して _corrupt_record 列を追加します。

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

破損したレコード列の名前を変更するには、 columnNameOfCorruptRecord オプションを設定します。

JSON パーサーは、破損したレコードを処理するために 3 つのモードをサポートしています。

Mode

説明

PERMISSIVE

破損したレコードの場合、不正な形式の文字列を columnNameOfCorruptRecord で設定されたフィールドに挿入し、不正な形式のフィールドを nullに設定します。破損したレコードを保持するには、ユーザー定義スキーマで columnNameOfCorruptRecord という名前の文字列型フィールドを設定します。スキーマにフィールドがない場合、破損したレコードは解析中に削除されます。スキーマを推論するとき、パーサーは出力スキーマに columnNameOfCorruptRecord フィールドを暗黙的に追加します。

DROPMALFORMED

破損したレコードを無視します。

rescuedDataColumnDROPMALFORMED モードを使用する場合、データ型の不一致によってレコードが削除されることはありません。破損したレコード (不完全または不正な形式の JSON など) のみが削除されます。

FAILFAST

パーサーが破損したレコードに遭遇したときに例外をスローします。

FAILFAST モードを with rescuedDataColumnとともに使用する場合、データ型の不一致によってエラーはスローされません。破損したレコードのみが、不完全または不正な形式の JSON などのエラーをスローします。

from_json出力のフィールドを参照する

from_json パイプラインの実行中にスキーマを推論します。from_json 関数が少なくとも 1 回正常に実行される前にダウンストリーム クエリが from_json フィールドを参照している場合、フィールドは解決されず、クエリはスキップされます。次の例では、シルバー テーブル クエリの分析は、ブロンズ クエリの from_json 関数が実行され、スキーマが推論されるまでスキップされます。

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze

from_json 関数とそれが推論するフィールドが同じクエリで参照されている場合、次の例のように分析が失敗する可能性があります。

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

これを修正するには、 from_json フィールドへの参照をダウンストリーム クエリ (上記の bronze/Silver の例など) に移動します。または、参照された from_json フィールドを含む schemaHints を指定することもできます。例えば:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

例: スキーマを自動的に推論して進化させる

このセクションでは、DLT で from_json を使用してスキーマの自動推論と進化を有効にするためのコード例を示します。

クラウドオブジェクトストレージからのストリーミングテーブルの作成

次の例では、 read_files 構文を使用して、クラウドオブジェクトストレージからストリーミングテーブルを作成します。

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Kafka からストリーミングテーブルを作成する

次の例では read_kafka 構文を使用して Kafkaからストリーミングテーブルを作成します。

SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)

例: 固定スキーマ

固定スキーマで from_json を使用するコードの例については、 from_json 関数を参照してください。

よくあるご質問(FAQ)

このセクションでは、 from_json 関数でのスキーマ推論と進化のサポートに関してよく寄せられる質問に回答します。

from_jsonparse_jsonの違いは何ですか?

parse_json 関数は、JSON 文字列から VARIANT 値を返します。

VARIANT は、半構造化データを格納するための柔軟で効率的な方法を提供します。これにより、厳密な型を完全に廃止することで、スキーマの推論と進化が回避されます。ただし、書き込み時にスキーマを適用する場合 (たとえば、スキーマが比較的厳密であるため) は、 from_json の方が適している可能性があります。

次の表では、 from_jsonparse_jsonの違いについて説明します。

関数

ユースケース

可用性

from_json

from_jsonによるスキーマの進化は、スキーマを維持します。これは、次の場合に役立ちます。

  • データ スキーマを適用する場合 (たとえば、スキーマの変更をすべて保持する前に確認するなど)。
  • ストレージを最適化し、クエリの待機時間とコストを低く抑える必要があります。
  • 型が一致しないデータで失敗させたい。
  • 破損した JSON レコードから部分的な結果を抽出し、不正な形式のレコードを _corrupt_record 列に格納します。これに対し、VARIANT インジェストは無効な JSON に対してエラーを返します。

スキーマ推論と進化はDLTでのみ利用可能

parse_json

VARIANT は、スキーマ化する必要のないデータを保持するのに特に適しています。例えば:

  • データを半構造化にしておくのは、柔軟性があるためです。
  • スキーマの変更が速すぎるため、ストリームの頻繁な失敗や再起動なしにスキーマにキャストできます。
  • 型が一致しないデータで失敗することは望ましくありません。(VARIANT インジェストは、型の不一致がある場合でも、有効な JSON レコードに対して常に成功します。
  • ユーザーは、スキーマに準拠していないフィールドを含む救出されたデータ列を処理したくありません。

DLTの有無にかかわらず利用可能

DLTの外部 from_json スキーマ推論および進化構文を使用できますか?

いいえ、DLT の外部で from_json スキーマ推論および進化構文を使用することはできません。

from_jsonによって推論されたスキーマにアクセスするにはどうすればよいですか?

ターゲット ストリーミングテーブルのスキーマを表示します。

スキーマ from_json 渡しながら、エボリューションも実行できますか?

いいえ、スキーマ from_json 渡して進化を行うことはできません。ただし、スキーマ ヒントを指定して、 from_jsonによって推論されるフィールドの一部またはすべてをオーバーライドできます。

テーブルが完全に更新された場合、スキーマはどうなりますか?

テーブルに関連付けられているスキーマの場所がクリアされ、スキーマが最初から再推論されます。