チュートリアル: ネイティブ空間タイプを使用して地理空間パイプラインを構築する
データ オーケストレーションと自動ローダー用のLakeFlow Spark宣言型パイプライン (SDP) を使用して、GPS データを取り込み、座標をネイティブの空間タイプに変換し、ウェアハウスのジオフェンスに結合して到着を追跡するパイプラインを作成およびAuto Loader方法を学びます。 このチュートリアルでは、Databricks ネイティブの空間タイプ ( GEOMETRY 、 GEOGRAPHY ) と、 ST_Point 、 ST_GeomFromWKT 、 ST_Containsなどの組み込みの空間関数を使用しているため、外部ライブラリを使用せずに大規模な地理空間ワークフローを実行できます。
このチュートリアルでは、次のことを行います。
- パイプラインを作成し、サンプル GPS データとジオフェンス データをUnity Catalogボリュームに生成します。
- Auto Loaderを使用して、生の GPS ping を Bronze ストリーミング テーブルに段階的に取り込みます。
- 緯度と経度をネイティブの
GEOMETRYポイントに変換するシルバーのストリーミング テーブルを構築します。 - WKT ポリゴンからウェアハウス ジオフェンスのマテリアライズドビューを作成します。
- 空間結合を実行して、ウェアハウスの到着テーブル (どのデバイスがどのジオフェンスに入ったか) を作成します。
結果は、ブロンズ (生の GPS)、シルバー (ジオメトリとしてのポイント)、ゴールド (ジオフェンスと到着イベント) のメダリオン スタイルのパイプラインです。メダリオン レイクハウス建築とは何かをご覧ください。詳細については。
要件
このチュートリアルを完了するには、以下の条件を満たす必要があります。
- Databricks ワークスペースにログインしている必要があります。
- ワークスペースでUnity Catalogを有効にします。
- サーバレスLakeFlow Spark宣言型パイプラインを使用する場合は、アカウントのサーバレスコンピュートを有効にしてください。 サーバレスコンピュートが有効になっていない場合、ステップはワークスペースの確実コンピュートで動作します。
- コンピュート リソースを作成する権限、またはコンピュート リソースにアクセスする権限を持っています。
- カタログに新しいスキーマを作成する権限を持っています。必要な権限は
USE CATALOGとCREATE SCHEMAです。 - 既存のスキーマに新しいボリュームを作成する権限を持っています。必要な権限は
USE SCHEMAとCREATE VOLUMEです。 - ネイティブの空間タイプと空間関数をサポートするランタイムを使用します。
ステップ 1: パイプラインを作成する
新しい ETL パイプラインを作成し、テーブルのデフォルトのカタログとスキーマを設定します。
-
ワークスペースで、
左上隅に新しいものが表示されます。
-
ETL パイプライン をクリックします。
-
パイプラインのタイトルを
Spatial pipeline tutorialまたは任意の名前に変更します。 -
タイトルの下で、書き込み権限を持つカタログとスキーマを選択します。
コード内でカタログまたはスキーマを指定しない場合は、このカタログとスキーマがデフォルトで使用されます。次の ステップ の
<catalog>と<schema>、ここで選択した値に置き換えます。 -
詳細オプション から、 空のファイルで開始を 選択します。
-
コード用のフォルダーを選択します。 [参照] を選択してフォルダーを選択できます。バージョン管理には Git フォルダーを使用できます。
-
最初のファイルの言語として Python または SQL を選択します。後から他の言語のファイルを追加できます。
-
[選択] をクリックしてパイプラインを作成し、 LakeFlow Pipelinesエディターを開きます。
これで、デフォルトのカタログとスキーマを持つ空のパイプラインが作成されました。次に、サンプルの GPS とジオフェンス データを作成します。
ステップ 2: サンプル GPS とジオフェンス データを作成する
このステップでは、ボリューム内にサンプル データ (生の GPS ping (JSON) とウェアハウス ジオフェンス (WKT ポリゴンを含む JSON)) が生成されます。GPS ポイントは、2 つのウェアハウス ポリゴンと重なる境界ボックス内に生成されるため、後のステップの空間結合によって到着行が返されます。 ボリュームまたはテーブルに独自のデータがすでに存在する場合は、このステップをスキップできます。
-
LakeFlow Pipelinesエディターのアセットブラウザで、
を追加し 、次に 探索を追加します 。
-
名前 を
Setup spatial dataに設定し、 Python を選択して、デフォルトの宛先フォルダーのままにします。 -
作成 をクリックします。
-
新しいノートブックに次のコードを貼り付けます。
<catalog>と<schema>ステップ 1 で設定したカタログとスキーマに置き換えます。ノートブックで次のコードを使用して、GPS およびジオフェンス データを生成します。
Pythonfrom pyspark.sql import functions as F
catalog = "<catalog>" # for example, "main"
schema = "<schema>" # for example, "default"
spark.sql(f"USE CATALOG `{catalog}`")
spark.sql(f"USE SCHEMA `{schema}`")
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
# GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
gps_path = f"{volume_base}/gps"
df_gps = (
spark.range(0, 5000)
.repartition(10)
.select(
F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
F.current_timestamp().alias("timestamp"),
(-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1
(34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2
)
)
df_gps.write.format("json").mode("overwrite").save(gps_path)
print(f"Wrote 5000 GPS rows to {gps_path}")
# Geofences: two warehouse polygons (WKT) in the same region
geofences_path = f"{volume_base}/geofences"
geofences_data = [
("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
]
df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
df_geo.write.format("json").mode("overwrite").save(geofences_path)
print(f"Wrote {len(geofences_data)} geofences to {geofences_path}") -
ノートブック セルを実行します (Shift + Enter)。
実行が完了すると、ボリュームにはgps (生の ping) とgeofences (WKT 内のポリゴン) が含まれます。次のステップでは、GPS データをブロンズ テーブルに取り込みます。
ステップ 3: GPS データを Bronze ストリーミング テーブルに取り込む
Auto Loader使用してボリュームから生の GPS JSON段階的に取り込み、Bronze ストリーミング テーブルに書き込みます。
-
アセットブラウザで、
を追加し 、次に 変換します 。
-
名前 を
gps_bronzeに設定し、 SQL または Python を選択して、 作成を クリックします。 -
ファイルの内容を次のように置き換えます (ご使用の言語に一致するタブを使用します)。
<catalog>と<schema>デフォルトのカタログとスキーマに置き換えます。
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE gps_bronze
COMMENT "Raw GPS pings ingested from volume using Auto Loader";
CREATE FLOW gps_bronze_ingest_flow AS
INSERT INTO gps_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/<catalog>/<schema>/raw_data/gps",
format => "json",
inferColumnTypes => "true"
)
from pyspark import pipelines as dp
path = "/Volumes/<catalog>/<schema>/raw_data/gps"
dp.create_streaming_table(
name="gps_bronze",
comment="Raw GPS pings ingested from volume using Auto Loader",
)
@dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
def gps_bronze_ingest_flow():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(path)
)
- クリック
実行ファイル または 実行パイプラインを使用し て更新を実行します。
更新が完了すると、パイプライン グラフにgps_bronzeテーブルが表示されます。次に、座標をネイティブ ジオメトリ ポイントに変換するシルバー テーブルを追加します。
ステップ 4: ジオメトリ ポイントを備えたシルバーのストリーミング テーブルを追加します
Bronze テーブルから読み取り、 ST_Point(longitude, latitude)を使用してGEOMETRY列を追加するストリーミング テーブルを作成します。
-
アセットブラウザで、
を追加し 、次に 変換します 。
-
名前 を
raw_gps_silverに設定し、 SQL または Python を選択して、 作成を クリックします。 -
次のコードを新しいファイルに貼り付けます。
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE raw_gps_silver
COMMENT "GPS pings with native geometry point for spatial joins";
CREATE FLOW raw_gps_silver_flow AS
INSERT INTO raw_gps_silver BY NAME
SELECT
device_id,
timestamp,
longitude,
latitude,
ST_Point(longitude, latitude) AS point_geom
FROM STREAM(gps_bronze)
from pyspark import pipelines as dp
from pyspark.sql import functions as F
dp.create_streaming_table(
name="raw_gps_silver",
comment="GPS pings with native geometry point for spatial joins",
)
@dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
def raw_gps_silver_flow():
return (
spark.readStream.table("gps_bronze")
.select(
"device_id",
"timestamp",
"longitude",
"latitude",
F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
)
)
- クリック
ファイルを実行する か、 パイプラインを実行します 。
パイプライン グラフにgps_bronzeとraw_gps_silverが表示されるようになりました。次に、ウェアハウス ジオフェンスをマテリアライズドビューとして追加します。
ステップ 5: ウェアハウス ジオフェンス ゴールド テーブルを作成する
ボリュームからジオフェンスを読み取り、 ST_GeomFromWKTを使用して WKT 列をGEOMETRY列に変換するマテリアライズドビューを作成します。
-
アセットブラウザで、
を追加し 、次に 変換します 。
-
名前 を
warehouse_geofences_goldに設定し、 SQL または Python を選択して、 作成を クリックします。 -
次のコードを貼り付けます。
<catalog>と<schema>デフォルトのカタログとスキーマに置き換えます。
- SQL
- Python
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
SELECT
warehouse_name,
ST_GeomFromWKT(boundary_wkt) AS boundary_geom
FROM read_files(
"/Volumes/<catalog>/<schema>/raw_data/geofences",
format => "json"
)
from pyspark import pipelines as dp
from pyspark.sql import functions as F
path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
@dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
def warehouse_geofences_gold():
return (
spark.read.format("json").load(path).select(
"warehouse_name",
F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
)
)
- クリック
ファイルを実行する か、 パイプラインを実行します 。
パイプラインに geofences テーブルが含まれるようになりました。次に、コンピュート ウェアハウスの到着に空間結合を追加します。
ステップ 6: 空間結合を使用してウェアハウス到着テーブルを作成する
ST_Contains(boundary_geom, point_geom)を使用して銀色の GPS ポイントをジオフェンスに結合するマテリアライズドビューを追加し、デバイスがウェアハウス ポリゴン内にいつあるかを判断します。
-
アセットブラウザで、
を追加し 、次に 変換します 。
-
名前 を
warehouse_arrivalsに設定し、 SQL または Python を選択して、 作成を クリックします。 -
次のコードを貼り付けます。
- SQL
- Python
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
SELECT
g.device_id,
g.timestamp,
w.warehouse_name
FROM raw_gps_silver g
JOIN warehouse_geofences_gold w
ON ST_Contains(w.boundary_geom, g.point_geom)
from pyspark import pipelines as dp
from pyspark.sql import functions as F
@dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
def warehouse_arrivals():
g = spark.read.table("raw_gps_silver")
w = spark.read.table("warehouse_geofences_gold")
return (
g.alias("g")
.join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
.select(
F.col("g.device_id").alias("device_id"),
F.col("g.timestamp").alias("timestamp"),
F.col("w.warehouse_name").alias("warehouse_name"),
)
)
- クリック
ファイルを実行する か、 パイプラインを実行します 。
更新が完了すると、パイプライン グラフに 4 つのデータセットすべて( gps_bronze 、 raw_gps_silver 、 warehouse_geofences_gold 、 warehouse_arrivalsが表示されます。
空間結合を確認する
空間結合によって生成された行を確認します。ジオフェンス内にあるシルバー テーブルのポイントがwarehouse_arrivalsに表示されます。ノートブックまたは SQL エディターで次のいずれかを実行します (パイプライン ターゲットと同じカタログとスキーマを使用します)。
ウェアハウスごとの到着数のカウント ( SQL ):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Warehouse_AとWarehouse_Bカウントがゼロ以外になっているはずです (サンプル GPS データは両方のポリゴンに重なっています)。サンプル行を検査するには:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Python (ノートブック) での同じチェック:
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
warehouse_arrivalsに行が表示されている場合は、 ST_Contains(boundary_geom, point_geom)結合は正しく機能しています。
ステップ 7: パイプラインをスケジュールする (オプション)
新しい GPS データがボリュームに入力されたときにパイプラインを最新の状態に保つには、スケジュールに従ってパイプラインを実行するジョブを作成します。
- エディターの上部にある [スケジュール] ボタンを選択します。
- [スケジュール] ダイアログが表示されたら、 [スケジュールの追加] を選択します。
- 必要に応じて、ジョブに名前を付けます。
- デフォルトでは、スケジュールは 1 日に 1 回実行されます。これを受け入れるか、独自に設定することができます。 「詳細」 を選択すると特定の時間を設定できます。「 その他のオプション」 を選択すると実行通知を追加できます。
- スケジュールを適用するには、 [作成] を選択します。
ジョブの実行の詳細については、「Lakeflowジョブのモニタリングと可観測性」を参照してください。