Tutorial: Continuously ingest data into Delta Lake with Auto Loader

Continuous, incremental data ingestion is a common need. For example, applications from mobile games to e-commerce websites to IoT sensors generate continuous streams of data. Analysts desire access to the freshest data, yet it can be challenging to implement for several reasons:

  • You may need to transform and ingest data as it arrives, while processing files exactly once.

  • You may want to enforce schemas before writing to tables. This logic can be complex to write and maintain.

  • It is challenging to handle data whose schemas change over time. For example, you must decide how to deal with incoming rows that have data quality problems and how to reprocess those rows after you have solved issues with the raw data.

  • A scalable solution–one that processes thousands or millions of files per minute–requires integrating cloud services like event notifications, message queues, and triggers, which adds to development complexity and long-term maintenance.

Building a continuous, cost effective, maintainable, and scalable data transformation and ingestion system is not trivial. Databricks provides Auto Loader as a built-in, optimized solution that addresses the preceding issues, and provides a way for data teams to load raw data from cloud object stores at lower costs and latencies. Auto Loader automatically configures and listens to a notification service for new files and can scale up to millions of files per second. It also takes care of common issues such as schema inference and schema evolution. To learn more, see Auto Loader.

In this tutorial, you use Auto Loader to incrementally ingest (load) data into a Delta table.

Requirements

  1. A Databricks account, and a Databricks workspace within your account. To create these, see Sign up for a free trial.

  2. An all-purpose cluster within your workspace. To create one, see Create a cluster.

  3. Familiarity with the Databricks workspace user interface. See Navigate the workspace.

Step 1. Create sample data

In this step, you create a notebook in your workspace. In this notebook, you run code that generates a random comma-separated file in your workspace every 30 seconds. Each of these files contains a random set of data.

Note

