Funções definidas pelo usuário no Databricks Connect for Scala
Este artigo aborda Databricks Connect para Databricks Runtime 14.1 e acima.
Este artigo descreve como executar funções definidas pelo usuário com Databricks Connect para Scala. Databricks Connect permite que o senhor conecte os populares IDEs, servidores de notebook e aplicativos personalizados ao clustering Databricks. Para obter a versão Python deste artigo, consulte Funções definidas pelo usuário em Databricks Connect para Python.
Antes de começar a usar o Databricks Connect, o senhor deve configurar o cliente Databricks Connect.
Para Databricks Runtime 14.1 e acima, Databricks Connect para Scala suporta a execução de funções definidas pelo usuário (UDFs).
Para executar um UDF, a classe compilada e os JARs que o UDF requer devem ser carregados no clustering.
O addCompiledArtifacts()
API pode ser usado para especificar a classe compilada e os arquivos JAR que devem ser carregados.
O Scala usado pelo cliente deve corresponder à versão Scala no clustering Databricks. Para verificar a versão Scala do clustering, consulte a seção "Ambiente do sistema" para obter a versão Databricks Runtime do clustering em Databricks Runtime notas sobre versões e compatibilidade.
O programa Scala a seguir configura uma UDF simples que eleva os valores em uma coluna ao quadrado.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
No exemplo anterior, como o UDF está totalmente contido em Main
, apenas o artefato compilado de Main
é adicionado.
Se o site UDF se espalhar por outras classes ou usar uma biblioteca externa (ou seja, JARs), todas essas bibliotecas também deverão ser incluídas.
Quando a sessão Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando o
spark.addArtifact()
API.
Ao fazer upload de JARs, todos os JARs de dependência transitiva devem ser incluídos para upload. As APIs não realizam nenhuma detecção automática de dependências transitivas.
Conjunto de dados digitados APIs
O mesmo mecanismo descrito na seção anterior para UDFs também se aplica ao conjunto de dados digitado APIs.
Conjunto de dados digitado APIs permite a execução de transformações como mapa, filtro e agregações no conjunto de dados resultante. Eles também são executados de forma semelhante aos UDFs no clustering Databricks.
O aplicativo Scala a seguir usa o map()
API para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Embora este exemplo use map()
API, isso também se aplica a outros conjuntos de dados digitados APIs como filter()
, mapPartitions()
, etc.