Pular para o conteúdo principal

Funções definidas pelo usuário no Databricks Connect for Scala

nota

Este artigo aborda Databricks Connect para Databricks Runtime 14.1 e acima.

Este artigo descreve como executar funções definidas pelo usuário com Databricks Connect para Scala. Databricks Connect permite que o senhor conecte os populares IDEs, servidores de notebook e aplicativos personalizados ao clustering Databricks. Para obter a versão Python deste artigo, consulte Funções definidas pelo usuário em Databricks Connect para Python.

nota

Antes de começar a usar o Databricks Connect, o senhor deve configurar o cliente Databricks Connect.

Para Databricks Runtime 14.1 e acima, Databricks Connect para Scala suporta a execução de funções definidas pelo usuário (UDFs).

Para executar um UDF, a classe compilada e os JARs que o UDF requer devem ser carregados no clustering. O addCompiledArtifacts() API pode ser usado para especificar a classe compilada e os arquivos JAR que devem ser carregados.

nota

O Scala usado pelo cliente deve corresponder à versão Scala no clustering Databricks. Para verificar a versão Scala do clustering, consulte a seção "Ambiente do sistema" para obter a versão Databricks Runtime do clustering em Databricks Runtime notas sobre versões e compatibilidade.

O programa Scala a seguir configura uma UDF simples que eleva os valores em uma coluna ao quadrado.

Scala
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()

def squared(x: Int): Int = x * x

val squared_udf = udf(squared _)

spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}

No exemplo anterior, como o UDF está totalmente contido em Main, apenas o artefato compilado de Main é adicionado. Se o site UDF se espalhar por outras classes ou usar uma biblioteca externa (ou seja, JARs), todas essas bibliotecas também deverão ser incluídas.

Quando a sessão Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando o spark.addArtifact() API.

nota

Ao fazer upload de JARs, todos os JARs de dependência transitiva devem ser incluídos para upload. As APIs não realizam nenhuma detecção automática de dependências transitivas.

Conjunto de dados digitados APIs

O mesmo mecanismo descrito na seção anterior para UDFs também se aplica ao conjunto de dados digitado APIs.

Conjunto de dados digitado APIs permite a execução de transformações como mapa, filtro e agregações no conjunto de dados resultante. Eles também são executados de forma semelhante aos UDFs no clustering Databricks.

O aplicativo Scala a seguir usa o map() API para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.

Scala
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()

spark.range(3).map(f => s"row-$f").show()
}
}

Embora este exemplo use map() API, isso também se aplica a outros conjuntos de dados digitados APIs como filter(), mapPartitions(), etc.