メインコンテンツまでスキップ

チュートリアル: ネイティブ空間タイプを使用して地理空間パイプラインを構築する

データ オーケストレーションと自動ローダー用のLakeFlow Spark宣言型パイプライン (SDP) を使用して、GPS データを取り込み、座標をネイティブの空間タイプに変換し、ウェアハウスのジオフェンスに結合して到着を追跡するパイプラインを作成およびAuto Loader方法を学びます。 このチュートリアルでは、Databricks ネイティブの空間タイプ ( GEOMETRYGEOGRAPHY ) と、 ST_PointST_GeomFromWKTST_Containsなどの組み込みの空間関数を使用しているため、外部ライブラリを使用せずに大規模な地理空間ワークフローを実行できます。

このチュートリアルでは、次のことを行います。

  • パイプラインを作成し、サンプル GPS データとジオフェンス データをUnity Catalogボリュームに生成します。
  • Auto Loaderを使用して、生の GPS ping を Bronze ストリーミング テーブルに段階的に取り込みます。
  • 緯度と経度をネイティブのGEOMETRYポイントに変換するシルバーのストリーミング テーブルを構築します。
  • WKT ポリゴンからウェアハウス ジオフェンスのマテリアライズドビューを作成します。
  • 空間結合を実行して、ウェアハウスの到着テーブル (どのデバイスがどのジオフェンスに入ったか) を作成します。

結果は、ブロンズ (生の GPS)、シルバー (ジオメトリとしてのポイント)、ゴールド (ジオフェンスと到着イベント) のメダリオン スタイルのパイプラインです。メダリオン レイクハウス建築とは何かをご覧ください。詳細については。

要件

このチュートリアルを完了するには、以下の条件を満たす必要があります。

ステップ 1: パイプラインを作成する

新しい ETL パイプラインを作成し、テーブルのデフォルトのカタログとスキーマを設定します。

  1. ワークスペースで、 プラスアイコン。 左上隅に新しいものが表示されます。

  2. ETL パイプライン をクリックします。

  3. パイプラインのタイトルをSpatial pipeline tutorialまたは任意の名前に変更します。

  4. タイトルの下で、書き込み権限を持つカタログとスキーマを選択します。

    コード内でカタログまたはスキーマを指定しない場合は、このカタログとスキーマがデフォルトで使用されます。次の ステップ の<catalog><schema> 、ここで選択した値に置き換えます。

  5. 詳細オプション から、 空のファイルで開始を 選択します。

  6. コード用のフォルダーを選択します。 [参照] を選択してフォルダーを選択できます。バージョン管理には Git フォルダーを使用できます。

  7. 最初のファイルの言語として Python または SQL を選択します。後から他の言語のファイルを追加できます。

  8. [選択] をクリックしてパイプラインを作成し、 LakeFlow Pipelinesエディターを開きます。

これで、デフォルトのカタログとスキーマを持つ空のパイプラインが作成されました。次に、サンプルの GPS とジオフェンス データを作成します。

ステップ 2: サンプル GPS とジオフェンス データを作成する

