Set up a Stream
This feature is in Public Preview. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.
A Stream represents an external streaming data source, such as Apache Kafka. Streams store connection details, authentication, schemas, and ingestion configuration. After a stream is created, you can reference it using Feature View definitions to create real-time streaming features.
Streams have three-part names (catalog.schema.stream_name). Access to a Stream is governed by its associated ingestion table. See Ingestion and backfill for details.
Requirements
- For running notebook commands: serverless or a classic compute cluster running Databricks Runtime 17.0 ML or above.
- The
feature-engineering-clientPython package version 0.16.0 or above must be installed.
Create a stream
Use create_stream() to create a new Stream. A Stream requires four configuration components:
- Source config: Specifies the streaming platform (for example, Kafka) and source-specific details (such as topic subscription for Kafka).
- Connection config: Specifies how to connect and authenticate to the streaming platform, including bootstrap servers and credentials.
- Schema config: Defines the structure of message keys and values.
- Ingestion config: Specifies where and how stream data is ingested. See Ingestion and backfill for details.
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
KafkaStreamConfig,
KafkaSubscriptionMode,
StreamConnectionConfig,
DirectSchemas,
SchemaConfig,
IngestionConfig,
IngestionDestination,
StreamBackfillSource,
)
client = FeatureEngineeringClient()
stream = client.create_stream(
name="my_catalog.my_schema.my_stream",
source_config=KafkaStreamConfig(
subscription_mode=KafkaSubscriptionMode(subscribe="events-topic"),
),
connection_config=StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
),
schema_config=DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "transaction_id": {"type": "string"},'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string", "format": "date-time"}'
' }'
'}'
)
),
),
ingestion_config=IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
),
)
Connecting to stream sources
Before defining streaming features, connect and test a streaming Lakeflow Spark Declarative Pipelines pipeline connection to your Kafka broker. See Streaming on serverless compute and Connect to Apache Kafka.
For AWS managed streaming (Amazon MSK), see Serverless private connectivity to Amazon MSK. For details on Kafka authentication options, see Authentication.
Authentication
Unity Catalog connection (recommended)
Use a Unity Catalog connection to authenticate to your Kafka cluster. This is the recommended approach for managed authentication. To create a connection, see Create a connection.
connection_config = StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
)
Direct mTLS
For direct mTLS authentication, provide keystore and truststore files stored on a Unity Catalog volume, with passwords referenced through Databricks secret scopes. For more information on SSL authentication with Kafka, see Use SSL to connect Databricks to Kafka.
from databricks.feature_engineering.entities import (
DirectMtlsConfig,
MtlsConfig,
SecretScopeReference,
)
connection_config = DirectMtlsConfig(
bootstrap_servers="broker1:9092,broker2:9092",
mtls_config=MtlsConfig(
keystore_location="/Volumes/my_catalog/my_schema/my_volume/keystore.jks",
keystore_password_ref=SecretScopeReference(
scope="my_scope", key="keystore_password"
),
key_password_ref=SecretScopeReference(
scope="my_scope", key="key_password"
),
truststore_location="/Volumes/my_catalog/my_schema/my_volume/truststore.jks",
truststore_password_ref=SecretScopeReference(
scope="my_scope", key="truststore_password"
),
),
)
SASL
SASL authentication (both SASL/SCRAM and SASL/PLAIN) is not supported during the preview.
Subscription modes
The subscription mode specifies how the Stream selects Kafka topics to consume from. Three modes are supported:
Mode | Description | Example |
|---|---|---|
| Comma-separated list of topic names |
|
| Java regex pattern matching topic names |
|
| JSON specifying topic-partition assignments |
|
Schema configuration
Define the structure of message keys and values using JSON Schema format. For Kafka sources, payload_schema corresponds to the Kafka message value (the value in Kafka's key-value model) and key_schema corresponds to the Kafka message key. At least one of payload_schema or key_schema must be provided.
schema_config = DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string"}'
' }'
'}'
)
),
key_schema=SchemaConfig(
json_schema='{"type": "string"}'
),
)
If no schema is provided for a key or payload, it is treated as a simple string.
Ingestion and backfill
The ingestion_config parameter configures how stream data is captured and stored for training and serving.
Access to a Stream is governed by the ingestion table:
SELECTon the ingestion table grants read access to the Stream.MANAGEon the ingestion table grants delete access.
For more information on table privileges, see Table and Unity Catalog privileges reference.
Ingestion pipeline
When a stream is created, Databricks starts a managed ingestion pipeline that continuously reads messages from the Kafka topic and writes them into a Delta table (the ingestion table). The pipeline starts from the latest Kafka offset and runs continuously, capturing only new messages that arrive after the stream is created. This ingestion table is used for training with streaming features. When a stream is deleted, its ingestion pipeline and ingestion table are also deleted.
Ingestion destination
The ingestion_destination specifies the three-part Delta table name where stream data is written.
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
)
Ingestion table schema
The ingestion table contains the message data along with metadata columns:
Column | Type | Description |
|---|---|---|
| Varies (from | The Kafka message key, structured according to the schema you provided. |
| Varies (from | The Kafka message value (payload), structured according to the schema you provided. |
|
| The record timestamp. For forward-fill data, this is the Kafka broker ingest timestamp. For backfill data, this is customer-supplied. |
|
| The Kafka topic the record was consumed from. |
|
| The Kafka partition the record was consumed from. |
|
| The Kafka offset of the record within its partition. |
|
| Either |
Backfill source
Because the forward-fill pipeline starts from the latest Kafka offset, it does not capture messages that existed before the stream was created. To provide historical data coverage for training, configure an optional backfill source.
When a backfill source is configured, Databricks runs a one-time MERGE INTO job that copies backfill rows into the ingestion table with record_source="backfill". The MERGE runs only after the overlap checker confirms that the backfill source and the forward-fill stream have overlapping timestamps (see Overlap between backfill and live stream data). If the overlap condition is not met within 2 days, the MERGE runs anyway to avoid blocking indefinitely.
The backfill table must include a stream_record_timestamp column of type TIMESTAMP in UTC timezone. Other Kafka metadata columns (kafka_topic, kafka_partition, kafka_offset) are passed through if present on the backfill source, or set to NULL otherwise.
from databricks.feature_engineering.entities import StreamBackfillSource
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
backfill_source=StreamBackfillSource(
delta_table_name="my_catalog.my_schema.historical_events"
),
)
Overlap between backfill and live stream data
Before running a MERGE between backfill and the ingestion table, an overlap check compares timestamps on the two tables:
- Backfill max: The maximum
stream_record_timestampin the backfill source. - Ingestion min: The minimum
stream_record_timestampof rows (record_source="stream") in the ingestion table.
The MERGE proceeds when the backfill's latest timestamp exceeds the ingestion table's earliest timestamp by at least 1 hour. This overlap ensures there are no gaps in the ingestion table. If the overlap condition is not met within 2 days, the MERGE runs anyway to avoid blocking indefinitely.
Because the ingestion pipeline starts from the latest Kafka offset, it only captures messages arriving after the stream is created. Your backfill source must contain data that extends into the ingestion time range — not just up to the stream creation time.
For example, if you create a stream at 3:00 PM, the forward-fill pipeline begins reading messages from 3:00 PM onward. Your backfill source must include data with timestamps through at least 4:00 PM (1 hour past the forward-fill start) to satisfy the overlap check. This means you should update your backfill table after 4:00 pm to ensure ingestion table has no gaps.
Deduplication
Use deduplication_columns to specify column paths for identifying duplicate rows during ingestion between backfill and forward-fill stream data. Use dot notation for nested fields (for example, "value.user_id").
Choose deduplication columns based on your data:
- If each record in your stream contains a unique identifier (for example,
value.transaction_id), use that column for deduplication. - If your backfill source includes
kafka_partitionandkafka_offsetcolumns, use those to uniquely identify each record. - If no deduplication columns are specified, the default deduplication key is the full combination of
key,value, andstream_record_timestamp. This is not recommended as this strict criteria matching can easily lead to duplicates.
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
deduplication_columns=["value.transaction_id"],
)
Manage streams
Get a stream
stream = client.get_stream(name="my_catalog.my_schema.my_stream")
List streams
streams = client.list_streams(
catalog_name="my_catalog",
schema_name="my_schema",
max_results=50,
include_schemas=False,
)
Set include_schemas=True to include full schema details. Schemas can be large and this might result in a long-running operation. To retrieve schemas individually instead, use get_stream.
Delete a stream
Deleting a stream also deletes its ingestion pipeline and ingestion table.
Any models or features that reference the deleted stream will no longer have access to the underlying stream data. Create a copy of the ingestion table before deletion if you need this data but no longer need the stream.
client.delete_stream(name="my_catalog.my_schema.my_stream")
Example notebook
For an end-to-end example that creates a Stream, defines streaming features, and deploys to a serving endpoint, see the following notebook: