メインコンテンツまでスキップ

Spark データフレーム を使用したバッチ推論の実行

この記事では、Databricks の登録済みモデルを使用して Spark データフレーム でバッチ推論を実行する方法について説明します。このワークフローは、TensorFlow、PyTorch、scikit-learn など、さまざまな機械学習モデルやディープラーニングモデルに適用されます。これには、データの読み込み、モデルの推論、およびパフォーマンスのチューニングに関するベスト プラクティスが含まれています。

ディープラーニングアプリケーションのモデル推論の場合、Databricks では次のワークフローをお勧めします。TensorFlow と PyTorch を使用するノートブックの例については、 バッチ推論の例をご覧ください。

モデル推論ワークフロー

Databricks では、 Spark データフレームを使用してバッチ推論を実行するために、次のワークフローを推奨しています。

ステップ1:環境設定

クラスタリングがトレーニング環境に一致する互換性のある Databricks ML Runtime バージョンを実行していることを確認します。 MLflow を使用してログに記録されるモデルには、トレーニング環境と推論環境が一致するようにインストールできる要件が含まれています。

Python
requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

ステップ 2: Spark データフレーム にデータを読み込む

データ型に応じて、適切なメソッドを使用して Spark データフレーム にデータを読み込みます。

データ型

メソッド

Unity Catalogの表(推奨)

table = spark.table(input_table_name)

画像ファイル(JPG、PNG)

files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])

TFRecords

df = spark.read.format("tfrecords").load(image_path)

その他の形式 (Parquet、CSV、JSON、JDBC)

Sparkデータソースを使用して読み込みます。

ステップ3:モデルレジストリからモデルをロードする

この例では、推論に Databricks Model Registry のモデルを使用します。

Python
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

ステップ 4: Pandas UDF を使用してモデル推論を実行する

Pandas UDFは、 Apache Arrowを活用して効率的なデータ転送と処理 Pandas を実現します。 Pandas UDF を使用した推論の一般的な手順は次のとおりです。

  1. トレーニング済みモデルを読み込む: MLflow を使用して、推論用の Spark UDF を作成します。
  2. 入力データの前処理: 入力スキーマがモデル要件と一致していることを確認します。
  3. モデル予測の実行: データフレーム でモデルの UDF 関数を使用します。
Python
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (推奨)予測を Unity Catalog に保存します。

次の例では、予測を Unity Catalog に保存します。

Python
df_result.write.mode("overwrite").saveAsTable(output_table)

モデル推論のためのパフォーマンス チューニング

このセクションでは、Databricks でのモデル推論のデバッグとパフォーマンス チューニングのヒントをいくつか紹介します。概要については、「 Spark データフレーム を使用したバッチ推論の実行」を参照してください。

通常、モデル推論には、データ入力パイプラインとモデル推論の 2 つの主要な部分があります。データ入力パイプラインはデータ I/O 入力に重く、モデルの推論は計算に重くなります。ワークフローのボトルネックを特定するのは簡単です。次に、いくつかのアプローチを示します。

  • モデルを自明なモデルに縮小し、1 秒あたりの例を測定します。フルモデルとトリビアルモデルの間のエンドツーエンドの時間の差が最小の場合は、データ入力パイプラインがボトルネックである可能性が高く、そうでない場合はモデルの推論がボトルネックです。
  • GPU を使用してモデル推論を実行している場合は、GPU 使用率 メトリクスを確認します。GPU の使用率が継続的に高くない場合は、データ入力パイプラインがボトルネックになっている可能性があります。

データ入力パイプラインの最適化

GPUを使用すると、モデル推論の実行速度を効率的に最適化できます。GPU やその他のアクセラレータが高速化するにつれて、データ入力パイプラインが需要に対応することが重要です。データ入力パイプラインは、データを Spark データフレームに読み込み、変換して、モデル推論の入力として読み込みます。 データ入力がボトルネックである場合は、I/O スループットを向上させるためのヒントをいくつか紹介します。

  • バッチあたりの最大レコード数を設定します。最大レコード数が多いほど、レコードがメモリに収まる限り、UDF 関数を呼び出すための I/O オーバーヘッドを削減できます。バッチサイズを設定するには、次の設定を設定します。

    Python
    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
  • データをバッチで読み込み、 Pandas UDF内の入力データを前処理するときにプリフェッチします。

    TensorFlow の場合、Databricks では tf.data API の使用をお勧めします。map関数で num_parallel_calls を設定し、プリフェッチとバッチ処理のために prefetchbatch を呼び出すことで、マップを並列に解析できます。

    Python
    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)

    PyTorch の場合、Databricks では DataLoader クラスの使用をお勧めします。バッチ処理の batch_size と並列データの読み込みの num_workers を設定できます。

    Python
    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)

バッチ推論の例

このセクションの例は、推奨されるディープラーニング推論ワークフローに従います。 これらの例は、事前学習済みの深層残差ネットワーク (ResNets) ニューラルネットワーク モデルを使用してモデル推論を実行する方法を示しています。