メインコンテンツまでスキップ

NLP用Hugging Face Transformersを用いたモデル推論

important
  • このドキュメントは廃止されており、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジはサポートされなくなりました。
  • Databricks では、代わりにバッチ推論に ai_query を使用することをお勧めします。を使用したバッチLLM 推論の実行AI Functions を参照してください。

この記事では、自然言語処理 (NLP) モデルの推論に Hugging Face Transformersを使用する方法について説明します。

Hugging Face Transformersには、事前トレーニング済みのモデルを推論に使用するための pipelines クラスが用意されています。 🤗 Transformers パイプラインは、Databricks で簡単に使用できる 幅広い NLP タスク をサポートしています。

必要条件

  • MLflow の 2.3
  • Hugging Face transformers ライブラリがインストールされているクラスターは、バッチ推論に使用できます。 transformers ライブラリは、 Databricks Runtime 10.4 LTS ML以降にプレインストールされています。 人気のあるNLPモデルの多くはGPUハードウェアで最適に機能するため、CPUでの使用に特別に最適化されたモデルを使用しない限り、最新のGPUハードウェアを使用して最高のパフォーマンスを得ることができます。

Pandas UDF を使用して、Sparkクラスターでモデル計算を分散します

エクスペリメントで事前トレーニング済みモデルを使用する場合は、 Pandas UDF を使用してモデルをラップし、ワーカー CPU または GPU で計算を実行できます。 Pandas UDF は、モデルを各ワーカーに配布します。

また、機械翻訳用の Hugging Face Transformers パイプラインを作成し、 Pandas UDF を使用して Spark クラスターのワーカーでパイプラインを実行することもできます。

Python
import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf

device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)

@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
return pd.Series(translations)

この方法で device を設定すると、GPU が で使用可能な場合は、GPU が使用されるようになります クラスター。

翻訳用の Hugging Face パイプラインは、Python dict オブジェクトのリストを返します。各オブジェクトには、それぞれに 1 つのキー translation_text と翻訳されたテキストを含む値があります。 この UDF は、結果から翻訳を抽出して、翻訳されたテキストのみを含む Pandas シリーズを返します。 device=0を設定して GPU を使用するようにパイプラインが構築されている場合、クラスターに複数の GPU を持つインスタンスがある場合、 Spark はワーカーノードで GPU を自動的に再割り当てします。

UDF を使用してテキスト列を翻訳するには、 select ステートメントで UDF を呼び出すことができます。

Python
texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))

複雑な結果タイプを返す

Pandas UDF を使用すると、より構造化された出力を返すこともできます。 たとえば、名前付きエンティティ認識では、パイプラインはエンティティ、その範囲、種類、および関連するスコアを含む dict オブジェクトの一覧を返します。 翻訳の例と似ていますが、名前付きエンティティ認識の場合、 @pandas_udf 注釈の戻り値の型はより複雑になります。

パイプラインの結果を調べる (たとえば、ドライバーでパイプラインを実行する) ことで、使用する戻り値の型を把握できます。

この例では、次のコードを使用します。

Python
from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)

注釈を生成するには:

Python
[[{'entity_group': 'ORG',
'score': 0.99933606,
'word': 'Hugging Face',
'start': 0,
'end': 12},
{'entity_group': 'LOC',
'score': 0.99967843,
'word': 'New York City',
'start': 42,
'end': 55}],
[{'entity_group': 'ORG',
'score': 0.9996372,
'word': 'Databricks',
'start': 0,
'end': 10},
{'entity_group': 'LOC',
'score': 0.999588,
'word': 'San Francisco',
'start': 23,
'end': 36}]]

これを戻り値の型として表すには、structフィールドのarrayを使用して、dictエントリをstructのフィールドとしてリストできます。

Python
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))

display(df.select(df.texts, ner_udf(df.texts).alias('entities')))

パフォーマンスの調整

