Skip to main content

Set up a Stream

Preview

This feature is in Public Preview. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.

A Stream represents an external streaming data source, such as Apache Kafka. Streams store connection details, authentication, schemas, and ingestion configuration. After a stream is created, you can reference it using Feature View definitions to create real-time streaming features.

Streams have three-part names (catalog.schema.stream_name). Access to a Stream is governed by its associated ingestion table. See Ingestion and backfill for details.

Requirements

  • For running notebook commands: serverless or a classic compute cluster running Databricks Runtime 17.0 ML or above.
  • The feature-engineering-client Python package version 0.16.0 or above must be installed.

Create a stream

Use create_stream() to create a new Stream. A Stream requires four configuration components:

  • Source config: Specifies the streaming platform (for example, Kafka) and source-specific details (such as topic subscription for Kafka).
  • Connection config: Specifies how to connect and authenticate to the streaming platform, including bootstrap servers and credentials.
  • Schema config: Defines the structure of message keys and values.
  • Ingestion config: Specifies where and how stream data is ingested. See Ingestion and backfill for details.
Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
KafkaStreamConfig,
KafkaSubscriptionMode,
StreamConnectionConfig,
DirectSchemas,
SchemaConfig,
IngestionConfig,
IngestionDestination,
StreamBackfillSource,
)

client = FeatureEngineeringClient()

stream = client.create_stream(
name="my_catalog.my_schema.my_stream",
source_config=KafkaStreamConfig(
subscription_mode=KafkaSubscriptionMode(subscribe="events-topic"),
),
connection_config=StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
),
schema_config=DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "transaction_id": {"type": "string"},'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string", "format": "date-time"}'
' }'
'}'
)
),
),
ingestion_config=IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
),
)

Connecting to stream sources

Before defining streaming features, connect and test a streaming Lakeflow Spark Declarative Pipelines pipeline connection to your Kafka broker. See Streaming on serverless compute and Connect to Apache Kafka.

For AWS managed streaming (Amazon MSK), see Serverless private connectivity to Amazon MSK. For details on Kafka authentication options, see Authentication.

Authentication

Use a Unity Catalog connection to authenticate to your Kafka cluster. This is the recommended approach for managed authentication. To create a connection, see Create a connection.

Python
connection_config = StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
)

Direct mTLS

For direct mTLS authentication, provide keystore and truststore files stored on a Unity Catalog volume, with passwords referenced through Databricks secret scopes. For more information on SSL authentication with Kafka, see Use SSL to connect Databricks to Kafka.

Python
from databricks.feature_engineering.entities import (
DirectMtlsConfig,
MtlsConfig,
SecretScopeReference,
)

connection_config = DirectMtlsConfig(
bootstrap_servers="broker1:9092,broker2:9092",
mtls_config=MtlsConfig(
keystore_location="/Volumes/my_catalog/my_schema/my_volume/keystore.jks",
keystore_password_ref=SecretScopeReference(
scope="my_scope", key="keystore_password"
),
key_password_ref=SecretScopeReference(
scope="my_scope", key="key_password"
),
truststore_location="/Volumes/my_catalog/my_schema/my_volume/truststore.jks",
truststore_password_ref=SecretScopeReference(
scope="my_scope", key="truststore_password"
),
),
)

SASL

SASL authentication (both SASL/SCRAM and SASL/PLAIN) is not supported during the preview.

Subscription modes

The subscription mode specifies how the Stream selects Kafka topics to consume from. Three modes are supported:

Mode

Description

Example

subscribe

Comma-separated list of topic names

KafkaSubscriptionMode(subscribe="topic1,topic2")

subscribe_pattern

Java regex pattern matching topic names

KafkaSubscriptionMode(subscribe_pattern="events-.*")

assign

JSON specifying topic-partition assignments

KafkaSubscriptionMode(assign='{"my-topic": [0, 1, 2]}')

Schema configuration

Define the structure of message keys and values using JSON Schema format. For Kafka sources, payload_schema corresponds to the Kafka message value (the value in Kafka's key-value model) and key_schema corresponds to the Kafka message key. At least one of payload_schema or key_schema must be provided.

Python
schema_config = DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string"}'
' }'
'}'
)
),
key_schema=SchemaConfig(
json_schema='{"type": "string"}'
),
)

If no schema is provided for a key or payload, it is treated as a simple string.

Ingestion and backfill

The ingestion_config parameter configures how stream data is captured and stored for training and serving.

Access to a Stream is governed by the ingestion table:

  • SELECT on the ingestion table grants read access to the Stream.
  • MANAGE on the ingestion table grants delete access.

For more information on table privileges, see Table and Unity Catalog privileges reference.

Ingestion pipeline

