Teste de unidade para Notebook

Você pode usar o teste de unidade para ajudar a melhorar a qualidade e a consistência do código do seu Notebook . O teste de unidade é uma abordagem para testar unidades de código independentes, como funções, antecipadamente e com frequência. Isso ajuda você a encontrar problemas com seu código mais rapidamente, descobrir suposições errôneas sobre seu código mais cedo e simplificar seus esforços gerais de codificação.

Este artigo é uma introdução ao teste básico de unidade com funções. Conceitos avançados, como classes e interfaces de teste de unidade, bem como o uso de stubs, mocks e test chicotes, embora também suportados em testes de unidade para Notebook, estão fora do escopo deste artigo. Este artigo também não abrange outros tipos de métodos de teste, como teste de integração, teste de sistema, teste de aceitação ou métodos de teste não funcionais, como teste de desempenho ou teste de usabilidade.

Este artigo demonstra o seguinte:

  • Como organizar funções e seus testes de unidade.

  • Como escrever funções em Python, R, Scala, bem como funções definidas pelo usuário em SQL, que são bem projetadas para serem testadas na unidade.

  • Como chamar essas funções do Python, R, Scala e SQL Notebook.

  • Como escrever testes de unidade em Python, R e Scala usando as estruturas de teste populares pytest para Python, testthat para R e ScalaTest para Scala. Também como escrever SQL que testa funções SQL definidas pelo usuário (SQL UDFs).

  • Como executar esses testes de unidade do Python, R, Scala e SQL Notebook.

Organizar funções e testes de unidade

Existem algumas abordagens comuns para organizar suas funções e seus testes de unidade com Notebook. Cada abordagem tem seus benefícios e desafios.

Para Python, R e Scala Notebook, as abordagens comuns incluem o seguinte:

  • Armazene funções e seus testes de unidade fora do Notebook..

    • Benefícios: Você pode chamar essas funções com e fora do Notebook. Frameworks de teste são mais bem projetados para testes de execução fora do Notebook.

    • Desafios: Essa abordagem não é compatível com o Scala Notebook. Essa abordagem também aumenta o número de arquivos a serem rastreados e mantidos.

  • Armazene funções em um Notebook e seus testes de unidade em um Notebook separado..

    • Benefícios: Essas funções são mais fáceis de reutilizar no Notebook.

    • Desafios: O número de Notebook para rastrear e manter aumenta. Essas funções não podem ser usadas fora do Notebook. Essas funções também podem ser mais difíceis de testar fora do Notebook.

  • Armazene funções e seus testes de unidade no mesmo Notebook..

    • Benefícios: Funções e seus testes unitários são armazenados em um único Notebook para facilitar o acompanhamento e manutenção.

    • Desafios: essas funções podem ser mais difíceis de reutilizar no Notebook. Essas funções não podem ser usadas fora do Notebook. Essas funções também podem ser mais difíceis de testar fora do Notebook.

Para Python e R Notebook, o Databricks recomenda armazenar funções e seus testes de unidade fora do Notebook. Para Scala Notebook, Databricks recomenda incluir funções em um Notebook e seus testes de unidade em um Notebook separado.

Para SQL Notebook, Databricks recomenda que você armazene funções como funções SQL definidas pelo usuário (SQL UDFs) em seus esquemas (também conhecidos como bancos de dados). Você pode chamar essas UDFs SQL e seus testes de unidade do SQL Notebook.

Escrever funções

Esta seção descreve um conjunto simples de funções de exemplo que determinam o seguinte:

  • Se uma tabela existe em um banco de dados.

  • Se uma coluna existe em uma tabela.

  • Quantas linhas existem em uma coluna para um valor dentro dessa coluna.

Essas funções devem ser simples, para que você possa se concentrar nos detalhes do teste de unidade neste artigo, em vez de se concentrar nas próprias funções.

