Importar módulos Python de pastas Git ou arquivos de espaço de trabalho
O senhor pode armazenar o código Python em pastasDatabricks Git ou em arquivosworkspace e, em seguida, importar esse código Python para o pipeline Delta Live Tables. Para obter mais informações sobre como trabalhar com módulos em pastas Git ou arquivos workspace, consulte Work with Python and R modules.
Observação
O senhor não pode importar o código-fonte de um Notebook armazenado em uma pasta Databricks Git ou em um arquivo workspace. Em vez disso, adicione o Notebook diretamente quando o senhor criar ou editar um pipeline. Consulte Configurar um pipeline do Delta Live Tables.
Importe um módulo Python para um pipeline Delta Live Tables
O exemplo a seguir demonstra a importação de consultas dataset como módulos Python de arquivos workspace. Embora este exemplo descreva o uso de arquivos workspace para armazenar o código-fonte pipeline, o senhor pode usá-lo com o código-fonte armazenado em uma pasta Git.
Para executar este exemplo, utilize os seguintes passos:
Clique em workspace na barra lateral de seu Databricks workspace para abrir o navegador workspace.
Use o navegador workspace para selecionar um diretório para os módulos Python.
Clique em na coluna mais à direita do diretório selec ionado e clique em Create > File (Criar arquivo).
Digite um nome para o arquivo, por exemplo,
clickstream_raw_module.py
. O editor de arquivos é aberto. Para criar um módulo para ler dados de origem em uma tabela, digite o seguinte na janela do editor:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Para criar um módulo que crie uma nova tabela contendo dados preparados, crie um novo arquivo no mesmo diretório, digite um nome para o arquivo, por exemplo,
clickstream_prepared_module.py
, e digite o seguinte na nova janela do editor:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Em seguida, crie um pipeline Notebook. Acesse seu site Databricks páginas de aterrissagem e selecione Create a Notebook, ou clique em New na barra lateral e selecione Notebook. O senhor também pode criar o Notebook no navegador workspace clicando em e em Create > Notebook.
Nomeie seu Notebook e confirme que o Python é o idioma default.
Clique em Criar.
Digite o código de exemplo no Notebook.
Observação
Se o seu Notebook importar módulos ou pacotes de um caminho de arquivos workspace ou de um caminho de pastas Git diferente do diretório Notebook, o senhor deverá anexar manualmente o caminho aos arquivos usando
sys.path.append()
.Se estiver importando um arquivo de uma pasta Git, o senhor deve acrescentar
/Workspace/
ao caminho. Por exemplo,sys.path.append('/Workspace/...')
. A omissão de/Workspace/
no caminho resulta em um erro.Se os módulos ou o pacote estiverem armazenados no mesmo diretório que o Notebook, o senhor não precisará acrescentar o caminho manualmente. O senhor também não precisa anexar manualmente o caminho ao importar do diretório raiz de uma pasta Git porque o diretório raiz é automaticamente anexado ao caminho.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("LIVE.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Substitua
<module-path>
pelo caminho para o diretório que contém os módulos Python a serem importados.Crie um pipeline usando o novo Notebook.
Para executar o pipeline, na página de detalhes do Pipeline , clique em começar.
O senhor também pode importar o código Python como um pacote. O seguinte trecho de código de um Delta Live Tables Notebook importa o pacote test_utils
do diretório dlt_packages
dentro do mesmo diretório que o Notebook. O diretório dlt_packages
contém os arquivos test_utils.py
e __init__.py
, e test_utils.py
define a função create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)