半構造化バリアントタイプとしてデータを取り込む

プレビュー

この機能はパブリックプレビュー段階です。

Databricks Runtime 15.3 以降では、 VARIANT型を使用して半構造化データを取り込むことができます。 この記事では、Auto Loader とCOPY INTOを使用してクラウド オブジェクト ストレージからデータを取り込む動作について説明し、Kafka からレコードをストリーミングする動作と、バリアント データを含む新しいテーブルを作成したり、バリアント型を使用して新しいレコードを挿入したりするための SQL コマンドについて説明します。

バリアント データのクエリ」を参照してください。

バリアント列を含むテーブルを作成する

VARIANT は、Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってサポートされるテーブルでサポートされています。 Databricks 上のマネージド テーブルはデフォルトで Delta Lake を使用するため、次の構文を使用して単一のVARIANT列を持つ空のテーブルを作成できます。

CREATE TABLE table_name (variant_column VARIANT)

あるいは、JSON 文字列でPARSE_JSON関数を使用して、CTAS ステートメントでバリアント列を含むテーブルを作成することもできます。 次の例では、2 つの列を持つテーブルを作成します。

  • JSON 文字列からSTRING型として抽出されたid列。

  • variant_column列には、 VARIANT型としてエンコードされた JSON 文字列全体が含まれています。

CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

注:

Databricks では、クエリを高速化し、ストレージ レイアウトを最適化するために使用する予定のフィールドを非バリアント列として抽出して保存することをお勧めします。

VARIANT 列は、クラスタリング キー、パーティション、または Z-Order キーには使用できません。 VARIANT データ型は、比較、グループ化、順序付け、およびセット操作には使用できません。制限事項の完全なリストについては、「 制限事項」を参照してください。

parse_jsonを使用したデータの挿入

ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、次の例のように、 parse_jsonを使用して JSON 文字列レコードをVARIANTとして挿入できます。

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data
from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

クラウドオブジェクトストレージからバリアントとしてデータを取り込む

Databricks Runtime 15.3 以降では、 Auto Loader使用して、 JSONソースからのすべてのデータをターゲット テーブルの単一の VARIANT 列として読み込むことができます。 VARIANTはスキーマと型の変更に対して柔軟であり、大文字と小文字の区別とデータソースに存在するNULL値を維持するため、このパターンは、次の注意事項を除けば、ほとんどの取り込みシナリオに対して堅牢です。

  • 不正な形式の JSON レコードは、 VARIANTタイプを使用してエンコードできません。

  • VARIANT type は、最大 16 MB のレコードしか保持できません。

注:

バリアント型は、過度に大きいレコード レコードを破損したレコードと同様に扱います。 デフォルトのPERMISSIVE処理モードでは、大きすぎるレコードは不正な形式の JSON レコードとともに_malformed_data列にキャプチャされます。

JSON ソースのすべてのデータは 1 つのVARIANT列として記録されるため、インジェスト中にスキーマの進化は発生せず、rescuedDataColumnはサポートされていません。次の例では、ターゲット・テーブルに 1 つの VARIANT カラムがすでに存在することを前提としています。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

スキーマを定義するとき、またはschemaHintsを渡すときにVARIANTを指定することもできます。参照されるソース フィールドのデータには、有効な JSON 文字列が含まれている必要があります。 次の例は、この構文を示しています。

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

バリアントでCOPY INTOを使用する

Databricks では、使用可能な場合は Auto Loader over COPY INTO を使用することをお勧めします。

COPY INTO JSONデータソースのコンテンツ全体を 1 つの列として取り込むことをサポートします。 次の例では、単一のVARIANT列を持つ新しいテーブルを作成し、 COPY INTOを使用して JSON ファイル ソースからレコードを取り込みます。

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

また、ターゲット表の任意のフィールドを VARIANTとして定義することもできます。 COPY INTOを実行すると、次の例のように、データソース内の対応するフィールドが取り込まれ、 VARIANT型にキャストされます。

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Kafka データをバリアントとしてストリームする

多くの Kafka ストリームは、JSON を使用してペイロードをエンコードします。 VARIANTを使用して Kafka ストリームを取り込むと、これらのワークロードはスキーマの変更に対して堅牢になります。

次の例は、Kafka ストリーミング ソースを読み取り、 keySTRINGとして、 valueVARIANTとしてキャストし、ターゲット テーブルに書き出す方法を示しています。

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)