Ray と Spark を Databricks の同じ環境で組み合わせる
Databricks を使用すると、Ray と Spark の操作を同じ実行環境で実行して、両方の分散コンピューティング エンジンの長所を活用できます。
レイと Spark の統合は、堅牢なデータマネジメント、安全なアクセス、リネージトラッキングを提供する Delta Lake と Unity Catalogによってサポートされています。
この記事では、次のユース ケースに従って Ray 操作と Spark 操作を接続する方法について説明します。
- Spark データを Ray データに書き込む : メモリ内のデータを効率的に Ray に転送します。
- Ray データを Spark に書き込む: Ray から Delta Lake またはその他のストレージ ソリューションにデータを出力して、互換性とアクセスを確保します。
- 外部 Ray アプリケーションを Unity Catalogに接続する : Databricks の外部にある Ray アプリケーションを接続して、Databricks Unity Catalog テーブルからデータをロードします。
Ray と Ray Sparkのどちらを使用するかについては、「Spark と Ray の使い分け」を参照してください。
Spark データフレーム から分散 Ray データセットを作成する
Spark データフレーム から分散 Ray データセットを作成するには、 ray.data.from_spark()
関数を使用して、データを任意の場所に書き込むことなく、Ray から Spark データフレーム を直接読み取ることができます。
インメモリ Spark から Ray への転送は、Databricks Runtime ML 15.0 以降で使用できます。
この機能を有効にするには、次のことを行う必要があります。
- クラスターを開始する前に、 Spark クラスター設定
spark.databricks.pyspark.dataFrameChunk.enabled
をtrue
に設定します。
import ray.data
source_table = "my_db.my_table"
# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)
オートスケール Spark クラスター (スポットインスタンスを使用するものを含む) は、from_spark()
関数を使用するために use_spark_chunk_api
パラメータを False
に設定する必要があります。 APISparkそうしないと、エグゼキューターが終了すると エグゼキューターのキャッシュが失われるため、 呼び出しでキャッシュ ミスが発生します。
ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)
Sparkへのレイデータの書き込み
Ray データを Spark に書き込むには、Spark がアクセスできる場所にデータセットを書き込む必要があります。
15.0 より前の Databricks Runtime ML では、ray.data
モジュールから Ray Parquet ライターray_dataset.write_parquet()
使用して、オブジェクト ストアの場所に直接書き込むことができます。Spark は、ネイティブ リーダーを使用してこの Parquet データを読み取ることができます。
Unity Catalog が有効なワークスペースの場合は、 ray.data.Dataset.write_databricks_table
関数を使用して Unity Catalog テーブルに書き込みます。
この関数は、Ray データセットを Unity Catalog ボリュームに一時的に保存し、Spark を使用して Unity Catalog ボリュームから読み取り、最後に Unity Catalog テーブルに書き込みます。 ray.data.Dataset.write_databricks_table
関数を呼び出す前に、環境変数 "_RAY_UC_VOLUMES_FUSE_TEMP_DIR"
が有効でアクセス可能な Unity Catalog ボリューム パス ("/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"
など) に設定されていることを確認してください。
ds = ray.data
ds.write_databricks_table()
Unity Catalog が有効になっていないワークスペースの場合は、Ray Data データセットを一時ファイル (Parquet ファイルなど) として DBFS に手動で格納し、Spark でデータ ファイルを読み取ることができます。
ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)
Ray コア アプリケーションから Spark へのデータの書き込み
また、Databricks は Ray Core アプリケーションを Sparkと統合することもでき、Ray Core (Ray の下位APIs) と Spark ワークロードを同じ環境内で実行し、それらの間でのデータ交換を可能にすることもできます。この統合では、さまざまなワークロードとデータマネジメントのニーズに合わせていくつかのパターンが提供され、両方のフレームワークを使用してエクスペリエンスが簡素化されます。
Ray から Spark にデータを書き込むには、主に 3 つのパターンがあります。
- 出力を一時的な場所に保持 する: Ray タスクの出力を DBFS または Unity Catalog ボリュームに一時的に格納してから、Spark データフレーム に統合します。
- Spark Connectで接続 する:RayタスクをSparkクラスターに直接接続して、RayがSpark データフレームやテーブルと対話できるようにします。
- サードパーティ ライブラリを使用する :
deltalake
やdeltaray
などの外部ライブラリを使用して、Ray Core タスクから Delta Lake または Spark テーブルにデータを書き込みます。
パターン1:出力を一時的な場所に永続化する
Ray から Spark にデータを書き込む最も一般的なパターンは、出力データを Unity Catalog ボリュームや DBFS などの一時的な場所に格納することです。 データを格納した後、Ray ドライバー スレッドはワーカー ノード上のファイルの各部分を読み取り、それらを最終的な データフレーム に統合してさらに処理します。 通常、一時ファイルはCSVなどの標準形式です。 このアプローチは、出力データが表形式 (Ray Core タスクによって生成された Pandas データフレーム など) の場合に最適です。
この方法は、Ray タスクからの出力が大きすぎてドライバー ノードまたは共有オブジェクト ストアのメモリに収まらない場合に使用します。 データをストレージに保持せずに大規模なデータセットを処理する必要がある場合は、パフォーマンスを向上させるために、 Databricks クラスターのドライバー ノードに割り当てられるメモリを増やすことを検討してください。
import os
import uuid
import numpy as np
import pandas as pd
@ray.remote
def write_example(task_id, path_prefix):
num_rows = 100
df = pd.DataFrame({
'foo': np.random.rand(num_rows),
'bar': np.random.rand(num_rows)
})
# Write the DataFrame to a CSV file
df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))
n_tasks = 10
# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"
# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")
tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])
# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)
パターン 2: Spark Connect を使用して接続する
Ray Core タスクがリモート タスク内の Spark と対話する別の方法は、 Spark Connect を使用することです。 これにより、Ray ワーカーの Spark コンテキストを設定して、ドライバー ノードから実行中の Spark クラスターを指すようにすることができます。
これを設定するには、Ray クラスター リソースを設定して、 Sparkにスペースを割り当てる必要があります。 たとえば、ワーカー・ノードに 8 つの CPU がある場合は、num_cpus_worker_node を 7 に設定し、Spark 用に 1 つの CPU を残します。 大規模な Spark タスクの場合は、より大きなリソースを共有することを割り当てることをお勧めします。
from databricks.connect import DatabricksSession
import ray
@ray.remote
class SparkHandler(object):
def __init__(self, access_token=None, cluster_id=None, host_url=None):
self.spark = (DatabricksSession
.builder
.remote(host=host_url,
token=access_token,
cluster_id=cluster_id)
.getOrCreate()
)
def test(self):
df = self.spark.sql("select * from samples.nyctaxi.trips")
df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
return df.count()
access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"
sh = SparkHandler.remote(access_token=access_token,
cluster_id=cluster_id,
host_url=host_url)
print(ray.get(sh.test.remote()))
この例では、ノートブックで生成されたトークンを使用します。 ただし、 Databricks では、本番運用のユースケースでは、 Databricks シークレットに保存されたアクセストークンを使用することをお勧めします。
このプロセスでは 1 つの Spark ドライバーが呼び出されるため、 スレッド ロック が作成され、すべてのタスクが前の Spark タスクの完了を待機します。 したがって、並列タスクがあまりない場合、 Spark タスクが完了するとすべての並列タスクが順次動作するため、これを使用することをお勧めします。 このような状況では、出力を保持し、最後に 1 つの Spark データフレームに結合してから、出力テーブルに書き込むことをお勧めします。
パターン 3: サードパーティ ライブラリ
別のオプションは、Delta Lake および Spark と対話するサードパーティのライブラリを使用することです。 Databricks は、これらのサードパーティライブラリを公式にサポートしていません。この例は、delta-rs
プロジェクトの deltalake
ライブラリです。このアプローチは現在、 Hive metastore テーブルでのみ機能し、 Unity Catalog テーブルでは機能しません。
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray
@ray.remote
def write_test(table_name):
random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
write_deltalake(table_name, pdf, mode="append")
def main():
table_name = "database.mytable"
ray.get([write_test.remote(table_name) for _ in range(100)])
利用可能な別のサードパーティライブラリは、 Delta Incubatorプロジェクト https://github.com/delta-incubator/deltaray)を通じて利用可能なdeltarayライブラリです
# Standard Libraries
import pathlib
# External Libraries
import deltaray
import deltalake as dl
import pandas as pd
# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)
# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()
外部の Ray アプリケーションを Databricks に接続する
Databricks ウェアハウス クエリから Ray データセットを作成する
Ray 2.8.0 以降では、Databricks の外部にある Ray アプリケーションを Databricks 内のテーブルに接続するために、 ray.data.read_databricks_tables
API を呼び出して Unity Catalog テーブルからデータを読み込むことができます。
まず、DATABRICKS_TOKEN
環境変数を SQLウェアハウス アクセス トークンに設定します。 Databricks Runtime でプログラムを実行していない場合は、次に示すように、 DATABRICKS_HOST
環境変数も Databricks ワークスペース URL に設定します。
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net
次に、ray.data.read_databricks_tables()
に電話して、 SQLウェアハウスから読み取ります。
import ray
ray_dataset = ray.data.read_databricks_tables(
warehouse_id='...', # Databricks SQL warehouse ID
catalog='catalog_1', # Unity Catalog name
schema='db_1', # Schema name
query="SELECT title, score FROM movie WHERE year >= 1980",
)
Databricks ウェアハウスでは、クエリ結果を約 2 時間しかキャッシュできません。 実行時間の長いワークロードの場合は、 ray.data.Dataset.materialize
メソッドを呼び出して、Ray データセットを Ray 分散オブジェクト ストアに具体化します。
Databricks デルタ共有テーブルから Ray データセットを作成する
Databricks デルタ共有テーブルからデータを読み取ることもできます。 差分共有テーブルからの読み取りは、Databricks ウェアハウス キャッシュからの読み取りよりも信頼性が高くなります。
ray.data.read_delta_sharing_tables
API は Ray 2.33 以降で使用できます。
import ray
ds = ray.data.read_delta_sharing_tables(
url=f"<profile-file-path>#<share-name>.<schema-name>.<table-name>",
limit=100000,
version=1,
)
ベストプラクティス
- Ray クラスターのベスト プラクティス ガイドで説明されている手法を常に使用して、クラスターが十分に活用されるようにします。
- Unity Catalog ボリュームを使用して、出力データを表形式以外の形式で格納し、ガバナンスを提供することを検討してください。
num_cpus_worker_node
構成が Spark ワーカー ノードの数と一致するように CPU コア数が設定されていることを確認します。同様に、num_gpus_worker_node
を Spark ワーカー ノードあたりの GPU の数に設定します。 この構成では、各 Spark ワーカー ノードは、Spark ワーカー ノードのリソースを最大限に活用する 1 つの Ray ワーカー ノードを起動します。
制限
現在、Unity Catalog は、Spark 以外のライターからテーブルに書き込むための資格情報を共有していません。 したがって、Ray Core タスクから Unity Catalog テーブルに書き込まれるすべてのデータでは、データを永続化してから Spark で読み取るか、Databricks Connect を Ray タスク内で設定する必要があります。