Use Hugging Face Transformers for natural language processing (NLP)

This article shows you how to use Hugging Face Transformers for natural language processing (NLP) model training and inference. Example notebooks and code demonstrate:

  • Text classification with MLflow logging

  • Fine-tuning a text classification model on a single GPU

Why use Hugging Face Transformers?

Hugging Face Transformers pipelines encode best practices and have default models selected for the different tasks, making it easy to get started. Pipelines make it easy to use GPUs when available and allow batching of items sent to the GPU for better throughput.

For many applications, such as sentiment analysis and text summarization, pretrained models work well without any additional model training. Hugging Face has the following:

  • A model hub containing many pretrained models.

  • The 🤗 Transformers library that supports the download and use of these models for NLP applications and fine-tuning. It is common to need both a tokenizer and a model for natural language processing tasks.

  • 🤗 Transformers pipelines that have a simple interface for most natural language processing tasks.

Batch inference: Hugging Face Transformers on CPUs or GPUs

You can use Hugging Face Transformers models on Spark to scale out your NLP batch applications. The following sections describe best practices for using Hugging Face Transformers pipelines:

  • Using Pandas UDFs to distribute the model for computation on a cluster.

  • Understanding and tuning performance.

  • Saving models using MLflow for improved governance or deployment to Model serving on Databricks.

Cluster requirements

Any cluster with the Hugging Face transformers library installed can be used for batch inference. The transformers library comes preinstalled on Databricks Runtime 10.4 LTS ML and above. Many of the popular NLP models work best on GPU hardware, so you may get the best performance using recent GPU hardware unless you use a model specifically optimized for use on CPUs.

Notebook: Hugging Face Transformers inference and MLflow logging

To get started quickly with example code, this notebook is an end-to-end example for text summarization by using Hugging Face Transformers pipelines inference and MLflow logging. The subsequent sections of this article go into more detail around using Hugging Face transformers on Databricks.

Hugging Face Transformers pipelines inference notebook

Open notebook in new tab

Use Pandas UDFs to distribute model computation

When experimenting with pretrained models you can use Pandas UDFs to wrap the model and perform computation on worker CPUs or GPUs. Pandas UDFs distribute the model to each worker. For example, you can create a 🤗 Transformers pipeline for machine translation as follows:

from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)

Setting the device in this manner ensures that GPUs are used if they are available on the cluster. While this example is for machine translation, 🤗 Transformers pipelines support a wide range of NLP tasks that you can easily use on Databricks.

Use a Pandas UDF to run the pipeline on a Databricks cluster

Use a Pandas UDF to run the pipeline on the workers of a Spark cluster:

import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
  translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
  return pd.Series(translations)

The Hugging Face pipelines for translation return a list of Python dict objects, each with a single key translation_text and a value containing the translated text. This UDF extracts the translation from the results to return a Pandas series with just the translated text. If your pipeline was constructed to use GPUs by setting device=0, then Spark automatically reassigns GPUs on the worker nodes if your cluster has instances with multiple GPUs.

To use the UDF to translate a text column, you can call the UDF in a select statement:

texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["text"]))
display(df.select(df.text, translation_udf(df.text).alias('translation')))

Return complex result types

Using Pandas UDFs you can also return more structured output. For example, in named-entity recognition, pipelines return a list of dict objects containing the entity, its span, type, and an associated score. While similar to the example for translation, the return type for the @pandas_udf annotation is more complex in the case of named-entity recognition. You can get a sense of the return types to use through inspection of pipeline results, for example by running the pipeline on the driver. In this example, use the following code:

from transformers import pipeline
import torch

device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)

ner_pipeline(texts)

To yield the annotations:

[[{'entity_group': 'ORG',
   'score': 0.99933606,
   'word': 'Hugging Face',
   'start': 0,
   'end': 12},
  {'entity_group': 'LOC',
   'score': 0.99967843,
   'word': 'New York City',
   'start': 42,
   'end': 55}],
 [{'entity_group': 'ORG',
   'score': 0.9996372,
   'word': 'Databricks',
   'start': 0,
   'end': 10},
  {'entity_group': 'LOC',
   'score': 0.999588,
   'word': 'San Francisco',
   'start': 23,
   'end': 36}]]