Para obter os melhores resultados de teste de unidade, uma função deve retornar um único resultado previsível e ser de um único tipo de dados. Por exemplo, para verificar se algo existe, a função deve retornar um valor Boolean verdadeiro ou falso. Para retornar o número de linhas existentes, a função deve retornar um número inteiro não negativo. Não deve, no primeiro exemplo, retornar falso se algo não existir ou a própria coisa se existir. Da mesma forma, para o segundo exemplo, ele não deve retornar o número de linhas existentes ou false se não houver linhas.

Você pode adicionar essas funções a um workspace Databricks existente da seguinte maneira, em Python, R, Scala ou SQL.

O código a seguir pressupõe que o senhor configurou as pastas Git do Databricks (Repos), adicionou um repo e tem o repo aberto no seu Databricks workspace.

Crie um arquivo chamado myfunctions.py no repositório e adicione o seguinte conteúdo ao arquivo. Outros exemplos neste artigo esperam que este arquivo seja nomeado myfunctions.py. Você pode usar nomes diferentes para seus próprios arquivos.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
  return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
  if columnName in dataFrame.columns:
    return True
  else:
    return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
  df = dataFrame.filter(col(columnName) == columnValue)

  return df.count()

O código a seguir pressupõe que o senhor configurou as pastas Git do Databricks (Repos), adicionou um repo e tem o repo aberto no seu Databricks workspace.

Crie um arquivo chamado myfunctions.r no repositório e adicione o seguinte conteúdo ao arquivo. Outros exemplos neste artigo esperam que este arquivo seja nomeado myfunctions.r. Você pode usar nomes diferentes para seus próprios arquivos.

library(SparkR)

# Does the specified table exist in the specified database?
table_exists <- function(table_name, db_name) {
  tableExists(paste(db_name, ".", table_name, sep = ""))
}

# Does the specified column exist in the given DataFrame?
column_exists <- function(dataframe, column_name) {
  column_name %in% colnames(dataframe)
}

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
num_rows_in_column_for_value <- function(dataframe, column_name, column_value) {
  df = filter(dataframe, dataframe[[column_name]] == column_value)

  count(df)
}

Crie um Scala Notebook chamado myfunctions com o seguinte conteúdo. Outros exemplos neste artigo esperam que este Notebook seja nomeado myfunctions. Você pode usar nomes diferentes para seu próprio Notebook.

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

// Does the specified table exist in the specified database?
def tableExists(tableName: String, dbName: String) : Boolean = {
  return spark.catalog.tableExists(dbName + "." + tableName)
}

// Does the specified column exist in the given DataFrame?
def columnExists(dataFrame: DataFrame, columnName: String) : Boolean = {
  val nameOfColumn = null

  for(nameOfColumn <- dataFrame.columns) {
    if (nameOfColumn == columnName) {
      return true
    }
  }

  return false
}

// How many rows are there for the specified value in the specified column
// in the given DataFrame?
def numRowsInColumnForValue(dataFrame: DataFrame, columnName: String, columnValue: String) : Long = {
  val df = dataFrame.filter(col(columnName) === columnValue)

  return df.count()
}

O código a seguir pressupõe que você tenha os dataset diamantes amostra de terceiros em um esquema chamado default em um catálogo chamado main que pode ser acessado no seu workspace do Databricks. Se o catálogo ou esquema que você deseja usar tiver um nome diferente, altere uma ou ambas as instruções USE a seguir para corresponder.

Crie um SQL Notebook e adicione o seguinte conteúdo a este novo Notebook. Em seguida, anexe o Notebook a um clusters e execute o Notebook para adicionar as seguintes UDFs SQL ao catálogo e esquema especificados.

Observação

As UDFs SQL table_exists e column_exists funcionam apenas com o Unity Catalog. O suporte SQL UDF para Unity Catalog está em visualização pública.

USE CATALOG main;
USE SCHEMA default;

