MySQL Table Replication to Databricks Delta

Preview

This feature is in Public Preview.

MySQL table replication to Databricks Delta streams data from a MySQL table directly into Delta, for downstream consumption in Spark analytics or data science workflows. Leveraging the same strategy that MySQL uses for replication to other instances, the binlog is used to identify updates that are then processed and streamed to Databricks as follows:

  1. Reads change events from the database log.
  2. Streams them to Databricks.
  3. Writes in the same order to a Delta table.
  4. Maintains state in case of disconnects from the source.

MySQL configuration

There are a few configuration options required to ensure your database can participate in replication. These must be configured before initiating a structured streaming job and beginning replication to Databricks.

  • The table being replicated from must specify a primary key.
  • The user responsible for replication within MySQL named cdcFromMysql must exist and have at least the following permissions on the source database:
    • SELECT
    • RELOAD
    • SHOW DATABASES
    • REPLICATION SLAVE
    • REPLICATION CLIENT
  • The source MySQL instance must have the following server configs set to generate binary logs in a format Spark can consume:
    • server_id = <value>
    • log_bin = <value>
    • binlog_format = row
    • binlog_row_image = full
  • Depending on server defaults and table size, it may also be necessary to increase the binlog retention period. If Spark’s most recently committed binlog record falls outside the retention period, the stream will fail and it will need to be restarted with a new snapshot and checkpoint location.

Set up a Structured Streaming job in Databricks

In Databricks you initiate replication by writing a Structured Streaming job:

spark.readStream
  .format("preview-cdcFromMysql")
  .option("host", <host>)
  .option("username", <username>)
  .option("password", dbutils.secrets.get(<scope>, <mysql_password>)
  .option("database",  <database>)
  .option("table", <table>)
  .load()
  .writeStream
  .format("preview-cdcToDelta")
  .option("checkpointLocation", <checkpoint location>)
  .start(<path to Delta table target>)

When the stream starts, it will read a full table scan as an initial snapshot of the MySQL table, overwriting to the Delta table location and creating the table if it doesn’t exist. After the snapshot, the stream will scan ongoing changes from the MySQL binlog replication mechanism, and stream these updates to Databricks.

Note

  • The initial snapshot may take a very long time for a large MySQL table, which will cause the stream status to be “Stream initializing…”” for a long time.

    ../_images/initializing-stream.png
  • We have not published official benchmarks, but a reasonable expectation for throughput is around 25 megabytes per second.

Advanced configuration

Most applications do not require any special configuration. For very large data sets, it may be necessary to set up advanced configurations in the execution engine. These parameters are:

  • rowqueue.size: the size of Spark’s initial buffer for the change events, in number of rows. Default is 10000.
  • rowqueue.timeoutMs: the time for which Spark will wait for records to enter the row queue before failing. Default is 60000.
  • offsetFetch.timeoutMs: the time for which Spark will wait to get offsets from the offset reader. Default is 5000.

Limitations

  • Point in time forward is not supported.
  • All timestamps are stored in the UTC time zone.
  • Geometric types are not supported.
  • Schema changes are supported.
  • Configuration errors may not produce clean error messages.
  • Only latin1, UTF8, and UTF8mb4 character sets are supported.
  • A MySQL YEAR value is replicated into Delta as an integer.
  • A MySQL TIME value is replicated into Delta as the number of microseconds after 00:00:00.000000.