Pular para o conteúdo principal

Importar módulos Python de pastas Git ou arquivos workspace

Você pode armazenar código Python em pastas Git Databricks ou em arquivos deworkspace e, em seguida, importar esse código Python para seu pipeline LakeFlow Declarative. Para obter mais informações sobre como trabalhar com módulos em pastas Git ou arquivos workspace , consulte Trabalhar com módulos Python e R.

Para importar um arquivo Python, você tem várias opções:

  • Inclua o módulo Python no seu pipeline, como um arquivo utilitários. Isso funciona melhor se o módulo for específico para o pipeline.
  • Adicione um módulo compartilhado ao seu ambiente de pipeline, em qualquer pipeline que precise usá-lo.
  • Importe um módulo do seu workspace diretamente para o seu código-fonte Python , com uma instrução import .

Incluir um módulo Python em um pipeline

Você pode criar um módulo Python como parte do seu pipeline. A pasta raiz do seu pipeline é automaticamente anexada ao sys.path. Isso permite que você faça referência ao módulo diretamente no código-fonte Python do seu pipeline.

O exemplo a seguir demonstra como criar um módulo Python na pasta raiz do seu pipeline e referenciá-lo a partir de um arquivo de origem Python no código-fonte do seu pipeline:

  1. Abra seu pipeline no editor de pipeline.

  2. No navegador ativo pipeline à esquerda, clique em Ícone de mais. Adicione e escolha utilidades no menu.

  3. Digite my_utils.py para o Nome .

  4. Deixe o caminho default e clique em Criar .

    Isso cria o arquivo my_utils.py na pasta utilities do seu pipeline e cria a pasta utilities , se ela não existir. Os arquivos nesta pasta não são adicionados à fonte pipeline por default, mas estão disponíveis para serem chamados a partir dos arquivos .py que fazem parte do seu código-fonte.

    Por default, o arquivo utilidades tem uma função de exemplo chamada distance_km() que pega uma distância em milhas e a converte.

  5. Em um arquivo de origem Python na sua pasta transformações (você pode criar um escolhendo Ícone de mais. Adicione , selecionando transformações no menu), adicione o seguinte código:

    Python
    from utilities import my_utils

Agora você pode chamar funções em my_utils a partir desse arquivo Python. Você deve adicionar a instrução import de qualquer arquivo Python que precise chamar funções no módulo.

Adicione um módulo Python ao seu ambiente de pipeline

Se você quiser compartilhar um módulo Python em vários pipelines, poderá salvar o módulo em qualquer lugar nos arquivos do seu workspace e referenciá-lo no ambiente de qualquer pipeline que precise usá-lo. Você pode referenciar módulos Python que são:

  • Arquivos Python individuais (.py).
  • Pacote do projeto Python como um arquivo Python wheel (.whl).
  • Projeto Python descompactado com um arquivo pyproject.toml (para definir o nome e a versão do projeto).

O exemplo a seguir mostra como adicionar uma dependência a um pipeline.

  1. Abra seu pipeline no editor de pipeline.

  2. Clique Ícone de engrenagem. Configurações na barra superior.

  3. No slide de configurações do pipeline , em ambiente do pipeline , clique em Ícone de lápis. Editar ambiente .

  4. Adicione uma dependência. Por exemplo, para adicionar um arquivo em seu workspace, você pode adicionar /Volumes/libraries/path/to/python_files/file.py. Para uma Python wheel armazenada em pastas Git , seu caminho pode ser parecido com /Workspace/libraries/path/to/wheel_files/file.whl.

    Você pode adicionar um arquivo sem caminho, ou um caminho relativo, se ele estiver na pasta raiz do pipeline.

nota

Você também pode adicionar um caminho para uma pasta compartilhada usando as dependências para permitir que instruções import no seu código encontrem os módulos que você deseja importar. Por exemplo, -e /Workspace/Users/<user_name>/path/to/add/.

Importe um módulo Python usando a instrução import

