Pular para o conteúdo principal

Funções definidas pelo usuário no Databricks Connect for Scala

nota

Este artigo aborda Databricks Connect para Databricks Runtime 14.1 e acima.

O Databricks Connect para Scala permite executar funções definidas pelo usuário (UDFs) em clusters Databricks a partir do seu ambiente de desenvolvimento local.

Esta página descreve como executar funções definidas pelo usuário com o Databricks Connect para Scala.

Para a versão Python deste artigo, consulte Funções definidas pelo usuário no Databricks Connect para Python.

Faça o upload da classe compilada e dos arquivos JAR.

Para que as UDFs funcionem, as classes compiladas e os JARs devem ser carregados no cluster usando a API addCompiledArtifacts() .

nota

O Scala usado pelo cliente deve corresponder à versão Scala no clustering Databricks. Para verificar a versão Scala do clustering, consulte a seção "Ambiente do sistema" para obter a versão Databricks Runtime do clustering em Databricks Runtime notas sobre versões e compatibilidade.

O programa Scala a seguir configura uma UDF simples que eleva os valores em uma coluna ao quadrado.

Scala
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()

val squared = udf((x: Long) => x * x)

spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()

}
}

def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI Aponta para o mesmo local que a saída compilada do projeto (por exemplo, target/classes ou o JAR gerado). Todas as classes compiladas são enviadas para Databricks, não apenas Main.

target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class

Quando a sessão Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando o spark.addArtifact() API.

nota

Ao fazer upload de JARs, todos os JARs de dependência transitiva devem ser incluídos para upload. As APIs não realizam nenhuma detecção automática de dependências transitivas.

UDFs com dependências de terceiros

Se você adicionou uma dependência do Maven em build.sbt que é usada em uma UDF, mas não está disponível no cluster Databricks, por exemplo:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
Scala
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})

Use spark.addArtifact() com ivy:// para download dependências do Maven:

  1. Adicione a biblioteca oro ao seu arquivo build.sbt

    libraryDependencies ++= Seq(
    "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
    "oro" % "oro" % "2.0.8" // Required for ivy:// to work
    )
  2. Adicione o artefato após criar a sessão com a API addArtifact() :

    Scala
    def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
    } else {
    val spark = DatabricksSession.builder()
    .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
    .getOrCreate()

    // Convert Maven coordinates to ivy:// format
    // From: "org.apache.commons" % "commons-text" % "1.10.0"
    // To: ivy://org.apache.commons:commons-text:1.10.0
    spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")

    spark
    }
    }

Conjunto de dados digitados APIs

APIs conjuntos de dados tipados permitem executar transformações como map(), filter(), mapPartitions() e agregações no conjunto de dados resultante. O upload da classe compilada e dos JARs para o cluster usando a API addCompiledArtifacts() também se aplica a estes, portanto, seu código deve se comportar de maneira diferente dependendo de onde for executado:

  • Desenvolvimento local com Databricks Connect: faça upload de artefatos para o cluster remoto.
  • Implantado no Databricks em execução no cluster: Não é necessário fazer upload de nada, pois as classes já estão lá.

O aplicativo Scala a seguir usa o map() API para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.

Scala
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()

spark.range(3).map(f => s"row-$f").show()
}
}

Dependências externas de JAR

Se você estiver usando uma biblioteca privada ou de terceiros que não esteja no cluster:

Scala
import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})

Faça o upload de arquivos JAR externos da sua pasta lib/ ao criar a sessão:

Scala
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)


builder.getOrCreate()
}
}

Isso fará o upload automático de todos os arquivos JAR do seu diretório lib/ para Databricks quando executado localmente.

Projetos com múltiplos módulos

Em um projeto SBT com vários módulos, getClass.getProtectionDomain.getCodeSource.getLocation.toURI retorna apenas a localização do módulo atual. Se sua UDF usar classes de outros módulos, você receberá ClassNotFoundException.

my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)

Use getClass de uma classe em cada módulo para obter a localização de todos eles e upload los separadamente:

Scala
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b

def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI

// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI

DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}