ストリームを設定する
プレビュー
この機能は パブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
ストリームは、Apache Kafkaなどの外部ストリーミングデータソースを表します。ストリームは、接続の詳細、認証、スキーマ、および取り込み設定を保存します。ストリームが作成された後、特徴量ビューの定義を使用して参照し、リアルタイムストリーミング特徴量を作成できます。
ストリームには3部構成の名前があります(catalog.schema.stream_name)。ストリームへのアクセスは、関連する取り込みテーブルによって管理されます。詳細については、取り込みとバックフィルを参照してください。
要件
- ノートブックコマンドの実行には:サーバレス、またはDatabricks Runtime 17.0 機械学習以上を実行しているクラシックなコンピュートクラスターが必要です。
feature-engineering-clientPythonパッケージのバージョン0.16.0以降がインストールされている必要があります。
ストリームを作成
新しいストリームを作成するには、create_stream()を使用します。ストリームには4つの構成コンポーネントが必要です:
- ソース設定 :ストリーミングプラットフォーム(例:Kafka)およびソース固有の詳細(Kafkaのトピックサブスクリプションなど)を指定します。
- 接続構成 :ブートストラップサーバと資格情報を含め、ストリーミングプラットフォームに接続および認証する方法を指定します。
- スキーマ設定:メッセージのキーと値の構造を定義します。
- 取り込み設定 : ストリームデータがどこでどのように取り込まれるかを指定します。詳細については、取り込みとバックフィルを参照してください。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
KafkaStreamConfig,
KafkaSubscriptionMode,
StreamConnectionConfig,
DirectSchemas,
SchemaConfig,
IngestionConfig,
IngestionDestination,
StreamBackfillSource,
)
client = FeatureEngineeringClient()
stream = client.create_stream(
name="my_catalog.my_schema.my_stream",
source_config=KafkaStreamConfig(
subscription_mode=KafkaSubscriptionMode(subscribe="events-topic"),
),
connection_config=StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
),
schema_config=DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "transaction_id": {"type": "string"},'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string", "format": "date-time"}'
' }'
'}'
)
),
),
ingestion_config=IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
),
)
ストリームソースへの接続
ストリーミング機能を定義する前に、ストリーミング Lakeflow Spark宣言型パイプライン接続を Kafka ブローカーに接続してテストしてください。サーバレス コンピュートでのストリーミングおよびApache Kafka への接続を参照してください。
AWS マネージド ストリーミング (Amazon MSK) の詳細については、Amazon MSK へのサーバレス プライベート接続」を参照してください。Kafka 認証オプションの詳細については、認証」を参照してください。
認証
Unity Catalog接続 (推奨)
Unity Catalog 接続を使用して、Kafka クラスターに対して認証を行います。これは、マネージド認証に推奨されるアプローチです。接続を作成するには、接続を作成を参照してください。
connection_config = StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
)
直接mTLS
直接mTLS認証の場合、Unity Catalogボリュームに保存されているキーストアファイルとトラストストアファイルを提供し、パスワードはDatabricksシークレットスコープを通じて参照されます。KafkaとのSSL認証の詳細については、SSLを使用してDatabricksをKafkaに接続するを参照してください。
from databricks.feature_engineering.entities import (
DirectMtlsConfig,
MtlsConfig,
SecretScopeReference,
)
connection_config = DirectMtlsConfig(
bootstrap_servers="broker1:9092,broker2:9092",
mtls_config=MtlsConfig(
keystore_location="/Volumes/my_catalog/my_schema/my_volume/keystore.jks",
keystore_password_ref=SecretScopeReference(
scope="my_scope", key="keystore_password"
),
key_password_ref=SecretScopeReference(
scope="my_scope", key="key_password"
),
truststore_location="/Volumes/my_catalog/my_schema/my_volume/truststore.jks",
truststore_password_ref=SecretScopeReference(
scope="my_scope", key="truststore_password"
),
),
)
SASL
SASL認証 (SASL/SCRAMとSASL/PLAINの両方) はプレビュー期間中はサポートされていません。
サブスクリプションモード
サブスクリプションモードは、ストリームが消費するKafkaトピックをどのように選択するかを指定します。3つのモードがサポートされています:
モード | 説明 | 例 |
|---|---|---|
| トピック名のコンマ区切りリスト |
|
| Java 正規表現パターンによるトピック名の一致 |
|
| トピックパーティションの割り当てを指定する JSON |
|
スキーマ構成
メッセージキーと値の構造をJSON Schema形式で定義します。Kafka ソースの場合、payload_schema は Kafka メッセージ値 (Kafka のキーと値のモデルにおける value) に相当し、key_schema は Kafka メッセージキーに相当します。payload_schema または key_schema のいずれか少なくとも 1 つを指定する必要があります。
schema_config = DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string"}'
' }'
'}'
)
),
key_schema=SchemaConfig(
json_schema='{"type": "string"}'
),
)
キーまたはペイロードのスキーマが提供されていない場合、それは単純な文字列として扱われます。
取り込みとバックフィル
ingestion_configパラメーターは、ストリームデータがトレーニングおよびサービング用にキャプチャおよび保存される方法を構成します。
ストリームへのアクセスは、インジェスチョンテーブルによって管理されます:
SELECT取り込みテーブルに対する読み取りアクセス権をストリームに付与します。MANAGE取り込みテーブルに対する削除アクセスを付与します。
テーブル情報の詳細については、テーブルおよびUnity Catalog権限リファレンスを参照してください。
取り込みパイプライン
ストリームが作成されると、Databricksはマネージド取り込みパイプラインを開始し、Kafkaトピックからメッセージを継続的に読み取り、Deltaテーブル (取り込みテーブル) に書き込みます。パイプラインは最新のKafkaオフセットから開始し、ストリーム作成後に到着する新しいメッセージのみをキャプチャして継続的に実行されます。このインジェストテーブルは、ストリーミング特徴量を使用したトレーニングに使用されます。ストリームが削除されると、その取り込みパイプラインと取り込みテーブルも削除されます。
取り込み先
ingestion_destinationは、ストリームデータが書き込まれる3部構成のDeltaテーブル名を指定します。
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
)
取り込みテーブルスキーマ
取り込みテーブルには、メッセージデータとメタデータ列が含まれています:
列 | Type | 説明 |
|---|---|---|
|
| 提供されたスキーマに従って構造化されたKafkaメッセージキー。 |
|
| 提供されたスキーマに従って構造化されたKafkaメッセージ値 (ペイロード)。 |
|
| レコードのタイムスタンプ。フォワードフィル データの場合、これは Kafka ブローカーの取り込みタイムスタンプです。バックフィル データの場合、これは顧客が提供したものです。 |
|
| レコードが消費されたKafkaトピック。 |
|
| レコードが消費されたKafkaパーティション。 |
|
| パーティション内のレコードのKafkaオフセット。 |
|
|
|
バックフィルソース
フォワードフィルパイプラインは最新のKafkaオフセットから開始するため、ストリームが作成される前に存在したメッセージはキャプチャしません。トレーニング用のヒストリカルデータカバー範囲を提供するには、オプションのバックフィルソースを設定します。
バックフィルソースが構成されている場合、Databricksは、バックフィル行をrecord_source="backfill"を使用して取り込みテーブルにコピーする1回限りのMERGE INTOジョブを実行します。MERGE は、重複チェッカーがバックフィルソースとフォワードフィルストリームに重複するタイムスタンプがあることを確認した後にのみ実行されます (バックフィルとライブストリームデータの重複を参照してください)。2日以内に重複条件が満たされない場合、無期限のブロックを回避するために、MERGEはとにかく実行されます。
バックフィルテーブルには、UTCタイムゾーンでTIMESTAMP型のstream_record_timestamp列を含める必要があります。他のKafkaメタデータ列(kafka_topic、kafka_partition、kafka_offset)は、バックフィルソースに存在する場合は渡され、それ以外の場合はNULLに設定されます。
from databricks.feature_engineering.entities import StreamBackfillSource
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
backfill_source=StreamBackfillSource(
delta_table_name="my_catalog.my_schema.historical_events"
),
)
バックフィルとライブストリームデータの重複
バックフィルと取り込みテーブル間でMERGEを実行する前に、重複チェックが2つのテーブルのタイムスタンプを比較します:
- バックフィル最大値 :バックフィルソースにおける最大
stream_record_timestampです。 - **取り込みの最小値**:
stream_record_timestamp取り込みテーブル内の行の最小record_source="stream"() です。
MERGEは、バックフィルの最新のタイムスタンプが取り込みテーブルの最も早いタイムスタンプを少なくとも1時間超えている場合に進行します。この重複により、取り込みテーブルにギャップが生じないようにします。重複条件が 2 日以内に満たされない場合でも、MERGE は無期限のブロックを回避するために実行されます。
取り込みパイプラインは最新の Kafka オフセットから開始されるため、ストリームの作成後に到着したメッセージのみをキャプチャします。バックフィル ソースには、取り込み時間範囲に及ぶデータを含める必要があります。つまり、ストリーム作成時刻までだけではなく、取り込み時間範囲に及ぶデータを含める必要があります。
たとえば、午後3時にストリームを作成した場合、フォワードフィルパイプラインは午後3時以降のメッセージの読み取りを開始します。オーバーラップチェックを満たすには、バックフィルソースに少なくとも午後4時(フォワードフィル開始から1時間後)までのタイムスタンプ付きデータを含める必要があります。これは、取り込みテーブルにギャップがないことを確実にするため、午後4時以降にバックフィルテーブルを更新する必要があることを意味します。
重複排除
deduplication_columnsを使用して、バックフィルとフォワードフィルストリームデータの間の取り込み中に、重複する行を特定するための列パスを指定します。ネストされたフィールドにはドット表記を使用します(例:"value.user_id")。
データに基づいて重複排除カラムを選択します:
- ストリーム内の各レコードに一意の識別子 (例:
value.transaction_id) が含まれている場合は、その列を重複排除に使用します。 - バックフィルソースに
kafka_partitionとkafka_offsetカラムが含まれている場合は、それらを使用して各レコードを一意に識別します。 - 重複排除列が指定されていない場合、デフォルトの重複排除キーは
key、value、およびstream_record_timestampの完全な組み合わせです。これは推奨されません。この厳密な条件の一致は、簡単に重複を招く可能性があるためです。
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
deduplication_columns=["value.transaction_id"],
)
ストリームの管理
ストリームの取得
stream = client.get_stream(name="my_catalog.my_schema.my_stream")
ストリームを一覧表示
streams = client.list_streams(
catalog_name="my_catalog",
schema_name="my_schema",
max_results=50,
include_schemas=False,
)
include_schemas=Trueを設定して、完全なスキーマ詳細を含めます。スキーマは大規模になる可能性があり、これにより長時間実行される操作になることがあります。代わりにスキーマを個別に取得するには、get_stream を使用します。
ストリームの削除
ストリームを削除すると、その取り込みパイプラインおよび取り込みテーブルも削除されます。
削除されたストリームを参照するモデルや特徴は、基になるストリームデータにアクセスできなくなります。このデータが必要だがストリームが不要になった場合は、削除する前に取り込みテーブルのコピーを作成してください。
client.delete_stream(name="my_catalog.my_schema.my_stream")
ノートブックの例
ストリームを作成し、ストリーミング特徴量を定義し、サービングエンドポイントにデプロイするエンドツーエンドの例については、以下のノートブックを参照してください。