UDF のパフォーマンスのチューニングには、いくつかの重要な側面があります。 1 つ目は、各 GPU を効果的に使用する方法で、Transformers パイプラインによって GPU に送信されるバッチのサイズを変更することで調整できます。 2 つ目は、クラスター全体を利用するために、 データフレーム が適切にパーティション分割されていることを確認することです。

最後に、Hugging Face モデルをキャッシュして、モデルの読み込み時間やイングレス コストを節約することもできます。

バッチサイズを選択する

上記の UDF は、 batch_size 1 でそのまま動作するはずですが、ワーカーが利用できるリソースを効率的に使用できない場合があります。 パフォーマンスを向上させるには、クラスター内のモデルとハードウェアに合わせてバッチ サイズを調整します。 Databricks では、クラスター上のパイプラインのさまざまなバッチ サイズを試して、最適なパフォーマンスを見つけることをお勧めします。 パイプラインのバッチ処理とその他のパフォーマンスオプションの詳細については、Hugging Face のドキュメントを参照してください。

GPU の全使用率を駆動し、 CUDA out of memory エラーを引き起こさないように十分な大きさのバッチ サイズを見つけてみてください。 チューニング中に CUDA out of memory エラーが発生した場合は、ノートブックをデタッチして再アタッチし、GPU 内のモデルとデータによって使用されているメモリを解放する必要があります。

GPU パフォーマンスを監視するには、クラスターのライブ クラスター メトリクス を表示し、GPU プロセッサ使用率の gpu0-util や GPU メモリ使用率の gpu0_mem_util などのメトリクスを選択します。

ステージレベルのスケジューリングによる並列処理の調整

デフォルトでは、Spark は各マシンの GPU ごとに 1 つのタスクをスケジュールします。 並列処理を増やすには、ステージ レベルのスケジューリングを使用して、GPU ごとに実行するタスクの数を Spark に指示できます。 たとえば、Spark で GPU ごとに 2 つのタスクを実行する場合は、次のように指定できます。

Python
from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder

task_requests = TaskResourceRequests().resource("gpu", 0.5)

builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build

rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)

データを再パーティション化して、使用可能なすべてのハードウェアを使用

パフォーマンスに関する 2 つ目の考慮事項は、クラスタリングでハードウェアを最大限に活用することです。 一般に、ワーカーの GPU の数 (GPU クラスタリングの場合) またはクラスタリングのワーカー全体のコアの数 (CPU クラスタリングの場合) の小さな倍数が適切です。 入力 DataFrame には、クラスタリングの並列処理を利用するのに十分なパーティションが既にある場合があります。 DataFrame に含まれるパーティションの数を確認するには、 df.rdd.getNumPartitions()を使用します。DataFrame は、 repartitioned_df = df.repartition(desired_partition_count)を使用して再パーティション分割できます。

モデルを DBFS またはマウント ポイントにキャッシュします

異なるクラスターからモデルを頻繁にロードする場合や、再起動したクラスターからモデルをロードする場合は、Hugging Face モデルをDBFS ルートボリューム またはマウントポイント にキャッシュすることもできます。これにより、イングレス コストが削減され、新しいクラスターまたは再起動されたクラスターにモデルを読み込む時間が短縮されます。 これを行うには、パイプラインを読み込む前に、コードで TRANSFORMERS_CACHE 環境変数を設定します。

例えば:

Python
import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'

または、 MLflow transformers フレーバーを使用してモデルを MLflow にログ記録することで、同様の結果を得ることができます。

ノートブック: Hugging Face Transformers 推論と MLflow ログ

サンプル コードをすぐに使い始めるために、このノートブックは、Hugging Face Transformers パイプライン推論と MLflow ログを使用したテキスト要約のエンドツーエンドの例です。

Hugging Face Transformers パイプライン inference ノートブック

Open notebook in new tab

追加のリソース

Hugging Face モデルは、次のガイドで微調整できます。

詳細については、「Hugging Face Transformersとは」をご覧ください。