Tutorial: Crie um pipeline geoespacial com tipos espaciais nativos
Aprenda como criar e implantar um pipeline que ingere dados de GPS, converte coordenadas em tipos espaciais nativos e realiza a junção com geofences de data warehouse para rastrear chegadas usando o pipeline declarativo Spark (SDP) LakeFlow para orquestração de dados e Auto Loader. Este tutorial usa tipos espaciais nativos Databricks (GEOMETRY, GEOGRAPHY) e funções espaciais integradas, como ST_Point, ST_GeomFromWKT e ST_Contains, para que você possa executar fluxo de trabalho geoespacial em escala sem biblioteca externa.
Neste tutorial, você irá:
- Crie um pipeline e gere dados de amostra de GPS e geofencing em um volume Unity Catalog .
- Ingerir incrementalmente sinais GPS brutos com Auto Loader em uma tabela de transmissão bronze.
- Construa uma tabela de transmissão prateada que converta latitude e longitude em um ponto nativo
GEOMETRY. - Criar uma view materializada de geocercas de armazém a partir de polígonos WKT.
- Execução de uma join espacial para produzir uma tabela de chegadas ao armazém (qual dispositivo entrou em qual cerca geográfica).
O resultado é um pipeline em formato de medalhão: bronze (GPS bruto), prata (pontos como geometria) e ouro (geocercas e eventos de chegada). Veja O que é a arquitetura lakehouse com medalhão? Para mais informações.
Requisitos
Para completar este tutorial, você deve atender aos seguintes requisitos:
- Faça login em um workspace Databricks .
- Ative Unity Catalog para seu workspace.
- Para usar o pipeline declarativo LakeFlow Spark sem servidor, habilite computeserverless em sua account . Se compute serverless não estiver habilitado, os passos do sistema operacional funcionarão com a compute default do seu workspace.
- Ter permissão para criar um recurso compute ou acesso a um recurso compute .
- Possuir permissões para criar um novo esquema em um catálogo. As permissões necessárias são
USE CATALOGeCREATE SCHEMA. - Possuir permissões para criar um novo volume em um esquema existente. As permissões necessárias são
USE SCHEMAeCREATE VOLUME. - Utilize um ambiente de execução que suporte tipos espaciais nativos e funções espaciais.
o passo 1: Criar um pipeline
Crie um novo pipeline ETL e defina o catálogo e o esquema default para suas tabelas.
-
Na sua workspace, clique em
Novidade no canto superior esquerdo.
-
Clique em PipelineETL .
-
Altere o título do pipeline para
Spatial pipeline tutorialou para um nome de sua preferência. -
Em "Título", selecione um catálogo e um esquema para os quais você tenha permissões de escrita.
Este catálogo e esquema são usados por default quando você não especifica um catálogo ou esquema em seu código. Substitua
<catalog>e<schema>nos seguintes passos pelos valores que você escolher aqui. -
Em Opções avançadas , selecione Começar com um arquivo vazio .
-
Escolha uma pasta para o seu código. Você pode selecionar "Procurar" para escolher uma pasta; você pode usar uma pasta Git para controle de versão.
-
Escolha Python ou SQL como linguagem para o seu primeiro arquivo. Você poderá adicionar arquivos em outro idioma posteriormente.
-
Clique em Selecionar para criar o pipeline e abrir o Editor LakeFlow Pipelines .
Agora você tem um pipeline em branco com um catálogo e esquema default . Em seguida, crie os dados de exemplo de GPS e geocerca.
Passo 2: Crie os dados de exemplo de GPS e geocerca.
Este passo gera dados de amostra em um volume: pings GPS brutos (JSON) e geofences de armazenamento (JSON com polígonos WKT). Os pontos de GPS são gerados em uma caixa delimitadora que se sobrepõe aos dois polígonos do armazém, portanto, a join espacial em uma etapa posterior retornará linhas de chegada. Você pode pular esta etapa se já tiver seus próprios dados em um volume ou tabela.
-
No Editor LakeFlow Pipelines , no navegador ativo, clique em
Adicione , depois Exploração .
-
Defina o nome como
Setup spatial data, escolha Python e deixe a pasta de destino default . -
Clique em Criar .
-
No novo Notebook, cole o seguinte código. Substitua
<catalog>e<schema>pelo catálogo e esquema default que você definiu na etapa 1.Utilize o seguinte código no Notebook para gerar dados de GPS e geocercas.
Pythonfrom pyspark.sql import functions as F
catalog = "<catalog>" # for example, "main"
schema = "<schema>" # for example, "default"
spark.sql(f"USE CATALOG `{catalog}`")
spark.sql(f"USE SCHEMA `{schema}`")
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
# GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
gps_path = f"{volume_base}/gps"
df_gps = (
spark.range(0, 5000)
.repartition(10)
.select(
F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
F.current_timestamp().alias("timestamp"),
(-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1
(34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2
)
)
df_gps.write.format("json").mode("overwrite").save(gps_path)
print(f"Wrote 5000 GPS rows to {gps_path}")
# Geofences: two warehouse polygons (WKT) in the same region
geofences_path = f"{volume_base}/geofences"
geofences_data = [
("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
]
df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
df_geo.write.format("json").mode("overwrite").save(geofences_path)
print(f"Wrote {len(geofences_data)} geofences to {geofences_path}") -
execução da célula do Notebook (Shift + Enter).
Após a conclusão da execução, o volume contém gps (pings brutos) e geofences (polígonos em WKT). Na próxima etapa, você insere os dados do GPS em uma tabela de bronze.
o passo 3: Ingerir dados GPS em uma tabela de transmissão de bronze
Ingerir o JSON bruto do GPS do volume de forma incremental usando Auto Loader e gravar em uma tabela de transmissão bronze.
-
No navegador ativo, clique
Adicione , depois transformações .
-
Defina o nome como
gps_bronze, escolha SQL ou Python e clique em Criar . -
Substitua o conteúdo do arquivo pelo seguinte (use a tab que corresponde ao seu idioma). Substitua
<catalog>e<schema>pelo seu catálogo e esquema default .
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE gps_bronze
COMMENT "Raw GPS pings ingested from volume using Auto Loader";
CREATE FLOW gps_bronze_ingest_flow AS
INSERT INTO gps_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/<catalog>/<schema>/raw_data/gps",
format => "json",
inferColumnTypes => "true"
)
from pyspark import pipelines as dp
path = "/Volumes/<catalog>/<schema>/raw_data/gps"
dp.create_streaming_table(
name="gps_bronze",
comment="Raw GPS pings ingested from volume using Auto Loader",
)
@dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
def gps_bronze_ingest_flow():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(path)
)
- Clique
arquivo de execução ou pipelinede execução para executar uma atualização.
Quando a atualização for concluída, o gráfico do pipeline mostrará a tabela gps_bronze . Em seguida, adicione uma tabela prateada que converta coordenadas em um ponto de geometria nativo.
o passo 4: Adicione uma mesa de transmissão prateada com pontos geométricos
Crie uma tabela de transmissão que leia da tabela bronze e adicione uma coluna GEOMETRY usando ST_Point(longitude, latitude).
-
No navegador ativo, clique
Adicione , depois transformações .
-
Defina o nome como
raw_gps_silver, escolha SQL ou Python e clique em Criar . -
Cole o seguinte código no novo arquivo.
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE raw_gps_silver
COMMENT "GPS pings with native geometry point for spatial joins";
CREATE FLOW raw_gps_silver_flow AS
INSERT INTO raw_gps_silver BY NAME
SELECT
device_id,
timestamp,
longitude,
latitude,
ST_Point(longitude, latitude) AS point_geom
FROM STREAM(gps_bronze)
from pyspark import pipelines as dp
from pyspark.sql import functions as F
dp.create_streaming_table(
name="raw_gps_silver",
comment="GPS pings with native geometry point for spatial joins",
)
@dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
def raw_gps_silver_flow():
return (
spark.readStream.table("gps_bronze")
.select(
"device_id",
"timestamp",
"longitude",
"latitude",
F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
)
)
- Clique
arquivo de execução ou pipelinede execução .
O gráfico do pipeline agora mostra gps_bronze e raw_gps_silver. Em seguida, adicione as geocercas do armazém como uma view materializada.
o passo 5: Crie a tabela de geofences do armazém
Crie uma view materializada que leia as geofences do volume e converta a coluna WKT em uma coluna GEOMETRY usando ST_GeomFromWKT.
-
No navegador ativo, clique
Adicione , depois transformações .
-
Defina o nome como
warehouse_geofences_gold, escolha SQL ou Python e clique em Criar . -
Cole o seguinte código. Substitua
<catalog>e<schema>pelo seu catálogo e esquema default .
- SQL
- Python
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
SELECT
warehouse_name,
ST_GeomFromWKT(boundary_wkt) AS boundary_geom
FROM read_files(
"/Volumes/<catalog>/<schema>/raw_data/geofences",
format => "json"
)
from pyspark import pipelines as dp
from pyspark.sql import functions as F
path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
@dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
def warehouse_geofences_gold():
return (
spark.read.format("json").load(path).select(
"warehouse_name",
F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
)
)
- Clique
arquivo de execução ou pipelinede execução .
O pipeline agora inclui a tabela de geofences. Em seguida, adicione a join espacial para compute as chegadas ao armazém.
Passo 6: Crie a tabela de chegadas ao armazém com uma joinespacial.
Adicione uma view materializada que una os pontos GPS prateados às geocercas usando ST_Contains(boundary_geom, point_geom) para determinar quando um dispositivo está dentro de um polígono de armazém.
-
No navegador ativo, clique
Adicione , depois transformações .
-
Defina o nome como
warehouse_arrivals, escolha SQL ou Python e clique em Criar . -
Cole o seguinte código.
- SQL
- Python
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
SELECT
g.device_id,
g.timestamp,
w.warehouse_name
FROM raw_gps_silver g
JOIN warehouse_geofences_gold w
ON ST_Contains(w.boundary_geom, g.point_geom)
from pyspark import pipelines as dp
from pyspark.sql import functions as F
@dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
def warehouse_arrivals():
g = spark.read.table("raw_gps_silver")
w = spark.read.table("warehouse_geofences_gold")
return (
g.alias("g")
.join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
.select(
F.col("g.device_id").alias("device_id"),
F.col("g.timestamp").alias("timestamp"),
F.col("w.warehouse_name").alias("warehouse_name"),
)
)
- Clique
arquivo de execução ou pipelinede execução .
Quando a atualização for concluída, o gráfico pipeline mostrará todos os quatro conjuntos de dados: gps_bronze, raw_gps_silver, warehouse_geofences_gold e warehouse_arrivals.
Verifique a joinespacial
Confirme se a join espacial produziu linhas: pontos da tabela prateada que caem dentro de uma cerca geográfica aparecem em warehouse_arrivals. Execute um dos seguintes procedimentos em um Notebook ou editor SQL (use o mesmo catálogo e esquema do seu destino pipeline ).
Contagem de chegadas por armazém (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Você deverá ver contagens diferentes de zero para Warehouse_A e Warehouse_B (os dados de GPS de amostra se sobrepõem a ambos os polígonos). Para inspecionar linhas de amostra:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
As mesmas verificações em Python (Notebook):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Se você vir linhas em warehouse_arrivals, a join ST_Contains(boundary_geom, point_geom) está funcionando corretamente.
o passo 7: programar o pipeline (opcional)
Para manter o pipeline atualizado à medida que novos dados de GPS chegam ao volume, crie um Job para executar o pipeline em um programador.
- Na parte superior do editor, selecione o botão "Programar" .
- Se a caixa de diálogo do programador aparecer, escolha Adicionar programador .
- Opcionalmente, dê um nome ao trabalho.
- Por default, a execução do programa é feita uma vez por dia. Você pode aceitar esta opção ou definir a sua própria. Selecionar "Avançado" permite definir um horário específico; "Mais opções" permite adicionar notificações de execução.
- Selecione Criar para aplicar o programa.
Consulte monitoramento e observabilidade para trabalhos LakeFlow para obter mais informações sobre execução de trabalhos.