DatabricksでRayとSparkを接続

Databricks を使用すると、Ray と Spark の操作を同じ実行環境で実行して、両方の分散コンピューティング エンジンの長所を活用できます。

レイと Spark の統合は、堅牢なデータマネジメント、安全なアクセス、リネージトラッキングを提供する Delta Lake と Unity Catalogによってサポートされています。

この記事では、次のユース ケースに従って Ray 操作と Spark 操作を接続する方法について説明します。

  • Spark データを Ray データに書き込む: メモリ内のデータを効率的に Ray に転送します。

  • Ray データを Spark に書き込む: Ray から Delta Lake またはその他のストレージ ソリューションにデータを出力して、互換性とアクセスを確保します。

  • 外部 Ray アプリケーションを Unity Catalogに接続する : Databricks の外部にある Ray アプリケーションを接続して、Databricks Unity Catalog テーブルからデータをロードします。

Ray を使用する場合、Sparkを使用する場合、またはその両方を使用する場合の詳細については、「Spark と Ray を使用する場合」を参照してください。

Spark DataFrame から分散 Ray データセットを作成する

Spark DataFrame から分散 Ray データセットを作成するには、 ray.data.from_spark() 関数を使用して、データを任意の場所に書き込むことなく、Ray から Spark DataFrame を直接読み取ることができます。

インメモリ Spark から Ray への転送は、Databricks Runtime ML 15.0 以降で使用できます。

この機能を有効にするには、次の操作を行う必要があります。

  • クラスターを開始する前に、Spark クラスター構成 spark.databricks.pyspark.dataFrameChunk.enabledtrue に設定します。

import ray.data

source_table = "my_db.my_table"

# Read a Spark DataFrame from a Delta table in Unity Catalog
df = spark.read.table(source_table)
ray_ds = ray.data.from_spark(df)

警告

オートスケール Spark クラスター (スポットインスタンスを使用するものを含む) は、from_spark() 関数を使用するには、use_spark_chunk_api パラメーターを False に設定する必要があります。 APISparkそうしないと、エグゼキューターが終了すると エグゼキューターのキャッシュが失われるため、 呼び出しでキャッシュ ミスが発生します。

ray_ds = ray.data.from_spark(df, use_spark_chunk_api=False)

Sparkへのレイデータの書き込み

Ray データを Spark に書き込むには、Spark がアクセスできる場所にデータセットを書き込む必要があります。

15.0 より前の Databricks Runtime ML では、ray.data モジュールから Ray Parquet ライターray_dataset.write_parquet()使用して、オブジェクト ストアの場所に直接書き込むことができます。Spark は、ネイティブ リーダーを使用してこの Parquet データを読み取ることができます。

Unity Catalog が有効なワークスペースの場合は、 ray.data.Dataset.write_databricks_table 関数を使用して Unity Catalog テーブルに書き込みます。

この関数は、Ray データセットを Unity Catalog ボリュームに一時的に保存し、Spark を使用して Unity Catalog ボリュームから読み取り、最後に Unity Catalog テーブルに書き込みます。 ray.data.Dataset.write_databricks_table関数を呼び出す前に、環境変数 "_RAY_UC_VOLUMES_FUSE_TEMP_DIR" が有効でアクセス可能な Unity Catalog ボリューム パス ("/Volumes/MyCatalog/MySchema/MyVolume/MyRayData" など) に設定されていることを確認してください。

ds = ray.data
ds.write_databricks_table()

Unity Catalog が有効になっていないワークスペースの場合は、Ray Data データセットを一時ファイル (Parquet ファイルなど) として DBFS に手動で格納し、Spark でデータ ファイルを読み取ることができます。

ds.write_parquet(tmp_path)
df = spark.read.parquet(tmp_path)
df.write.format("delta").saveAsTable(table_name)

Ray コア アプリケーションから Spark へのデータの書き込み

また、Databricks は Ray Core アプリケーションを Sparkと統合することもでき、Ray Core (Ray の下位APIs) と Spark ワークロードを同じ環境内で実行し、それらの間でのデータ交換を可能にすることもできます。この統合では、さまざまなワークロードとデータマネジメントのニーズに合わせていくつかのパターンが提供され、両方のフレームワークを使用してエクスペリエンスが簡素化されます。