When a stream is created, Databricks starts a managed ingestion pipeline that continuously reads messages from the Kafka topic and writes them into a Delta table (the ingestion table). The pipeline starts from the latest Kafka offset and runs continuously, capturing only new messages that arrive after the stream is created. This ingestion table is used for training with streaming features. When a stream is deleted, its ingestion pipeline and ingestion table are also deleted.

Ingestion destination

The ingestion_destination specifies the three-part Delta table name where stream data is written.

Python
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
)

Ingestion table schema

The ingestion table contains the message data along with metadata columns:

Column

Type

Description

key

Varies (from key_schema)

The Kafka message key, structured according to the schema you provided.

value

Varies (from payload_schema)

The Kafka message value (payload), structured according to the schema you provided.

stream_record_timestamp

TIMESTAMP

The record timestamp. For forward-fill data, this is the Kafka broker ingest timestamp. For backfill data, this is customer-supplied.

kafka_topic

STRING

The Kafka topic the record was consumed from.

kafka_partition

INT

The Kafka partition the record was consumed from.

kafka_offset

LONG

The Kafka offset of the record within its partition.

record_source

STRING

Either "stream" (forward-fill from the live Kafka stream) or "backfill" (from the backfill source).

Backfill source

Because the forward-fill pipeline starts from the latest Kafka offset, it does not capture messages that existed before the stream was created. To provide historical data coverage for training, configure an optional backfill source.

When a backfill source is configured, Databricks runs a one-time MERGE INTO job that copies backfill rows into the ingestion table with record_source="backfill". The MERGE runs only after the overlap checker confirms that the backfill source and the forward-fill stream have overlapping timestamps (see Overlap between backfill and live stream data). If the overlap condition is not met within 2 days, the MERGE runs anyway to avoid blocking indefinitely.

The backfill table must include a stream_record_timestamp column of type TIMESTAMP in UTC timezone. Other Kafka metadata columns (kafka_topic, kafka_partition, kafka_offset) are passed through if present on the backfill source, or set to NULL otherwise.

Python
from databricks.feature_engineering.entities import StreamBackfillSource

ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
backfill_source=StreamBackfillSource(
delta_table_name="my_catalog.my_schema.historical_events"
),
)

Overlap between backfill and live stream data

Before running a MERGE between backfill and the ingestion table, an overlap check compares timestamps on the two tables:

  • Backfill max: The maximum stream_record_timestamp in the backfill source.
  • Ingestion min: The minimum stream_record_timestamp of rows (record_source="stream") in the ingestion table.

The MERGE proceeds when the backfill's latest timestamp exceeds the ingestion table's earliest timestamp by at least 1 hour. This overlap ensures there are no gaps in the ingestion table. If the overlap condition is not met within 2 days, the MERGE runs anyway to avoid blocking indefinitely.

Because the ingestion pipeline starts from the latest Kafka offset, it only captures messages arriving after the stream is created. Your backfill source must contain data that extends into the ingestion time range — not just up to the stream creation time.

For example, if you create a stream at 3:00 PM, the forward-fill pipeline begins reading messages from 3:00 PM onward. Your backfill source must include data with timestamps through at least 4:00 PM (1 hour past the forward-fill start) to satisfy the overlap check. This means you should update your backfill table after 4:00 pm to ensure ingestion table has no gaps.

Deduplication

Use deduplication_columns to specify column paths for identifying duplicate rows during ingestion between backfill and forward-fill stream data. Use dot notation for nested fields (for example, "value.user_id").

Choose deduplication columns based on your data:

  • If each record in your stream contains a unique identifier (for example, value.transaction_id), use that column for deduplication.
  • If your backfill source includes kafka_partition and kafka_offset columns, use those to uniquely identify each record.
  • If no deduplication columns are specified, the default deduplication key is the full combination of key, value, and stream_record_timestamp. This is not recommended as this strict criteria matching can easily lead to duplicates.
Python
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
deduplication_columns=["value.transaction_id"],
)

Manage streams

Get a stream

Python
stream = client.get_stream(name="my_catalog.my_schema.my_stream")

List streams

Python
streams = client.list_streams(
catalog_name="my_catalog",
schema_name="my_schema",
max_results=50,
include_schemas=False,
)

Set include_schemas=True to include full schema details. Schemas can be large and this might result in a long-running operation. To retrieve schemas individually instead, use get_stream.

Delete a stream

Deleting a stream also deletes its ingestion pipeline and ingestion table.

warning

Any models or features that reference the deleted stream will no longer have access to the underlying stream data. Create a copy of the ingestion table before deletion if you need this data but no longer need the stream.

Python
client.delete_stream(name="my_catalog.my_schema.my_stream")

Example notebook

For an end-to-end example that creates a Stream, defines streaming features, and deploys to a serving endpoint, see the following notebook:

Streaming Feature Views quickstart notebook

Open notebook in new tab