Você também pode referenciar diretamente um arquivo workspace no seu código-fonte Python .

  • Se o arquivo estiver na pasta utilities do seu pipeline, você poderá referenciá-lo sem um caminho:

    Python
    from utilities import my_module
  • Se o arquivo estiver em qualquer outro lugar, você pode importá-lo anexando primeiro o caminho do módulo ao sys.path:

    Python
    import sys, os
    sys.path.append(os.path.abspath('<module-path>'))

    from my_module import *
  • Você também pode anexar ao sys.path todos os arquivos de origem do pipeline adicionando o caminho para o ambiente do pipeline, conforme descrito na seção anterior.

Exemplo de importação de consultas como módulos Python

O exemplo a seguir demonstra a importação de consultas dataset como módulos Python de arquivos workspace . Embora este exemplo descreva o uso de arquivos workspace para armazenar o código-fonte pipeline , você pode usá-lo com o código-fonte armazenado em uma pasta Git .

Para executar este exemplo, utilize os seguintes passos:

  1. Clique ícone do espaço de trabalho. espaço de trabalho na barra lateral do seu workspace Databricks para abrir o navegador workspace .

  2. Use o navegador workspace para selecionar um diretório para os módulos Python .

  3. Clique Ícone do menu de kebab. na coluna mais à direita do diretório selecionado e clique em Criar > Arquivo .

  4. Digite um nome para o arquivo, por exemplo, clickstream_raw_module.py. O editor de arquivos é aberto. Para criar um módulo para ler dados de origem em uma tabela, digite o seguinte na janela do editor:

    Python
    from pyspark import pipelines as dp

    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"

    def create_clickstream_raw_table(spark):
    @dp.table
    def clickstream_raw():
    return (
    spark.read.json(json_path)
    )
  5. Para criar um módulo que cria uma nova tabela contendo dados preparados, crie um novo arquivo no mesmo diretório, insira um nome para o arquivo, por exemplo, clickstream_prepared_module.py, e insira o seguinte na nova janela do editor:

    Python
    from clickstream_raw_module import *
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    def create_clickstream_prepared_table(spark):
    create_clickstream_raw_table(spark)
    @dp.table
    @dp.expect("valid_current_page_title", "current_page_title IS NOT NULL")
    @dp.expect_or_fail("valid_count", "click_count > 0")
    def clickstream_prepared():
    return (
    spark.read("clickstream_raw")
    .withColumn("click_count", expr("CAST(n AS INT)"))
    .withColumnRenamed("curr_title", "current_page_title")
    .withColumnRenamed("prev_title", "previous_page_title")
    .select("current_page_title", "click_count", "previous_page_title")
    )
  6. Em seguida, crie um arquivo Python no código-fonte do seu pipeline. No editor de pipeline, escolha Ícone de mais. Adicione e depois transforme.

  7. Nomeie seu arquivo e confirme que Python é a linguagem default .

  8. Clique em Criar .

  9. Insira o seguinte código de exemplo no Notebook.

nota

Se o seu Notebook importar módulos ou pacotes de um caminho de arquivos workspace ou de um caminho de pastas Git diferente do diretório do Notebook, você deverá anexar manualmente o caminho aos arquivos usando sys.path.append().

Se você estiver importando um arquivo de uma pasta Git, você deve acrescentar /Workspace/ ao caminho. Por exemplo, sys.path.append('/Workspace/...'). Omitir /Workspace/ do caminho resulta em um erro.

Se os módulos ou pacotes estiverem armazenados no mesmo diretório que o Notebook, você não precisará anexar o caminho manualmente. Você também não precisa anexar manualmente o caminho ao importar do diretório raiz de uma pasta Git porque o diretório raiz é anexado automaticamente ao caminho.

Python
import sys, os
sys.path.append(os.path.abspath('<module-path>'))

from pyspark import pipelines as dp
from clickstream_prepared_module import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

create_clickstream_prepared_table(spark)

@dp.table(
comment="A table containing the top pages linking to the Apache Spark page."
)
def top_spark_referrers():
return (
spark.read.table("catalog_name.schema_name.clickstream_prepared")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.sort(desc("click_count"))
.select("referrer", "click_count")
.limit(10)
)

Substitua <module-path> pelo caminho para o diretório que contém os módulos Python a serem importados. 10. Para executar o pipeline, clique em pipelinede execução .