Pular para o conteúdo principal

Tutorial: Crie um pipeline geoespacial com tipos espaciais nativos

Aprenda como criar e implantar um pipeline que ingere dados de GPS, converte coordenadas em tipos espaciais nativos e realiza a junção com geofences de data warehouse para rastrear chegadas usando o pipeline declarativo Spark (SDP) LakeFlow para orquestração de dados e Auto Loader. Este tutorial usa tipos espaciais nativos Databricks (GEOMETRY, GEOGRAPHY) e funções espaciais integradas, como ST_Point, ST_GeomFromWKT e ST_Contains, para que você possa executar fluxo de trabalho geoespacial em escala sem biblioteca externa.

Neste tutorial, você irá:

  • Crie um pipeline e gere dados de amostra de GPS e geofencing em um volume Unity Catalog .
  • Ingerir incrementalmente sinais GPS brutos com Auto Loader em uma tabela de transmissão bronze.
  • Construa uma tabela de transmissão prateada que converta latitude e longitude em um ponto nativo GEOMETRY .
  • Criar uma view materializada de geocercas de armazém a partir de polígonos WKT.
  • Execução de uma join espacial para produzir uma tabela de chegadas ao armazém (qual dispositivo entrou em qual cerca geográfica).

O resultado é um pipeline em formato de medalhão: bronze (GPS bruto), prata (pontos como geometria) e ouro (geocercas e eventos de chegada). Veja O que é a arquitetura lakehouse com medalhão? Para mais informações.

Requisitos

Para completar este tutorial, você deve atender aos seguintes requisitos:

o passo 1: Criar um pipeline

Crie um novo pipeline ETL e defina o catálogo e o esquema default para suas tabelas.

  1. Na sua workspace, clique em Ícone de mais (+). Novidade no canto superior esquerdo.

  2. Clique em PipelineETL .

  3. Altere o título do pipeline para Spatial pipeline tutorial ou para um nome de sua preferência.

  4. Em "Título", selecione um catálogo e um esquema para os quais você tenha permissões de escrita.

    Este catálogo e esquema são usados por default quando você não especifica um catálogo ou esquema em seu código. Substitua <catalog> e <schema> nos seguintes passos pelos valores que você escolher aqui.

  5. Em Opções avançadas , selecione Começar com um arquivo vazio .

  6. Escolha uma pasta para o seu código. Você pode selecionar "Procurar" para escolher uma pasta; você pode usar uma pasta Git para controle de versão.

  7. Escolha Python ou SQL como linguagem para o seu primeiro arquivo. Você poderá adicionar arquivos em outro idioma posteriormente.

  8. Clique em Selecionar para criar o pipeline e abrir o Editor LakeFlow Pipelines .

Agora você tem um pipeline em branco com um catálogo e esquema default . Em seguida, crie os dados de exemplo de GPS e geocerca.

Passo 2: Crie os dados de exemplo de GPS e geocerca.

