半構造化バリアントタイプとしてデータを取り込みます
プレビュー
この機能は パブリック プレビュー段階です。
Databricks Runtime 15.3 以降では、 VARIANT 型を使用して半構造化データを取り込むことができます。この記事では、 Auto Loader と COPY INTOを使用してクラウド オブジェクト ストレージからデータを取り込む、 Kafkaからレコードをストリーミングする、バリアント データを使用して新しいテーブルを作成する、またはバリアント タイプを使用して新しいレコードを挿入するための SQL コマンドの動作とパターンの例について説明します。 次の表は、サポートされているファイル形式と Databricks Runtime バージョンのサポートをまとめたものです。
ファイル形式 | サポートされている Databricks Runtime のバージョン |
|---|---|
JSON | 15.3 以上 |
XML | 16.4 以上 |
CSV | 16.4 以上 |
バリアント データのクエリを参照してください。
バリアント列を持つテーブルを作成する
VARIANT は、Databricks Runtime 15.3 以降の標準 SQL 型であり、Delta Lake によってサポートされるテーブルでサポートされています。Databricksのマネージドテーブルは、当然Delta Lakeを使用するため、次の構文を使用して単一のVARIANT列を持つ空のテーブルを作成できます。
CREATE TABLE table_name (variant_column VARIANT)
あるいは、CTAS ステートメントを使用してバリアント列を持つテーブルを作成することもできます。JSON 文字列を解析するにはPARSE_JSON関数を使用し、XML 文字列を解析するにはFROM_XML関数を使用します。次の例では、 2 つの列を持つテーブルを作成します。
id列は JSON 文字列からSTRING型として抽出されます。variant_columnVARIANT型としてエンコードされた JSON 文字列全体が含まれます。
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Databricks では、クエリを高速化し、ストレージ レイアウトを最適化するために、頻繁にクエリされるフィールドを抽出し、非バリアント列として保存することを推奨しています。
VARIANT 列はクラスタリング キー、パーティション、またはZ-Orderキーには使用できません。 VARIANTデータ型は、比較、グループ化、順序付け、および設定操作には使用できません。詳細については、 「制限事項」を参照してください。
データの挿入 parse_json
ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、 parse_jsonを使用して JSON 文字列レコードをVARIANTとして挿入できます。たとえば、 json_string列から JSON 文字列を解析し、 table_nameに挿入します。
- SQL
- Python
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
データの挿入 from_xml
ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、 from_xmlを使用して XML 文字列レコードをVARIANTとして挿入できます。たとえば、 xml_string列から XML 文字列を解析し、 table_nameに挿入します。
- SQL
- Python
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
データを挿入する方法 from_csv
ターゲット テーブルにVARIANTとしてエンコードされた列がすでに含まれている場合は、 from_csvを使用して CSV 文字列レコードをVARIANTとして挿入できます。たとえば、 csv_string列から CSV レコードを解析し、 table_nameに挿入します。
- SQL
- Python
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
クラウドオブジェクトストレージからバリアントとしてデータを取り込む
Auto Loader を使用して、サポートされているファイル ソース からすべてのデータをターゲット テーブルの 1 つの VARIANT 列として読み込むことができます。 VARIANT はスキーマと型の変更に柔軟に対応でき、大文字と小文字の区別とデータソースに存在するNULL値を維持するため、このパターンはほとんどのインジェスト シナリオに対して堅牢ですが、次の点に注意してください。
- 形式が正しくないレコードは、
VARIANTタイプを使用してエンコードできません。 VARIANTタイプ は、最大 16MB のサイズのレコードのみを保持できます。
バリアントは、大きすぎるレコードを破損したレコードと同様に扱います。デフォルトの PERMISSIVE 処理モードでは、大きすぎるレコードは corruptRecordColumn.
レコード全体が 1 つの VARIANT 列として記録されるため、インジェスト中にスキーマの進化は発生せず、 rescuedDataColumn はサポートされていません。 次の例では、ターゲット・テーブルに 1 つの VARIANT カラムがすでに存在することを前提としています。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
スキーマを定義するとき、またはschemaHintsを渡すときにVARIANT指定することもできます。参照されるソース フィールドのデータには有効なレコードが含まれている必要があります。次の例はこの構文を示しています。
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
バリアントでCOPY INTOを使用する
Databricks では、使用可能な場合は Auto Loader と COPY INTO を使用することをお勧めします。
COPY INTO は、サポートされているデータソースの全コンテンツを 1 つの列として取り込むことをサポートします。 次の例では、1 つの VARIANT 列を持つ新しいテーブルを作成し、 COPY INTO を使用して JSON ファイル ソースからレコードを取り込みます。
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')
Kafka データをバリアントとしてストリームする
多くの Kafka ストリームは、JSON を使用してペイロードをエンコードします。 VARIANT を使用して Kafka ストリームを取り込むと、これらのワークロードはスキーマの変更に対して堅牢になります。
次の例は、Kafka ストリーミングソースを読み取り、 key を STRING としてキャストし、 value を VARIANTとしてキャストし、ターゲットテーブルに書き出す方法を示しています。
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
次のステップ
- バリアントデータをクエリします。
- バリアント型のサポートを構成します。
- Auto Loader について詳しくご覧ください。「Auto Loader とは何ですか?」を参照してください。