Pular para o conteúdo principal

Funções definidas pelo usuário no Databricks Connect para Python

nota

Este artigo aborda Databricks Connect para Databricks Runtime 13.3 e acima.

O Databricks Connect for Python oferece suporte a funções definidas pelo usuário (UDF). Quando um DataFrame operações que inclui UDFs é executado, os UDFs são serializados pelo Databricks Connect e enviados ao servidor como parte da solicitação.

Para obter informações sobre UDFs para Databricks Connect para Scala, consulte Funções definidas pelo usuário em Databricks Connect para Scala.

nota

Como a função definida pelo usuário é serializada e desserializada, a versão Python do cliente deve corresponder à versão Python no site Databricks compute. Para ver as versões compatíveis, consulte a matriz de suporte de versões.

Definir um UDF

Para criar uma UDF no Databricks Connect for Python, use uma das seguintes funções compatíveis:

Por exemplo, o Python a seguir configura um UDF simples que eleva os valores em uma coluna ao quadrado.

Python
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Gerenciar dependências UDF

info

Visualização

Esse recurso está em Public Preview e requer o site Databricks Connect para Python 16.4 ou acima, e um clustering executando Databricks Runtime 16.4 ou acima. Para usar esse recurso, ative a visualização de UDFs aprimorados em Python em Unity Catalog em seu workspace.

O Databricks Connect suporta a especificação de dependências Python necessárias para UDFs. Essas dependências são instaladas em Databricks compute como parte do ambiente de UDF's Python.

Esse recurso permite que os usuários especifiquem as dependências de que o UDF precisa, além do pacote fornecido no ambiente básico. Ele também pode ser usado para instalar uma versão diferente do pacote em relação ao que é fornecido no ambiente básico.

As dependências podem ser instaladas a partir das seguintes fontes:

  • PyPI pacote

    • PyPI O pacote pode ser especificado de acordo com a PEP 508, por exemplo, dice, pyjokes<1 ou simplejson==3.19.*.
  • Arquivos armazenados nos volumes do Unity Catalog

    • Tanto o pacote wheel (.whl) quanto os arquivos tar gzipados (.tar.gz) são suportados. O usuário deve receber a permissão READ_FILE no arquivo no volume re: [UC].
    • Ao instalar o pacote a partir de volumes Unity Catalog, para invocar os UDFs, os usuários precisam da permissão READ VOLUME no volume de origem. Conceder essa permissão a todos os usuários do site account automaticamente a habilita para novos usuários.
    • Os arquivos de volumes do Unity Catalog devem ser especificados como dbfs:<path>, por exemplo, dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl ou dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Para incluir dependências personalizadas em seu UDF, especifique-as em um ambiente usando withDependencies e, em seguida, use esse ambiente para criar uma sessão do Spark. As dependências são instaladas em seu Databricks compute e estarão disponíveis em todos os UDFs que usam essa sessão Spark.

O código a seguir declara o pacote PyPI dice como uma dependência:

Python
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Ou, para especificar a dependência de uma roda em um volume:

Python
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Comportamento em Databricks Notebook e Job

No Notebook e no Job, as dependências do UDF precisam ser instaladas diretamente no REPL. O Databricks Connect valida o ambiente REPL Python verificando se todas as dependências especificadas já estão instaladas e lança uma exceção se alguma não estiver instalada.

Notebook A validação do ambiente ocorre para as dependências de volume PyPI e Unity Catalog. As dependências de volume precisam ser empacotadas de acordo com as especificações de empacotamento padrão do Python do PEP-427 ou posterior para arquivos de roda e do PEP-241 ou posterior para arquivos de distribuição de código-fonte. Para obter mais informações sobre os padrões de embalagem do site Python, consulte a documentação do PyPA.

Limitações

  • Arquivos como o Python wheel ou a distribuição do código-fonte em sua máquina de desenvolvimento local não podem ser especificados diretamente como uma dependência. Primeiro, eles devem ser carregados em Unity Catalog volumes.
  • O suporte para dependências UDF para pyspark.sql.streaming.DataStreamWriter.foreach requer Databricks Connect para Python 18.0 ou superior e um cluster executando Databricks Runtime 18.0 ou superior.
  • O suporte para dependências UDF para pyspark.sql.streaming.DataStreamWriter.foreachBatch requer Databricks Connect para Python 18.0 ou superior e um cluster executando Databricks Runtime 18.0 ou superior. O recurso não é suportado em serverless.
  • UDF não são compatíveis com Pandas UDFs de agregação sobre funções de janela.