To represent this as a return type, you can use an array of struct fields, listing the dict entries as the fields of the struct:

import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
  return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))

display(df.select(df.texts, ner_udf(df.texts).alias('entities')))

Tune performance

There are several key aspects to tuning performance of the UDF. The first is to use each GPU effectively, which you can adjust by changing the size of batches sent to the GPU by the Transformers pipeline. The second is to make sure the DataFrame is well-partitioned to utilize the entire cluster. Finally, you may wish to cache the Hugging Face model to save model load time or ingress costs.

Choose a batch size

While the UDFs described above should work out-of-the box with a batch_size of 1, this may not use the resources available to the workers efficiently. To improve performance, tune the batch size to the model and hardware in the cluster. Databricks recommends trying various batch sizes for the pipeline on your cluster to find the best performance. Read more about pipeline batching and other performance options in Hugging Face documentation.

Try finding a batch size that is large enough so that it drives the full GPU utilization but does not result in CUDA out of memory errors. When you receive CUDA out of memory errors during tuning, you need to detach and reattach the notebook to release the memory used by the model and data in the GPU.

Monitor GPU performance by viewing the live ganglia metrics for a cluster, and choosing a metric, such as gpu0-util for GPU processor utilization or gpu0_mem_util for GPU memory utilization.

Repartition data to use all available hardware

The second consideration for performance is making full use of the hardware in your cluster. Generally a small multiple of the number of GPUs on your workers (for GPU clusters) or number of cores across the workers in your cluster (for CPU clusters) works well. Your input DataFrame may already have enough partitions to take advantage of the cluster’s parallelism. To see how manypartitions the DataFrame contains, use df.rdd.getNumPartitions(). You can repartition a DataFrame using repartitioned_df = df.repartition(desired_partition_count)

Cache the model in DBFS or on mount points

If you are frequently loading a model from different or restarted clusters, you may also wish to cache the Hugging Face model in the DBFS root volume or on a mount point. This can decrease ingress costs and reduce the time to load the model on a new or restarted cluster. To do this, set the TRANSFORMERS_CACHE environment variable in your code before loading the pipeline. For example:

import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'

Alternatively, you can achieve similar results by logging the model to MLflow, as described below.

Log to MLflow

You can log 🤗 Transformers pipelines models to MLflow using a custom pyfunc model. These models can be used for batch or real-time inference. First, create a custom MLflow model that wraps the pipeline loading and inference. For example, with the machine translation example, the class could be written as follows:

import mlflow
from transformers import pipeline
import torch
import pandas as pd

class TranslationPipelineModel(mlflow.pyfunc.PythonModel):
  def load_context(self, context):
    device = 0 if torch.cuda.is_available() else -1
    self.pipeline = pipeline("translation_en_to_fr", context.artifacts["pipeline"], device=device)

  def predict(self, context, model_input):
    texts = model_input.iloc[:,0].to_list() # get the first column
    translations = [result['translation_text'] for result in self.pipeline(texts.to_list(), batch_size=1)]
    return pd.Series(translations)

The load_context function loads the pipeline from the model’s artifacts. When logging the model using the custom PyFunc model, first write out the pipeline to local disk, then invoke mlflow.pyfunc.log_model:

translation_pipeline.save_pretrained("./pipeline")
with mlflow.start_run() as run:
  mlflow.pyfunc.log_model(artifacts={'pipeline': "./pipeline"}, artifact_path="translation_model", python_model=TranslationPipelineModel())

If the MLflow model returns a series of scalars, a series of lists of scalars, or a series of structs of scalars, you can use the model on Spark by creating a UDF using mlflow.pyfunc.spark_udf:

logged_model_uri = f"runs:/{run.info.run_id}/translation_model"
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model_uri, result_type='string')
display(df.select(df.texts, loaded_model(df.texts).alias("translated_text")))

When creating the UDF, you must set the result_type to match the results that the UDF returns.

Fine-tune Hugging Face models for a single GPU

