Pular para o conteúdo principal

Teste o driver ODBC do Databricks (Simba)

Esta página descreve como testar código que utiliza o driver ODBC do Databricks.

Utilize qualquer framework de testes para linguagens compatíveis com ODBC. Os exemplos a seguir utilizam pyodbc, pytest e unittest.mock para testar conexões de drivers ODBC. Este código é baseado no exemplo em Conectar Python e pyodbc ao Databricks.

Funções auxiliares

O arquivo helpers.py contém funções úteis para trabalhar com conexões ODBC :

  • connect_to_dsnAbre uma conexão com um recurso compute Databricks .
  • get_cursor_from_connectionObtém um cursor para executar consultas.
  • select_from_nyctaxi_trips: Consulta o número especificado de linhas de samples.nyctaxi.trips.
  • print_rowsImprime o conteúdo do conjunto de resultados no console.
Python
# helpers.py

from pyodbc import connect, Connection, Cursor

def connect_to_dsn(
connstring: str,
autocommit: bool
) -> Connection:

connection = connect(
connstring,
autocommit = autocommit
)

return connection

def get_cursor_from_connection(
connection: Connection
) -> Cursor:

cursor = connection.cursor()
return cursor

def select_from_nyctaxi_trips(
cursor: Cursor,
num_rows: int
) -> Cursor:

select_cursor = cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
return select_cursor

def print_rows(cursor: Cursor):
for row in cursor.fetchall():
print(row)

Classe principal

O arquivo main.py chama as funções auxiliares para conectar e consultar dados:

Python
# main.py

from helpers import *

connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)

cursor = get_cursor_from_connection(
connection = connection)

select_cursor = select_from_nyctaxi_trips(
cursor = cursor,
num_rows = 2
)

print_rows(
cursor = select_cursor
)

Testes unitários com mocking

O arquivo test_helpers.py usa pytest e unittest.mock para testar a função select_from_nyctaxi_trips . O mocking simula conexões de banco de dados sem usar recursos compute reais, permitindo a execução de testes em segundos sem afetar seu espaço de trabalho Databricks .

Python
# test_helpers.py

from pyodbc import SQL_DBMS_NAME
from helpers import *
from unittest.mock import patch
import datetime

@patch("helpers.connect_to_dsn")
def test_connect_to_dsn(mock_connection):
mock_connection.return_value.getinfo.return_value = "Spark SQL"

mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)

assert mock_connection.getinfo(SQL_DBMS_NAME) == "Spark SQL"

@patch('helpers.get_cursor_from_connection')
def test_get_cursor_from_connection(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_cursor.return_value.rowcount = -1

mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)

mock_cursor = get_cursor_from_connection(
connection = mock_connection
)

assert mock_cursor.rowcount == -1

@patch('helpers.select_from_nyctaxi_trips')
def test_select_from_nyctaxi_trips(mock_connection):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.arraysize = 1

mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)

mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)

mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)

assert mock_select_cursor.arraysize == 1

@patch('helpers.print_rows')
def test_print_rows(mock_connection, capsys):
mock_cursor = mock_connection.return_value.cursor
mock_get_cursor = mock_cursor.return_value.execute
mock_select_cursor = mock_get_cursor.return_value.fetchall.return_value = [
(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171),
(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)
]

mock_connection = connect_to_dsn(
connstring = "DSN=<your-dsn-name>",
autocommit = True
)

mock_get_cursor = get_cursor_from_connection(
connection = mock_connection
)

mock_select_cursor = select_from_nyctaxi_trips(
cursor = mock_get_cursor,
num_rows = 2
)

print_rows(
cursor = mock_select_cursor
)

captured = capsys.readouterr()
assert captured.out == "(datetime.datetime(2016, 2, 14, 16, 52, 13), datetime.datetime(2016, 2, 14, 17, 16, 4), 4.94, 19.0, 10282, 10171)\n" \
"(datetime.datetime(2016, 2, 4, 18, 44, 19), datetime.datetime(2016, 2, 4, 18, 46), 0.28, 3.5, 10110, 10110)\n"

Como select_from_nyctaxi_trips apenas executa uma instrução SELECT , o mocking não é estritamente necessário aqui. No entanto, o mocking é especialmente útil ao testar funções que modificam dados (INSERT INTO, UPDATE, DELETE FROM), pois você pode executar testes repetidamente sem afetar o estado da tabela.