Funções definidas pelo usuário no Databricks Connect para Python
Este artigo aborda Databricks Connect para Databricks Runtime 13.3 e acima.
O Databricks Connect for Python oferece suporte a funções definidas pelo usuário (UDF). Quando um DataFrame operações que inclui UDFs é executado, os UDFs são serializados pelo Databricks Connect e enviados ao servidor como parte da solicitação.
Para obter informações sobre UDFs para Databricks Connect para Scala, consulte Funções definidas pelo usuário em Databricks Connect para Scala.
Como a função definida pelo usuário é serializada e desserializada, a versão Python do cliente deve corresponder à versão Python no site Databricks compute. Para ver as versões compatíveis, consulte a matriz de suporte de versões.
Definir um UDF
Para criar uma UDF no Databricks Connect for Python, use uma das seguintes funções compatíveis:
-
Funções definidas pelo usuário do PySpark
-
PySpark funções de transmissão
Por exemplo, o Python a seguir configura um UDF simples que eleva os valores em uma coluna ao quadrado.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Gerenciar dependências UDF
Visualização
Esse recurso está em Public Preview e requer o site Databricks Connect para Python 16.4 ou acima, e um clustering executando Databricks Runtime 16.4 ou acima. Para usar esse recurso, ative a visualização de UDFs aprimorados em Python em Unity Catalog em seu workspace.
O Databricks Connect suporta a especificação de dependências Python necessárias para UDFs. Essas dependências são instaladas em Databricks compute como parte do ambiente de UDF's Python.
Esse recurso permite que os usuários especifiquem as dependências de que o UDF precisa, além do pacote fornecido no ambiente básico. Ele também pode ser usado para instalar uma versão diferente do pacote em relação ao que é fornecido no ambiente básico.
As dependências podem ser instaladas a partir das seguintes fontes:
-
PyPI pacote
- PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo,
dice,pyjokes<1ousimplejson==3.19.*.
- PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo,
-
pacote armazenado em volumes Unity Catalog
- São suportadas tanto distribuições construídas (
.whl) quanto distribuições de origem (.tar.gz). - O pacote de volumes Unity Catalog pode ser especificado como
dbfs:<path>, por exemplo,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whloudbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - O usuário deve ter permissão
READ_FILEconcedida no arquivo no volume re:[UC]. Conceder essa permissão a todos os usuários account a habilita automaticamente para novos usuários.
- São suportadas tanto distribuições construídas (
-
Pacotes locais, pastas e arquivos Python .
- Distribuições locais construídas (
.whl), distribuições de origem (.tar.gz), pastas e arquivos Python podem ser especificados comolocal:<path>, por exemplo:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folder,local:/path/to/my_file.py. - São suportados caminhos absolutos e relativos, por exemplo:
local:/path/to/my_file.pyoulocal:./path/to/my_file.py.
- Distribuições locais construídas (
Para incluir dependências personalizadas em seu UDF, especifique-as em um ambiente usando withDependencies e, em seguida, use esse ambiente para criar uma sessão do Spark. As dependências são instaladas em seu Databricks compute e estarão disponíveis em todos os UDFs que usam essa sessão Spark.
O código a seguir declara o pacote PyPI dice como uma dependência:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Ou, para especificar a dependência de uma roda em um volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportamento em Databricks Notebook e Job
Nos ambientes Notebook e Job, as dependências UDF precisam ser instaladas diretamente no REPL. O Databricks Connect valida o ambiente Python do REPL verificando se todas as dependências especificadas já estão instaladas e lança uma exceção caso alguma não esteja. Validação da execução do ambiente Notebook para dependências de volume PyPI e Unity Catalog , mas não para dependências locais.
Limitações
- O suporte para dependências UDF para
pyspark.sql.streaming.DataStreamWriter.foreachrequer Databricks Connect para Python 18.0 ou superior e um cluster executando Databricks Runtime 18.0 ou superior. - O suporte para dependências UDF para
pyspark.sql.streaming.DataStreamWriter.foreachBatchrequer Databricks Connect para Python 18.0 ou superior e um cluster executando Databricks Runtime 18.0 ou superior. O recurso não é suportado em serverless. - O suporte a dependências UDF para pacotes, pastas e arquivos Python locais requer Databricks Connect para Python 18.1 ou superior e um cluster executando Databricks Runtime 18.1 ou superior.
- UDF não são compatíveis com Pandas UDFs de agregação sobre funções de janela.
- Os pacotes de volumes Unity Catalog e os pacotes locais devem seguir as especificações padrão de empacotamento Python , a partir da PEP-427 ou posterior para distribuições construídas a partir de wheel e da PEP-241 ou posterior para distribuições de código-fonte tar. Para obter mais informações sobre os padrões de empacotamento Python , consulte a documentação do PyPA.
Exemplos
O exemplo a seguir define as dependências do PyPI e dos volumes em um ambiente, cria uma sessão com esse ambiente e, em seguida, define e chama UDFs que usam essas dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Gerenciamento automático de dependências UDF
Visualização
Este recurso está em Pré-visualização Pública e requer Databricks Connect para Python 18.1 ou superior, Python 3.12 em sua máquina local e um cluster executando Databricks Runtime 18.1 ou superior. Para usar este recurso, habilite a pré-visualização "Enhanced Python UDFs" no Unity Catalog em seu workspace.
A API Databricks Connect withAutoDependencies() permite a descoberta e upload automáticos de módulos locais e dependências públicas PyPI usadas nas instruções de importação em suas UDFs. Isso elimina a necessidade de especificar dependências manualmente.
O código a seguir habilita o gerenciamento automático de dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
O método withAutoDependencies() aceita os seguintes parâmetros:
upload_local: Quando definido comoTrue, os módulos locais importados em suas UDFs são descobertos, empacotados e enviados automaticamente para o sandbox UDF .use_index: Quando definido comoTrue, as dependências públicas PyPI usadas em suas UDFs são descobertas e instaladas automaticamente no Databricks compute. O processo de descoberta utiliza o pacote instalado em sua máquina local para determinar as versões, garantindo a consistência entre seu ambiente local e o ambiente de execução remoto.
Limitações
- Importações dinâmicas (por exemplo,
importlib.import_module("foo")) não são suportadas. - Pacotes de namespace (por exemplo,
azure.eventhubegoogle.cloud.aiplatform) não são suportados. - Dependências instaladas usando referências de URL diretas não são suportadas. Isso inclui aqueles instalados a partir de arquivos wheel locais.
- Dependências instaladas a partir de índices de pacotes privados não são suportadas. O pacote instalado desta forma não pode ser distinguido do pacote instalado a partir do PyPI público.
- A descoberta de dependências não funciona em um shell Python. Somente scripts Python , shell IPython e Jupyter Notebook são suportados.
Exemplos
O exemplo a seguir demonstra o gerenciamento automático de dependências com módulos locais e pacotes PyPI . Este exemplo requer que você tenha instalado simplejson e dice (usando pip install simplejson dice).
Primeiro, crie módulos auxiliares locais:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Em seguida, no seu script principal, importe esses módulos e use-os em UDFs:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Registro
Para exibir as dependências descobertas, defina a variável de ambiente SPARK_CONNECT_LOG_LEVEL para info ou debug. Alternativamente, configure o framework de registro de logs do Python:
import logging
logging.basicConfig(level=logging.INFO)
Os logs relevantes são emitidos pelo módulo databricks.connect.auto_dependencies , por exemplo:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Ambiente básico do Python
Os UDFs são executados no site Databricks compute e não no cliente. O ambiente básico Python no qual os UDFs são executados depende do Databricks compute.
Para clustering, o ambiente Python base é o ambiente Python da versão Databricks Runtime em execução no clustering. A versão Python e a lista de pacotes nesse ambiente básico podem ser encontradas nas seções System environment e Installed Python biblioteca do site Databricks Runtime notas sobre a versão.
Para compute serverless , o ambiente Python base corresponde à versão do ambienteserverless de acordo com a tabela a seguir. As versões Databricks Connect não listadas nesta tabela ou ainda não oferecem suporte a serverless ou já atingiram o fim do suporte. Consulte a matriz de suporte de versões e as versões do Databricks Connect que deixaram de ter suporte.