CREATE OR REPLACE FUNCTION table_exists(catalog_name STRING,
                                        db_name      STRING,
                                        table_name   STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.tables
     WHERE table_catalog = table_exists.catalog_name
       AND table_schema  = table_exists.db_name
       AND table_name    = table_exists.table_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION column_exists(catalog_name STRING,
                                         db_name      STRING,
                                         table_name   STRING,
                                         column_name  STRING)
  RETURNS BOOLEAN
  RETURN if(
    (SELECT count(*) FROM system.information_schema.columns
     WHERE table_catalog = column_exists.catalog_name
       AND table_schema  = column_exists.db_name
       AND table_name    = column_exists.table_name
       AND column_name   = column_exists.column_name) > 0,
    true,
    false
  );

CREATE OR REPLACE FUNCTION num_rows_for_clarity_in_diamonds(clarity_value STRING)
  RETURNS BIGINT
  RETURN SELECT count(*)
         FROM main.default.diamonds
         WHERE clarity = clarity_value

funções de chamada

Esta seção descreve o código que chama as funções anteriores. Você pode usar essas funções, por exemplo, para contar o número de linhas na tabela em que existe um valor especificado em uma coluna especificada. No entanto, você deve verificar se a tabela realmente existe e se a coluna realmente existe nessa tabela antes de prosseguir. O código a seguir verifica essas condições.

Se você adicionou as funções da seção anterior ao seu workspace do Databricks, pode chamar essas funções do seu workspace da seguinte maneira.

Crie um Python Notebook na mesma pasta que o arquivo myfunctions.py anterior em seu repositório e adicione o seguinte conteúdo ao Notebook. Altere os valores das variáveis para o nome da tabela, o nome do esquema (banco de dados), o nome da coluna e o valor da coluna conforme necessário. Em seguida, anexe o Notebook a um clusters e execute o Notebook para ver os resultados.

from myfunctions import *

tableName   = "diamonds"
dbName      = "default"
columnName  = "clarity"
columnValue = "VVS2"

# If the table exists in the specified database...
if tableExists(tableName, dbName):

  df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")

  # And the specified column exists in that table...
  if columnExists(df, columnName):
    # Then report the number of rows for the specified value in that column.
    numRows = numRowsInColumnForValue(df, columnName, columnValue)

    print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
  else:
    print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
  print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.") 

Crie um R Notebook na mesma pasta que o arquivo myfunctions.r anterior em seu repositório e adicione o seguinte conteúdo ao Notebook. Altere os valores das variáveis para o nome da tabela, o nome do esquema (banco de dados), o nome da coluna e o valor da coluna conforme necessário. Em seguida, anexe o Notebook a um clusters e execute o Notebook para ver os resultados.

library(SparkR)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "VVS2"

# If the table exists in the specified database...
if (table_exists(table_name, db_name)) {

  df = sql(paste("SELECT * FROM ", db_name, ".", table_name, sep = ""))

  # And the specified column exists in that table...
  if (column_exists(df, column_name)) {
    # Then report the number of rows for the specified value in that column.
    num_rows = num_rows_in_column_for_value(df, column_name, column_value)

    print(paste("There are ", num_rows, " rows in table '", table_name, "' where '", column_name, "' equals '", column_value, "'.", sep = "")) 
  } else {
    print(paste("Column '", column_name, "' does not exist in table '", table_name, "' in schema (database) '", db_name, "'.", sep = ""))
  }

} else {
  print(paste("Table '", table_name, "' does not exist in schema (database) '", db_name, "'.", sep = ""))
}

Crie outro Scala Notebook na mesma pasta que o anterior myfunctions Scala Notebook, e adicione o seguinte conteúdo a este novo Notebook.

Na primeira célula deste novo Notebook , adicione o seguinte código, que chama o %run magic. Essa mágica disponibiliza o conteúdo do Notebook myfunctions para seu novo Notebook.

%run ./myfunctions

Na segunda célula deste novo Notebook , adicione o seguinte código. Altere os valores das variáveis para o nome da tabela, o nome do esquema (banco de dados), o nome da coluna e o valor da coluna conforme necessário. Em seguida, anexe o Notebook a um clusters e execute o Notebook para ver os resultados.

val tableName   = "diamonds"
val dbName      = "default"
val columnName  = "clarity"
val columnValue = "VVS2"

// If the table exists in the specified database...
if (tableExists(tableName, dbName)) {

  val df = spark.sql("SELECT * FROM " + dbName + "." + tableName)

  // And the specified column exists in that table...
  if (columnExists(df, columnName)) {
    // Then report the number of rows for the specified value in that column.
    val numRows = numRowsInColumnForValue(df, columnName, columnValue)

    println("There are " + numRows + " rows in '" + tableName + "' where '" + columnName + "' equals '" + columnValue + "'.")
  } else {
    println("Column '" + columnName + "' does not exist in table '" + tableName + "' in database '" + dbName + "'.")
  }

} else {
  println("Table '" + tableName + "' does not exist in database '" + dbName + "'.")
}

Adicione o código a seguir a uma nova célula no Notebook anterior ou a uma célula em um Notebook separado. Altere os nomes do esquema ou do catálogo, se necessário, para corresponder ao seu e, em seguida, execute esta célula para ver os resultados.

SELECT CASE
-- If the table exists in the specified catalog and schema...
WHEN
  table_exists("main", "default", "diamonds")
THEN
  -- And the specified column exists in that table...
  (SELECT CASE
   WHEN
     column_exists("main", "default", "diamonds", "clarity")
   THEN
     -- Then report the number of rows for the specified value in that column.
     printf("There are %d rows in table 'main.default.diamonds' where 'clarity' equals 'VVS2'.",
            num_rows_for_clarity_in_diamonds("VVS2"))
   ELSE
     printf("Column 'clarity' does not exist in table 'main.default.diamonds'.")
   END)
ELSE
  printf("Table 'main.default.diamonds' does not exist.")
END

Escrever testes de unidade

Esta seção descreve o código que testa cada uma das funções descritas no início deste artigo. Se você fizer alterações nas funções no futuro, poderá usar testes de unidade para determinar se essas funções ainda funcionam conforme o esperado.

Se você adicionou as funções no início deste artigo ao seu workspace do Databricks, pode adicionar testes de unidade para essas funções ao seu workspace da seguinte maneira.

Crie outro arquivo chamado test_myfunctions.py na mesma pasta que o arquivo myfunctions.py anterior em seu repositório e adicione o seguinte conteúdo ao arquivo. Por default, pytest procura por .py arquivos cujos nomes começam com test_ (ou terminam com _test) para testar. Da mesma forma, por default, pytest procura dentro desses arquivos por funções cujos nomes começam com test_ para testar.

Em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para proteger seus dados de produção de serem comprometidos por seus testes de unidade de maneiras inesperadas, você deve executar testes de unidade contra dados de não produção. Uma abordagem comum é criar dados falsos que sejam o mais próximo possível dos dados de produção. O exemplo de código a seguir cria dados falsos para a execução dos testes de unidade.

import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName    = "diamonds"
dbName       = "default"
columnName   = "clarity"
columnValue  = "SI2"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
                    .appName('integrity-tests') \
                    .getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
  StructField("_c0",     IntegerType(), True), \
  StructField("carat",   FloatType(),   True), \
  StructField("cut",     StringType(),  True), \
  StructField("color",   StringType(),  True), \
  StructField("clarity", StringType(),  True), \
  StructField("depth",   FloatType(),   True), \
  StructField("table",   IntegerType(), True), \
  StructField("price",   IntegerType(), True), \
  StructField("x",       FloatType(),   True), \
  StructField("y",       FloatType(),   True), \
  StructField("z",       FloatType(),   True), \
])

