メインコンテンツまでスキップ

Ray と Spark を Databricks の同じ環境で組み合わせる

Databricks を使用すると、Ray と Spark の操作を同じ実行環境で実行して、両方の分散コンピューティング エンジンの長所を活用できます。

レイと Spark の統合は、堅牢なデータマネジメント、安全なアクセス、リネージトラッキングを提供する Delta Lake と Unity Catalogによってサポートされています。

この記事では、次のユース ケースに従って Ray 操作と Spark 操作を接続する方法について説明します。

  • Spark データを Ray データに書き込む : メモリ内のデータを効率的に Ray に転送します。
  • Ray データを Spark に書き込む: Ray から Delta Lake またはその他のストレージ ソリューションにデータを出力して、互換性とアクセスを確保します。
  • 外部 Ray アプリケーションを Unity Catalogに接続する : Databricks の外部にある Ray アプリケーションを接続して、Databricks Unity Catalog テーブルからデータをロードします。

Ray と Ray Sparkのどちらを使用するかについては、「Spark と Ray の使い分け」を参照してください。

Spark データフレーム から分散 Ray データセットを作成する

Spark データフレーム から分散 Ray データセットを作成するには、 ray.data.from_spark() 関数を使用して、データを任意の場所に書き込むことなく、Ray から Spark データフレーム を直接読み取ることができます。

インメモリ Spark から Ray への転送は、Databricks Runtime ML 15.0 以降で使用できます。

この機能を有効にするには、次のことを行う必要があります。

  • クラスターを開始する前に、 Spark クラスター設定spark.databricks.pyspark.dataFrameChunk.enabledtrue に設定します。
Python
import ray.data

source_table = "my_db.my_table"

# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)
警告

オートスケール Spark クラスター (スポットインスタンスを使用するものを含む) は、from_spark() 関数を使用するために use_spark_chunk_api パラメータを False に設定する必要があります。 APISparkそうしないと、エグゼキューターが終了すると エグゼキューターのキャッシュが失われるため、 呼び出しでキャッシュ ミスが発生します。

Python
ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)

Sparkへのレイデータの書き込み

Ray データを Spark に書き込むには、Spark がアクセスできる場所にデータセットを書き込む必要があります。

15.0 より前の Databricks Runtime ML では、ray.data モジュールから Ray Parquet ライターray_dataset.write_parquet()使用して、オブジェクト ストアの場所に直接書き込むことができます。Spark は、ネイティブ リーダーを使用してこの Parquet データを読み取ることができます。

Unity Catalog が有効なワークスペースの場合は、 ray.data.Dataset.write_databricks_table 関数を使用して Unity Catalog テーブルに書き込みます。

この関数は、Ray データセットを Unity Catalog ボリュームに一時的に保存し、Spark を使用して Unity Catalog ボリュームから読み取り、最後に Unity Catalog テーブルに書き込みます。 ray.data.Dataset.write_databricks_table関数を呼び出す前に、環境変数 "_RAY_UC_VOLUMES_FUSE_TEMP_DIR" が有効でアクセス可能な Unity Catalog ボリューム パス ("/Volumes/MyCatalog/MySchema/MyVolume/MyRayData" など) に設定されていることを確認してください。

Python
ds = ray.data
ds.write_databricks_table()

Unity Catalog が有効になっていないワークスペースの場合は、Ray Data データセットを一時ファイル (Parquet ファイルなど) として DBFS に手動で格納し、Spark でデータ ファイルを読み取ることができます。

Python
ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)

Ray コア アプリケーションから Spark へのデータの書き込み

また、Databricks は Ray Core アプリケーションを Sparkと統合することもでき、Ray Core (Ray の下位APIs) と Spark ワークロードを同じ環境内で実行し、それらの間でのデータ交換を可能にすることもできます。この統合では、さまざまなワークロードとデータマネジメントのニーズに合わせていくつかのパターンが提供され、両方のフレームワークを使用してエクスペリエンスが簡素化されます。

Ray から Spark にデータを書き込むには、主に 3 つのパターンがあります。

  • 出力を一時的な場所に保持 する: Ray タスクの出力を DBFS または Unity Catalog ボリュームに一時的に格納してから、Spark データフレーム に統合します。
  • Spark Connectで接続 する:RayタスクをSparkクラスターに直接接続して、RayがSpark データフレームやテーブルと対話できるようにします。
  • サードパーティ ライブラリを使用する : deltalakedeltarayなどの外部ライブラリを使用して、Ray Core タスクから Delta Lake または Spark テーブルにデータを書き込みます。

パターン1:出力を一時的な場所に永続化する

Ray から Spark にデータを書き込む最も一般的なパターンは、出力データを Unity Catalog ボリュームや DBFS などの一時的な場所に格納することです。 データを格納した後、Ray ドライバー スレッドはワーカー ノード上のファイルの各部分を読み取り、それらを最終的な データフレーム に統合してさらに処理します。 通常、一時ファイルはCSVなどの標準形式です。 このアプローチは、出力データが表形式 (Ray Core タスクによって生成された Pandas データフレーム など) の場合に最適です。

この方法は、Ray タスクからの出力が大きすぎてドライバー ノードまたは共有オブジェクト ストアのメモリに収まらない場合に使用します。 データをストレージに保持せずに大規模なデータセットを処理する必要がある場合は、パフォーマンスを向上させるために、 Databricks クラスターのドライバー ノードに割り当てられるメモリを増やすことを検討してください。

Python
import os
import uuid
import numpy as np
import pandas as pd

@ray.remote
def write_example(task_id, path_prefix):

num_rows = 100

df = pd.DataFrame({
'foo': np.random.rand(num_rows),
'bar': np.random.rand(num_rows)
})

# Write the DataFrame to a CSV file
df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))

