メインコンテンツまでスキップ

半構造化バリアントタイプとしてデータを取り込みます

備考

プレビュー

この機能は パブリック プレビュー段階です。

Databricks Runtime 15.3 以降では、 VARIANT 型を使用して半構造化データを取り込むことができます。この記事では、 Auto Loader と COPY INTOを使用してクラウド オブジェクト ストレージからデータを取り込む、 Kafkaからレコードをストリーミングする、バリアント データを使用して新しいテーブルを作成する、またはバリアント タイプを使用して新しいレコードを挿入するための SQL コマンドの動作とパターンの例について説明します。 次の表は、サポートされているファイル形式と Databricks Runtime バージョンのサポートをまとめたものです。

ファイル形式

サポートされている Databricks Runtime のバージョン

JSON

15.3 以上

XML

16.4 以上

バリアント データのクエリを参照してください。

バリアント列を持つテーブルを作成する

VARIANT は、Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってサポートされるテーブルでサポートされています。 マネージドテーブル on Databricks Delta Lake by デフォルトを使用するため、次の構文を使用して 1 つの VARIANT 列を持つ空のテーブルを作成できます。

SQL
CREATE TABLE table_name (variant_column VARIANT)

または、JSON 文字列に対して PARSE_JSON 関数を使用するか、XML 文字列に対して FROM_XML 関数を使用して CTAS ステートメントを使用し、バリアント列を持つテーブルを作成することもできます。次の例では、2 つのカラムを持つテーブルを作成します。

  • JSON 文字列からSTRING型として抽出された id 列。
  • variant_column列には、VARIANT型としてエンコードされた JSON 文字列全体が含まれます。
SQL
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注記

Databricks では、クエリを高速化し、ストレージ レイアウトを最適化するために使用する予定の非バリアント列としてフィールドを抽出して格納することをお勧めします。

VARIANT 列は、クラスタリング キー、パーティション、または Z-Order キーには使用できません。 VARIANT データ型は、比較、グループ化、順序付け、およびセット操作には使用できません。制限事項の完全なリストについては、「 制限事項」を参照してください。

データの挿入: parse_json

ターゲットテーブルに VARIANTとしてエンコードされた列がすでに含まれている場合は、次の例のように、 parse_json を使用して JSON 文字列レコードを VARIANTとして挿入できます。

SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data

データの挿入: from_xml

ターゲット・テーブルに VARIANTとしてエンコードされたカラムがすでに含まれている場合は、 from_xml を使用して XML 文字列レコードを VARIANTとして挿入できます。例えば:

SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data

クラウドオブジェクトストレージからデータをバリアントとして取り込む

Auto Loader を使用して、サポートされているファイル ソース からすべてのデータをターゲット テーブルの 1 つの VARIANT 列として読み込むことができます。 VARIANT はスキーマと型の変更に柔軟に対応でき、大文字と小文字の区別とデータソースに存在するNULL値を維持するため、このパターンはほとんどのインジェスト シナリオに対して堅牢ですが、次の点に注意してください。

  • 形式が正しくないレコードは、 VARIANT タイプを使用してエンコードできません。
  • VARIANT タイプ は、最大 16MB のサイズのレコードのみを保持できます。
注記

バリアントは、大きすぎるレコードを破損したレコードと同様に扱います。デフォルトの PERMISSIVE 処理モードでは、大きすぎるレコードは corruptRecordColumn.

レコード全体が 1 つの VARIANT 列として記録されるため、インジェスト中にスキーマの進化は発生せず、 rescuedDataColumn はサポートされていません。 次の例では、ターゲット・テーブルに 1 つの VARIANT カラムがすでに存在することを前提としています。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

スキーマを定義するとき、またはschemaHintsを渡すときにVARIANTを指定することもできます。参照されるソース フィールドのデータには、有効なレコードが含まれている必要があります。次の例は、この構文を示しています。

Python
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

バリアントで COPY INTO を使用する

Databricks では、使用可能な場合は Auto Loader と COPY INTO を使用することをお勧めします。

COPY INTO は、サポートされているデータソースの全コンテンツを 1 つの列として取り込むことをサポートします。 次の例では、1 つの VARIANT 列を持つ新しいテーブルを作成し、 COPY INTO を使用して JSON ファイル ソースからレコードを取り込みます。

SQL
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')

ストリーム Kafka データをバリアントとして

多くの Kafka ストリームは、JSON を使用してペイロードをエンコードします。 VARIANT を使用して Kafka ストリームを取り込むと、これらのワークロードはスキーマの変更に対して堅牢になります。

次の例は、Kafka ストリーミングソースを読み取り、 keySTRING としてキャストし、 valueVARIANTとしてキャストし、ターゲットテーブルに書き出す方法を示しています。

Python
from pyspark.sql.functions import col, parse_json

(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)