Lakeflow宣言型パイプラインの from_json
を使用したスキーマの推定、進化
プレビュー
この機能はパブリック プレビュー段階です。
この記事では、Lakeflow宣言型パイプラインのfrom_json
SQL関数を使用して JSON blob のスキーマを推定し、 進化させる方法について説明します。
概要
from_json
SQL 関数は、JSON 文字列列を解析し、構造体値を返します。Lakeflow宣言型パイプラインの外部で使用する場合は、schema
引数を使用して戻り値のスキーマを明示的に指定する必要があります。Lakeflow宣言型パイプラインと共に使用すると、スキーマの推論と進化を有効にして、戻り値のスキーマを自動的に管理できます。この機能により、初期設定 (特にスキーマが不明な場合) と、スキーマが頻繁に変更される場合の継続的な操作の両方が簡略化されます。これにより、Auto Loader、Kafka、Kinesisなどのストリーミング データソースからの任意のJSON BLOB をシームレスに処理できます。
具体的には、 Lakeflow 宣言型パイプラインで使用すると、 from_json
SQL 関数のスキーマ推論と進化により、次のことが可能になります。
- 受信したJSONレコード(ネストされたJSONオブジェクトを含む)内の新しいフィールドを検出する
- フィールドタイプを推測し、適切な Spark データ型にマッピングします。
- 新しいフィールドに対応するためにスキーマを自動的に進化させる
- 現在のスキーマに準拠していないデータを自動的に処理する
構文: スキーマを自動的に推論して進化させる
from_json
をLakeflow 宣言型パイプラインと共に使用することで、スキーマを自動的に推論して進化させることができます。これを有効にするには、スキーマを NULL に設定し、 schemaLocationKey
オプションを指定します。これにより、スキーマを推測して追跡できます。
- SQL
- Python
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
クエリには複数のfrom_json
式を含めることができますが、各式には一意のschemaLocationKey
が必要です。schemaLocationKey
もパイプラインごとに一意である必要があります。
- SQL
- Python
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
構文: 固定スキーマ
代わりに特定のスキーマを適用する場合は、次のfrom_json
構文を使用して、そのスキーマを使用して JSON 文字列を解析できます。
from_json(jsonStr, schema, [, options])
この構文は、Lakeflow宣言型パイプラインを含む任意の Databricks 環境で使用できます。 詳細については、 こちらをご覧ください。
スキーマ推論
from_json
JSON データ列の最初のバッチからスキーマを推測し、 schemaLocationKey
(必須) で内部的にインデックスを付けます。
JSON 文字列が単一のオブジェクト (たとえば、 {"id": 123, "name": "John"}
) の場合、 from_json
STRUCT タイプのスキーマを推測し、フィールドのリストにrescuedDataColumn
追加します。
STRUCT<id LONG, name STRING, _rescued_data STRING>
ただし、JSON 文字列に最上位レベルの配列 ( ["id": 123, "name": "John"]
など) がある場合、 from_json
は ARRAY を STRUCT でラップします。このアプローチにより、推論されたスキーマと互換性のないデータを救済できます。配列の値を下流の個別の行に分割するオプションがあります。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
スキーマヒントを使用してスキーマ推論をオーバーライドする
オプションでschemaHints
指定して、 from_json
列の型を推測する方法に影響を与えることができます。これは、列が特定のデータ型であることがわかっている場合、またはより一般的なデータ型 (たとえば、整数ではなく倍精度) を選択する場合に役立ちます。SQL スキーマ仕様構文を使用して、列のデータ型に対して任意の数のヒントを提供できます。スキーマ ヒントのセマンティクスはAuto Loaderスキーマ ヒントの場合と同じです。 例えば:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
JSON 文字列に最上位レベルの ARRAY が含まれている場合、それは STRUCT にラップされます。このような場合、スキーマヒントは、ラップされた STRUCT ではなく ARRAY スキーマに適用されます。たとえば、次のような最上位配列を持つ JSON 文字列を考えます。
[{"id": 123, "name": "John"}]
推論された ARRAY スキーマは STRUCT にラップされます。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
id
のデータ型を変更するには、スキーマ ヒントをelement.id
文字列として指定します。 DOUBLE 型の新しい列を追加するには、 element.new_col
DOUBLE を指定します。これらのヒントにより、最上位レベルの JSON 配列のスキーマは次のようになります。
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
スキーマを進化させる schemaEvolutionMode
from_json
データを処理する際に新しい列の追加を検出します。from_json
新しいフィールドを検出すると、新しい列をスキーマの末尾にマージして、推論されたスキーマを最新のスキーマに更新します。既存の列のデータ型は変更されません。スキーマの更新後、パイプラインは更新されたスキーマで自動的に再起動します。
from_json
オプションのschemaEvolutionMode
設定を使用して設定する、スキーマ進化の次のモードをサポートします。これらのモードはAuto Loaderと一致しています。
| 新しいコラムを読む際の行動 |
---|---|
| ストリームが失敗します。新しい列がスキーマに追加されます。既存の列ではデータ型が進化しません。 |
| スキーマは進化せず、スキーマの変更によってストリームが失敗することはありません。すべての新しい列は、 レスキューデータ列に記録されます。 |
| ストリームが失敗しました。 |
| スキーマは進化せず、新しい列は無視され、 |
例えば:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
救出されたデータ列
救出されたデータ列は、 _rescued_data
としてスキーマに自動的に追加されます。rescuedDataColumn
オプションを設定することで列の名前を変更できます。例えば:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
復元されたデータ列を使用することを選択した場合、推論されたスキーマと一致しない列は削除されるのではなく復元されます。これは、データ型の不一致、スキーマ内の列の欠落、または列名の大文字と小文字の違いが原因で発生する可能性があります。
破損したレコードを処理する
形式が不正で解析できないレコードを保存するには、次の例のように、スキーマヒントを設定して_corrupt_record
列を追加します。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
破損したレコード列の名前を変更するには、 columnNameOfCorruptRecord
オプションを設定します。
JSON パーサーは、破損したレコードを処理するための 3 つのモードをサポートしています。
モード | 説明 |
---|---|
| 破損したレコードの場合、不正な形式の文字列を |
| 破損したレコードを無視します。
|
| パーサーが破損したレコードに遭遇すると例外をスローします。
|
from_json出力のフィールドを参照する
from_json
パイプライン実行中にスキーマを推論します。from_json
関数が少なくとも 1 回正常に実行される前に、ダウンストリーム クエリがfrom_json
フィールドを参照する場合、フィールドは解決されず、クエリはスキップされます。次の例では、ブロンズ クエリのfrom_json
関数が実行され、スキーマが推論されるまで、シルバー テーブル クエリの分析はスキップされます。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
from_json
関数とそれが推論するフィールドが同じクエリで参照されている場合、次の例のように分析が失敗する可能性があります。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
この問題は、 from_json
フィールドへの参照をダウンストリーム クエリ (上記のブロンズ/シルバーの例のように) に移動することで修正できます。あるいは、参照されるfrom_json
フィールドを含むschemaHints
を指定することもできます。例えば:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
例: スキーマを自動的に推論して進化させる
このセクションでは、Lakeflow宣言型パイプラインで from_json
を使用してスキーマの自動推論と進化を有効にする コード例を示します。
クラウドオブジェクトストレージからストリーミングテーブルを作成する
次の例では、 read_files
構文を使用して、クラウド オブジェクト ストレージからストリーミング テーブルを作成します。
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
@dp.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Kafkaからストリーミングテーブルを作成する
次の例では、 read_kafka
構文を使用してKafkaからストリーミング テーブルを作成します。
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
@dp.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
例: 固定スキーマ
固定スキーマでfrom_json
を使用するコード例については、 from_json
関数を参照してください。
よくある質問
このセクションでは、 from_json
関数におけるスキーマ推論と進化のサポートに関するよくある質問に回答します。
from_json
とparse_json
の違いは何ですか?
parse_json
関数は JSON 文字列からVARIANT
値を返します。
VARIANT は、半構造化データを保存するための柔軟かつ効率的な方法を提供します。これにより、厳密な型を完全に廃止することで、スキーマの推論と進化を回避します。ただし、書き込み時にスキーマを強制する場合 (たとえば、比較的厳密なスキーマがある場合) は、 from_json
方が適切なオプションである可能性があります。
次の表は、 from_json
とparse_json
の違いを示しています。
関数 | ユースケース | 可用性 |
---|---|---|
|
| スキーマ推論と進化で使用できるのは Lakeflow 宣言型パイプラインのみです。 |
| VARIANT は、スキーマ化する必要のないデータを保持するのに特に適しています。例えば:
| Lakeflow 宣言型パイプラインの有無にかかわらず利用可能 |
from_json
スキーマ推論と進化構文をLakeflow宣言型パイプラインの外部で使用できますか
いいえ、 from_json
スキーマ推論と進化構文をLakeflow 宣言型パイプラインの外部で使用することはできません。
from_json
によって推論されたスキーマにアクセスするにはどうすればよいでしょうか?
ターゲットのストリーミングテーブルのスキーマを表示します。
from_json
にスキーマを渡して進化させることはできますか?
いいえ、 from_json
にスキーマを渡して進化させることはできません。ただし、スキーマヒントを提供して、 from_json
によって推論されたフィールドの一部またはすべてをオーバーライドすることができます。
テーブルが完全に更新されると、スキーマはどうなりますか?
テーブルに関連付けられたスキーマの場所がクリアされ、スキーマが最初から再推論されます。