Funções definidas pelo usuário no Databricks Connect para Python
Este artigo aborda Databricks Connect para Databricks Runtime 13.3 e acima.
O Databricks Connect for Python oferece suporte a funções definidas pelo usuário (UDF). Quando um DataFrame operações que inclui UDFs é executado, os UDFs são serializados pelo Databricks Connect e enviados ao servidor como parte da solicitação.
Para obter informações sobre UDFs para Databricks Connect para Scala, consulte Funções definidas pelo usuário em Databricks Connect para Scala.
Como a função definida pelo usuário é serializada e desserializada, a versão Python do cliente deve corresponder à versão Python no site Databricks compute. Para ver as versões compatíveis, consulte a matriz de suporte de versões.
Definir um UDF
Para criar uma UDF no Databricks Connect for Python, use uma das seguintes funções compatíveis:
-
Funções definidas pelo usuário do PySpark
-
PySpark funções de transmissão
Por exemplo, o Python a seguir configura um UDF simples que eleva os valores em uma coluna ao quadrado.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Gerenciar dependências UDF
Visualização
Esse recurso está em Public Preview e requer o site Databricks Connect para Python 16.4 ou acima, e um clustering executando Databricks Runtime 16.4 ou acima. Para usar esse recurso, ative a visualização de UDFs aprimorados em Python em Unity Catalog em seu workspace.
O Databricks Connect suporta a especificação de dependências Python necessárias para UDFs. Essas dependências são instaladas em Databricks compute como parte do ambiente de UDF's Python.
Esse recurso permite que os usuários especifiquem as dependências de que o UDF precisa, além do pacote fornecido no ambiente básico. Ele também pode ser usado para instalar uma versão diferente do pacote em relação ao que é fornecido no ambiente básico.
As dependências podem ser instaladas a partir das seguintes fontes:
-
PyPI pacote
- PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo,
dice,pyjokes<1ousimplejson==3.19.*.
- PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo,
-
Arquivos armazenados nos volumes do Unity Catalog
- Tanto o pacote wheel (
.whl) quanto os arquivos tar gzipados (.tar.gz) são suportados. O usuário deve receber a permissãoREAD_FILEno arquivo no volume re: [UC]. - Ao instalar o pacote a partir de volumes Unity Catalog, para invocar os UDFs, os usuários precisam da permissão
READ VOLUMEno volume de origem. Conceder essa permissão a todos os usuários do site account automaticamente a habilita para novos usuários. - Os arquivos de volumes do Unity Catalog devem ser especificados como
dbfs:<path>, por exemplo,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whloudbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.
- Tanto o pacote wheel (
Para incluir dependências personalizadas em seu UDF, especifique-as em um ambiente usando withDependencies e, em seguida, use esse ambiente para criar uma sessão do Spark. As dependências são instaladas em seu Databricks compute e estarão disponíveis em todos os UDFs que usam essa sessão Spark.
O código a seguir declara o pacote PyPI dice como uma dependência:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Ou, para especificar a dependência de uma roda em um volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportamento em Databricks Notebook e Job
No Notebook e no Job, as dependências do UDF precisam ser instaladas diretamente no REPL. O Databricks Connect valida o ambiente REPL Python verificando se todas as dependências especificadas já estão instaladas e lança uma exceção se alguma não estiver instalada.
Notebook A validação do ambiente ocorre para as dependências de volume PyPI e Unity Catalog. As dependências de volume precisam ser empacotadas de acordo com as especificações de empacotamento padrão do Python do PEP-427 ou posterior para arquivos de roda e do PEP-241 ou posterior para arquivos de distribuição de código-fonte. Para obter mais informações sobre os padrões de embalagem do site Python, consulte a documentação do PyPA.
Limitações
- Arquivos como o Python wheel ou a distribuição do código-fonte em sua máquina de desenvolvimento local não podem ser especificados diretamente como uma dependência. Primeiro, eles devem ser carregados em Unity Catalog volumes.
- O suporte para dependências UDF para
pyspark.sql.streaming.DataStreamWriter.foreachrequer Databricks Connect para Python 18.0 ou superior e um cluster executando Databricks Runtime 18.0 ou superior. - O suporte para dependências UDF para
pyspark.sql.streaming.DataStreamWriter.foreachBatchrequer Databricks Connect para Python 18.0 ou superior e um cluster executando Databricks Runtime 18.0 ou superior. O recurso não é suportado em serverless. - UDF não são compatíveis com Pandas UDFs de agregação sobre funções de janela.
Exemplos
O exemplo a seguir define as dependências do PyPI e dos volumes em um ambiente, cria uma sessão com esse ambiente e, em seguida, define e chama UDFs que usam essas dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Gerenciamento automático de dependências UDF
Visualização
Este recurso está em Pré-visualização Pública e requer Databricks Connect para Python 18.1 ou superior, Python 3.12 em sua máquina local e um cluster executando Databricks Runtime 18.1 ou superior. Para usar este recurso, habilite a pré-visualização "Enhanced Python UDFs" no Unity Catalog em seu workspace.
A API Databricks Connect withAutoDependencies() permite a descoberta e upload automáticos de módulos locais e dependências públicas PyPI usadas nas instruções de importação em suas UDFs. Isso elimina a necessidade de especificar dependências manualmente.
O código a seguir habilita o gerenciamento automático de dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
O método withAutoDependencies() aceita os seguintes parâmetros:
upload_local: Quando definido comoTrue, os módulos locais importados em suas UDFs são descobertos, empacotados e enviados automaticamente para o sandbox UDF .use_index: Quando definido comoTrue, as dependências públicas PyPI usadas em suas UDFs são descobertas e instaladas automaticamente no Databricks compute. O processo de descoberta utiliza o pacote instalado em sua máquina local para determinar as versões, garantindo a consistência entre seu ambiente local e o ambiente de execução remoto.
Limitações
- Importações dinâmicas (por exemplo,
importlib.import_module("foo")) não são suportadas. - Pacotes de namespace (por exemplo,
azure.eventhubegoogle.cloud.aiplatform) não são suportados. - Dependências instaladas usando referências de URL diretas não são suportadas. Isso inclui aqueles instalados a partir de arquivos wheel locais.
- Dependências instaladas a partir de índices de pacotes privados não são suportadas. O pacote instalado desta forma não pode ser distinguido do pacote instalado a partir do PyPI público.
- A descoberta de dependências não funciona em um shell Python. Somente scripts Python , shell IPython e Jupyter Notebook são suportados.
Exemplos
O exemplo a seguir demonstra o gerenciamento automático de dependências com módulos locais e pacotes PyPI . Este exemplo requer que você tenha instalado simplejson e dice (usando pip install simplejson dice).
Primeiro, crie módulos auxiliares locais:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Em seguida, no seu script principal, importe esses módulos e use-os em UDFs:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Registro
Para exibir as dependências descobertas, defina a variável de ambiente SPARK_CONNECT_LOG_LEVEL para info ou debug. Alternativamente, configure o framework de registro de logs do Python:
import logging
logging.basicConfig(level=logging.INFO)
Os logs relevantes são emitidos pelo módulo databricks.connect.auto_dependencies , por exemplo:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Ambiente básico do Python
Os UDFs são executados no site Databricks compute e não no cliente. O ambiente básico Python no qual os UDFs são executados depende do Databricks compute.
Para clustering, o ambiente Python base é o ambiente Python da versão Databricks Runtime em execução no clustering. A versão Python e a lista de pacotes nesse ambiente básico podem ser encontradas nas seções System environment e Installed Python biblioteca do site Databricks Runtime notas sobre a versão.
Para serverless compute, o ambiente básico Python corresponde à versão do ambienteserverless de acordo com a tabela a seguir.
Versão do Databricks Connect | UDF serverless ambiente |
|---|---|
17.0 a 17.3, Python 3.12 | |
16.4.1 até abaixo de 17, Python 3.12 | |
15.4.10 até abaixo de 16, Python 3.12 | |
15.4.10 até abaixo de 16, Python 3.11 | |
15.4.0 a 15.4.9 e 16.0 a 16.3 | Mais recente serverless compute . Por favor, migre para a versão 15.4.10 LTS e acima ou 16.4.1 LTS e acima para obter um ambiente Python estável. |