Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.
This topic explains how to use Structured Streaming with Azure Event Hubs and Databricks clusters.
In this topic:
The Azure Event Hubs Spark Connector, developed by Microsoft, requires Databricks Runtime 3.5-LTS or above.
For current release support, see “Latest Releases” in the Azure Event Hubs Spark Connector project readme file.
Create a library in your Databricks workspace using the Maven coordinate
This connector is updated regularly, and a more recent version may be available: we recommend that you pull the latest connector from the Maven repository
Install the created library into your cluster.
The schema of the records is:
body is always provided as a byte array. Use
cast("string") to explicitly deserialize the
Let’s start with a quick example: WordCount. The following notebook is all that it takes to run WordCount using Structured Streaming with Azure Event Hubs.
This section discusses the configuration settings you need to work with Event Hubs.
For detailed guidance on configuring Structured Streaming with Azure Event Hubs, see the Structured Streaming and Azure Event Hubs Integration Guide developed by Microsoft.
For detailed guidance on using Structured Streaming, see Structured Streaming Guide.
An Event Hubs connection string is required to connect to the Event Hubs service. You can get the connection string
for your Event Hubs instance from the Azure Portal or by using the
in the library.
When you get the connection string from the Azure portal, it may or may not have the
EntityPath key. Consider:
// Without an entity path val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>" // With an entity path val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
To connect to your EventHubs, an
EntityPath must be present. If your connection string doesn’t have one, don’t worry.
This will take care of it:
import org.apache.spark.eventhubs.ConnectionStringBuilder val connectionString = ConnectionStringBuilder(without) // defined in the previous code block .setEventHubName("<eventhub-name>") .build
Alternatively, you can use the
ConnectionStringBuilder to make your connection string.
import org.apache.spark.eventhubs.ConnectionStringBuilder val connectionString = ConnectionStringBuilder() .setNamespaceName("<namespace-name>") .setEventHubName("<eventhub-name>") .setSasKeyName("<key-name>") .setSasKey("<key>") .build
All configuration relating to Event Hubs happens in your
EventHubsConf. To create an
EventHubsConf, you must
pass a connection string:
val connectionString = "<event-hub-connection-string>" val eventHubsConf = EventHubsConf(connectionString)
See Connection String for more information about obtaining a valid connection string.
For a complete list of configurations, see EventHubsConf. Here is a subset of configurations to get you started:
|consumerGroup||String||“$Default”||Streaming and batch||A consumer group is a view of an entire event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. More information is available in the Microsoft documentation.|
|startingPosition||EventPosition||Start of stream||Streaming and batch||The starting position for your Structured Streaming job. See startingPositions for information about the order in which options are read.|
|maxEventsPerTrigger||long||partitionCount * 1000||Streaming query||Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume.|
For each option, there exists a corresponding setting in
EventHubsConf. For example:
import org.apache.spark.eventhubs. val cs = "<your-connection-string>" val eventHubsConf = EventHubsConf(cs) .setConsumerGroup("sample-cg") .setMaxEventsPerTrigger(10000)
EventHubsConf allows users to specify starting (and ending) positions with the
defines the position of an event in an Event Hub partition. The position can be an enqueued time, offset, sequence number,
the start of the stream, or the end of the stream.
import org.apache.spark.eventhubs._ EventPosition.fromOffset("246812") // Specifies offset 246812 EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100 EventPosition.fromEnqueuedTime(Instant.now) // Specifies any event after the current time EventPosition.fromStartOfStream // Specifies from start of stream EventPosition.fromEndOfStream // Specifies from end of stream
If you would like to start (or end) at a specific position, simply create the correct
set it in your
val connectionString = "<event-hub-connection-string>" val eventHubsConf = EventHubsConf(connectionString) .setStartingPosition(EventPosition.fromEndOfStream)
When you run streaming queries in production, you probably want more robustness and uptime guarantees than you would have when you simply attach a notebook to a cluster and run your streaming queries interactively. Import and run the following notebook for a demonstration of how to configure and run Structured Streaming in production with Azure Event Hubs and Databricks.
For more information, see Structured Streaming in Production.
For an end-to-end example of streaming data into a cluster using Event Hubs, see Tutorial: Stream data into Azure Databricks using Event Hubs.