Databricks SQL Conector para Python
O Databricks SQL Connector for Python é uma Python biblioteca que permite que o senhor use o código Python para executar SQL comando em Databricks clustering e Databricks SQL warehouses. O conector Databricks SQL para Python é mais fácil de configurar e usar do que uma biblioteca Python semelhante, como pyodbc. Essa biblioteca segue a PEP 249 - Python Database API Specification v2.0.
O conector Databricks SQL para Python também é compatível com o dialeto SQLAlchemy para Databricks, mas deve ser instalado para usar esses recursos. Consulte Usar o SQLAlchemy com Databricks.
Requisitos
- Uma máquina de desenvolvimento executando Python >=3.8 e <=3.11.
- A Databricks recomenda que o senhor use ambientes virtuais Python, como os fornecidos pelo venv que estão incluídos no Python. Os ambientes virtuais ajudam a garantir que o senhor esteja usando as versões corretas do Python e do Databricks SQL Connector for Python juntos. A configuração e o uso de ambientes virtuais estão fora do escopo deste artigo. Para obter mais informações, consulte Criação de ambientes virtuais.
- Um clustering existente ou SQL warehouse.
Começar
-
Instale o Databricks SQL Connector for Python. O PyArrow é uma dependência opcional do Databricks SQL Connector for Python e não é instalado pelo default na versão 4.0.0 e acima do conector. Se o PyArrow não estiver instalado, recursos como o CloudFetch e outras funcionalidades do Apache Arrow não estarão disponíveis, o que pode afetar o desempenho de grandes volumes de dados.
- Para instalar o conector lean, use
pip install databricks-sql-connector
. - Para instalar o conector completo, incluindo o PyArrow, use
pip install databricks-sql-connector[pyarrow]
.
- Para instalar o conector lean, use
-
Reúna as seguintes informações para o clustering ou SQL warehouse que o senhor deseja usar:
- Cluster
- SQL warehouse
-
The server hostname of the cluster. You can get this from the Server Hostname value in the Advanced Options > JDBC/ODBC tab for your cluster.
-
The HTTP path of the cluster. You can get this from the HTTP Path value in the Advanced Options > JDBC/ODBC tab for your cluster.
-
The server hostname of the SQL warehouse. You can get this from the Server Hostname value in the Connection Details tab for your SQL warehouse.
-
The HTTP path of the SQL warehouse. You can get this from the HTTP Path value in the Connection Details tab for your SQL warehouse.
Autenticação
O Databricks SQL Connector for Python suporta os seguintes tipos de autenticação Databricks:
- Autenticação de access tokens pessoais do Databricks
- Autenticação OAuth machine-to-machine (M2M)
- Autenticação OAuth user-to-machine (U2M)
A autenticação básica usando um nome de usuário e senha da Databricks chegou ao fim da vida útil em 10 de julho de 2024. Consulte End of life para Databricks-gerenciar senhas.
Databricks autenticação de tokens de acesso pessoal
Para usar o Databricks SQL Conector para Python com Databricks autenticação de tokens de acesso pessoal, o senhor deve primeiro criar um Databricks tokens de acesso pessoal. Para fazer isso, siga as etapas em Databricks personal access tokens for workspace users.
Para autenticar o Databricks SQL Connector for Python, use o seguinte trecho de código. Esse snippet pressupõe que o senhor tenha definido a seguinte variável de ambiente:
DATABRICKS_SERVER_HOSTNAME
definido como o valor do Server Hostname para seu clustering ou SQL warehouse.DATABRICKS_HTTP_PATH
Defina o valor do caminho HTTP para seu clustering ou SQL warehouse.DATABRICKS_TOKEN
, definido como os tokens de acesso pessoal Databricks.
Para definir variáveis de ambiente, consulte a documentação do sistema operacional.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
Autenticação OAuth máquina a máquina (M2M)
Databricks SQL Connector for Python versões 2.5.0 e acima suportam autenticação máquina a máquina (M2M)OAuth. O senhor também deve instalar o Databricks SDK para Python (por exemplo, executando pip install databricks-sdk
ou python -m pip install databricks-sdk
).
Para usar o Databricks SQL Connector for Python com a autenticação OAuth M2M, o senhor deve fazer o seguinte:
-
Crie uma Databricks entidade de serviço em seu Databricks workspace e crie um OAuth secret para essa entidade de serviço.
Para criar a entidade de serviço e seu segredo OAuth, consulte Autorizar o acesso autônomo a Databricks recurso com uma entidade de serviço usando OAuth. Anote o valor do UUID da entidade de serviço ou do ID do aplicativo e o valor do segredo do OAuth da entidade de serviço.
-
Dê a essa entidade de serviço acesso ao seu clustering ou depósito.
Para conceder à entidade de serviço acesso ao seu clustering ou warehouse, consulte compute permissions ou gerenciar a SQL warehouse.
Para autenticar o Databricks SQL Connector for Python, use o seguinte trecho de código. Esse snippet pressupõe que o senhor tenha definido a seguinte variável de ambiente:
DATABRICKS_SERVER_HOSTNAME
definido como o valor do Server Hostname para seu clustering ou SQL warehouse.DATABRICKS_HTTP_PATH
Defina o valor do caminho HTTP para seu clustering ou SQL warehouse.DATABRICKS_CLIENT_ID
definido como o valor do UUID da entidade de serviço ou do ID do aplicativo .DATABRICKS_CLIENT_SECRET
definido como o valor Secret do segredo OAuth da entidade de serviço.
Para definir variáveis de ambiente, consulte a documentação do sistema operacional.
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)
with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
# ...
Autenticação OAuth de usuário para máquina (U2M)
Databricks SQL Connector for Python versões 2.1.0 e acima suportam a autenticaçãoOAuth user-to-machine (U2M).
Para autenticar o Databricks SQL Connector for Python com a autenticação OAuth U2M, use o seguinte trecho de código. OAuth A autenticação U2M usa o tempo de login humano real e o consentimento para autenticar o usuário de destino Databricks account. Esse snippet pressupõe que o senhor tenha definido a seguinte variável de ambiente:
- Defina
DATABRICKS_SERVER_HOSTNAME
como o valor do Server Hostname do seu clustering ou SQL warehouse. - Defina
DATABRICKS_HTTP_PATH
como o valor do caminho HTTP para seu clustering ou SQL warehouse.
Para definir variáveis de ambiente, consulte a documentação do sistema operacional.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
# ...
Exemplos
Os exemplos de código a seguir demonstram como usar o Databricks SQL Connector for Python para consultar e inserir dados, consultar metadados, gerenciar cursores e conexões e configurar o registro.
Os exemplos de código a seguir demonstram como usar tokens de acesso pessoal Databricks para autenticação. Para usar outros tipos de autenticação disponíveis no Databricks, consulte Autenticação.
Esses exemplos de código recuperam os valores das variáveis de conexão server_hostname
, http_path
e access_token
a partir dessas variáveis de ambiente:
DATABRICKS_SERVER_HOSTNAME
, que representa o valor do nome do host do servidor a partir dos requisitos.DATABRICKS_HTTP_PATH
, que representa o valor do caminho HTTP dos requisitos.DATABRICKS_TOKEN
que representa seus tokens de acesso dos requisitos.
Você pode usar outras abordagens para recuperar esses valores de variáveis de conexão. O uso da variável de ambiente é apenas uma abordagem entre muitas outras.
- Definir agente de usuário
- Consultar dados
- Inserir dados
- Metadados da consulta
- gerenciar cursores e conexões
- Gerenciar arquivos em volumes do Unity Catalog
- Configurar o registro
Definir agente de usuário
O exemplo de código a seguir demonstra como definir o aplicativo User-Agent product_name
para acompanhamento do uso.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
_user_agent_entry = "product_name") as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1 + 1")
result = cursor.fetchall()
for row in result:
print(row)
Dados de consulta
O exemplo de código a seguir demonstra como chamar o conector Databricks SQL para Python para executar um comando básico SQL em um clustering ou SQL warehouse. Esse comando retorna as duas primeiras linhas da tabela trips
no esquema nyctaxi
do catálogo samples
.
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
Inserir dados
O exemplo a seguir demonstra como inserir pequenas quantidades de dados (milhares de linhas):
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
cursor.execute("SELECT * FROM squares LIMIT 10")
result = cursor.fetchall()
for row in result:
print(row)
Para grandes quantidades de dados, o senhor deve primeiro fazer upload dos dados para o armazenamento em nuvem e, em seguida, executar o comando COPY INTO.
Metadados da consulta
Existem métodos dedicados para recuperar metadados. O exemplo a seguir recupera metadados sobre colunas em uma tabela de amostra:
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
gerenciar cursores e conexões
É uma prática recomendada fechar todas as conexões e cursores que não estejam mais em uso. Isso libera o recurso em Databricks clustering e Databricks SQL warehouses.
O senhor pode usar um gerenciador de contexto (a sintaxe with
usada nos exemplos anteriores) para gerenciar o recurso ou chamar explicitamente close
:
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
Gerenciar arquivos em Unity Catalog volumes
O Databricks SQL conector permite que o senhor grave arquivos locais em Unity Catalog volumes, download arquivos de volumes e exclua arquivos de volumes, conforme mostrado no exemplo a seguir:
from databricks import sql
import os
# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:
with connection.cursor() as cursor:
# Write a local file to the specified path in a volume.
# Specify OVERWRITE to overwrite any existing file in that path.
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)
# Download a file from the specified path in a volume.
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)
# Delete a file from the specified path in a volume.
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)
Configurar o registro
O Databricks SQL Connector usa o módulo de registro padrão do Python. Você pode configurar o nível de registro semelhante ao seguinte:
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
logging.debug(row)
cursor.close()
connection.close()
Testando
Para testar seu código, use estruturas de teste Python, como o pytest. Para testar seu código em condições simuladas sem chamar o endpoint Databricks REST API ou alterar o estado de sua conta ou espaço de trabalho Databricks, o senhor pode usar a biblioteca de simulação Python, como unittest.mock.
Por exemplo, dado o seguinte arquivo denominado helpers.py
contendo uma função get_connection_personal_access_token
que usa um Databricks tokens de acesso pessoal para retornar uma conexão a um Databricks workspace e uma função select_nyctaxi_trips
que usa a conexão para obter o número especificado de linhas de dados da tabela trips
no esquema nyctaxi
do catálogo samples
:
# helpers.py
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)
def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result
E dado o seguinte arquivo chamado main.py
que chama as funções get_connection_personal_access_token
e select_nyctaxi_trips
:
# main.py
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)
for row in rows:
print(row)
O arquivo a seguir chamado test_helpers.py
testa se a função select_nyctaxi_trips
retorna a resposta esperada. Em vez de criar uma conexão real com o destino workspace, esse teste simula um objeto Connection
. O teste também simula alguns dados que estão em conformidade com o esquema e os valores que estão nos dados reais. O teste retorna os dados simulados por meio da conexão simulada e, em seguida, verifica se um dos valores das linhas de dados simuladas corresponde ao valor esperado.
# test_helpers.py
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]
def test_select_nyctaxi_trips(mock_data: List[Row]):
# Create a mock Connection.
mock_connection = create_autospec(Connection)
# Set the mock Connection's cursor().fetchall() to the mock data.
mock_connection.cursor().fetchall.return_value = mock_data
# Call the real function with the mock Connection.
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)
# Check the value of one of the mocked data row's columns.
assert response[1].fare_amount == 3.5
Como a função select_nyctaxi_trips
contém uma instrução SELECT
e, portanto, não altera o estado da tabela trips
, a simulação não é absolutamente necessária neste exemplo. No entanto, o mocking permite que o senhor execute rapidamente seus testes sem esperar que seja feita uma conexão real com o site workspace. Além disso, o mocking permite que o senhor execute testes simulados várias vezes para funções que possam alterar o estado de uma tabela, como INSERT INTO
, UPDATE
e DELETE FROM
.
Referência da API
pacote
databricks-sql-connector
Uso: pip install databricks-sql-connector
Consulte também databricks-sql-connector no Python pacote Index (PyPI).
Módulo
databricks.sql
Uso: from databricks import sql
Aulas
As aulas selecionadas incluem o seguinte:
Aulas |
---|
|
|
|
ClasseConnection
Para criar um objeto Connection
, chame o método databricks.sql.connect
com os seguintes parâmetros:
Parâmetros |
---|
|
|
|
|
|
|
|
|
Os métodos Connection
selecionados incluem o seguinte:
Métodos |
---|
|
|
ClasseCursor
Para criar um objeto Cursor
, chame o método cursor
da classe Connection
.
Os atributos Cursor
selecionados incluem o seguinte:
Atributos |
---|
|
|
Os métodos Cursor
selecionados incluem o seguinte:
Métodos |
---|
|
|
|
|
|
|
|
|
|
|
|
|
|
ClasseRow
A classe row é uma estrutura de dados semelhante a uma tupla que representa uma linha de resultado individual.
Se a linha contiver uma coluna com o nome "my_column"
, você poderá acessar o campo "my_column"
de row
via
row.my_column
. Você também pode usar índices numéricos para acessar campos, por exemplo, row[0]
.
Se o nome da coluna não for permitido como um nome de método de atributo (por exemplo, ele começa com um dígito),
então você pode acessar o campo como row["1_my_column"]
.
Desde a versão 1.0
Os métodos Row
selecionados incluem:
| asDict
Retorna uma representação de dicionário da linha, que é indexada por nomes de campo. Se houver nomes de campo duplicados, um dos campos duplicados (mas somente um) será retornado no dicionário. O campo duplicado retornado não está definido.
Sem parâmetros.
Retorna um dict
de campos. |
Conversões de tipo
A tabela a seguir mapeia os tipos de dados SQL do Apache Spark para seus equivalentes em Python.
Tipo de dados SQL do Apache Spark | Tipo de dados Python |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Solução de problemas
MensagemtokenAuthWrapperInvalidAccessToken: Invalid access token
Problema : Ao executar o código, o senhor vê uma mensagem semelhante a Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token
.
Possible cause : O valor passado para access_token
não é um Databricks válido de tokens de acesso pessoal.
Correção recomendada : verifique se o valor passado para access_token
está correto e tente novamente.
Mensagemgaierror(8, 'nodename nor servname provided, or not known')
Problema : Ao executar o código, o senhor vê uma mensagem semelhante a Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')
.
Causa possível : o valor passado para server_hostname
não é o nome de host correto.
Correção recomendada : verifique se o valor passado para server_hostname
está correto e tente novamente.
Para obter mais informações sobre como encontrar o nome do host do servidor, consulte Obter detalhes da conexão para um recurso Databricks compute.
MensagemIpAclError
Problema : Ao executar o código, o senhor vê a mensagem Error during request to server: IpAclValidation
quando tenta usar o conector em um notebook Databricks.
Possível causa : O senhor pode ter habilitado a listagem de permissão de IP para o site Databricks workspace. Com a listagem de permissão de IP, as conexões do Spark clustering de volta ao plano de controle não são permitidas pelo default.
Correção recomendada : Peça ao administrador para adicionar a sub-rede do plano compute à lista de permissões de IP.
Recurso adicional
Para saber mais, consulte:
- O repositório do Databricks SQL Connector for Python no GitHub
- Tipos de dados
- Tipos integrados (para
bool
,bytearray
,float
,int
estr
) no site Python - datetime (para
datetime.date
edatatime.datetime
) no site do Python - decimal (para
decimal.Decimal
) no site do Python - Constantes integradas (para
NoneType
) no site Python