このステップでは、ボリューム内にサンプル データ (生の GPS ping (JSON) とウェアハウス ジオフェンス (WKT ポリゴンを含む JSON)) が生成されます。GPS ポイントは、2 つのウェアハウス ポリゴンと重なる境界ボックス内に生成されるため、後のステップの空間結合によって到着行が返されます。 ボリュームまたはテーブルに独自のデータがすでに存在する場合は、このステップをスキップできます。

  1. LakeFlow Pipelinesエディターのアセットブラウザで、プラスアイコン。 を追加し 、次に 探索を追加します

  2. 名前Setup spatial dataに設定し、 Python を選択して、デフォルトの宛先フォルダーのままにします。

  3. 作成 をクリックします。

  4. 新しいノートブックに次のコードを貼り付けます。<catalog><schema>ステップ 1 で設定したカタログとスキーマに置き換えます。

    ノートブックで次のコードを使用して、GPS およびジオフェンス データを生成します。

    Python
    from pyspark.sql import functions as F

    catalog = "<catalog>" # for example, "main"
    schema = "<schema>" # for example, "default"

    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"

    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
    spark.range(0, 5000)
    .repartition(10)
    .select(
    F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
    F.current_timestamp().alias("timestamp"),
    (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1
    (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2
    )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")

    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
    ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
    ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
  5. ノートブック セルを実行します (Shift + Enter)。

実行が完了すると、ボリュームにはgps (生の ping) とgeofences (WKT 内のポリゴン) が含まれます。次のステップでは、GPS データをブロンズ テーブルに取り込みます。

ステップ 3: GPS データを Bronze ストリーミング テーブルに取り込む

Auto Loader使用してボリュームから生の GPS JSON段階的に取り込み、Bronze ストリーミング テーブルに書き込みます。

  1. アセットブラウザで、プラスアイコン。 を追加し 、次に 変換します

  2. 名前gps_bronzeに設定し、 SQL または Python を選択して、 作成を クリックします。

  3. ファイルの内容を次のように置き換えます (ご使用の言語に一致するタブを使用します)。<catalog><schema>デフォルトのカタログとスキーマに置き換えます。

SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze
COMMENT "Raw GPS pings ingested from volume using Auto Loader";

CREATE FLOW gps_bronze_ingest_flow AS
INSERT INTO gps_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/<catalog>/<schema>/raw_data/gps",
format => "json",
inferColumnTypes => "true"
)
  1. クリック再生アイコン。 実行ファイル または 実行パイプラインを使用し て更新を実行します。

更新が完了すると、パイプライン グラフにgps_bronzeテーブルが表示されます。次に、座標をネイティブ ジオメトリ ポイントに変換するシルバー テーブルを追加します。

ステップ 4: ジオメトリ ポイントを備えたシルバーのストリーミング テーブルを追加します

Bronze テーブルから読み取り、 ST_Point(longitude, latitude)を使用してGEOMETRY列を追加するストリーミング テーブルを作成します。

  1. アセットブラウザで、プラスアイコン。 を追加し 、次に 変換します

  2. 名前raw_gps_silverに設定し、 SQL または Python を選択して、 作成を クリックします。

  3. 次のコードを新しいファイルに貼り付けます。

SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver
COMMENT "GPS pings with native geometry point for spatial joins";

CREATE FLOW raw_gps_silver_flow AS
INSERT INTO raw_gps_silver BY NAME
SELECT
device_id,
timestamp,
longitude,
latitude,
ST_Point(longitude, latitude) AS point_geom
FROM STREAM(gps_bronze)
  1. クリック再生アイコン。 ファイルを実行する か、 パイプラインを実行します

パイプライン グラフにgps_bronzeraw_gps_silverが表示されるようになりました。次に、ウェアハウス ジオフェンスをマテリアライズドビューとして追加します。

ステップ 5: ウェアハウス ジオフェンス ゴールド テーブルを作成する

ボリュームからジオフェンスを読み取り、 ST_GeomFromWKTを使用して WKT 列をGEOMETRY列に変換するマテリアライズドビューを作成します。

  1. アセットブラウザで、プラスアイコン。 を追加し 、次に 変換します

  2. 名前warehouse_geofences_goldに設定し、 SQL または Python を選択して、 作成を クリックします。

  3. 次のコードを貼り付けます。<catalog><schema>デフォルトのカタログとスキーマに置き換えます。

SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
SELECT
warehouse_name,
ST_GeomFromWKT(boundary_wkt) AS boundary_geom
FROM read_files(
"/Volumes/<catalog>/<schema>/raw_data/geofences",
format => "json"
)
  1. クリック再生アイコン。 ファイルを実行する か、 パイプラインを実行します

パイプラインに geofences テーブルが含まれるようになりました。次に、コンピュート ウェアハウスの到着に空間結合を追加します。

ステップ 6: 空間結合を使用してウェアハウス到着テーブルを作成する

ST_Contains(boundary_geom, point_geom)を使用して銀色の GPS ポイントをジオフェンスに結合するマテリアライズドビューを追加し、デバイスがウェアハウス ポリゴン内にいつあるかを判断します。

  1. アセットブラウザで、プラスアイコン。 を追加し 、次に 変換します

  2. 名前warehouse_arrivalsに設定し、 SQL または Python を選択して、 作成を クリックします。

  3. 次のコードを貼り付けます。

SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
SELECT
g.device_id,
g.timestamp,
w.warehouse_name
FROM raw_gps_silver g
JOIN warehouse_geofences_gold w
ON ST_Contains(w.boundary_geom, g.point_geom)
  1. クリック再生アイコン。 ファイルを実行する か、 パイプラインを実行します

更新が完了すると、パイプライン グラフに 4 つのデータセットすべて( gps_bronzeraw_gps_silverwarehouse_geofences_goldwarehouse_arrivalsが表示されます。

空間結合を確認する

空間結合によって生成された行を確認します。ジオフェンス内にあるシルバー テーブルのポイントがwarehouse_arrivalsに表示されます。ノートブックまたは SQL エディターで次のいずれかを実行します (パイプライン ターゲットと同じカタログとスキーマを使用します)。

ウェアハウスごとの到着数のカウント ( SQL ):

SQL
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Warehouse_AWarehouse_Bカウントがゼロ以外になっているはずです (サンプル GPS データは両方のポリゴンに重なっています)。サンプル行を検査するには:

SQL
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Python (ノートブック) での同じチェック:

Python
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

warehouse_arrivalsに行が表示されている場合は、 ST_Contains(boundary_geom, point_geom)結合は正しく機能しています。

ステップ 7: パイプラインをスケジュールする (オプション)

新しい GPS データがボリュームに入力されたときにパイプラインを最新の状態に保つには、スケジュールに従ってパイプラインを実行するジョブを作成します。

  1. エディターの上部にある [スケジュール] ボタンを選択します。
  2. [スケジュール] ダイアログが表示されたら、 [スケジュールの追加] を選択します。
  3. 必要に応じて、ジョブに名前を付けます。
  4. デフォルトでは、スケジュールは 1 日に 1 回実行されます。これを受け入れるか、独自に設定することができます。 「詳細」 を選択すると特定の時間を設定できます。「 その他のオプション」 を選択すると実行通知を追加できます。
  5. スケジュールを適用するには、 [作成] を選択します。

ジョブの実行の詳細については、「Lakeflowジョブのモニタリングと可観測性」を参照してください。

その他のリソース