n_tasks = 10

# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"

# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")

tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])

# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)

パターン 2: Spark Connect を使用して接続する

Ray Core タスクがリモート タスク内の Spark と対話する別の方法は、 Spark Connect を使用することです。 これにより、Ray ワーカーの Spark コンテキストを設定して、ドライバー ノードから実行中の Spark クラスターを指すようにすることができます。

これを設定するには、Ray クラスター リソースを設定して、 Sparkにスペースを割り当てる必要があります。 たとえば、ワーカー・ノードに 8 つの CPU がある場合は、num_cpus_worker_node を 7 に設定し、Spark 用に 1 つの CPU を残します。 大規模な Spark タスクの場合は、より大きなリソースを共有することを割り当てることをお勧めします。

Python
from databricks.connect import DatabricksSession
import ray

@ray.remote
class SparkHandler(object):

def __init__(self, access_token=None, cluster_id=None, host_url=None):
self.spark = (DatabricksSession
.builder
.remote(host=host_url,
token=access_token,
cluster_id=cluster_id)
.getOrCreate()
)
def test(self):
df = self.spark.sql("select * from samples.nyctaxi.trips")

df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
return df.count()

access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"

sh = SparkHandler.remote(access_token=access_token,
cluster_id=cluster_id,
host_url=host_url)
print(ray.get(sh.test.remote()))

この例では、ノートブックで生成されたトークンを使用します。 ただし、 Databricks では、本番運用のユースケースでは、 Databricks シークレットに保存されたアクセストークンを使用することをお勧めします。

このプロセスでは 1 つの Spark ドライバーが呼び出されるため、 スレッド ロック が作成され、すべてのタスクが前の Spark タスクの完了を待機します。 したがって、並列タスクがあまりない場合、 Spark タスクが完了するとすべての並列タスクが順次動作するため、これを使用することをお勧めします。 このような状況では、出力を保持し、最後に 1 つの Spark データフレームに結合してから、出力テーブルに書き込むことをお勧めします。

パターン 3: サードパーティ ライブラリ

別のオプションは、Delta Lake および Spark と対話するサードパーティのライブラリを使用することです。 Databricks は、これらのサードパーティライブラリを公式にサポートしていません。この例は、delta-rs プロジェクトの deltalake ライブラリです。このアプローチは現在、 Hive metastore テーブルでのみ機能し、 Unity Catalog テーブルでは機能しません。

Python
from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray

@ray.remote
def write_test(table_name):
random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
write_deltalake(table_name, pdf, mode="append")

def main():
table_name = "database.mytable"
ray.get([write_test.remote(table_name) for _ in range(100)])

利用可能な別のサードパーティライブラリは、 Delta Incubatorプロジェクト https://github.com/delta-incubator/deltaray)を通じて利用可能なdeltarayライブラリです

Python
# Standard Libraries
import pathlib

# External Libraries
import deltaray
import deltalake as dl
import pandas as pd

# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)

# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()

外部の Ray アプリケーションを Databricks に接続する

Databricks ウェアハウス クエリから Ray データセットを作成する

Ray 2.8.0 以降では、Databricks の外部にある Ray アプリケーションを Databricks 内のテーブルに接続するために、 ray.data.read_databricks_tables API を呼び出して Unity Catalog テーブルからデータを読み込むことができます。

まず、DATABRICKS_TOKEN 環境変数を SQLウェアハウス アクセス トークンに設定します。 Databricks Runtime でプログラムを実行していない場合は、次に示すように、 DATABRICKS_HOST 環境変数も Databricks ワークスペース URL に設定します。

Python
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

次に、ray.data.read_databricks_tables()に電話して、 SQLウェアハウスから読み取ります。

Python
import ray

ray_dataset = ray.data.read_databricks_tables(
warehouse_id='...', # Databricks SQL warehouse ID
catalog='catalog_1', # Unity Catalog name
schema='db_1', # Schema name
query="SELECT title, score FROM movie WHERE year >= 1980",
)
警告

Databricks ウェアハウスでは、クエリ結果を約 2 時間しかキャッシュできません。 実行時間の長いワークロードの場合は、 ray.data.Dataset.materialize メソッドを呼び出して、Ray データセットを Ray 分散オブジェクト ストアに具体化します。

Databricks デルタ共有テーブルから Ray データセットを作成する

Databricks デルタ共有テーブルからデータを読み取ることもできます。 差分共有テーブルからの読み取りは、Databricks ウェアハウス キャッシュからの読み取りよりも信頼性が高くなります。

ray.data.read_delta_sharing_tables API は Ray 2.33 以降で使用できます。

Python
import ray

ds = ray.data.read_delta_sharing_tables(
url=f"<profile-file-path>#<share-name>.<schema-name>.<table-name>",
limit=100000,
version=1,
)

ベストプラクティス

  • Ray クラスターのベスト プラクティス ガイドで説明されている手法を常に使用して、クラスターが十分に活用されるようにします。
  • Unity Catalog ボリュームを使用して、出力データを表形式以外の形式で格納し、ガバナンスを提供することを検討してください。
  • num_cpus_worker_node 構成が Spark ワーカー ノードの数と一致するように CPU コア数が設定されていることを確認します。同様に、 num_gpus_worker_node を Spark ワーカー ノードあたりの GPU の数に設定します。 この構成では、各 Spark ワーカー ノードは、Spark ワーカー ノードのリソースを最大限に活用する 1 つの Ray ワーカー ノードを起動します。

制限

現在、Unity Catalog は、Spark 以外のライターからテーブルに書き込むための資格情報を共有していません。 したがって、Ray Core タスクから Unity Catalog テーブルに書き込まれるすべてのデータでは、データを永続化してから Spark で読み取るか、Databricks Connect を Ray タスク内で設定する必要があります。