Exemplos

O exemplo a seguir define as dependências do PyPI e dos volumes em um ambiente, cria uma sessão com esse ambiente e, em seguida, define e chama UDFs que usam essas dependências:

Python
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Gerenciamento automático de dependências UDF

info

Visualização

Este recurso está em Pré-visualização Pública e requer Databricks Connect para Python 18.1 ou superior, Python 3.12 em sua máquina local e um cluster executando Databricks Runtime 18.1 ou superior. Para usar este recurso, habilite a pré-visualização "Enhanced Python UDFs" no Unity Catalog em seu workspace.

A API Databricks Connect withAutoDependencies() permite a descoberta e upload automáticos de módulos locais e dependências públicas PyPI usadas nas instruções de importação em suas UDFs. Isso elimina a necessidade de especificar dependências manualmente.

O código a seguir habilita o gerenciamento automático de dependências:

Python
from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

O método withAutoDependencies() aceita os seguintes parâmetros:

  • upload_local: Quando definido como True, os módulos locais importados em suas UDFs são descobertos, empacotados e enviados automaticamente para o sandbox UDF .
  • use_index: Quando definido como True, as dependências públicas PyPI usadas em suas UDFs são descobertas e instaladas automaticamente no Databricks compute. O processo de descoberta utiliza o pacote instalado em sua máquina local para determinar as versões, garantindo a consistência entre seu ambiente local e o ambiente de execução remoto.

Limitações

  • Importações dinâmicas (por exemplo, importlib.import_module("foo")) não são suportadas.
  • Pacotes de namespace (por exemplo, azure.eventhub e google.cloud.aiplatform) não são suportados.
  • Dependências instaladas usando referências de URL diretas não são suportadas. Isso inclui aqueles instalados a partir de arquivos wheel locais.
  • Dependências instaladas a partir de índices de pacotes privados não são suportadas. O pacote instalado desta forma não pode ser distinguido do pacote instalado a partir do PyPI público.
  • A descoberta de dependências não funciona em um shell Python. Somente scripts Python , shell IPython e Jupyter Notebook são suportados.

Exemplos

O exemplo a seguir demonstra o gerenciamento automático de dependências com módulos locais e pacotes PyPI . Este exemplo requer que você tenha instalado simplejson e dice (usando pip install simplejson dice).

Primeiro, crie módulos auxiliares locais:

Python
# my_helper.py
def double(x):
return 2 * x
Python
# my_json.py
import simplejson

def loads(x):
return simplejson.loads(x)

def dumps(x):
return simplejson.dumps(x)

Em seguida, no seu script principal, importe esses módulos e use-os em UDFs:

Python
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Registro

Para exibir as dependências descobertas, defina a variável de ambiente SPARK_CONNECT_LOG_LEVEL para info ou debug. Alternativamente, configure o framework de registro de logs do Python:

Python
import logging
logging.basicConfig(level=logging.INFO)

Os logs relevantes são emitidos pelo módulo databricks.connect.auto_dependencies , por exemplo:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Ambiente básico do Python

Os UDFs são executados no site Databricks compute e não no cliente. O ambiente básico Python no qual os UDFs são executados depende do Databricks compute.

Para clustering, o ambiente Python base é o ambiente Python da versão Databricks Runtime em execução no clustering. A versão Python e a lista de pacotes nesse ambiente básico podem ser encontradas nas seções System environment e Installed Python biblioteca do site Databricks Runtime notas sobre a versão.

Para serverless compute, o ambiente básico Python corresponde à versão do ambienteserverless de acordo com a tabela a seguir.

Versão do Databricks Connect

UDF serverless ambiente

17.0 a 17.3, Python 3.12

Versão 4

16.4.1 até abaixo de 17, Python 3.12

Versão 3

15.4.10 até abaixo de 16, Python 3.12

Versão 3

15.4.10 até abaixo de 16, Python 3.11

Versão 2

15.4.0 a 15.4.9 e 16.0 a 16.3

Mais recente serverless compute . Por favor, migre para a versão 15.4.10 LTS e acima ou 16.4.1 LTS e acima para obter um ambiente Python estável.