Ray から Spark にデータを書き込むには、主に 3 つのパターンがあります。

  • 出力を一時的な場所に保持する: Ray タスクの出力を DBFS または Unity Catalog ボリュームに一時的に格納してから、Spark DataFrame に統合します。

  • Spark Connect との接続: Ray タスクを Spark クラスターに直接接続して、Ray がSpark DataFramesやテーブルと対話できるようにします。

  • サードパーティ ライブラリを使用する: deltalakedeltarayなどの外部ライブラリを使用して、Ray Core タスクから Delta Lake または Spark テーブルにデータを書き込みます。

パターン 1: 出力を一時的な場所に保持する

Ray から Spark にデータを書き込む最も一般的なパターンは、出力データを Unity Catalog ボリュームや DBFS などの一時的な場所に格納することです。 データを格納した後、Ray ドライバー スレッドはワーカー ノード上のファイルの各部分を読み取り、それらを最終的な DataFrame に統合してさらに処理します。 通常、一時ファイルはCSVなどの標準形式です。 このアプローチは、出力データが表形式 (Ray Core タスクによって生成された Pandas DataFrame など) の場合に最適です。

この方法は、Ray タスクからの出力が大きすぎてドライバー ノードまたは共有オブジェクト ストアのメモリに収まらない場合に使用します。 データをストレージに保持せずに大規模なデータセットを処理する必要がある場合は、Databricks クラスターのドライバー ノードに割り当てられるメモリを増やしてパフォーマンスを向上させることを検討してください。

import os
import uuid
import numpy as np
import pandas as pd

@ray.remote
def write_example(task_id, path_prefix):

  num_rows = 100

  df = pd.DataFrame({
      'foo': np.random.rand(num_rows),
      'bar': np.random.rand(num_rows)
  })

  # Write the DataFrame to a CSV file
  df.to_csv(os.path.join(path_prefix, f"result_part_{task_id}.csv"))

n_tasks = 10

# Put a unique DBFS prefix for the temporary file path
dbfs_prefix = f"/dbfs/<USERNAME>"

# Create a unique path for the temporary files
path_prefix = os.path.join(dbfs_prefix, f"/ray_tmp/write_task_{uuid.uuid4()}")

tasks = ray.get([write_example.remote(i, path_prefix) for i in range(n_tasks)])

# Read all CSV files in the directory into a single DataFrame
df = spark.read.csv(path_prefix.replace("/dbfs", "dbfs:"), header=True, inferSchema=True)

パターン 2: Spark Connect を使用して接続する

Ray Core タスクがリモート タスク内の Spark と対話する別の方法は、 Spark Connect を使用することです。 これにより、ドライバー ノードから実行されている Spark クラスターを指すように Ray ワーカーの Spark コンテキストを設定できます。

これを設定するには、Spark のスペースを割り当てるように Ray クラスター リソースを構成する必要があります。 たとえば、ワーカー・ノードに 8 つの CPU がある場合は、num_cpus_worker_node を 7 に設定し、Spark 用に 1 つの CPU を残します。 大規模な Spark タスクの場合は、より大きなリソースを共有することを割り当てることをお勧めします。

from databricks.connect import DatabricksSession
import ray

@ray.remote
class SparkHandler(object):

   def __init__(self, access_token=None, cluster_id=None, host_url=None):
       self.spark = (DatabricksSession
                     .builder
                     .remote(host=host_url,
                             token=access_token,
                             cluster_id=cluster_id)
                     .getOrCreate()
                     )
   def test(self):
       df = self.spark.sql("select * from samples.nyctaxi.trips")

       df.write.format("delta").mode(
"overwrite").saveAsTable("catalog.schema.taxi_trips")
       return df.count()

access_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
cluster_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().clusterId().get()
host_url = f"https://{dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get('browserHostName').get()}"

sh = SparkHandler.remote(access_token=access_token,
                        cluster_id=cluster_id,
                        host_url=host_url)
print(ray.get(sh.test.remote()))

この例では、ノートブックで生成されたトークンを使用します。 ただし、 Databricks では、本番運用のユースケースでは、 Databricks シークレットに保存されたアクセストークンを使用することをお勧めします。

このプロセスでは 1 つの Spark ドライバーが呼び出されるため、 スレッド ロック が作成され、すべてのタスクが前の Spark タスクの完了を待機します。 したがって、並列タスクがあまりない場合、 Spark タスクが完了するとすべての並列タスクが順次動作するため、これを使用することをお勧めします。 このような状況では、出力を保持し、最後に 1 つの Spark データフレームに結合してから、出力テーブルに書き込むことをお勧めします。