The Hugging Face transformers library provides the Trainer utility and Auto Model classes that enable loading and fine-tuning Transformers models.

These tools work well with little modification for:

  • Loading models to fine-tune.

  • constructing the configuration for the Hugging Face Transformers Trainer utility.

  • Performing training on a single GPU.

This article has Databricks-specific recommendations for loading data from the Lakehouse and logging models to MLflow, which enables you to use and govern your models on Databricks.

GPU cluster requirements

For single GPU fine-tuning, Databricks recommends a single-node cluster with one GPU on the driver. This documentation works with the GPU version of Databricks Runtime 9.1 ML and above. In addition to 🤗 Transformers, these examples for fine-tuning also require 🤗 Datasets and 🤗 Evaluate. Install the required libraries using %pip install --upgrade 'transformers>=4.20.1' datasets evaluate.

Notebook: Fine-tune text classification on a single GPU

To get started quickly with example code, this example notebook provides an end-to-end example for fine-tuning a model for text classification. The subsequent sections of this article go into more detail around using Hugging Face for fine-tuning on Databricks.

Fine-tuning Hugging Face text classification models notebook

Open notebook in new tab

Load public datasets from Hugging Face

The Hugging Face datasets library has utilities for reading datasets from the Hugging Face Hub. There are many datasets downloadable and readable from the Hugging Face Hub by using the load_dataset function. For example:

from datasets import load_dataset
dataset = load_dataset("imdb")

Create datasets from your Databricks Lakehouse

To use your own data for model fine-tuning, you must first format your training and evaluation data into Spark DataFrames. Then, convert the DataFrames into a format that the Hugging Face datasets library recognizes, typically Parquet.

Format your training and evaluation data

Start by formatting your training data into a table meeting the expectations of the trainer. For text classification, this is a table with two columns: a text column and a column of labels.

To perform fine-tuning, you need to provide a model. Hugging Face transformer Auto Classes makes it easy to load models or configuration settings, including a wide range of Auto Models for Natural Language Processing.

For example, Hugging Face transformers provides AutoModelForSequenceClassification as a model loader for text classification, which expects integer IDs as the category labels. However, you must also specify mappings between the integer labels and string labels when creating the model. If you have a DataFrame with string labels, you can collect this information as follows:

labels = df.select(df.label).groupBy(df.label).count().collect()
id2label = {index: row.label for (index, row) in enumerate(labels)}
label2id = {row.label: index for (index, row) in enumerate(labels)}

Then create the integer IDs as a label column with a Pandas UDF:

from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf('integer')
def replace_labels_with_ids(labels: pd.Series) -> pd.Series:
  return labels.apply(lambda x: label2id[x])

df_id_labels = df.select(replace_labels_with_ids(df.label).alias('label'), df.text)

Load a Hugging Face dataset from a Spark DataFrame

Hugging Face datasets do not directly support Spark DataFrames, so you must convert the DataFrame to a format datasets recognizes.

This article describes model training on the driver, so you must also make the data available to the driver. One way to do so is to write the Spark DataFrames out to Parquet files on the DBFS root volume or mount points. For example, if you have train and test DataFrames you can write them to the DBFS root volume using:

train_dbfs_path = f"{tutorial_path}/train"
test_dbfs_path = f"{tutorial_path}/test"
train_df = train.write.parquet(train_dbfs_path, mode="overwrite")
test_df = test.write.parquet(test_dbfs_path, mode="overwrite")

These Parquet files are now available to the filesystem on the driver using the mounted /dbfs path. You can specify paths to the Parquet files using the 🤗 Datasets utilities to create a training and evaluation dataset.

from datasets import load_dataset
train_test_dataset = load_dataset("parquet", data_files={"train":f"/dbfs{train_dbfs_path}/*.parquet", "test":f"/dbfs{test_dbfs_path}/*.parquet"})

The DBFS root volume is accessible to all users of the workspace and should only be used for data where it is acceptable for all users to access. Using mount points on clusters that have access restricted access can mitigate this concern. For datasets that can fit in memory, you can avoid using the DBFS root volume or mount points by collecting the DataFrame as a Pandas DataFrame on the driver.

