Pular para o conteúdo principal

Funções escalares definidas pelo usuário - Python

Este artigo contém Python exemplos de funções definidas pelo usuário (UDF). Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões em Spark SQL.

No Databricks Runtime 14.0 e acima, é possível usar as funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte Funções de tabela definidas pelo usuário (UDTFs) do Python.

nota

Em Databricks Runtime 14.0 e abaixo, Python UDFs e Pandas UDFs não são suportados em Unity Catalog clustering que usam o modo de acesso padrão. Os UDFs escalares Python e Pandas são compatíveis com todos os modos de acesso em Databricks Runtime 14.1 e acima.

Em Databricks Runtime 14.1 e acima, o senhor pode registrar UDFs escalares Python para Unity Catalog usando a sintaxe SQL. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.

registrar uma função como um UDF

Python
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)

Opcionalmente, o senhor pode definir o tipo de retorno do seu UDF. O tipo de retorno do default é StringType.

Python
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Chamar o UDF no Spark SQL

Python
spark.range(1, 20).createOrReplaceTempView("test")
SQL
%sql select id, squaredWithPython(id) as id_squared from test

Usar UDF com DataFrames

Python
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Como alternativa, o senhor pode declarar o mesmo UDF usando a sintaxe de anotação:

Python
from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Ordem de avaliação e verificação de nulos

Spark SQL (incluindo SQL e DataFrame e o conjunto de dados API) não garante a ordem de avaliação das subexpressões. Em particular, as entradas de um operador ou função não são necessariamente avaliado da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, lógico AND e as expressões OR não têm semântica de “curto-circuito” da esquerda para a direita.

Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação das expressões Boolean e na ordem das cláusulas WHERE e HAVING, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se um UDF depender da semântica de curto-circuito no SQL para verificação de nulidade, não há garantia de que a verificação de nulidade ocorrerá antes de invocar o UDF. Por exemplo,

Python
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Essa cláusula WHERE não garante que o UDF strlen seja invocado após a filtragem de nulos.

Para realizar uma verificação de nulo adequada, recomendamos que você faça o seguinte:

  • Tornar o próprio UDF sensível a nulidade e fazer a verificação de nulidade dentro do próprio UDF
  • Use as expressões IF ou CASE WHEN para fazer a verificação de nulidade e chamar o UDF em uma ramificação condicional
Python
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok

Credenciais de serviço em UDFs do Scalar Python

As UDFs escalares Python podem utilizar credenciais de serviço Unity Catalog para acessar com segurança serviços externos na nuvem. Isso é útil para integrar operações como tokenização baseada em nuvem, criptografia ou gerenciamento de segredos diretamente em suas transformações de dados.

As credenciais de serviço para UDFs escalares Python são suportadas apenas em SQL warehouse e compute em geral.

Para criar uma credencial de serviço, consulte Criar credenciais de serviço.

Para acessar a credencial do serviço, utilize as utilidades databricks.service_credentials.getServiceCredentialsProvider() em sua lógica UDF para inicializar os SDKs da nuvem com a credencial apropriada. Todo o código deve ser encapsulado no corpo da UDF.

Python
@udf
def use_service_credential():
from google.cloud import storage
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
client = storage.Client(project='your-project', credentials=getServiceCredentialsProvider('testcred'))
# Use the client to perform operations

Permissões de credenciais de serviço

O criador da UDF deve ter permissão de ACESSO nas credenciais do serviço Unity Catalog.

As UDFs que são executadas no escopo No-PE, também conhecido como clustering dedicado, requerem permissões de gerenciamento na credencial do serviço.

credenciais padrão

Quando utilizado em UDFs escalares Python, Databricks utiliza automaticamente a credencial de serviço default da variável de ambiente compute. Este comportamento permite referenciar serviços externos de forma segura sem gerenciar explicitamente aliases de credenciais no código do seu aplicativo ( UDF ). Consulte Especificar uma credencial de serviço default para um recurso compute.

O suporte a credenciais padrão está disponível apenas no modo de acesso Padrão e Dedicado. Ele não está disponível no DBSQL.

Python
@udf
def use_service_credential():
import google.auth # this import is always needed to trigger the SDK monkeypatching
from google.cloud import storage
# the client automatically uses the default credential
client = storage.Client(project='your-project')

# you can also get the token explicitly from the default provider
from google.auth import default
credentials, _ = default()
token = credentials.token
# Use the client to perform operations

Obter o contexto de execução da tarefa

Use o TaskContext PySpark API para obter informações de contexto, como a identidade do usuário, a tag do cluster, o ID do spark job e muito mais. Consulte Obter contexto de tarefa em um UDF.

Limitações

As seguintes limitações se aplicam às UDFs do PySpark:

  • Restrições de acesso a arquivos: Em Databricks Runtime 14.2 e abaixo, os UDFs PySpark em clustering compartilhado não podem acessar pastas Git, arquivos workspace ou volumes Unity Catalog.

  • Variáveis de difusão: PySpark UDFs em clustering de modo de acesso padrão e serverless compute não oferecem suporte a variáveis de broadcast.

  • Credenciais de serviço: as credenciais de serviço estão disponíveis apenas em lotes Unity Catalog Python UDFs e escalares Python UDFs. Não são suportados nas UDFs padrão Unity Catalog Python .

  • Credenciais de serviço : as credenciais de serviço não são compatíveis com o site dedicado compute.

  • Credenciais de serviço : as credenciais de serviço só estão disponíveis em serverless compute quando o senhor usa o ambiente serverless versão 3 ou acima. Consulte as versões do ambiente sem servidor.

  • Limite de memória em serverless : PySpark Os UDFs em serverless compute têm um limite de memória de 1 GB por PySpark UDF. Exceder esse limite resulta em um erro do tipo UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESS.

  • Limite de memória no modo de acesso padrão : as UDFs do PySpark no modo de acesso padrão têm um limite de memória com base na memória disponível do tipo de instância escolhido. Exceder a memória disponível resulta em um erro do tipo UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT.