メインコンテンツまでスキップ

半構造化バリアントタイプとしてデータを取り込みます

備考

プレビュー

この機能は パブリック プレビュー段階です。

Databricks Runtime 15.3 以降では、 VARIANT 型を使用して半構造化データを取り込むことができます。 この記事では、 Auto Loader と COPY INTO を使用してクラウド オブジェクト ストレージからデータを取り込むための動作とパターンの例、 Kafkaからのストリーミング レコード、およびバリアント データを使用して新しいテーブルを作成する SQL またはバリアント タイプを使用して新しいレコードを挿入するためのコマンドについて説明します。

「バリアント データのクエリ」を参照してください。

バリアント列を持つテーブルを作成する

VARIANT は、Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってサポートされるテーブルでサポートされています。 マネージドテーブル on Databricks Delta Lake by デフォルトを使用するため、次の構文を使用して 1 つの VARIANT 列を持つ空のテーブルを作成できます。

SQL
CREATE TABLE table_name (variant_column VARIANT)

または、JSON 文字列で PARSE_JSON 関数を使用して CTAS ステートメントを使用し、バリアント列を持つテーブルを作成することもできます。 次の例では、2 つのカラムを持つテーブルを作成します。

  • JSON 文字列からSTRING型として抽出された id 列。
  • variant_column列には、VARIANT型としてエンコードされた JSON 文字列全体が含まれます。
SQL
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注記

Databricks では、クエリを高速化し、ストレージ レイアウトを最適化するために使用する予定の非バリアント列としてフィールドを抽出して格納することをお勧めします。

VARIANT 列は、クラスタリング キー、パーティション、または Z-Order キーには使用できません。 VARIANT データ型は、比較、グループ化、順序付け、およびセット操作には使用できません。制限事項の完全なリストについては、「 制限事項」を参照してください。

を使用してデータを挿入します parse_json

ターゲットテーブルに VARIANTとしてエンコードされた列がすでに含まれている場合は、次の例のように、 parse_json を使用して JSON 文字列レコードを VARIANTとして挿入できます。

SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data

クラウドオブジェクトストレージからデータをバリアントとして取り込む

Databricks Runtime 15.3 以降では、Auto Loader を使用して、JSON ソース からすべてのデータをターゲット テーブルの 1 つのVARIANT列としてロードできます。VARIANT はスキーマと型の変更に柔軟に対応でき、大文字と小文字の区別とデータソースに存在するNULL値を維持するため、このパターンはほとんどのインジェスト シナリオに対して堅牢ですが、次の点に注意してください。

  • 形式が正しくない JSON レコードは、 VARIANT タイプを使用してエンコードできません。
  • VARIANT type は、最大 16MB のサイズのレコードのみを保持できます。
注記

バリアントは、大きすぎるレコード レコードを破損したレコードと同様に扱います。 デフォルトの PERMISSIVE 処理モードでは、大きすぎるレコードは、不正な形式の JSON レコードと共に _malformed_data 列にキャプチャされます。

JSON ソースのすべてのデータは 1 つのVARIANT列として記録されるため、インジェスト中にスキーマの進化は発生せず、rescuedDataColumnはサポートされていません。次の例では、ターゲット・テーブルに 1 つの VARIANT カラムがすでに存在することを前提としています。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

スキーマを定義するとき、またはschemaHintsを渡すときにVARIANTを指定することもできます。参照されるソース フィールドのデータには、有効な JSON 文字列が含まれている必要があります。 次の例は、この構文を示しています。

Python
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)

バリアントで COPY INTO を使用する

Databricks では、使用可能な場合は Auto Loader over COPY INTO を使用することをお勧めします。

COPY INTO JSON データソースの全コンテンツを 1 つの列として取り込むことをサポートします。 次の例では、1 つの VARIANT 列を持つ新しいテーブルを作成し、 COPY INTO を使用して JSON ファイル ソースからレコードを取り込みます。

SQL
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')

また、ターゲット テーブル内の任意のフィールドを VARIANTとして定義することもできます。 COPY INTOを実行すると、次の例のように、データソース内の対応するフィールドが取り込まれ、VARIANTタイプにキャストされます。

SQL
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON

ストリーム Kafka データをバリアントとして

多くの Kafka ストリームは、JSON を使用してペイロードをエンコードします。 VARIANT を使用して Kafka ストリームを取り込むと、これらのワークロードはスキーマの変更に対して堅牢になります。

次の例は、Kafka ストリーミングソースを読み取り、 keySTRING としてキャストし、 valueVARIANTとしてキャストし、ターゲットテーブルに書き出す方法を示しています。

Python
from pyspark.sql.functions import col, parse_json

(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)