Importar módulos Python de pastas Git ou arquivos workspace
O senhor pode armazenar o código Python em pastasDatabricks Git ou em arquivosworkspace e, em seguida, importar esse código Python para o pipeline DLT. Para obter mais informações sobre como trabalhar com módulos em pastas Git ou arquivos workspace, consulte Work with Python and R modules.
O senhor não pode importar o código-fonte de um Notebook armazenado em uma pasta Databricks Git ou em um arquivo workspace. Em vez disso, adicione o Notebook diretamente quando o senhor criar ou editar um pipeline. Consulte Configurar um pipeline DLT.
Importar um módulo Python para um pipeline DLT
O exemplo a seguir demonstra a importação de consultas dataset como módulos Python de arquivos workspace. Embora este exemplo descreva o uso de arquivos workspace para armazenar o código-fonte pipeline, o senhor pode usá-lo com o código-fonte armazenado em uma pasta Git.
Para executar esse exemplo, use as etapas a seguir:
-
Clique em
workspace na barra lateral de seu Databricks workspace para abrir o navegador workspace.
-
Use o navegador workspace para selecionar um diretório para os módulos Python.
-
Clique
na coluna mais à direita do diretório selecionado e clique em Criar arquivo > .
-
Insira um nome para o arquivo, por exemplo,
clickstream_raw_module.py
. O editor de arquivos é aberto. Para criar um módulo para ler os dados de origem em uma tabela, digite o seguinte na janela do editor:Pythonfrom dlt import *
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
def create_clickstream_raw_table(spark):
@table
def clickstream_raw():
return (
spark.read.json(json_path)
) -
Para criar um módulo que crie uma nova tabela contendo dados preparados, crie um novo arquivo no mesmo diretório, digite um nome para o arquivo, por exemplo,
clickstream_prepared_module.py
, e digite o seguinte na nova janela do editor:Pythonfrom clickstream_raw_module import *
from dlt import read
from pyspark.sql.functions import *
from pyspark.sql.types import *
def create_clickstream_prepared_table(spark):
create_clickstream_raw_table(spark)
@table
@expect("valid_current_page_title", "current_page_title IS NOT NULL")
@expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
return (
read("clickstream_raw")
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_title", "click_count", "previous_page_title")
) -
Em seguida, crie um pipeline Notebook. Acesse Databricks páginas de aterrissagem e selecione Create a Notebook ou clique em
New na barra lateral e selecione Notebook . O senhor também pode criar o Notebook no navegador workspace clicando em
e em Create > Notebook .
-
Dê um nome ao seu Notebook e confirme que Python é o idioma default.
-
Clique em Criar .
-
Digite o código de exemplo no Notebook.
Se o Notebook importar módulos ou pacotes de um caminho de arquivos workspace ou de um caminho de pastas Git diferente do diretório do Notebook, o senhor deverá anexar manualmente o caminho aos arquivos usando sys.path.append()
.
Se estiver importando um arquivo de uma pasta do Git, o senhor deve acrescentar /Workspace/
ao caminho. Por exemplo, sys.path.append('/Workspace/...')
. Omitir /Workspace/
do caminho resulta em um erro.
Se os módulos ou o pacote estiverem armazenados no mesmo diretório que o Notebook, o senhor não precisará acrescentar o caminho manualmente. O senhor também não precisa anexar manualmente o caminho ao importar do diretório raiz de uma pasta Git, pois o diretório raiz é automaticamente anexado ao caminho.
import sys, os
# You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
sys.path.append(os.path.abspath('<module-path>'))
import dlt
from clickstream_prepared_module import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
create_clickstream_prepared_table(spark)
@dlt.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
spark.read.table("catalog_name.schema_name.clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)
Substitua <module-path>
pelo caminho para o diretório que contém os módulos Python a serem importados.
10. Crie um pipeline usando o novo Notebook.
- Para executar o pipeline, na página de detalhes do pipeline , clique em começar .
O senhor também pode importar o código Python como um pacote. O trecho de código a seguir de um DLT Notebook importa o pacote test_utils
do diretório dlt_packages
dentro do mesmo diretório do Notebook. O diretório dlt_packages
contém os arquivos test_utils.py
e __init__.py
, e test_utils.py
define a função create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)