Este passo gera dados de amostra em um volume: pings GPS brutos (JSON) e geofences de armazenamento (JSON com polígonos WKT). Os pontos de GPS são gerados em uma caixa delimitadora que se sobrepõe aos dois polígonos do armazém, portanto, a join espacial em uma etapa posterior retornará linhas de chegada. Você pode pular esta etapa se já tiver seus próprios dados em um volume ou tabela.

  1. No Editor LakeFlow Pipelines , no navegador ativo, clique em Ícone de mais (+). Adicione , depois Exploração .

  2. Defina o nome como Setup spatial data, escolha Python e deixe a pasta de destino default .

  3. Clique em Criar .

  4. No novo Notebook, cole o seguinte código. Substitua <catalog> e <schema> pelo catálogo e esquema default que você definiu na etapa 1.

    Utilize o seguinte código no Notebook para gerar dados de GPS e geocercas.

    Python
    from pyspark.sql import functions as F

    catalog = "<catalog>" # for example, "main"
    schema = "<schema>" # for example, "default"

    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"

    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
    spark.range(0, 5000)
    .repartition(10)
    .select(
    F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
    F.current_timestamp().alias("timestamp"),
    (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1
    (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2
    )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")

    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
    ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
    ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
  5. execução da célula do Notebook (Shift + Enter).

Após a conclusão da execução, o volume contém gps (pings brutos) e geofences (polígonos em WKT). Na próxima etapa, você insere os dados do GPS em uma tabela de bronze.

o passo 3: Ingerir dados GPS em uma tabela de transmissão de bronze

Ingerir o JSON bruto do GPS do volume de forma incremental usando Auto Loader e gravar em uma tabela de transmissão bronze.

  1. No navegador ativo, clique Ícone de mais (+). Adicione , depois transformações .

  2. Defina o nome como gps_bronze, escolha SQL ou Python e clique em Criar .

  3. Substitua o conteúdo do arquivo pelo seguinte (use a tab que corresponde ao seu idioma). Substitua <catalog> e <schema> pelo seu catálogo e esquema default .

SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze
COMMENT "Raw GPS pings ingested from volume using Auto Loader";

CREATE FLOW gps_bronze_ingest_flow AS
INSERT INTO gps_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/<catalog>/<schema>/raw_data/gps",
format => "json",
inferColumnTypes => "true"
)
  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução para executar uma atualização.

Quando a atualização for concluída, o gráfico do pipeline mostrará a tabela gps_bronze . Em seguida, adicione uma tabela prateada que converta coordenadas em um ponto de geometria nativo.

o passo 4: Adicione uma mesa de transmissão prateada com pontos geométricos

Crie uma tabela de transmissão que leia da tabela bronze e adicione uma coluna GEOMETRY usando ST_Point(longitude, latitude).

  1. No navegador ativo, clique Ícone de mais (+). Adicione , depois transformações .

  2. Defina o nome como raw_gps_silver, escolha SQL ou Python e clique em Criar .

  3. Cole o seguinte código no novo arquivo.

SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver
COMMENT "GPS pings with native geometry point for spatial joins";

CREATE FLOW raw_gps_silver_flow AS
INSERT INTO raw_gps_silver BY NAME
SELECT
device_id,
timestamp,
longitude,
latitude,
ST_Point(longitude, latitude) AS point_geom
FROM STREAM(gps_bronze)
  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução .

O gráfico do pipeline agora mostra gps_bronze e raw_gps_silver. Em seguida, adicione as geocercas do armazém como uma view materializada.

o passo 5: Crie a tabela de geofences do armazém

Crie uma view materializada que leia as geofences do volume e converta a coluna WKT em uma coluna GEOMETRY usando ST_GeomFromWKT.

  1. No navegador ativo, clique Ícone de mais (+). Adicione , depois transformações .

  2. Defina o nome como warehouse_geofences_gold, escolha SQL ou Python e clique em Criar .

  3. Cole o seguinte código. Substitua <catalog> e <schema> pelo seu catálogo e esquema default .

SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
SELECT
warehouse_name,
ST_GeomFromWKT(boundary_wkt) AS boundary_geom
FROM read_files(
"/Volumes/<catalog>/<schema>/raw_data/geofences",
format => "json"
)
  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução .

O pipeline agora inclui a tabela de geofences. Em seguida, adicione a join espacial para compute as chegadas ao armazém.

Passo 6: Crie a tabela de chegadas ao armazém com uma joinespacial.

Adicione uma view materializada que una os pontos GPS prateados às geocercas usando ST_Contains(boundary_geom, point_geom) para determinar quando um dispositivo está dentro de um polígono de armazém.

  1. No navegador ativo, clique Ícone de mais (+). Adicione , depois transformações .

  2. Defina o nome como warehouse_arrivals, escolha SQL ou Python e clique em Criar .

  3. Cole o seguinte código.

SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
SELECT
g.device_id,
g.timestamp,
w.warehouse_name
FROM raw_gps_silver g
JOIN warehouse_geofences_gold w
ON ST_Contains(w.boundary_geom, g.point_geom)
  1. Clique Ícone de reprodução. arquivo de execução ou pipelinede execução .

Quando a atualização for concluída, o gráfico pipeline mostrará todos os quatro conjuntos de dados: gps_bronze, raw_gps_silver, warehouse_geofences_gold e warehouse_arrivals.

Verifique a joinespacial

Confirme se a join espacial produziu linhas: pontos da tabela prateada que caem dentro de uma cerca geográfica aparecem em warehouse_arrivals. Execute um dos seguintes procedimentos em um Notebook ou editor SQL (use o mesmo catálogo e esquema do seu destino pipeline ).

Contagem de chegadas por armazém (SQL):

SQL
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Você deverá ver contagens diferentes de zero para Warehouse_A e Warehouse_B (os dados de GPS de amostra se sobrepõem a ambos os polígonos). Para inspecionar linhas de amostra:

SQL
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

As mesmas verificações em Python (Notebook):

Python
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

Se você vir linhas em warehouse_arrivals, a join ST_Contains(boundary_geom, point_geom) está funcionando corretamente.

o passo 7: programar o pipeline (opcional)

Para manter o pipeline atualizado à medida que novos dados de GPS chegam ao volume, crie um Job para executar o pipeline em um programador.

  1. Na parte superior do editor, selecione o botão "Programar" .
  2. Se a caixa de diálogo do programador aparecer, escolha Adicionar programador .
  3. Opcionalmente, dê um nome ao trabalho.
  4. Por default, a execução do programa é feita uma vez por dia. Você pode aceitar esta opção ou definir a sua própria. Selecionar "Avançado" permite definir um horário específico; "Mais opções" permite adicionar notificações de execução.
  5. Selecione Criar para aplicar o programa.

Consulte monitoramento e observabilidade para trabalhos LakeFlow para obter mais informações sobre execução de trabalhos.

Recursos adicionais