data = [ (1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
         (2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
  assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
  assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
  assert numRowsInColumnForValue(df, columnName, columnValue) > 0

Crie outro arquivo chamado test_myfunctions.r na mesma pasta que o arquivo myfunctions.r anterior em seu repositório e adicione o seguinte conteúdo ao arquivo. Por default, testthat procura por .r arquivos cujos nomes começam com test para testar.

Em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para proteger seus dados de produção de serem comprometidos por seus testes de unidade de maneiras inesperadas, você deve executar testes de unidade contra dados de não produção. Uma abordagem comum é criar dados falsos que sejam o mais próximo possível dos dados de produção. O exemplo de código a seguir cria dados falsos para a execução dos testes de unidade.

library(testthat)
source("myfunctions.r")

table_name   <- "diamonds"
db_name      <- "default"
column_name  <- "clarity"
column_value <- "SI2"

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema <- structType(
  structField("_c0",     "integer"),
  structField("carat",   "float"),
  structField("cut",     "string"),
  structField("color",   "string"),
  structField("clarity", "string"),
  structField("depth",   "float"),
  structField("table",   "integer"),
  structField("price",   "integer"),
  structField("x",       "float"),
  structField("y",       "float"),
  structField("z",       "float"))

data <- list(list(as.integer(1), 0.23, "Ideal",   "E", "SI2", 61.5, as.integer(55), as.integer(326), 3.95, 3.98, 2.43),
             list(as.integer(2), 0.21, "Premium", "E", "SI1", 59.8, as.integer(61), as.integer(326), 3.89, 3.84, 2.31))

df <- createDataFrame(data, schema)

# Does the table exist?
test_that ("The table exists.", {
  expect_true(table_exists(table_name, db_name))
})

# Does the column exist?
test_that ("The column exists in the table.", {
  expect_true(column_exists(df, column_name))
})

# Is there at least one row for the value in the specified column?
test_that ("There is at least one row in the query result.", {
  expect_true(num_rows_in_column_for_value(df, column_name, column_value) > 0)
})

Crie outro Scala Notebook na mesma pasta que o anterior myfunctions Scala Notebook, e adicione o seguinte conteúdo a este novo Notebook.

Na primeira célula do novo Notebook , adicione o seguinte código, que chama a mágica %run. Essa mágica disponibiliza o conteúdo do Notebook myfunctions para seu novo Notebook.

%run ./myfunctions

Na segunda célula, adicione o seguinte código. Este código define seus testes de unidade e especifica como executá-los.

Em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para proteger seus dados de produção de serem comprometidos por seus testes de unidade de maneiras inesperadas, você deve executar testes de unidade contra dados de não produção. Uma abordagem comum é criar dados falsos que sejam o mais próximo possível dos dados de produção. O exemplo de código a seguir cria dados falsos para a execução dos testes de unidade.

import org.scalatest._
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, FloatType, StringType}
import scala.collection.JavaConverters._

class DataTests extends AsyncFunSuite {

  val tableName   = "diamonds"
  val dbName      = "default"
  val columnName  = "clarity"
  val columnValue = "SI2"

  // Create fake data for the unit tests to run against.
  // In general, it is a best practice to not run unit tests
  // against functions that work with data in production.
  val schema = StructType(Array(
                 StructField("_c0",     IntegerType),
                 StructField("carat",   FloatType),
                 StructField("cut",     StringType),
                 StructField("color",   StringType),
                 StructField("clarity", StringType),
                 StructField("depth",   FloatType),
                 StructField("table",   IntegerType),
                 StructField("price",   IntegerType),
                 StructField("x",       FloatType),
                 StructField("y",       FloatType),
                 StructField("z",       FloatType)
               ))

  val data = Seq(
                  Row(1, 0.23, "Ideal",   "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43),
                  Row(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31)
                ).asJava

  val df = spark.createDataFrame(data, schema)

  // Does the table exist?
  test("The table exists") {
    assert(tableExists(tableName, dbName) == true)
  }

  // Does the column exist?
  test("The column exists") {
    assert(columnExists(df, columnName) == true)
  }

  // Is there at least one row for the value in the specified column?
  test("There is at least one matching row") {
    assert(numRowsInColumnForValue(df, columnName, columnValue) > 0)
  }
}

nocolor.nodurations.nostacks.stats.run(new DataTests)

Observação

Este exemplo de código usa o estilo FunSuite de teste em ScalaTest. Para outros estilos de teste disponíveis, consulte Selecionando estilos de teste para seu projeto.

Antes de adicionar testes de unidade, você deve estar ciente de que, em geral, é uma prática recomendada não executar testes de unidade em funções que trabalham com dados em produção. Isso é especialmente importante para funções que adicionam, removem ou alteram dados. Para proteger seus dados de produção de serem comprometidos por seus testes de unidade de maneiras inesperadas, você deve executar testes de unidade contra dados de não produção. Uma abordagem comum é executar testes de unidade em exibições em vez de tabelas.

Para criar uma exibição, você pode chamar o comando CREATE VIEW de uma nova célula no Notebook anterior ou em um Notebook separado. O exemplo a seguir supõe que você tenha uma tabela existente denominada diamonds em um esquema (banco de dados) denominado default em um catálogo denominado main. Altere esses nomes para corresponder aos seus conforme necessário e, em seguida, execute apenas essa célula.

USE CATALOG main;
USE SCHEMA default;

CREATE VIEW view_diamonds AS
SELECT * FROM diamonds;

Depois de criar a view, adicione cada uma das instruções SELECT a seguir à sua própria nova célula no Notebook anterior ou à sua própria nova célula em um Notebook separado. Altere os nomes para corresponder aos seus, conforme necessário.

SELECT if(table_exists("main", "default", "view_diamonds"),
          printf("PASS: The table 'main.default.view_diamonds' exists."),
          printf("FAIL: The table 'main.default.view_diamonds' does not exist."));

SELECT if(column_exists("main", "default", "view_diamonds", "clarity"),
          printf("PASS: The column 'clarity' exists in the table 'main.default.view_diamonds'."),
          printf("FAIL: The column 'clarity' does not exists in the table 'main.default.view_diamonds'."));

SELECT if(num_rows_for_clarity_in_diamonds("VVS2") > 0,
          printf("PASS: The table 'main.default.view_diamonds' has at least one row where the column 'clarity' equals 'VVS2'."),
          printf("FAIL: The table 'main.default.view_diamonds' does not have at least one row where the column 'clarity' equals 'VVS2'."));

testes de unidade de execução

Esta seção descreve como executar os testes de unidade que você codificou na seção anterior. Ao executar os testes de unidade, você obtém resultados mostrando quais testes de unidade passaram e falharam.

Se você adicionou os testes de unidade da seção anterior ao seu workspace Databricks, você pode executar esses testes de unidade do seu workspace. Você pode executar esses testes de unidade manualmente ou em um programa.

Crie um Python Notebook na mesma pasta que o arquivo test_myfunctions.py anterior em seu repositório e adicione o seguinte conteúdo.

Na primeira célula do novo Notebook , adicione o seguinte código e, em seguida, execute a célula, que chama a magia %pip. Esta mágica instala pytest.

%pip install pytest

Na segunda célula, adicione o seguinte código e, em seguida, execute a célula. Os resultados mostram quais testes de unidade foram aprovados e quais falharam.

import pytest
import sys

# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True

# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])

# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."

Crie um R Notebook na mesma pasta que o arquivo test_myfunctions.r anterior em seu repositório e adicione o seguinte conteúdo.

Na primeira célula, adicione o código a seguir e execute a célula, que chama a função install.packages . Esta função instala testthat.

install.packages("testthat")

Na segunda célula, adicione o seguinte código e, em seguida, execute a célula. Os resultados mostram quais testes de unidade passaram e falharam.

library(testthat)
source("myfunctions.r")

test_dir(".", reporter = "tap")

execução da primeira e depois da segunda células no Notebook da seção anterior. Os resultados mostram quais testes de unidade passaram e falharam.

execução de cada uma das três células no Notebook da seção anterior. Os resultados mostram se cada teste de unidade passou ou falhou.

Se você não precisar mais da view depois de executar seus testes de unidade, poderá excluir a view. Para excluir esta view, você pode adicionar o código a seguir a uma nova célula dentro de um dos Notebook anteriores e, em seguida, executar apenas essa célula.

DROP VIEW view_diamonds;

Dica

Você pode view os resultados da execução do seu Notebook (incluindo resultados de testes de unidade) nos logs do driver do seu cluster. Você também pode especificar um local para a clusters logs entrega dos.

Você pode configurar um sistema de integração contínua (CI) e entrega contínua (CD) ou implantação (CI/CD), como GitHub Actions, para executar automaticamente seus testes de unidade sempre que seu código mudar. Por exemplo, consulte a cobertura do GitHub Actions em práticas recomendadas de engenharia de software para Notebook.