MLflow と Rayの統合

MLflow は、機械学習と AI ワークロードを管理するためのオープンソースプラットフォームです。 Rayと MLflow を組み合わせると、Rayでワークロードを分散し、トレーニング中に生成されたモデル、メトリクス、パラメーター、メタデータを MLflowで追跡できます。

この記事では、MLflow を次の Ray コンポーネントと統合する方法について説明します。

  • Ray Core: Ray Tune および Ray Train でカバーされていない汎用分散アプリケーション

  • Ray Train: 分散モデルトレーニング

  • Ray Tune: 分散ハイパーパラメーターチューニング

Ray Core と MLflowの統合

Ray Core は、汎用分散アプリケーションの基本的な構成要素を提供します。 これにより、Pythonの関数とクラスを複数のノードにスケーリングできます。

このセクションでは、Ray Core と MLflow を統合するための次のパターンについて説明します。

  • Ray ドライバー プロセスからの MLflow モデルのログ記録

  • 子供のランからの MLflow モデルのログ記録

Ray ドライバー プロセスからの MLflow のログ

一般に、MLflow モデルはワーカー ノードからではなく、ドライバー プロセスからログに記録するのが最適です。 これは、ステートフルな参照をリモートワーカーに渡す際に複雑さが増すためです。

たとえば、次のコードは、MLflow 追跡サーバーがワーカー ノード内からの MLflow Client を使用して初期化されていないため、失敗します。

import mlflow

@ray.remote
def example_logging_task(x):
# ...

 # This method will fail
 mlflow.log_metric("x", x)
 return x

with mlflow.start_run() as run:
 ray.get([example_logging_task.remote(x) for x in range(10)])

代わりに、メトリクスをドライバーノードに戻します。 メトリクスとメタデータは、通常、メモリの問題を引き起こさずにドライバーに転送できるほど小さいです。

上記の例を例にとり、Ray タスクから返されたメトリクスをログに記録するように更新します。

import mlflow

@ray.remote
def example_logging_task(x):
 # ...
 return x

with mlflow.start_run() as run:
  results = ray.get([example_logging_task.remote(x) for x in range(10)])
 for x in results:
   mlflow.log_metric("x", x)

大きな Pandas テーブル、イメージ、プロット、モデルなど、大きなアーティファクトを保存する必要があるタスクの場合、Databricks ではアーティファクトをファイルとして永続化することをお勧めします。 次に、ドライバー コンテキスト内で成果物を再読み込みするか、保存されたファイルへのパスを指定して MLflow を使用してオブジェクトを直接ログに記録します。

import mlflow

@ray.remote
def example_logging_task(x):
# ...
# Create a large object that needs to be stored
with open("/dbfs/myLargeFilePath.txt", "w") as f:
  f.write(myLargeObject)
return x

with mlflow.start_run() as run:
 results = ray.get([example_logging_task.remote(x) for x in range(10)])
for x in results:
  mlflow.log_metric("x", x)
  # Directly log the saved file by specifying the path
  mlflow.log_artifact("/dbfs/myLargeFilePath.txt")

MLflowの子供のランとしてのRayタスクの記録

子供のランを使用して、Ray Core を MLflow と統合できます。 これには、次の手順が含まれます。

  1. 親のランを作成する: ドライバー プロセスで親のランを初期化します。 このランは、後続のすべての子供のランの階層コンテナとして機能します。

  2. 子供のランの作成: 各 Ray タスク内で、親のランの下で子供のランを開始します。 それぞれの子供のランは、独自のメトリクスを個別にログに記録できます。

このアプローチを実装するには、各 Ray タスクが必要なクライアント資格情報と親のランの run_idを受け取ることを確認します。 この設定により、ランの間の階層的な親子関係が確立されます。 次のコード スニペットは、資格情報を取得し、親のランの run_idを渡す方法を示しています。

from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

username = "" # Username path
experiment_name = f"/Users/{username}/mlflow_test"

mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
   import os
  # Set the MLflow credentials within the Ray task
   os.environ.update(mlflow_db_creds)
  # Set the active MLflow experiment within each Ray task
   mlflow.set_experiment(experiment_name)
  # Create nested child runs associated with the parent run_id
   with mlflow.start_run(run_id=run_id, nested=True):
    # Log metrics to the child run within the Ray task
       mlflow.log_metric("x", x)

  return x

# Start parent run on the main driver process
with mlflow.start_run() as run:
  # Pass the parent run's run_id to each Ray task
   results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray TrainとMLflow

Ray Trainのモデルを MLflow にログに記録する最も簡単な方法は、トレーニング 実行によって生成されたチェックポイントを使用することです。 トレーニングのランが完了したら、ネイティブのディープラーニング フレームワーク ( PyTorch や TensorFlowなど) にモデルを再読み込みし、対応する MLflow コードでログに記録します。

このアプローチにより、モデルが正しく保存され、評価またはデプロイの準備が整います。

次のコードは、Ray Trainのcheckpoint からモデルを再読み込みし、 MLflowにログを記録します。

result = trainer.fit()

