Tutorial: Build a geospatial pipeline with native spatial types
Learn how to create and deploy a pipeline that ingests GPS data, converts coordinates to native spatial types, and joins against warehouse geofences to track arrivals using Lakeflow Spark Declarative Pipelines (SDP) for data orchestration and Auto Loader. This tutorial uses Databricks native spatial types (GEOMETRY, GEOGRAPHY) and built-in spatial functions such as ST_Point, ST_GeomFromWKT, and ST_Contains, so you can run geospatial workflows at scale without external libraries.
In this tutorial, you will:
- Create a pipeline and generate sample GPS and geofence data in a Unity Catalog volume.
- Ingest raw GPS pings incrementally with Auto Loader into a bronze streaming table.
- Build a silver streaming table that converts latitude and longitude to a native
GEOMETRYpoint. - Create a materialized view of warehouse geofences from WKT polygons.
- Run a spatial join to produce a table of warehouse arrivals (which device entered which geofence).
The result is a medallion-style pipeline: bronze (raw GPS), silver (points as geometry), and gold (geofences and arrival events). See What is the medallion lakehouse architecture? for more information.
Requirements
To complete this tutorial, you must meet the following requirements:
- Be logged in to a Databricks workspace.
- Have Unity Catalog enabled for your workspace.
- Have serverless compute enabled for your account if you want to use Serverless Lakeflow Spark Declarative Pipelines. If serverless compute is not enabled, the steps work with the default compute for your workspace.
- Have permission to create a compute resource or access to a compute resource.
- Have permissions to create a new schema in a catalog. The required permissions are
USE CATALOGandCREATE SCHEMA. - Have permissions to create a new volume in an existing schema. The required permissions are
USE SCHEMAandCREATE VOLUME. - Use a runtime that supports native spatial types and spatial functions.
Step 1: Create a pipeline
Create a new ETL pipeline and set the default catalog and schema for your tables.
-
In your workspace, click
New in the upper-left corner.
-
Click ETL Pipeline.
-
Change the title of the pipeline to
Spatial pipeline tutorialor a name you prefer. -
Under the title, choose a catalog and schema for which you have write permissions.
This catalog and schema are used by default when you do not specify a catalog or schema in your code. Replace
<catalog>and<schema>in the following steps with the values you choose here. -
From Advanced options, select Start with an empty file.
-
Choose a folder for your code. You can select Browse to choose a folder; you can use a Git folder for version control.
-
Choose Python or SQL for the language of your first file. You can add files in the other language later.
-
Click Select to create the pipeline and open the Lakeflow Pipelines Editor.
You now have a blank pipeline with a default catalog and schema. Next, create the sample GPS and geofence data.
Step 2: Create the sample GPS and geofence data
This step generates sample data in a volume: raw GPS pings (JSON) and warehouse geofences (JSON with WKT polygons). The GPS points are generated in a bounding box that overlaps the two warehouse polygons, so the spatial join in a later step will return arrival rows. You can skip this step if you already have your own data in a volume or table.
-
In the Lakeflow Pipelines Editor, in the asset browser, click
Add, then Exploration.
-
Set Name to
Setup spatial data, choose Python, and leave the default destination folder. -
Click Create.
-
In the new notebook, paste the following code. Replace
<catalog>and<schema>with the default catalog and schema you set in Step 1.Use the following code in the notebook to generate GPS and geofence data.
Pythonfrom pyspark.sql import functions as F
catalog = "<catalog>" # for example, "main"
schema = "<schema>" # for example, "default"
spark.sql(f"USE CATALOG `{catalog}`")
spark.sql(f"USE SCHEMA `{schema}`")
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
# GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
gps_path = f"{volume_base}/gps"
df_gps = (
spark.range(0, 5000)
.repartition(10)
.select(
F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
F.current_timestamp().alias("timestamp"),
(-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1
(34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2
)
)
df_gps.write.format("json").mode("overwrite").save(gps_path)
print(f"Wrote 5000 GPS rows to {gps_path}")
# Geofences: two warehouse polygons (WKT) in the same region
geofences_path = f"{volume_base}/geofences"
geofences_data = [
("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
]
df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
df_geo.write.format("json").mode("overwrite").save(geofences_path)
print(f"Wrote {len(geofences_data)} geofences to {geofences_path}") -
Run the notebook cell (Shift + Enter).
After the run completes, the volume contains gps (raw pings) and geofences (polygons in WKT). In the next step, you ingest the GPS data into a bronze table.
Step 3: Ingest GPS data into a bronze streaming table
Ingest the raw GPS JSON from the volume incrementally using Auto Loader and write into a bronze streaming table.
-
In the asset browser, click
Add, then Transformation.
-
Set Name to
gps_bronze, choose SQL or Python, and click Create. -
Replace the file contents with the following (use the tab that matches your language). Replace
<catalog>and<schema>with your default catalog and schema.- SQL
- Python
SQLCREATE OR REFRESH STREAMING TABLE gps_bronze
COMMENT "Raw GPS pings ingested from volume using Auto Loader";
CREATE FLOW gps_bronze_ingest_flow AS
INSERT INTO gps_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/<catalog>/<schema>/raw_data/gps",
format => "json",
inferColumnTypes => "true"
)Pythonfrom pyspark import pipelines as dp
path = "/Volumes/<catalog>/<schema>/raw_data/gps"
dp.create_streaming_table(
name="gps_bronze",
comment="Raw GPS pings ingested from volume using Auto Loader",
)
@dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
def gps_bronze_ingest_flow():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(path)
) -
Click
Run file or Run pipeline to run an update.
When the update completes, the pipeline graph shows the gps_bronze table. Next, add a silver table that converts coordinates to a native geometry point.
Step 4: Add a silver streaming table with geometry points
Create a streaming table that reads from the bronze table and adds a GEOMETRY column using ST_Point(longitude, latitude).
-
In the asset browser, click
Add, then Transformation.
-
Set Name to
raw_gps_silver, choose SQL or Python, and click Create. -
Paste the following code into the new file.
- SQL
- Python
SQLCREATE OR REFRESH STREAMING TABLE raw_gps_silver
COMMENT "GPS pings with native geometry point for spatial joins";
CREATE FLOW raw_gps_silver_flow AS
INSERT INTO raw_gps_silver BY NAME
SELECT
device_id,
timestamp,
longitude,
latitude,
ST_Point(longitude, latitude) AS point_geom
FROM STREAM(gps_bronze)Pythonfrom pyspark import pipelines as dp
from pyspark.sql import functions as F
dp.create_streaming_table(
name="raw_gps_silver",
comment="GPS pings with native geometry point for spatial joins",
)
@dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
def raw_gps_silver_flow():
return (
spark.readStream.table("gps_bronze")
.select(
"device_id",
"timestamp",
"longitude",
"latitude",
F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
)
) -
Click
Run file or Run pipeline.
The pipeline graph now shows gps_bronze and raw_gps_silver. Next, add the warehouse geofences as a materialized view.
Step 5: Create the warehouse geofences gold table
Create a materialized view that reads the geofences from the volume and converts the WKT column to a GEOMETRY column using ST_GeomFromWKT.
-
In the asset browser, click
Add, then Transformation.
-
Set Name to
warehouse_geofences_gold, choose SQL or Python, and click Create. -
Paste the following code. Replace
<catalog>and<schema>with your default catalog and schema.- SQL
- Python
SQLCREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
SELECT
warehouse_name,
ST_GeomFromWKT(boundary_wkt) AS boundary_geom
FROM read_files(
"/Volumes/<catalog>/<schema>/raw_data/geofences",
format => "json"
)Pythonfrom pyspark import pipelines as dp
from pyspark.sql import functions as F
path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
@dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
def warehouse_geofences_gold():
return (
spark.read.format("json").load(path).select(
"warehouse_name",
F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
)
) -
Click
Run file or Run pipeline.
The pipeline now includes the geofences table. Next, add the spatial join to compute warehouse arrivals.
Step 6: Create the warehouse arrivals table with a spatial join
Add a materialized view that joins the silver GPS points to the geofences using ST_Contains(boundary_geom, point_geom) to determine when a device is inside a warehouse polygon.
-
In the asset browser, click
Add, then Transformation.
-
Set Name to
warehouse_arrivals, choose SQL or Python, and click Create. -
Paste the following code.
- SQL
- Python
SQLCREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
SELECT
g.device_id,
g.timestamp,
w.warehouse_name
FROM raw_gps_silver g
JOIN warehouse_geofences_gold w
ON ST_Contains(w.boundary_geom, g.point_geom)Pythonfrom pyspark import pipelines as dp
from pyspark.sql import functions as F
@dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
def warehouse_arrivals():
g = spark.read.table("raw_gps_silver")
w = spark.read.table("warehouse_geofences_gold")
return (
g.alias("g")
.join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
.select(
F.col("g.device_id").alias("device_id"),
F.col("g.timestamp").alias("timestamp"),
F.col("w.warehouse_name").alias("warehouse_name"),
)
) -
Click
Run file or Run pipeline.
When the update completes, the pipeline graph shows all four datasets: gps_bronze, raw_gps_silver, warehouse_geofences_gold, and warehouse_arrivals.
Verify the spatial join
Confirm that the spatial join produced rows: points from the silver table that fall inside a geofence appear in warehouse_arrivals. Run one of the following in a notebook or SQL editor (use the same catalog and schema as your pipeline target).
Count arrivals by warehouse (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
You should see non-zero counts for Warehouse_A and Warehouse_B (the sample GPS data overlaps both polygons). To inspect sample rows:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Same checks in Python (notebook):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
If you see rows in warehouse_arrivals, the ST_Contains(boundary_geom, point_geom) join is working correctly.
Step 7: Schedule the pipeline (optional)
To keep the pipeline up to date as new GPS data lands in the volume, create a job to run the pipeline on a schedule.
- At the top of the editor, choose the Schedule button.
- If the Schedules dialog appears, choose Add schedule.
- Optionally, give the job a name.
- By default, the schedule runs once per day. You can accept this or set your own. Choosing Advanced lets you set a specific time; More options lets you add run notifications.
- Select Create to apply the schedule.
See Monitoring and observability for Lakeflow Jobs for more information about job runs.