This feature is in Public Preview.
MySQL table replication to Delta Lake streams data from a MySQL table directly into Delta Lake, for downstream consumption in Spark analytics or data science workflows. Leveraging the same strategy that MySQL uses for replication to other instances, the
binlog is used to identify updates that are then processed and streamed to Databricks as follows:
- Reads change events from the database log.
- Streams them to Databricks.
- Writes in the same order to a Delta Lake table.
- Maintains state in case of disconnects from the source.
There are a few configuration options required to ensure your database can participate in replication. These must be configured before initiating a structured streaming job and beginning replication to Databricks.
- The table being replicated from must specify a primary key.
- The user responsible for replication within MySQL named
cdcFromMysqlmust exist and have at least the following permissions on the source database:
- The source MySQL instance must have the following server configs set to generate binary logs in a format Spark can consume:
server_id = <value>
log_bin = <value>
binlog_format = row
binlog_row_image = full
- Depending on server defaults and table size, it may also be necessary to increase the
binlogretention period. If Spark’s most recently committed
binlogrecord falls outside the retention period, the stream will fail and it will need to be restarted with a new snapshot and checkpoint location.
In Databricks you initiate replication by writing a Structured Streaming job:
spark.readStream .format("preview-cdcFromMysql") .option("host", <host>) .option("username", <username>) .option("password", dbutils.secrets.get(<scope>, <mysql_password>) .option("database", <database>) .option("table", <table>) .load() .writeStream .format("preview-cdcToDelta") .option("checkpointLocation", <checkpoint location>) .start(<path to Delta Lake table target>)
When the stream starts, it will read a full table scan as an initial snapshot of the MySQL table, overwriting to the Delta Lake table location and creating the table if it doesn’t exist. After the snapshot, the stream will scan ongoing changes from the MySQL
binlog replication mechanism, and stream these updates to Databricks.
The initial snapshot may take a very long time for a large MySQL table, which will cause the stream status to be “Stream initializing…”” for a long time.
We have not published official benchmarks, but a reasonable expectation for throughput is around 25 megabytes per second.
Most applications do not require any special configuration. For very large data sets, it may be necessary to set up advanced configurations in the execution engine. These parameters are:
rowqueue.size: the size of Spark’s initial buffer for the change events, in number of rows. Default is 10000.
rowqueue.timeoutMs: the time for which Spark will wait for records to enter the row queue before failing. Default is 60000.
offsetFetch.timeoutMs: the time for which Spark will wait to get offsets from the offset reader. Default is 5000.
- Point in time forward is not supported.
- All timestamps are stored in the UTC time zone.
- Geometric types are not supported.
- Schema changes are supported.
- Configuration errors may not produce clean error messages.
- Only latin1, UTF8, and UTF8mb4 character sets are supported.
- A MySQL
YEARvalue is replicated into Delta Lake as an integer.
- A MySQL
TIMEvalue is replicated into Delta Lake as the number of microseconds after