Skip to main content

Tutorial: Build a geospatial pipeline with native spatial types

Learn how to create and deploy a pipeline that ingests GPS data, converts coordinates to native spatial types, and joins against warehouse geofences to track arrivals using Lakeflow Spark Declarative Pipelines (SDP) for data orchestration and Auto Loader. This tutorial uses Databricks native spatial types (GEOMETRY, GEOGRAPHY) and built-in spatial functions such as ST_Point, ST_GeomFromWKT, and ST_Contains, so you can run geospatial workflows at scale without external libraries.

In this tutorial, you will:

  • Create a pipeline and generate sample GPS and geofence data in a Unity Catalog volume.
  • Ingest raw GPS pings incrementally with Auto Loader into a bronze streaming table.
  • Build a silver streaming table that converts latitude and longitude to a native GEOMETRY point.
  • Create a materialized view of warehouse geofences from WKT polygons.
  • Run a spatial join to produce a table of warehouse arrivals (which device entered which geofence).

The result is a medallion-style pipeline: bronze (raw GPS), silver (points as geometry), and gold (geofences and arrival events). See What is the medallion lakehouse architecture? for more information.

Requirements

To complete this tutorial, you must meet the following requirements:

Step 1: Create a pipeline

Create a new ETL pipeline and set the default catalog and schema for your tables.

  1. In your workspace, click Plus icon. New in the upper-left corner.

  2. Click ETL Pipeline.

  3. Change the title of the pipeline to Spatial pipeline tutorial or a name you prefer.

  4. Under the title, choose a catalog and schema for which you have write permissions.

    This catalog and schema are used by default when you do not specify a catalog or schema in your code. Replace <catalog> and <schema> in the following steps with the values you choose here.

  5. From Advanced options, select Start with an empty file.

  6. Choose a folder for your code. You can select Browse to choose a folder; you can use a Git folder for version control.

  7. Choose Python or SQL for the language of your first file. You can add files in the other language later.

  8. Click Select to create the pipeline and open the Lakeflow Pipelines Editor.

You now have a blank pipeline with a default catalog and schema. Next, create the sample GPS and geofence data.

Step 2: Create the sample GPS and geofence data

This step generates sample data in a volume: raw GPS pings (JSON) and warehouse geofences (JSON with WKT polygons). The GPS points are generated in a bounding box that overlaps the two warehouse polygons, so the spatial join in a later step will return arrival rows. You can skip this step if you already have your own data in a volume or table.

  1. In the Lakeflow Pipelines Editor, in the asset browser, click Plus icon. Add, then Exploration.

  2. Set Name to Setup spatial data, choose Python, and leave the default destination folder.

  3. Click Create.

  4. In the new notebook, paste the following code. Replace <catalog> and <schema> with the default catalog and schema you set in Step 1.

    Use the following code in the notebook to generate GPS and geofence data.

    Python
    from pyspark.sql import functions as F

    catalog = "<catalog>" # for example, "main"
    schema = "<schema>" # for example, "default"

    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"

    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
    spark.range(0, 5000)
    .repartition(10)
    .select(
    F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
    F.current_timestamp().alias("timestamp"),
    (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1
    (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2
    )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")

    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
    ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
    ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
  5. Run the notebook cell (Shift + Enter).

After the run completes, the volume contains gps (raw pings) and geofences (polygons in WKT). In the next step, you ingest the GPS data into a bronze table.

Step 3: Ingest GPS data into a bronze streaming table

Ingest the raw GPS JSON from the volume incrementally using Auto Loader and write into a bronze streaming table.

  1. In the asset browser, click Plus icon. Add, then Transformation.

  2. Set Name to gps_bronze, choose SQL or Python, and click Create.

  3. Replace the file contents with the following (use the tab that matches your language). Replace <catalog> and <schema> with your default catalog and schema.

    SQL
    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";

    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
    "/Volumes/<catalog>/<schema>/raw_data/gps",
    format => "json",
    inferColumnTypes => "true"
    )
  4. Click Play icon. Run file or Run pipeline to run an update.

When the update completes, the pipeline graph shows the gps_bronze table. Next, add a silver table that converts coordinates to a native geometry point.

Step 4: Add a silver streaming table with geometry points

Create a streaming table that reads from the bronze table and adds a GEOMETRY column using ST_Point(longitude, latitude).

  1. In the asset browser, click Plus icon. Add, then Transformation.

  2. Set Name to raw_gps_silver, choose SQL or Python, and click Create.

  3. Paste the following code into the new file.

    SQL
    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";

    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
    device_id,
    timestamp,
    longitude,
    latitude,
    ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
  4. Click Play icon. Run file or Run pipeline.

The pipeline graph now shows gps_bronze and raw_gps_silver. Next, add the warehouse geofences as a materialized view.

Step 5: Create the warehouse geofences gold table

Create a materialized view that reads the geofences from the volume and converts the WKT column to a GEOMETRY column using ST_GeomFromWKT.

  1. In the asset browser, click Plus icon. Add, then Transformation.

  2. Set Name to warehouse_geofences_gold, choose SQL or Python, and click Create.

  3. Paste the following code. Replace <catalog> and <schema> with your default catalog and schema.

    SQL
    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
    warehouse_name,
    ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
    "/Volumes/<catalog>/<schema>/raw_data/geofences",
    format => "json"
    )
  4. Click Play icon. Run file or Run pipeline.

The pipeline now includes the geofences table. Next, add the spatial join to compute warehouse arrivals.

Step 6: Create the warehouse arrivals table with a spatial join

Add a materialized view that joins the silver GPS points to the geofences using ST_Contains(boundary_geom, point_geom) to determine when a device is inside a warehouse polygon.

  1. In the asset browser, click Plus icon. Add, then Transformation.

  2. Set Name to warehouse_arrivals, choose SQL or Python, and click Create.

  3. Paste the following code.

    SQL
    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
    g.device_id,
    g.timestamp,
    w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
    ON ST_Contains(w.boundary_geom, g.point_geom)
  4. Click Play icon. Run file or Run pipeline.

When the update completes, the pipeline graph shows all four datasets: gps_bronze, raw_gps_silver, warehouse_geofences_gold, and warehouse_arrivals.

Verify the spatial join

Confirm that the spatial join produced rows: points from the silver table that fall inside a geofence appear in warehouse_arrivals. Run one of the following in a notebook or SQL editor (use the same catalog and schema as your pipeline target).

Count arrivals by warehouse (SQL):

SQL
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

You should see non-zero counts for Warehouse_A and Warehouse_B (the sample GPS data overlaps both polygons). To inspect sample rows:

SQL
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Same checks in Python (notebook):

Python
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

If you see rows in warehouse_arrivals, the ST_Contains(boundary_geom, point_geom) join is working correctly.

Step 7: Schedule the pipeline (optional)

To keep the pipeline up to date as new GPS data lands in the volume, create a job to run the pipeline on a schedule.

  1. At the top of the editor, choose the Schedule button.
  2. If the Schedules dialog appears, choose Add schedule.
  3. Optionally, give the job a name.
  4. By default, the schedule runs once per day. You can accept this or set your own. Choosing Advanced lets you set a specific time; More options lets you add run notifications.
  5. Select Create to apply the schedule.

See Monitoring and observability for Lakeflow Jobs for more information about job runs.

Additional resources