半構造化バリアントタイプとしてデータを取り込む
プレビュー
この機能はパブリックプレビュー段階です。
Databricks Runtime 15.3 以降では、 VARIANT
型を使用して半構造化データを取り込むことができます。 この記事では、Auto Loader とCOPY INTO
を使用してクラウド オブジェクト ストレージからデータを取り込む動作について説明し、Kafka からレコードをストリーミングする動作と、バリアント データを含む新しいテーブルを作成したり、バリアント型を使用して新しいレコードを挿入したりするための SQL コマンドについて説明します。
「 バリアント データのクエリ」を参照してください。
バリアント列を含むテーブルを作成する
VARIANT
は、Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってサポートされるテーブルでサポートされています。 Databricks 上のマネージド テーブルはデフォルトで Delta Lake を使用するため、次の構文を使用して単一のVARIANT
列を持つ空のテーブルを作成できます。
CREATE TABLE table_name (variant_column VARIANT)
あるいは、JSON 文字列でPARSE_JSON
関数を使用して、CTAS ステートメントでバリアント列を含むテーブルを作成することもできます。 次の例では、2 つの列を持つテーブルを作成します。
JSON 文字列から
STRING
型として抽出されたid
列。variant_column
列には、VARIANT
型としてエンコードされた JSON 文字列全体が含まれています。
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注:
VARIANT
列はクラスタリング キー、パーティション、またはZ-Orderキーには使用できません。 VARIANT
型で格納されたデータは、比較や順序付けには使用できません。
Databricks では、クエリを高速化し、ストレージ レイアウトを最適化するために使用する予定のフィールドを非バリアント列として抽出して保存することをお勧めします。
parse_json
を使用したデータの挿入
ターゲット テーブルにVARIANT
としてエンコードされた列がすでに含まれている場合は、次の例のように、 parse_json
を使用して JSON 文字列レコードをVARIANT
として挿入できます。
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
クラウドオブジェクトストレージからバリアントとしてデータを取り込む
Databricks Runtime 15.3 以降では、 Auto Loader使用して、 JSONソースからのすべてのデータをターゲット テーブルの単一の VARIANT
列として読み込むことができます。 VARIANT
はスキーマと型の変更に対して柔軟であり、大文字と小文字の区別とデータソースに存在するNULL
値を維持するため、このパターンは、次の注意事項を除けば、ほとんどの取り込みシナリオに対して堅牢です。
不正な形式の JSON レコードは、
VARIANT
タイプを使用してエンコードできません。VARIANT
type は、最大 16 MB のレコードしか保持できません。
注:
バリアント型は、過度に大きいレコード レコードを破損したレコードと同様に扱います。 デフォルトのPERMISSIVE
処理モードでは、大きすぎるレコードは不正な形式の JSON レコードとともに_malformed_data
列にキャプチャされます。
JSON ソースのすべてのデータは 1 つのVARIANT
列として記録されるため、インジェスト中にスキーマの進化は発生せず、rescuedDataColumn
はサポートされていません。次の例では、ターゲット・テーブルに 1 つの VARIANT
カラムがすでに存在することを前提としています。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
スキーマを定義するとき、またはschemaHints
を渡すときにVARIANT
を指定することもできます。参照されるソース フィールドのデータには、有効な JSON 文字列が含まれている必要があります。 次の例は、この構文を示しています。
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
バリアントでCOPY INTO
を使用する
Databricks では、使用可能な場合は Auto Loader over COPY INTO
を使用することをお勧めします。
COPY INTO
JSONデータソースのコンテンツ全体を 1 つの列として取り込むことをサポートします。 次の例では、単一のVARIANT
列を持つ新しいテーブルを作成し、 COPY INTO
を使用して JSON ファイル ソースからレコードを取り込みます。
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
また、ターゲット表の任意のフィールドを VARIANT
として定義することもできます。 COPY INTO
を実行すると、次の例のように、データソース内の対応するフィールドが取り込まれ、 VARIANT
型にキャストされます。
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Kafka データをバリアントとしてストリームする
多くの Kafka ストリームは、JSON を使用してペイロードをエンコードします。 VARIANT
を使用して Kafka ストリームを取り込むと、これらのワークロードはスキーマの変更に対して堅牢になります。
次の例は、Kafka ストリーミング ソースを読み取り、 key
をSTRING
として、 value
をVARIANT
としてキャストし、ターゲット テーブルに書き出す方法を示しています。
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)