DLT の from_json
を使用してスキーマを推論および進化させる
プレビュー
この機能はパブリック プレビュー段階です。
この記事では、DLT の from_json
SQL 関数を使用して JSON BLOB のスキーマを推論し、進化させる方法について説明します。
概要
from_json
SQL 関数は、JSON 文字列列を解析し、構造体値を返します。DLT の外部で使用する場合は、 schema
引数を使用して戻り値のスキーマを明示的に指定する必要があります。DLT と併用すると、スキーマの推論と進化を有効にして、戻り値のスキーマを自動的に管理できます。この機能により、初期設定 (特にスキーマが不明な場合) と、スキーマが頻繁に変更される場合の継続的な操作の両方が簡略化されます。JSONAuto Loaderこれにより、 、Kafka 、 などのストリーミング データソースからの任意の BLOBKinesis をシームレスに処理できます。
具体的には、DLT で使用すると、 from_json
SQL 関数のスキーマ推論と進化により、次のことが可能になります。
- 受信 JSON レコード (ネストされた JSON オブジェクトを含む) の新しいフィールドを検出する
- フィールド型を推測し、適切な Spark データ型にマップします
- スキーマを自動的に進化させて新しいフィールドに対応
- 現在のスキーマに準拠していないデータを自動的に処理します
構文: スキーマを自動的に推論して進化させる
DLT で from_json
を使用すると、スキーマを自動的に推論して進化させることができます。これを有効にするには、スキーマを NULL に設定し、 schemaLocationKey
オプションを指定します。これにより、スキーマを推測して追跡できます。
- SQL
- Python
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
クエリには複数の from_json
式を含めることができますが、各式には一意の schemaLocationKey
が必要です。また、 schemaLocationKey
はパイプラインごとに一意である必要があります。
- SQL
- Python
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
構文: 固定スキーマ
代わりに特定のスキーマを適用する場合は、次の from_json
構文を使用して、そのスキーマを使用して JSON 文字列を解析できます。
from_json(jsonStr, schema, [, options])
この構文は、DLT を含む任意の Databricks 環境で使用できます。詳細については、 こちらをご覧ください。
スキーマ推論
from_json
JSON データ列の最初のバッチからスキーマを推論し、その schemaLocationKey
(必須) によって内部的にインデックスを作成します。
JSON 文字列が 1 つのオブジェクト ( {"id": 123, "name": "John"}
など) の場合、 from_json
は STRUCT 型のスキーマを推論し、フィールドのリストに rescuedDataColumn
を追加します。
STRUCT<id LONG, name STRING, _rescued_data STRING>
ただし、JSON 文字列に最上位の配列( ["id": 123, "name": "John"]
など)がある場合、 from_json
は ARRAY を STRUCT でラップします。このアプローチにより、推論されたスキーマと互換性のないデータをレスキューできます。配列の値をダウンストリームの別々の行に 分解 するオプションがあります。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
スキーマヒントを使用してスキーマ推論をオーバーライドする
必要に応じて、列のタイプを推論する方法に影響を与える schemaHints
を提供できます from_json
。これは、列が特定のデータ型であることがわかっている場合や、より一般的なデータ型 (たとえば、整数ではなく double など) を選択する場合に便利です。SQL スキーマ仕様構文を使用して、列データ型に任意の数のヒントを指定できます。スキーマ ヒントのセマンティクスは、 Auto Loader スキーマ ヒントのセマンティクスと同じです。 例えば:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
JSON 文字列に最上位の ARRAY が含まれている場合、その文字列は STRUCT にラップされます。このような場合、スキーマ ヒントは、ラップされた STRUCT ではなく ARRAY スキーマに適用されます。たとえば、次のような最上位の配列を含む JSON 文字列について考えてみます。
[{"id": 123, "name": "John"}]
推論された ARRAY スキーマは、STRUCT にラップされます。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
id
のデータ型を変更するには、スキーマ ヒントを文字列として指定しますelement.id
。DOUBLE 型の新しい列を追加するには、 element.new_col
DOUBLE を指定します。これらのヒントにより、最上位の JSON 配列のスキーマは次のようになります。
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
スキーマを進化させるには、 schemaEvolutionMode
from_json
は、データの処理中に新しい列の追加を検出します。from_json
は新しいフィールドを検出すると、新しい列をスキーマの末尾にマージすることで、推論されたスキーマを最新のスキーマに更新します。既存の列のデータ型は変更されません。スキーマの更新後、パイプラインは更新されたスキーマで自動的に再起動します。
from_json
は、オプションの schemaEvolutionMode
設定を使用して設定するスキーマ進化の次のモードをサポートします。 これらのモードは Auto Loaderと一致しています。
| 新しい列の読み取り時の動作 |
---|---|
| ストリームが失敗します。新しい列がスキーマに追加されます。既存の列ではデータ型が進化しません。 |
| スキーマは進化せず、スキーマの変更によってストリームが失敗することはありません。すべての新しい列は、 レスキューされたデータ列に記録されます。 |
| ストリームが失敗します。ストリームは、 |
| スキーマは進化せず、新しい列は無視され、 |
例えば:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
救出されたデータ列
レスキューされたデータ列は、スキーマに _rescued_data
として自動的に追加されます。列の名前を変更するには、 rescuedDataColumn
オプションを設定します。例えば:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
救出されたデータ列を使用することを選択すると、推論されたスキーマと一致しない列は、ドロップされるのではなく救出されます。これは、データ型の不一致、スキーマ内の列の欠落、または列名の大文字と小文字の違いが原因で発生する可能性があります。
破損したレコードの処理
形式が正しくなく、解析できないレコードを格納するには、次の例のようにスキーマ ヒントを設定して _corrupt_record
列を追加します。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
破損したレコード列の名前を変更するには、 columnNameOfCorruptRecord
オプションを設定します。
JSON パーサーは、破損したレコードを処理するために 3 つのモードをサポートしています。
Mode | 説明 |
---|---|
| 破損したレコードの場合、不正な形式の文字列を |
| 破損したレコードを無視します。
|
| パーサーが破損したレコードに遭遇したときに例外をスローします。
|
from_json出力のフィールドを参照する
from_json
パイプラインの実行中にスキーマを推論します。from_json
関数が少なくとも 1 回正常に実行される前にダウンストリーム クエリが from_json
フィールドを参照している場合、フィールドは解決されず、クエリはスキップされます。次の例では、シルバー テーブル クエリの分析は、ブロンズ クエリの from_json
関数が実行され、スキーマが推論されるまでスキップされます。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
from_json
関数とそれが推論するフィールドが同じクエリで参照されている場合、次の例のように分析が失敗する可能性があります。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
これを修正するには、 from_json
フィールドへの参照をダウンストリーム クエリ (上記の bronze/Silver の例など) に移動します。または、参照された from_json
フィールドを含む schemaHints
を指定することもできます。例えば:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
例: スキーマを自動的に推論して進化させる
このセクションでは、DLT で from_json
を使用してスキーマの自動推論と進化を有効にするためのコード例を示します。
クラウドオブジェクトストレージからのストリーミングテーブルの作成
次の例では、 read_files
構文を使用して、クラウドオブジェクトストレージからストリーミングテーブルを作成します。
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
@dlt.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Kafka からストリーミングテーブルを作成する
次の例では read_kafka
構文を使用して Kafkaからストリーミングテーブルを作成します。
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
@dlt.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
例: 固定スキーマ
固定スキーマで from_json
を使用するコードの例については、 from_json
関数を参照してください。
よくあるご質問(FAQ)
このセクションでは、 from_json
関数でのスキーマ推論と進化のサポートに関してよく寄せられる質問に回答します。
from_json
とparse_json
の違いは何ですか?
parse_json
関数は、JSON 文字列から VARIANT
値を返します。
VARIANT は、半構造化データを格納するための柔軟で効率的な方法を提供します。これにより、厳密な型を完全に廃止することで、スキーマの推論と進化が回避されます。ただし、書き込み時にスキーマを適用する場合 (たとえば、スキーマが比較的厳密であるため) は、 from_json
の方が適している可能性があります。
次の表では、 from_json
と parse_json
の違いについて説明します。
関数 | ユースケース | 可用性 |
---|---|---|
|
| スキーマ推論と進化はDLTでのみ利用可能 |
| VARIANT は、スキーマ化する必要のないデータを保持するのに特に適しています。例えば:
| DLTの有無にかかわらず利用可能 |
DLTの外部 from_json
スキーマ推論および進化構文を使用できますか?
いいえ、DLT の外部で from_json
スキーマ推論および進化構文を使用することはできません。
from_json
によって推論されたスキーマにアクセスするにはどうすればよいですか?
ターゲット ストリーミングテーブルのスキーマを表示します。
スキーマ from_json
渡しながら、エボリューションも実行できますか?
いいえ、スキーマ from_json
渡して進化を行うことはできません。ただし、スキーマ ヒントを指定して、 from_json
によって推論されるフィールドの一部またはすべてをオーバーライドできます。
テーブルが完全に更新された場合、スキーマはどうなりますか?
テーブルに関連付けられているスキーマの場所がクリアされ、スキーマが最初から再推論されます。