Teste de unidade para o Notebook
O senhor pode usar o teste de unidade para ajudar a melhorar a qualidade e a consistência do código do Notebook. O teste unitário é uma abordagem para testar unidades de código independentes, como funções, com antecedência e frequência. Isso ajuda você a encontrar problemas com seu código com mais rapidez, descobrir suposições errôneas sobre seu código mais cedo e otimizar seus esforços gerais de codificação.
Este artigo é uma introdução aos testes unitários básicos com funções. Conceitos avançados, como classes e interfaces de teste de unidade, bem como o uso de stubs, mocks e chicotes de teste, embora também sejam compatíveis com o teste de unidade do Notebook, estão fora do escopo deste artigo. Este artigo também não abrange outros tipos de métodos de teste, como teste de integração, teste de sistema, teste de aceitação ou métodos de teste não funcionais, como teste de desempenho ou teste de usabilidade.
Este artigo demonstra o seguinte:
- Como organizar funções e seus testes unitários.
- Como escrever funções em Python, R, Scala, bem como funções definidas pelo usuário em SQL, que são bem projetadas para serem testadas por unidades.
- Como chamar essas funções em Python, R, Scala e SQL Notebook.
- Como escrever testes unitários em Python, R e Scala usando as estruturas de teste populares pytest para Python, testthat para R e ScalaTest para Scala. Além disso, como escrever SQL para testes unitários de funções definidas pelo usuário de SQL (SQL UDFs).
- Como executar esses testes de unidade em Python, R, Scala e SQL Notebook.
Databricks recomenda escrever e executar seus testes de unidade em um Notebook. Embora o senhor possa executar alguns comandos no terminal da Web, o terminal da Web tem mais limitações, como a falta de suporte para Spark. Consulte execução shell comando em Databricks web terminal.
Organize funções e testes unitários
Há algumas abordagens comuns para organizar suas funções e seus testes de unidade com o Notebook. Cada abordagem tem seus benefícios e desafios.
Para Python, R e Scala Notebook, as abordagens comuns incluem o seguinte:
-
Armazenar funções e seus testes de unidade fora do Notebook.
- Benefícios: O senhor pode chamar essas funções com e fora do Notebook. As estruturas de teste são mais bem projetadas para executar testes fora do Notebook.
- Desafios: Essa abordagem não é compatível com o site Scala Notebook. Essa abordagem também aumenta o número de arquivos a serem rastreados e mantidos.
-
Armazene funções em um Notebook e seus testes de unidade em um Notebook separado.
- Vantagens: Essas funções são mais fáceis de reutilizar no Notebook.
- Desafios: O número de notebooks a serem monitorados e mantidos aumenta. Essas funções não podem ser usadas fora do Notebook. Essas funções também podem ser mais difíceis de testar fora do Notebook.
-
Armazenar funções e seus testes de unidade no mesmo Notebook.
- Benefícios: As funções e seus testes unitários são armazenados em um único Notebook para facilitar o acompanhamento e a manutenção.
- Desafios: Essas funções podem ser mais difíceis de reutilizar no Notebook. Essas funções não podem ser usadas fora do Notebook. Essas funções também podem ser mais difíceis de testar fora do Notebook.
Para o Python e o R Notebook, o Databricks recomenda armazenar funções e seus testes de unidade fora do Notebook. Para o Scala Notebook, o Databricks recomenda incluir funções em um Notebook e seus testes de unidade em um Notebook separado.
Para o SQL Notebook, o Databricks recomenda que o senhor armazene funções como SQL funções definidas pelo usuário (SQL UDFs) em seus esquemas (também conhecidos como bancos de dados). Em seguida, o senhor pode chamar esses UDFs SQL e seus testes unitários no SQL Notebook.
Funções de escrita
Esta seção descreve um conjunto simples de exemplos de funções que determinam o seguinte:
- Se existe uma tabela em um banco de dados.
- Se existe uma coluna em uma tabela.
- Quantas linhas existem em uma coluna para um valor dentro dessa coluna.
A intenção é que essas funções sejam simples, para que o senhor possa se concentrar nos detalhes do teste de unidade neste artigo, em vez de se concentrar nas próprias funções.
Para obter os melhores resultados de testes unitários, uma função deve retornar um único resultado previsível e ter um único tipo de dados. Por exemplo, para verificar se algo existe, a função deve retornar um valor booleano de verdadeiro ou falso. Para retornar o número de linhas existentes, a função deve retornar um número inteiro não negativo. No primeiro exemplo, não deve retornar nem falso se algo não existir, nem a coisa em si, se existir. Da mesma forma, para o segundo exemplo, ele não deve retornar o número de linhas existentes ou falso se não houver nenhuma linha.
O senhor pode adicionar essas funções a um Databricks workspace existente da seguinte forma, em Python, R, Scala, ou SQL.
- Python
- R
- Scala
- SQL
O código a seguir pressupõe que o senhor tenha configurado as pastas Databricks Git, adicionado um repositório e que o repositório esteja aberto em seu Databricks workspace.
Crie um arquivo chamado myfunctions.py
no repositório e adicione o seguinte conteúdo ao arquivo. Outros exemplos neste artigo esperam que esse arquivo tenha o nome myfunctions.py
. Você pode usar nomes diferentes para seus próprios arquivos.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
return spark.catalog.tableExists(f"{dbName}.{tableName}")
# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
if columnName in dataFrame.columns:
return True
else:
return False
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
df = dataFrame.filter(col(columnName) == columnValue)
return df.count()
O código a seguir pressupõe que o senhor tenha configurado as pastas Databricks Git, adicionado um repositório e que o repositório esteja aberto em seu Databricks workspace.
Crie um arquivo chamado myfunctions.r
no repositório e adicione o seguinte conteúdo ao arquivo. Outros exemplos neste artigo esperam que esse arquivo tenha o nome myfunctions.r
. Você pode usar nomes diferentes para seus próprios arquivos.
library(SparkR)
# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
tableExists(paste(db_name, ".", table_name, sep = ""))
}
# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
column_name %in% colnames(dataframe)
}
# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
df = filter(dataframe, dataframe[[column_name]] == column_value)
count(df)
}
Crie um NotebookScala chamado myfunctions
com o seguinte conteúdo. Outros exemplos neste artigo esperam que este Notebook seja chamado de myfunctions
. O senhor pode usar nomes diferentes para seu próprio Notebook.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
return spark.catalog.tableExists(dbName + "." + tableName)
}
// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
val nameOfColumn = null
for(nameOfColumn <- dataFrame.columns) {
if (nameOfColumn == columnName) {
return true
}
}
return false
}
// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
val df = dataFrame.filter(col(columnName) === columnValue)
return df.count()
}
O código a seguir pressupõe que o senhor tenha o dataset diamante de amostra de terceiros em um esquema chamado default
em um catálogo chamado main
que pode ser acessado Databricks workspace em . Se o catálogo ou esquema que você deseja usar tiver um nome diferente, altere uma ou ambas as instruções USE
a seguir para que correspondam.
Crie um NotebookSQL e adicione os seguintes conteúdos a esse novo Notebook. Em seguida, anexe o Notebook a um cluster e execute o Notebook para adicionar os seguintes UDFs SQL ao catálogo e ao esquema especificados.
Os UDFs SQL table_exists
e column_exists
funcionam somente com o Unity Catalog. O suporte a SQL UDF para o Unity Catalog está em visualização pública.
USE CATALOG main;
USE SCHEMA default;
CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
db_name STRING,
table_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.tables
WHERE table_catalog = table_exists.catalog_name
AND table_schema = table_exists.db_name
AND table_name = table_exists.table_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
db_name STRING,
table_name STRING,
column_name STRING)
RETURNS BOOLEAN
RETURN if(
(SELECT count(*) FROM system.information_schema.columns
WHERE table_catalog = column_exists.catalog_name
AND table_schema = column_exists.db_name
AND table_name = column_exists.table_name
AND column_name = column_exists.column_name) > 0,
true,
false
);
CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
RETURNS BIGINT
RETURN SELECT count(*)
FROM main.default.diamonds
WHERE clarity = clarity_value
Funções de chamada
Esta seção descreve o código que chama as funções anteriores. Você pode usar essas funções, por exemplo, para contar o número de linhas na tabela em que um valor especificado existe em uma coluna especificada. No entanto, você gostaria de verificar se a tabela realmente existe e se a coluna realmente existe nessa tabela antes de continuar. O código a seguir verifica essas condições.
Se tiver adicionado as funções da seção anterior ao seu Databricks workspace, poderá chamar essas funções do seu workspace da seguinte forma.
- Python
- R
- Scala
- SQL
Crie um Notebook Python na mesma pasta que o arquivo myfunctions.py
anterior em seu repositório e adicione o seguinte conteúdo ao Notebook. Altere os valores das variáveis para o nome da tabela, o nome do esquema (banco de dados), o nome da coluna e o valor da coluna conforme necessário. Em seguida, anexe o Notebook a um cluster e execute o Notebook para ver os resultados.
from myfunctions import *
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "VVS2"
# If the table exists in the specified database...
if tableExists(tableName, dbName):
df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")
# And the specified column exists in that table...
if columnExists(df, columnName):
# Then report the number of rows for the specified value in that column.
numRows = numRowsInColumnForValue(df, columnName, columnValue)
print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
else:
print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.")
Crie um R Notebook na mesma pasta que o arquivo myfunctions.r
anterior em seu repositório e adicione o seguinte conteúdo ao Notebook. Altere os valores das variáveis para o nome da tabela, o nome do esquema (banco de dados), o nome da coluna e o valor da coluna conforme necessário. Em seguida, anexe o Notebook a um cluster e execute o Notebook para ver os resultados.
library(SparkR)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "VVS2"
# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {
df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))
# And the specified column exists in that table...
if (column_exists(df, column_name)) {
# Then report the number of rows for the specified value in that column.
num_rows = num_rows_in_column_for_value(df, column_name, column_value)
print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = ""))
} else {
print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
}
} else {
print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}
Crie outro Notebook Scala na mesma pasta que o Notebook myfunctions
Scala anterior e adicione o seguinte conteúdo a esse novo Notebook.
Na primeira célula desse novo Notebook, adicione o seguinte código, que chama a mágica de execução%. Essa mágica torna o conteúdo do Notebook myfunctions
disponível para o novo Notebook.
%run ./myfunctions
Na segunda célula desse novo Notebook, adicione o seguinte código. Altere os valores das variáveis para o nome da tabela, o nome do esquema (banco de dados), o nome da coluna e o valor da coluna conforme necessário. Em seguida, anexe o Notebook a um cluster e execute o Notebook para ver os resultados.
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "VVS2"
// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {
val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)
// And the specified column exists in that table...
if (columnExists(df, columnName)) {
// Then report the number of rows for the specified value in that column.
val numRows = numRowsInColumnForValue(df, columnName, columnValue)
println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
} else {
println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
}
} else {
println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}
Adicione o código a seguir a uma nova célula no Notebook anterior ou a uma célula em um Notebook separado. Se necessário, altere os nomes do esquema ou do catálogo para que correspondam aos seus e, em seguida, execute essa célula para ver os resultados.
SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
table_exists("main", "default", "diamonds")
THEN
-- And the specified column exists in that table...
(SELECT CASE
WHEN
column_exists("main", "default", "diamonds", "clarity")
THEN
-- Then report the number of rows for the specified value in that column.
printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
num_rows_for_clarity_in_diamonds("VVS2"))
ELSE
printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
END)
ELSE
printf("Table 'main.default.diamonds' does not exist.")
END
Escreva testes unitários
Esta seção descreve o código que testa cada uma das funções descritas no início deste artigo. Se você fizer alguma alteração nas funções no futuro, poderá usar testes de unidade para determinar se essas funções ainda funcionam conforme o esperado.
Se o senhor adicionou as funções do início deste artigo ao seu Databricks workspace, poderá adicionar testes de unidade para essas funções ao seu workspace da seguinte forma.
- Python
- R
- Scala
- SQL
Crie outro arquivo chamado test_myfunctions.py
na mesma pasta do arquivo myfunctions.py
anterior em seu repositório e adicione o conteúdo a seguir ao arquivo. Por default, pytest
procura por arquivos .py
cujos nomes comecem com test_
(ou terminem com _test
) para testar. Da mesma forma, por default, pytest
procura dentro desses arquivos as funções cujos nomes começam com test_
para testar.
Em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para evitar que os dados de produção sejam comprometidos pelos testes de unidade de forma inesperada, o senhor deve executar testes de unidade em dados que não sejam de produção. Uma abordagem comum é criar dados falsos que estejam o mais próximo possível dos dados de produção. O exemplo de código a seguir cria dados falsos para a execução dos testes de unidade.
import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "SI2"
# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
StructField("_c0", IntegerType(), True), \
StructField("carat", FloatType(), True), \
StructField("cut", StringType(), True), \
StructField("color", StringType(), True), \
StructField("clarity", StringType(), True), \
StructField("depth", FloatType(), True), \
StructField("table", IntegerType(), True), \
StructField("price", IntegerType(), True), \
StructField("x", FloatType(), True), \
StructField("y", FloatType(), True), \
StructField("z", FloatType(), True), \
])
data = [ (1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]
df = spark.createDataFrame(data, schema)
# Does the table exist?
def test_tableExists():
assert tableExists(tableName, dbName) is True
# Does the column exist?
def test_columnExists():
assert columnExists(df, columnName) is True
# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
assert numRowsInColumnForValue(df, columnName, columnValue) > 0
Crie outro arquivo chamado test_myfunctions.r
na mesma pasta do arquivo myfunctions.r
anterior em seu repositório e adicione o conteúdo a seguir ao arquivo. Em default, testthat
procura por arquivos .r
cujos nomes comecem com test
para testar.
Em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para evitar que os dados de produção sejam comprometidos pelos testes de unidade de forma inesperada, o senhor deve executar testes de unidade em dados que não sejam de produção. Uma abordagem comum é criar dados falsos que estejam o mais próximo possível dos dados de produção. O exemplo de código a seguir cria dados falsos para a execução dos testes de unidade.
library(testthat)
source("myfunctions.r")
table_name <- "diamonds"
db_name <- "default"
column_name <- "clarity"
column_value <- "SI2"
# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
structField("_c0", "integer"),
structField("carat", "float"),
structField("cut", "string"),
structField("color", "string"),
structField("clarity", "string"),
structField("depth", "float"),
structField("table", "integer"),
structField("price", "integer"),
structField("x", "float"),
structField("y", "float"),
structField("z", "float"))
data <- list(list(as.integer(1), 0.23, "Ideal", "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))
df <- createDataFrame(data, schema)
# Does the table exist?
test_that ("The table exists.", {
expect_true(table_exists(table_name, db_name))
})
# Does the column exist?
test_that ("The column exists in the table.", {
expect_true(column_exists(df, column_name))
})
# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})
Crie outro Notebook Scala na mesma pasta que o Notebook myfunctions
Scala anterior e adicione o seguinte conteúdo a esse novo Notebook.
Na primeira célula do novo Notebook, adicione o seguinte código, que chama a mágica %run
. Essa mágica torna o conteúdo do Notebook myfunctions
disponível para o novo Notebook.
%run ./myfunctions
Na segunda célula, adicione o código a seguir. Esse código define seus testes unitários e especifica como executá-los.
Em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para evitar que os dados de produção sejam comprometidos pelos testes de unidade de forma inesperada, o senhor deve executar testes de unidade em dados que não sejam de produção. Uma abordagem comum é criar dados falsos que estejam o mais próximo possível dos dados de produção. O exemplo de código a seguir cria dados falsos para a execução dos testes de unidade.
import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._
class DataTests extends AsyncFunSuite {
val tableName = "diamonds"
val dbName = "default"
val columnName = "clarity"
val columnValue = "SI2"
// Create fake data for the unit tests to run against.
// In general, it is a best practice to not run unit tests
// against functions that work with data in production.
val schema = StructType(Array(
StructField("_c0", IntegerType),
StructField("carat", FloatType),
StructField("cut", StringType),
StructField("color", StringType),
StructField("clarity", StringType),
StructField("depth", FloatType),
StructField("table", IntegerType),
StructField("price", IntegerType),
StructField("x", FloatType),
StructField("y", FloatType),
StructField("z", FloatType)
))
val data = Seq(
Row(1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
).asJava
val df = spark.createDataFrame(data, schema)
// Does the table exist?
test("The table exists") {
assert(tableExists(tableName, dbName) == true)
}
// Does the column exist?
test("The column exists") {
assert(columnExists(df, columnName) == true)
}
// Is there at least one row for the value in the specified column?
test("There is at least one matching row") {
assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
}
}
nocolor.nodurations.nostacks.stats.run(new DataTests)
Este exemplo de código usa o estilo FunSuite
de teste no ScalaTest. Para outros estilos de teste disponíveis, consulte Seleção de estilos de teste para seu projeto.
Antes de adicionar testes unitários, o senhor deve estar ciente de que, em geral, é uma prática recomendada não executar testes unitários em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para evitar que os dados de produção sejam comprometidos pelos testes de unidade de forma inesperada, o senhor deve executar testes de unidade em dados que não sejam de produção. Uma abordagem comum é executar testes de unidade em relação à visualização em vez de tabelas.
Para criar um view, o senhor pode chamar o comando CREATE VIEW de uma nova célula no Notebook anterior ou em um Notebook separado. O exemplo a seguir pressupõe que você tenha uma tabela existente chamada diamonds
em um esquema (banco de dados) chamado default
em um catálogo chamado main
. Altere esses nomes para que correspondam aos seus, conforme necessário, e execute apenas essa célula.
USE CATALOG main;
USE SCHEMA default;
CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;
Depois de criar o view, adicione cada uma das seguintes declarações do SELECT
à sua própria célula nova no Notebook anterior ou à sua própria célula nova em um Notebook separado. Altere os nomes para que correspondam aos seus, conforme necessário.
SELECT if(table_exists("main", "default", "view_diamonds"),
printf("PASS: The table 'main.default.view_diamonds' exists."),
printf("FAIL: The table 'main.default.view_diamonds' does not exist."));
SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));
SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));
execução de testes unitários
Esta seção descreve como executar os testes unitários que o senhor codificou na seção anterior. Quando o senhor executa os testes de unidade, obtém resultados que mostram quais testes de unidade foram aprovados e quais falharam.
Se o senhor adicionou os testes de unidade da seção anterior ao site Databricks workspace, poderá executar esses testes de unidade no site workspace. O senhor pode executar esses testes unitários manualmente ou em um programador.
- Python
- R
- Scala
- SQL
Crie um Notebook Python na mesma pasta que o arquivo test_myfunctions.py
anterior em seu repositório e adicione o seguinte conteúdo.
Na primeira célula do novo Notebook, adicione o seguinte código e, em seguida, execute a célula que chama a mágica %pip
. Essa mágica instala o pytest
.
%pip install pytest
Na segunda célula, adicione o seguinte código e, em seguida, execute a célula. Os resultados mostram quais testes unitários foram aprovados e falharam.
import pytest
import sys
# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True
# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])
# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
Crie um R Notebook na mesma pasta que o arquivo test_myfunctions.r
anterior em seu repositório e adicione o seguinte conteúdo.
Na primeira célula, adicione o seguinte código e, em seguida, execute a célula que chama a função install.packages
. Essa função instala testthat
.
install.packages("testthat")
Na segunda célula, adicione o seguinte código e, em seguida, execute a célula. Os resultados mostram quais testes unitários foram aprovados e falharam.
library(testthat)
source("myfunctions.r")
test_dir(".", reporter = "tap")
Execute a primeira e a segunda células no Notebook da seção anterior. Os resultados mostram quais testes unitários foram aprovados e falharam.
Execute cada uma das três células do Notebook da seção anterior. Os resultados mostram se cada teste unitário foi aprovado ou falhado.
Se o senhor não precisar mais do view depois de executar os testes de unidade, poderá excluir o view. Para excluir esse view, o senhor pode adicionar o seguinte código a uma nova célula em um dos Notebooks anteriores e, em seguida, executar somente essa célula.
DROP VIEW view_diamonds;
O senhor logs pode view os resultados da execução do Notebook (incluindo os resultados dos testes de unidade) no driver do seu cluster. O senhor também pode especificar um local para a entrega log do do seu clustering.
O senhor pode configurar um sistema de integração contínua (CI) e entrega contínua (CD) ou de implantação (CI/CD), como o GitHub Actions, para executar automaticamente os testes de unidade sempre que o código for alterado. Como exemplo, veja a cobertura do site GitHub Actions nas práticas recomendadas de engenharia de software para o Notebook.
Recurso adicional
pytest
- página inicial do pytest
- Guia prático do pytest
- Guia de referência do pytest
- Melhores práticas de engenharia de software para notebooks