Teste de unidade para o Notebook
O senhor pode usar o teste de unidade para ajudar a melhorar a qualidade e a consistência do código do Notebook. O teste unitário é uma abordagem para testar unidades de código independentes, como funções, com antecedência e frequência. Isso ajuda você a encontrar problemas com seu código com mais rapidez, descobrir suposições errôneas sobre seu código mais cedo e otimizar seus esforços gerais de codificação.
Este artigo é uma introdução aos testes unitários básicos com funções. Conceitos avançados, como classes e interfaces de teste de unidade, bem como o uso de stubs, mocks e chicotes de teste, embora também sejam compatíveis com o teste de unidade do Notebook, estão fora do escopo deste artigo. Este artigo também não abrange outros tipos de métodos de teste, como teste de integração, teste de sistema, teste de aceitação ou métodos de teste não funcionais, como teste de desempenho ou teste de usabilidade.
Este artigo demonstra o seguinte:
- Como organizar funções e seus testes unitários.
- Como escrever funções em Python, R, Scala, bem como funções definidas pelo usuário em SQL, que são bem projetadas para serem testadas por unidades.
- Como chamar essas funções em Python, R, Scala e SQL Notebook.
- Como escrever testes unitários em Python, R e Scala usando as estruturas de teste populares pytest para Python, testthat para R e ScalaTest para Scala. Além disso, como escrever SQL para testes unitários de funções definidas pelo usuário de SQL (SQL UDFs).
- Como executar esses testes de unidade em Python, R, Scala e SQL Notebook.
Databricks recomenda escrever e executar seus testes de unidade em um Notebook. Embora o senhor possa executar alguns comandos no terminal da Web, o terminal da Web tem mais limitações, como a falta de suporte para Spark. Consulte execução shell comando em Databricks web terminal.
Organize funções e testes unitários
Há algumas abordagens comuns para organizar suas funções e seus testes de unidade com o Notebook. Cada abordagem tem seus benefícios e desafios.
Para Python, R e Scala Notebook, as abordagens comuns incluem o seguinte:
-
Armazenar funções e seus testes de unidade fora do Notebook.
- Benefícios: O senhor pode chamar essas funções com e fora do Notebook. As estruturas de teste são mais bem projetadas para executar testes fora do Notebook.
- Desafios: Essa abordagem não é compatível com o site Scala Notebook. Essa abordagem também aumenta o número de arquivos a serem rastreados e mantidos.
-
Armazene funções em um Notebook e seus testes de unidade em um Notebook separado.
- Vantagens: Essas funções são mais fáceis de reutilizar no Notebook.
- Desafios: O número de notebooks a serem monitorados e mantidos aumenta. Essas funções não podem ser usadas fora do Notebook. Essas funções também podem ser mais difíceis de testar fora do Notebook.
-
Armazenar funções e seus testes de unidade no mesmo Notebook.
- Benefícios: As funções e seus testes unitários são armazenados em um único Notebook para facilitar o acompanhamento e a manutenção.
- Desafios: Essas funções podem ser mais difíceis de reutilizar no Notebook. Essas funções não podem ser usadas fora do Notebook. Essas funções também podem ser mais difíceis de testar fora do Notebook.
Para o Python e o R Notebook, o Databricks recomenda armazenar funções e seus testes de unidade fora do Notebook. Para o Scala Notebook, o Databricks recomenda incluir funções em um Notebook e seus testes de unidade em um Notebook separado.
Para o SQL Notebook, o Databricks recomenda que o senhor armazene funções como SQL funções definidas pelo usuário (SQL UDFs) em seus esquemas (também conhecidos como bancos de dados). Em seguida, o senhor pode chamar esses UDFs SQL e seus testes unitários no SQL Notebook.
Funções de escrita
Esta seção descreve um conjunto simples de exemplos de funções que determinam o seguinte:
- Se existe uma tabela em um banco de dados.
- Se existe uma coluna em uma tabela.
- Quantas linhas existem em uma coluna para um valor dentro dessa coluna.
A intenção é que essas funções sejam simples, para que o senhor possa se concentrar nos detalhes do teste de unidade neste artigo, em vez de se concentrar nas próprias funções.
Para obter os melhores resultados de testes unitários, uma função deve retornar um único resultado previsível e ter um único tipo de dados. Por exemplo, para verificar se algo existe, a função deve retornar um valor booleano de verdadeiro ou falso. Para retornar o número de linhas existentes, a função deve retornar um número inteiro não negativo. No primeiro exemplo, não deve retornar nem falso se algo não existir, nem a coisa em si, se existir. Da mesma forma, para o segundo exemplo, ele não deve retornar o número de linhas existentes ou falso se não houver nenhuma linha.
O senhor pode adicionar essas funções a um Databricks workspace existente da seguinte forma, em Python, R, Scala, ou SQL.
- Python
- R
- Scala
- SQL
The following code assumes you have Set up Databricks Git folders (Repos), added a repo, and have the repo open in your Databricks workspace.
Create a file named myfunctions.py
within the repo, and add the following contents to the file. Other examples in this article expect this file to be named myfunctions.py
. You can use different names for your own files.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
return spark.catalog.tableExists(f"{dbName}.{tableName}")
# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
if columnName in dataFrame.columns:
return True
else:
return False
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
df = dataFrame.filter(col(columnName) == columnValue)
return df.count()
The following code assumes you have Set up Databricks Git folders (Repos), added a repo, and have the repo open in your Databricks workspace.
Create a file named myfunctions.r
within the repo, and add the following contents to the file. Other examples in this article expect this file to be named myfunctions.r
. You can use different names for your own files.
library(SparkR)
# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
tableExists(paste(db_name, ".", table_name, sep = ""))
}
# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
column_name %in% colnames(dataframe)
}
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
df = filter(dataframe, dataframe[[column_name]] == column_value)
count(df)
}
Create a Scala notebook named myfunctions
with the following contents. Other examples in this article expect this notebook to be named myfunctions
. You can use different names for your own notebooks.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
return spark.catalog.tableExists(dbName + "." + tableName)
}
// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
val nameOfColumn = null
for(nameOfColumn <- dataFrame.columns) {
if (nameOfColumn == columnName) {
return true
}
}
return false
}
// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
val df = dataFrame.filter(col(columnName) === columnValue)
return df.count()
}
The following code assumes you have the third-party sample dataset diamonds within a schema named default
within a catalog named main
that is accessible from your Databricks workspace. If the catalog or schema that you want to use has a different name, then change one or both of the following USE
statements to match.
Create a SQL notebook and add the following contents to this new notebook. Then attach the notebook to a cluster and run the notebook to add the following SQL UDFs to the specified catalog and schema.
The SQL UDFs table_exists
and column_exists
work only with Unity Catalog. SQL UDF support for Unity Catalog is in Public Preview.
USE CATALOG main;
USE SCHEMA default;
CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
db_name STRING,
table_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.tables
WHERE table_catalog = table_exists.catalog_name
AND table_schema = table_exists.db_name
AND table_name = table_exists.table_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
db_name STRING,
table_name STRING,
column_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.columns
WHERE table_catalog = column_exists.catalog_name
AND table_schema = column_exists.db_name
AND table_name = column_exists.table_name
AND column_name = column_exists.column_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
RETURNS BIGINT
RETURN SELECT count(*)
FROM main.default.diamonds
WHERE clarity = clarity_value
Funções de chamada
Esta seção descreve o código que chama as funções anteriores. Você pode usar essas funções, por exemplo, para contar o número de linhas na tabela em que um valor especificado existe em uma coluna especificada. No entanto, você gostaria de verificar se a tabela realmente existe e se a coluna realmente existe nessa tabela antes de continuar. O código a seguir verifica essas condições.
Se tiver adicionado as funções da seção anterior ao seu Databricks workspace, poderá chamar essas funções do seu workspace da seguinte forma.
- Python
- R
- Scala
- SQL
Create a Python notebook in the same folder as the preceding myfunctions.py
file in your repo, and add the following contents to the notebook. Change the variable values for the table name, the schema (database) name, the column name, and the column value as needed. Then attach the notebook to a cluster and run the notebook to see the results.
from myfunctions import *
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "VVS2"
# If the table exists in the specified database...
if tableExists(tableName, dbName):
df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")
# And the specified column exists in that table...
if columnExists(df, columnName):
# Then report the number of rows for the specified value in that column.
numRows = numRowsInColumnForValue(df, columnName, columnValue)
print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
else:
print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.")
Create an R notebook in the same folder as the preceding myfunctions.r
file in your repo, and add the following contents to the notebook. Change the variable values for the table name, the schema (database) name, the column name, and the column value as needed. Then attach the notebook to a cluster and run the notebook to see the results.
library(SparkR)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "VVS2"
# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {
df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))
# And the specified column exists in that table...
if (column_exists(df, column_name)) {
# Then report the number of rows for the specified value in that column.
num_rows = num_rows_in_column_for_value(df, column_name, column_value)
print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = ""))
} else {
print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
}
} else {
print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}
Create another Scala notebook in the same folder as the preceding myfunctions
Scala notebook, and add the following contents to this new notebook.
In this new notebook’s first cell, add the following code, which calls the %run magic. This magic makes the contents of the myfunctions
notebook available to your new notebook.
%run ./myfunctions
In this new notebook’s second cell, add the following code. Change the variable values for the table name, the schema (database) name, the column name, and the column value as needed. Then attach the notebook to a cluster and run the notebook to see the results.
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "VVS2"
// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {
val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)
// And the specified column exists in that table...
if (columnExists(df, columnName)) {
// Then report the number of rows for the specified value in that column.
val numRows = numRowsInColumnForValue(df, columnName, columnValue)
println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
} else {
println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
}
} else {
println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}
Add the following code to a new cell in the preceding notebook or to a cell in a separate notebook. Change the schema or catalog names if necessary to match yours, and then run this cell to see the results.
SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
table_exists("main", "default", "diamonds")
THEN
-- And the specified column exists in that table...
(SELECT CASE
WHEN
column_exists("main", "default", "diamonds", "clarity")
THEN
-- Then report the number of rows for the specified value in that column.
printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
num_rows_for_clarity_in_diamonds("VVS2"))
ELSE
printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
END)
ELSE
printf("Table 'main.default.diamonds' does not exist.")
END
Escreva testes unitários
Esta seção descreve o código que testa cada uma das funções descritas no início deste artigo. Se você fizer alguma alteração nas funções no futuro, poderá usar testes de unidade para determinar se essas funções ainda funcionam conforme o esperado.
Se o senhor adicionou as funções do início deste artigo ao seu Databricks workspace, poderá adicionar testes de unidade para essas funções ao seu workspace da seguinte forma.
- Python
- R
- Scala
- SQL
Create another file named test_myfunctions.py
in the same folder as the preceding myfunctions.py
file in your repo, and add the following contents to the file. By default, pytest
looks for .py
files whose names start with test_
(or end with _test
) to test. Similarly, by default, pytest
looks inside of these files for functions whose names start with test_
to test.
In general, it is a best practice to not run unit tests against functions that work with data in production. This is especially important for functions that add, remove, or otherwise change data. To protect your production data from being compromised by your unit tests in unexpected ways, you should run unit tests against non-production data. One common approach is to create fake data that is as close as possible to the production data. The following code example creates fake data for the unit tests to run against.
import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "SI2"
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
StructField("_c0", IntegerType(), True), \
StructField("carat", FloatType(), True), \
StructField("cut", StringType(), True), \
StructField("color", StringType(), True), \
StructField("clarity", StringType(), True), \
StructField("depth", FloatType(), True), \
StructField("table", IntegerType(), True), \
StructField("price", IntegerType(), True), \
StructField("x", FloatType(), True), \
StructField("y", FloatType(), True), \
StructField("z", FloatType(), True), \
])
data = [ (1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]
df = spark.createDataFrame(data, schema)
# Does the table exist?
def test_tableExists():
assert tableExists(tableName, dbName) is True
# Does the column exist?
def test_columnExists():
assert columnExists(df, columnName) is True
# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
assert numRowsInColumnForValue(df, columnName, columnValue) > 0
Create another file named test_myfunctions.r
in the same folder as the preceding myfunctions.r
file in your repo, and add the following contents to the file. By default, testthat
looks for .r
files whose names start with test
to test.
In general, it is a best practice to not run unit tests against functions that work with data in production. This is especially important for functions that add, remove, or otherwise change data. To protect your production data from being compromised by your unit tests in unexpected ways, you should run unit tests against non-production data. One common approach is to create fake data that is as close as possible to the production data. The following code example creates fake data for the unit tests to run against.
library(testthat)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "SI2"
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
structField("_c0", "integer"),
structField("carat", "float"),
structField("cut", "string"),
structField("color", "string"),
structField("clarity", "string"),
structField("depth", "float"),
structField("table", "integer"),
structField("price", "integer"),
structField("x", "float"),
structField("y", "float"),
structField("z", "float"))
data <- list(list(as.integer(1), 0.23, "Ideal", "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))
df <- createDataFrame(data, schema)
# Does the table exist?
test_that ("The table exists.", {
expect_true(table_exists(table_name, db_name))
})
# Does the column exist?
test_that ("The column exists in the table.", {
expect_true(column_exists(df, column_name))
})
# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})
Create another Scala notebook in the same folder as the preceding myfunctions
Scala notebook, and add the following contents to this new notebook.
In the new notebook’s first cell, add the following code, which calls the %run
magic. This magic makes the contents of the myfunctions
notebook available to your new notebook.
%run ./myfunctions
In the second cell, add the following code. This code defines your unit tests and specifies how to run them.
In general, it is a best practice to not run unit tests against functions that work with data in production. This is especially important for functions that add, remove, or otherwise change data. To protect your production data from being compromised by your unit tests in unexpected ways, you should run unit tests against non-production data. One common approach is to create fake data that is as close as possible to the production data. The following code example creates fake data for the unit tests to run against.
import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._
class DataTests extends AsyncFunSuite {
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "SI2"
// Create fake data for the unit tests to run against.
// In general, it is a best practice to not run unit tests
// against functions that work with data in production.
val schema = StructType(Array(
StructField("_c0", IntegerType),
StructField("carat", FloatType),
StructField("cut", StringType),
StructField("color", StringType),
StructField("clarity", StringType),
StructField("depth", FloatType),
StructField("table", IntegerType),
StructField("price", IntegerType),
StructField("x", FloatType),
StructField("y", FloatType),
StructField("z", FloatType)
))
val data = Seq(
Row(1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
).asJava
val df = spark.createDataFrame(data, schema)
// Does the table exist?
test("The table exists") {
assert(tableExists(tableName, dbName) == true)
}
// Does the column exist?
test("The column exists") {
assert(columnExists(df, columnName) == true)
}
// Is there at least one row for the value in the specified column?
test("There is at least one matching row") {
assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
}
}
nocolor.nodurations.nostacks.stats.run(new DataTests)
This code example uses the FunSuite
style of testing in ScalaTest. For other available testing styles, see Selecting testing styles for your project.
Before you add unit tests, you should be aware that in general, it is a best practice to not run unit tests against functions that work with data in production. This is especially important for functions that add, remove, or otherwise change data. To protect your production data from being compromised by your unit tests in unexpected ways, you should run unit tests against non-production data. One common approach is to run unit tests against views instead of tables.
To create a view, you can call the CREATE VIEW command from a new cell in either the preceding notebook or a separate notebook. The following example assumes that you have an existing table named diamonds
within a schema (database) named default
within a catalog named main
. Change these names to match your own as needed, and then run only that cell.
USE CATALOG main;
USE SCHEMA default;
CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;
After you create the view, add each of the following SELECT
statements to its own new cell in the preceding notebook or to its own new cell in a separate notebook. Change the names to match your own as needed.
SELECT if(table_exists("main", "default", "view_diamonds"),
printf("PASS: The table 'main.default.view_diamonds' exists."),
printf("FAIL: The table 'main.default.view_diamonds' does not exist."));
SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));
SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));
execução de testes unitários
Esta seção descreve como executar os testes unitários que o senhor codificou na seção anterior. Quando o senhor executa os testes de unidade, obtém resultados que mostram quais testes de unidade foram aprovados e quais falharam.
Se o senhor adicionou os testes de unidade da seção anterior ao site Databricks workspace, poderá executar esses testes de unidade no site workspace. O senhor pode executar esses testes unitários manualmente ou em um programador.
- Python
- R
- Scala
- SQL
Create a Python notebook in the same folder as the preceding test_myfunctions.py
file in your repo, and add the following contents.
In the new notebook’s first cell, add the following code, and then run the cell, which calls the %pip
magic. This magic installs pytest
.
%pip install pytest
In the second cell, add the following code and then run the cell. Results show which unit tests passed and failed.
import pytest
import sys
# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True
# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])
# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
Create an R notebook in the same folder as the preceding test_myfunctions.r
file in your repo, and add the following contents.
In the first cell, add the following code, and then run the cell, which calls the install.packages
function. This function installs testthat
.
install.packages("testthat")
In the second cell, add the following code, and then run the cell. Results show which unit tests passed and failed.
library(testthat)
source("myfunctions.r")
test_dir(".", reporter = "tap")
Run the first and then second cells in the notebook from the preceding section. Results show which unit tests passed and failed.
Run each of the three cells in the notebook from the preceding section. Results show whether each unit test passed or failed.
If you no longer need the view after you run your unit tests, you can delete the view. To delete this view, you can add the following code to a new cell within one of the preceding notebooks and then run only that cell.
DROP VIEW view_diamonds;
O senhor logs pode view os resultados da execução do Notebook (incluindo os resultados dos testes de unidade) no driver do seu cluster. O senhor também pode especificar um local para a entrega log do do seu clustering.
O senhor pode configurar um sistema de integração contínua (CI) e entrega contínua (CD) ou de implantação (CI/CD), como o GitHub Actions, para executar automaticamente os testes de unidade sempre que o código for alterado. Como exemplo, veja a cobertura do site GitHub Actions nas práticas recomendadas de engenharia de software para o Notebook.
Recurso adicional
pytest
- página inicial do pytest
- Guia prático do pytest
- Guia de referência do pytest
- Melhores práticas de engenharia de software para notebooks