パターン 3: サードパーティ ライブラリ

別のオプションは、Delta Lake および Spark と対話するサードパーティのライブラリを使用することです。 Databricks は、これらのサードパーティライブラリを公式にサポートしていません。 この例は、delta-rs プロジェクトの deltalake ライブラリです。このアプローチは現在、 Hive metastore テーブルでのみ機能し、 Unity Catalog テーブルでは機能しません。

from deltalake import DeltaTable, write_deltalake
import pandas as pd
import numpy as np
import ray

@ray.remote
def write_test(table_name):
   random_df_id_vals = [int(np.random.randint(1000)), int(np.random.randint(1000))]
   pdf = pd.DataFrame({"id": random_df_id_vals, "value": ["foo", "bar"]})
   write_deltalake(table_name, pdf, mode="append")


def main():
   table_name = "database.mytable"
   ray.get([write_test.remote(table_name) for _ in range(100)])

利用可能な別のサードパーティライブラリは、 Delta Incubatorプロジェクト https://github.com/delta-incubator/deltaray)を通じて利用可能なdeltarayライブラリです

# Standard Libraries
import pathlib

# External Libraries
import deltaray
import deltalake as dl
import pandas as pd

# Creating a Delta Table
cwd = pathlib.Path().resolve()
table_uri = f'{cwd}/tmp/delta-table'
df = pd.DataFrame({'id': [0, 1, 2, 3, 4, ], })
dl.write_deltalake(table_uri, df)

# Reading our Delta Table
ds = deltaray.read_delta(table_uri)
ds.show()

外部の Ray アプリケーションを Databricks に接続する

Databricks ウェアハウス クエリから Ray データセットを作成する

Ray 2.8.0 以降では、Databricks の外部にある Ray アプリケーションを Databricks 内のテーブルに接続するために、 ray.data.read_databricks_tables API を呼び出して Unity Catalog テーブルからデータを読み込むことができます。

まず、DATABRICKS_TOKEN 環境変数を SQLウェアハウス アクセストークンに設定します。 Databricks Runtime でプログラムを実行していない場合は、次に示すように、 DATABRICKS_HOST 環境変数も Databricks ワークスペース URL に設定します。

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

次に、ray.data.read_databricks_tables()に電話して、 SQLウェアハウスから読み取ります。

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity Catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

警告

Databricks ウェアハウスでは、クエリ結果を約 2 時間しかキャッシュできません。 実行時間の長いワークロードの場合は、 ray.data.Dataset.materialize メソッドを呼び出して、Ray データセットを Ray 分散オブジェクト ストアに具体化します。

Databricks デルタ共有テーブルから Ray データセットを作成する

Databricks デルタ共有テーブルからデータを読み取ることもできます。 差分共有テーブルからの読み取りは、Databricks ウェアハウス キャッシュからの読み取りよりも信頼性が高くなります。

ray.data.read_delta_sharing_tables API は Ray 2.33 以降で使用できます。

import ray

ds = ray.data.read_delta_sharing_tables(
    url=f"<profile-file-path>#<share-name>.<schema-name>.<table-name>",
    limit=100000,
    version=1,
)

ベストプラクティス

  • Ray クラスターのベスト プラクティス ガイドで説明されている手法を常に使用して、クラスターが完全に活用されるようにします。

  • Unity Catalog ボリュームを使用して、出力データを表形式以外の形式で格納し、ガバナンスを提供することを検討してください。

  • num_cpus_worker_node 構成が Spark ワーカー ノードの数と一致するように CPU コア数が設定されていることを確認します。同様に、 num_gpus_worker_node を Spark ワーカー ノードあたりの GPU の数に設定します。 この構成では、各 Spark ワーカー ノードは、Spark ワーカー ノードのリソースを最大限に活用する 1 つの Ray ワーカー ノードを起動します。

制限事項

現在、Unity Catalog は、Spark 以外のライターからテーブルに書き込むための資格情報を共有していません。 したがって、Ray Core タスクから Unity Catalog テーブルに書き込まれるすべてのデータでは、データを永続化してから Spark で読み取るか、Databricks Connect を Ray タスク内で設定する必要があります。