メインコンテンツまでスキップ

半構造化バリアントタイプとしてデータを取り込みます

備考

プレビュー

この機能は パブリック プレビュー段階です。

Databricks Runtime 15.3 以降では、 VARIANT 型を使用して半構造化データを取り込むことができます。この記事では、 Auto Loader と COPY INTOを使用してクラウド オブジェクト ストレージからデータを取り込む、 Kafkaからレコードをストリーミングする、バリアント データを使用して新しいテーブルを作成する、またはバリアント タイプを使用して新しいレコードを挿入するための SQL コマンドの動作とパターンの例について説明します。 次の表は、サポートされているファイル形式と Databricks Runtime バージョンのサポートをまとめたものです。

ファイル形式

サポートされている Databricks Runtime のバージョン

JSON

15.3 以上

XML

16.4 以上

CSV

16.4 以上

バリアント データのクエリを参照してください。

バリアント列を持つテーブルを作成する

VARIANT は、Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってサポートされるテーブルでサポートされています。Databricksのマネージドテーブルは、当然Delta Lakeを使用するため、次の構文を使用して単一のVARIANT列を持つ空のテーブルを作成できます。

SQL
CREATE TABLE table_name (variant_column VARIANT)

あるいは、CTAS ステートメントを使用してバリアント列を持つテーブルを作成することもできます。JSON 文字列を解析するにはPARSE_JSON関数を使用し、XML 文字列を解析するにはFROM_XML関数を使用します。次の例では、 2 つの列を持つテーブルを作成します。

  • id列は JSON 文字列からSTRING型として抽出されます。
  • variant_column VARIANT型としてエンコードされた JSON 文字列全体が含まれます。
SQL
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注記

Databricks では、クエリを高速化し、ストレージ レイアウトを最適化するために、頻繁にクエリされるフィールドを抽出し、非バリアント列として保存することを推奨しています。

VARIANT 列はクラスタリング キー、パーティション、またはZ-Orderキーには使用できません。 VARIANTデータ型は、比較、グループ化、順序付け、および設定操作には使用できません。詳細については、 「制限事項」を参照してください。

データの挿入 parse_json

ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、 parse_jsonを使用して JSON 文字列レコードをVARIANTとして挿入できます。たとえば、 json_string列から JSON 文字列を解析し、 table_nameに挿入します。

SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data

データの挿入 from_xml

ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、 from_xmlを使用して XML 文字列レコードをVARIANTとして挿入できます。たとえば、 xml_string列から XML 文字列を解析し、 table_nameに挿入します。

SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data

データを挿入する方法 from_csv

ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、 from_csvを使用して CSV 文字列レコードをVARIANTとして挿入できます。たとえば、 csv_string列から CSV レコードを解析し、 table_nameに挿入します。

SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data

クラウドオブジェクトストレージからバリアントとしてデータを取り込む

Auto Loader を使用して、サポートされているファイル ソース からすべてのデータをターゲット テーブルの 1 つの VARIANT 列として読み込むことができます。 VARIANT はスキーマと型の変更に柔軟に対応でき、大文字と小文字の区別とデータソースに存在するNULL値を維持するため、このパターンはほとんどのインジェスト シナリオに対して堅牢ですが、次の点に注意してください。

  • 形式が正しくないレコードは、 VARIANT タイプを使用してエンコードできません。
  • VARIANT タイプ は、最大 16MB のサイズのレコードのみを保持できます。
注記

バリアントは、大きすぎるレコードを破損したレコードと同様に扱います。デフォルトの PERMISSIVE 処理モードでは、大きすぎるレコードは corruptRecordColumn.

レコード全体が 1 つの VARIANT 列として記録されるため、インジェスト中にスキーマの進化は発生せず、 rescuedDataColumn はサポートされていません。 次の例では、ターゲット・テーブルに 1 つの VARIANT カラムがすでに存在することを前提としています。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

スキーマを定義するとき、またはschemaHintsを渡すときにVARIANT指定することもできます。参照されるソース フィールドのデータには有効なレコードが含まれている必要があります。次の例はこの構文を示しています。

Python
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

バリアントでCOPY INTOを使用する

Databricks では、使用可能な場合は Auto Loader と COPY INTO を使用することをお勧めします。

COPY INTO は、サポートされているデータソースの全コンテンツを 1 つの列として取り込むことをサポートします。 次の例では、1 つの VARIANT 列を持つ新しいテーブルを作成し、 COPY INTO を使用して JSON ファイル ソースからレコードを取り込みます。

SQL
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')

Kafka データをバリアントとしてストリームする

多くの Kafka ストリームは、JSON を使用してペイロードをエンコードします。 VARIANT を使用して Kafka ストリームを取り込むと、これらのワークロードはスキーマの変更に対して堅牢になります。

次の例は、Kafka ストリーミングソースを読み取り、 keySTRING としてキャストし、 valueVARIANTとしてキャストし、ターゲットテーブルに書き出す方法を示しています。

Python
from pyspark.sql.functions import col, parse_json

(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

次のステップ