checkpoint = result.checkpoint
with checkpoint.as_directory() as checkpoint_dir:
     # Change as needed for different DL frameworks
    checkpoint_path = f"{checkpoint_dir}/checkpoint.ckpt"
    # Load the model from the checkpoint
    model = MyModel.load_from_checkpoint(checkpoint_path)

with mlflow.start_run() as run:
    # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

一般的には、オブジェクトをドライバーノードに戻すのがベストプラクティスですが、Ray Trainを使用すると、ワーカープロセスからのトレーニング履歴全体を保存するよりも最終結果を保存する方が簡単です。

トレーニング実行から複数のモデルを保存するには、 ray.train.CheckpointConfigに保持するチェックポイントの数を指定します。 その後、1 つのモデルを格納するのと同じ方法でモデルを読み取ってログに記録できます。

注:

MLflow は、モデル トレーニング中のフォールト トレランスの処理ではなく、モデルのライフサイクルの追跡を担当します。 フォールトトレランスは、代わりに Ray Train 自体によって管理されます。

Ray Trainで指定したトレーニング メトリクスを格納するには、result オブジェクトから取得して MLflowを使用して格納します。

result = trainer.fit()

with mlflow.start_run() as run:
    mlflow.log_metrics(result.metrics_dataframe.to_dict(orient='dict'))

  # Change the MLflow flavor as needed
    mlflow.pytorch.log_model(model, "model")

Spark クラスターと Ray クラスターを適切に構成し、リソース割り当ての問題を回避するには、 resources_per_worker 設定を調整する必要があります。 具体的には、各 Ray ワーカーの CPU の数を、Ray ワーカー ノードで使用可能な CPU の合計数より 1 つ少なく設定します。 トレーナーが使用可能なすべてのコアを Ray アクター用に予約すると、リソース競合エラーにつながる可能性があるため、この調整は非常に重要です。

Ray Tune と MLflow

Ray Tune を MLflow と統合すると、 Databricks内でハイパーパラメーターチューニング エクスペリメントを効率的に追跡および記録できます。 この統合は、 MLflowのエクスペリメント追跡機能を活用して、Ray タスクから直接メトリクスと結果を記録します。

ログ記録のための子供のランのアプローチ

Ray Core タスクからのログ記録と同様に、Ray Tune アプリケーションでは、子供のランのアプローチを使用して、各トライアルまたはチューニング イテレーションのメトリクスをログに記録できます。 次のステップを使用して、子供のランのアプローチを実装します。

  1. 親のランを作成する: ドライバー プロセスで親のランを初期化します。 このランは、後続のすべての子供のランのメイン コンテナとして機能します。

  2. 子供のランの記録: 各 Ray Tune タスクは、エクスペリメント結果の明確な階層を維持しながら、親のランの下に子供のランを作成します。

次の例は、MLflow を使用して Ray Tune タスクから認証およびログを記録する方法を示しています。

import os
import tempfile
import time

import mlflow
from mlflow.utils.databricks_utils import get_databricks_env_vars

from ray import train, tune
from ray.air.integrations.mlflow import MLflowLoggerCallback, setup_mlflow


mlflow_db_creds = get_databricks_env_vars("databricks")

EXPERIMENT_NAME = "/Users/<WORKSPACE_USERNAME>/setup_mlflow_example"
mlflow.set_experiment(EXPERIMENT_NAME)


def evaluation_fn(step, width, height):
   return (0.1 + width * step / 100) ** (-1) + height * 0.1


def train_function_mlflow(config, run_id):
   os.environ.update(mlflow_db_creds)
   mlflow.set_experiment(EXPERIMENT_NAME)

   # Hyperparameters
   width = config["width"]
   height = config["height"]

   with mlflow.start_run(run_id=run_id, nested=True):
       for step in range(config.get("steps", 100)):
           # Iterative training function - can be any arbitrary training procedure
           intermediate_score = evaluation_fn(step, width, height)
           # Log the metrics to MLflow
           mlflow.log_metrics({"iterations": step, "mean_loss": intermediate_score})
           # Feed the score back to Tune.
           train.report({"iterations": step, "mean_loss": intermediate_score})
           time.sleep(0.1)


def tune_with_setup(run_id, finish_fast=True):
   os.environ.update(mlflow_db_creds)
   # Set the experiment or create a new one if it does not exist.
   mlflow.set_experiment(experiment_name=EXPERIMENT_NAME)

   tuner = tune.Tuner(
       tune.with_parameter(train_function_mlflow, run_id),
       tune_config=tune.TuneConfig(num_samples=5),
       run_config=train.RunConfig(
           name="mlflow",
       ),
       param_space={
           "width": tune.randint(10, 100),
           "height": tune.randint(0, 100),
           "steps": 20 if finish_fast else 100,
       },
   )
   results = tuner.fit()


with mlflow.start_run() as run:
   mlflow_tracking_uri = mlflow.get_tracking_uri()
   tune_with_setup(run.info.run_id)

モデルサービング

Databricks クラスターで Ray Serve を使用してリアルタイム推論を行うと、外部アプリケーションと対話する際のネットワーク セキュリティと接続の制限により、問題が発生します。

Databricksでは モデルサービング を使用して、本番運用の機械学習モデルを REST API エンドポイントにデプロイすることをお勧めします。 詳細については、「 カスタム モデルのデプロイ」を参照してください。