train_file = "train.parquet"
test_file = "test.parquet"

train.toPandas().to_parquet(train_file)
test.toPandas().to_parquet(test_file)

from datasets import load_dataset
train_test_dataset = load_dataset("parquet", data_files={"train": train_file, "test": test_file})

Tokenize a Hugging Face dataset

Hugging Face Transformers models expect tokenized input, rather than the text in the downloaded data. To ensure compatibility with the base model, use an AutoTokenizer loaded from the base model. Hugging Face datasets allows you to directly apply the tokenizer consistently to both the training and testing data. For example:

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(base_model)
def tokenize_function(examples):
    return tokenizer(examples["text"], padding=False, truncation=True)

train_test_tokenized = train_test_dataset.map(tokenize_function, batched=True)

Set up the training configuration

Hugging Face training configuration tools can be used to configure a Trainer. The Trainer classes require the user to provide:

  • Metrics

  • A base model

  • A training configuration

By default, the Trainer computes and uses loss as a metric, which can be hard to interpret. Below is an example of creating a metrics function that additionally computes accuracy during model training.

import numpy as np
import evaluate
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

Use the Auto Model classes for NLP to load the appropriate model for your task.

For text classification, use AutoModelForSequenceClassification to load a base model for text classification. When creating the model, provide the number of classes and the label mappings created during dataset preparation.

from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained(base_model, num_labels=len(label2id), label2id=label2id, id2label=id2label)

Next, create the training configuration. The TrainingArguments class allows specification of the output directory, evaluation strategy, learning rate, and other parameters.

from transformers import TrainingArguments, Trainer
training_args = TrainingArguments(output_dir=training_output_dir, evaluation_strategy="epoch")

Using a data collator batches input in training and evaluation datasets. DataCollatorWithPadding gives good baseline performance for text classification.

from transformers import DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer)

With all of these parameters constructed, you can now create a Trainer.

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_test_dataset["train"],
    eval_dataset=train_test_dataset["test"],
    compute_metrics=compute_metrics,
    data_collator=data_collator,
)

Train and log to MLflow

Hugging Face interfaces nicely with MLflow, automatically logging metrics during model training using the MLflowCallback. However, you must log the trained model yourself. Similar to the example for logging pretrained models for inference, Databricks recommends wrapping the trained model in a Transformers pipeline and using MLflow’s pyfunc log_model capabilities. To do so, you need a custom model class. For example, for a text classification model you may write:

import mlflow
import torch

pipeline_artifact_name = "pipeline"
class TextClassificationPipelineModel(mlflow.pyfunc.PythonModel):

  def load_context(self, context):
    device = 0 if torch.cuda.is_available() else -1
    self.pipeline = pipeline("text-classification", context.artifacts[pipeline_artifact_name], device=device)

  def predict(self, context, model_input):
    texts = model_input[model_input.columns[0]].to_list()
    pipe = self.pipeline(texts, truncation=True, batch_size=1)
    labels = [prediction['label'] for prediction in pipe]
    return pd.Series(labels)

Wrap training in an MLflow run, constructing a Transformers pipeline from tokenizer and the trained model, writing it to local disk. Finally, log the model to MLflow.

from transformers import pipeline

with mlflow.start_run() as run:
  trainer.train()
  trainer.save_model(model_output_dir)
  pipe = pipeline("text-classification", model=AutoModelForSequenceClassification.from_pretrained(model_output_dir), batch_size=1, tokenizer=tokenizer)
  pipe.save_pretrained(pipeline_output_dir)
  mlflow.pyfunc.log_model(artifacts={pipeline_artifact_name: pipeline_output_dir}, artifact_path=model_artifact_path, python_model=TextClassificationPipelineModel())
## Load the model for inference

When your model is logged and ready, loading the model for inference is the same as loading the MLflow wrapped pretrained model.

```python
logged_model = "runs:/{run_id}/{model_artifact_path}".format(run_id=run.info.run_id, model_artifact_path=model_artifact_path)

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='string')

test = test.select(test.text, test.label, loaded_model_udf(test.text).alias("prediction"))
display(test)

See Model serving with Databricks.