Funções definidas pelo usuário no Databricks Connect para Python
Este artigo aborda Databricks Connect para Databricks Runtime 13.1 e acima.
O Databricks Connect for Python oferece suporte a funções definidas pelo usuário (UDF). Quando um DataFrame operações que inclui UDFs é executado, os UDFs são serializados pelo Databricks Connect e enviados ao servidor como parte da solicitação.
Para obter informações sobre UDFs para Databricks Connect para Scala, consulte Funções definidas pelo usuário em Databricks Connect para Scala.
Como a função definida pelo usuário é serializada e desserializada, a versão Python do cliente deve corresponder à versão Python no site Databricks compute. Para ver as versões compatíveis, consulte a matriz de suporte de versões.
Definir um UDF
Para criar uma UDF no Databricks Connect for Python, use uma das seguintes funções compatíveis:
-
Funções definidas pelo usuário do PySpark
-
PySpark funções de transmissão
Por exemplo, o Python a seguir configura um UDF simples que eleva os valores em uma coluna ao quadrado.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
UDFs com dependências
Visualização
Esse recurso está em Public Preview e requer o site Databricks Connect para Python 16.4 ou acima, e um clustering executando Databricks Runtime 16.4 ou acima. Para usar esse recurso, ative a visualização de UDFs aprimorados em Python em Unity Catalog em seu workspace.
O Databricks Connect suporta a especificação de dependências Python necessárias para UDFs. Essas dependências são instaladas em Databricks compute como parte do ambiente de UDF's Python.
Esse recurso permite que os usuários especifiquem as dependências de que o UDF precisa, além do pacote fornecido no ambiente básico. Ele também pode ser usado para instalar uma versão diferente do pacote em relação ao que é fornecido no ambiente básico.
As dependências podem ser instaladas a partir das seguintes fontes:
-
PyPI pacote
- PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo,
dice
,pyjokes<1
ousimplejson==3.19.*
.
- PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo,
-
Arquivos armazenados nos volumes do Unity Catalog
- Tanto o pacote wheel (
.whl
) quanto os arquivos tar gzipados (.tar.gz
) são suportados. O usuário deve receber a permissãoREAD_FILE
no arquivo no volume re: [UC]. - Ao instalar o pacote a partir de volumes Unity Catalog, para invocar os UDFs, os usuários precisam da permissão
READ VOLUME
no volume de origem. Conceder essa permissão a todos os usuários do site account automaticamente a habilita para novos usuários. - Os arquivos de volumes do Unity Catalog devem ser especificados como
dbfs:<path>
, por exemplo,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl
oudbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz
.
- Tanto o pacote wheel (
Para incluir dependências personalizadas em seu UDF, especifique-as em um ambiente usando withDependencies
e, em seguida, use esse ambiente para criar uma sessão do Spark. As dependências são instaladas em seu Databricks compute e estarão disponíveis em todos os UDFs que usam essa sessão Spark.
O código a seguir declara o pacote PyPI dice
como uma dependência:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Ou, para especificar a dependência de uma roda em um volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportamento em Databricks Notebook e Job
No Notebook e no Job, as dependências do UDF precisam ser instaladas diretamente no REPL. O Databricks Connect valida o ambiente REPL Python verificando se todas as dependências especificadas já estão instaladas e lança uma exceção se alguma não estiver instalada.
Notebook A validação do ambiente ocorre para as dependências de volume PyPI e Unity Catalog. As dependências de volume precisam ser empacotadas de acordo com as especificações de empacotamento padrão do Python do PEP-427 ou posterior para arquivos de roda e do PEP-241 ou posterior para arquivos de distribuição de código-fonte. Para obter mais informações sobre os padrões de embalagem do site Python, consulte a documentação do PyPA.
Limitações
- Arquivos como o Python wheel ou a distribuição do código-fonte em sua máquina de desenvolvimento local não podem ser especificados diretamente como uma dependência. Primeiro, eles devem ser carregados em Unity Catalog volumes.
- UDF não são compatíveis com Pandas UDFs de agregação sobre funções de janela.
Exemplos
O exemplo a seguir define as dependências do PyPI e dos volumes em um ambiente, cria uma sessão com esse ambiente e, em seguida, define e chama UDFs que usam essas dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
O exemplo a seguir define uma dependência em um ambiente, cria uma sessão com esse ambiente e, em seguida, define e chama uma função que usa essa dependência em uma transmissão de lotes:
import time
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("simplejson==3.19.3")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
def feb_func(batch_df, batch_id):
from simplejson import loads, dumps
assert loads(dumps(batch_id)) == batch_id
batch_df.collect()
df = spark.readStream.format("rate").load()
q = df.writeStream.foreachBatch(feb_func).start()
time.sleep(3)
q.stop()
Ambiente básico do Python
Os UDFs são executados no site Databricks compute e não no cliente. O ambiente básico Python no qual os UDFs são executados depende do Databricks compute escolhido.
O ambiente base Python é o ambiente Python da versão Databricks Runtime em execução no clustering. A versão Python e a lista de pacotes nesse ambiente básico podem ser encontradas nas seções System environment e Installed Python biblioteca do site Databricks Runtime notas sobre a versão.