Auto Loader also works with data in the following formats: Avro, binary, CSV, JSON, ORC, Parquet, and text.

  1. In your workspace, in the sidebar, click Create > Notebook.

  2. In the Create Notebook dialog, enter a name for the notebook, for example Fake Data Generator.

  3. For Default Language, select Python.

  4. For Cluster, select the cluster that you created in the Requirements section, or select another available cluster that you want to use.

  5. Click Create.

  6. In the notebook’s menu bar, if the circle next to the name of the cluster does not contain a green check mark, click the drop-down arrow next to the cluster’s name, and then click Start Cluster. Click Confirm, and then wait until the circle contains a green check mark.

  7. In the notebook’s first cell, paste the following code:

    import csv
    import uuid
    import random
    import time
    from pathlib import Path
    
    count = 0
    path = "/tmp/generated_raw_csv_data"
    Path(path).mkdir(parents=True, exist_ok=True)
    
    while True:
      row_list = [ ["id", "x_axis", "y_axis"],
                   [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)],
                   [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)],
                   [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)]
                 ]
      file_location = f'{path}/file_{count}.csv'
    
      with open(file_location, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerows(row_list)
        file.close()
    
      count += 1
      dbutils.fs.mv(f'file:{file_location}', f'dbfs:{file_location}')
      time.sleep(30)
      print(f'New CSV file created at dbfs:{file_location}. Contents:')
    
      with open(f'/dbfs{file_location}', 'r') as file:
        reader = csv.reader(file, delimiter=' ')
        for row in reader:
          print(', '.join(row))
        file.close()
    

    The preceding code does the following:

    1. Creates a directory at /tmp/generated_raw_csv_data in your workspace, if a directory does not already exist.

      Tip

      If this path already exists in your workspace because someone else ran this tutorial, you may want to clear out any existing files in this path first.

    2. Creates a random set of data, for example:

      id,x_axis,y_axis
      d033faf3-b6bd-4bbc-83a4-43a37ce7e994,88,-13
      fde2bdb6-b0a1-41c2-9650-35af717549ca,-96,19
      297a2dfe-99de-4c52-8310-b24bc2f83874,-23,43
      
    3. After 30 seconds, creates a file named file_<number>.csv, writes the random set of data to the file, stores the file in dbfs:/tmp/generated_raw_csv_data, and reports the path to the file and its contents. <number> starts at 0 and increases by 1 every time a file is created (for example, file_0.csv, file_1.csv, and so on).

  8. In the notebook’s menu bar, click Run All. Leave this notebook running.

    Note

    To view the list of generated files, in the sidebar, click Data. Click DBFS, select a cluster if prompted, and then click tmp > generated_raw_csv_data.

Step 2: Run Auto Loader

In this step you use Auto Loader to continuously read raw data from one location in your workspace and then stream that data into a Delta table in another location in the same workspace.

  1. In the sidebar, click Create > Notebook.

  2. In the Create Notebook dialog, enter a name for the notebook, for example Auto Loader Demo.

  3. For Default Language, select Python.

  4. For Cluster, select the cluster that you created in the Requirements section, or select another available cluster that you want to use.

  5. Click Create.

  6. In the notebook’s menu bar, if the circle next to the name of the cluster does not contain a green check mark, click the drop-down arrow next to the cluster’s name, and then click Start Cluster. Click Confirm, and then wait until the circle contains a green check mark.

  7. In the notebook’s first cell, paste the following code:

    raw_data_location = "dbfs:/tmp/generated_raw_csv_data"
    target_delta_table_location = "dbfs:/tmp/table/coordinates"
    schema_location = "dbfs:/tmp/auto_loader/schema"
    checkpoint_location = "dbfs:/tmp/auto_loader/checkpoint"
    

    This code defines in your workspace the paths to the raw data and the target Delta table, the path to the table’s schema, and the path to the location where Auto Loader writes checkpoint file information in the Delta Lake transaction log. Checkpoints enable Auto Loader to process only new incoming data and to skip over any existing data that has already been processed.

    Tip

    If any of these paths already exist in your workspace because someone else ran this tutorial, you may want to clear out any existing files in these paths first.

  8. With your cursor still in the first cell, run the cell. (To run the cell, press Shift+Enter.) Databricks reads the specified paths into memory.

  9. Add a cell below the first cell, if it is not already there. (To add a cell, rest your mouse pointer along the bottom edge of the cell, and then click the + icon.) In this second cell, paste the following code (note that cloudFiles represents Auto Loader):

    stream = spark.readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("header", "true") \
      .option("cloudFiles.schemaLocation", schema_location) \
      .load(raw_data_location)
    
  10. Run this cell.

  11. In the notebook’s third cell, paste the following code:

    display(stream)
    
  12. Run this cell. Auto Loader begins processing the existing CSV files in raw_data_location as well as any incoming CSV files as they arrive in that location. Auto Loader processes each CSV file by using the first line in the file for the field names and the remaining lines as field data. Databricks displays the data as Auto Loader processes it.

  13. In the notebook’s fourth cell, paste the following code:

    stream.writeStream \
      .option("checkpointLocation", checkpoint_location) \
      .start(target_delta_table_location)
    
  14. Run this cell. Auto Loader writes the data to the Delta table in target_data_table_location. Auto Loader also writes checkpoint file information in checkpoint_location.

Step 3: Evolve and enforce data schema

What happens if your data’s schema changes over time? For example, what if you want to evolve field data types so that in the future you can better enforce data quality issues and to make it easier to do calculations on your data? In this step, you evolve the allowed data types of your data, and then you enforce this schema on incoming data.

Remember to keep your notebook from step 1 running, to maintain the data stream with new generated sample files.

  1. Stop the notebook from step 2. (To stop the notebook, click Stop Execution in the notebook’s menu bar.)

  2. In the notebook from step 2, replace the contents of the fourth cell (the one that starts with stream.writeStream) with the following code:

    stream.printSchema()
    
  3. Run all of the notebook’s cells. (To run all of the cells, click Run All in the notebook’s menu bar.) Databricks prints the data’s schema, which shows all fields as strings. Let’s evolve the x_axis and y_axis fields to integers.

  4. Stop the notebook.

  5. Replace the contents of the second cell (the one that starts with stream = spark.readStream) with the following code:

    stream = spark.readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("header", "true") \
      .option("cloudFiles.schemaLocation", schema_location) \
      .option("cloudFiles.schemaHints", """x_axis integer, y_axis integer""") \
      .load(raw_data_location)
    
  6. Run all of the notebook’s cells. Databricks prints the data’s new schema, which shows the x_axis and y_axis columns as integers. Let’s now enforce data quality by using this new schema.

  7. Stop the notebook.

  8. Replace the contents of the second cell with the following code:

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    schema = StructType([
               StructField('id', StringType(), True),
               StructField('x_axis', IntegerType(), True),
               StructField('y_axis', IntegerType(), True)
             ])
    
    stream = spark.readStream \
      .format("cloudFiles") \
      .option("cloudFiles.format", "csv") \
      .option("header", "true") \
      .option("cloudFiles.schemaLocation", schema_location) \
      .schema(schema) \
      .load(raw_data_location)
    
  9. Run all of the notebook’s cells. Auto Loader now uses its schema inference and evolution logic to determine how to process incoming data that does not match the new schema.

Step 4: Clean up

When you are done with this tutorial, you can clean up the associated Databricks resources in your workspace, if you no longer want to keep them.

Delete the data

  1. Stop both notebooks. (To open a notebook, in the sidebar, click Workspace > Users > your user name, and then click the notebook.)

  2. In the notebook from step 1, add a cell after the first one, and paste the following code into this second cell.

    dbutils.fs.rm("dbfs:/tmp/generated_raw_csv_data", True)
    dbutils.fs.rm("dbfs:/tmp/table", True)
    dbutils.fs.rm("dbfs:/tmp/auto_loader", True)
    

    Warning

    If you have any other information in these locations, this information will also be deleted!

  3. Run the cell. Databricks deletes the directories that contain the raw data, the Delta table, the table’s schema, and the Auto Loader checkpoint information.

Delete the notebooks

  1. In the sidebar, click Workspace > Users > your user name.

  2. Click the drop-down arrow next to the first notebook, and click Move to Trash.

  3. Click Confirm and move to Trash.

  4. Repeat steps 1 - 3 for the second notebook.

Stop the cluster

If you are not using the cluster for any other tasks, you should stop it to avoid additional costs.

  1. In the sidebar, click Compute.

  2. Click the cluster’s name.

  3. Click Terminate.

  4. Click Confirm.

Additional resources