Databricks Connect para Databricks Runtime 12.2 LTS e abaixo
Observação
O Databricks Connect recomenda que você use o Databricks Connect para Databricks Runtime 13.0 e acima .
Databricks não planeja nenhum trabalho de novo recurso para Databricks Connect para Databricks Runtime 12.2 LTS e abaixo.
O Databricks Connect permite conectar IDEs populares, como Visual Studio Code e PyCharm, servidores Notebook e outros aplicativos personalizados a clusters Databricks.
Este artigo explica como o Databricks Connect funciona, orienta você nas passos para começar a usar o Databricks Connect, explica como solucionar problemas que podem surgir ao usar o Databricks Connect e as diferenças entre a execução usando o Databricks Connect e a execução em um Databricks Notebook.
Visão geral
Databricks Connect é uma biblioteca de cliente para o Databricks Runtime. Ele permite que você escreva Job usando APIs do Spark e execute-os remotamente em clusters Databricks em vez de na sessão local do Spark.
Por exemplo, quando você executa o comando DataFrame spark.read.format(...).load(...).groupBy(...).agg(...).show()
usando o Databricks Connect, a representação lógica do comando é enviada para o servidor Spark em execução no Databricks para execução nos clusters remotos.
Com o Databricks Connect, você pode:
execução Spark Job em larga escala de qualquer aplicativo Python, R, Scala ou Java. Em qualquer lugar que você puder
import pyspark
,require(SparkR)
ouimport org.apache.spark
, agora você pode executar o Spark Job diretamente do seu aplicativo, sem precisar instalar nenhum plug-in IDE ou usar scripts de envio do Spark.passo e código de depuração em seu IDE, mesmo ao trabalhar com clusters remotos.
Itere rapidamente ao desenvolver bibliotecas. Você não precisa reiniciar os clusters depois de alterar as dependências da biblioteca Python ou Java no Databricks Connect, porque cada sessão do cliente é isolada uma da outra nos clusters.
Desligue os clusters do Parado sem perder trabalho. Como o aplicativo cliente é desacoplado dos clusters, ele não é afetado por reinicializações ou atualizações clusters , o que normalmente faria com que você perdesse todas as variáveis, RDDs e objetos DataFrame definidos em um Notebook.
Observação
Para desenvolvimento Python com query SQL, Databricks recomenda que você use o Databricks SQL Connector para Python em vez de Databricks Connect. o Databricks SQL Connector para Python é mais fácil de configurar do que o Databricks Connect. Além disso, o Databricks Connect analisa e planeja a execução Job em sua máquina local, enquanto a execução Job em recursos compute remota. Isso pode tornar especialmente difícil depurar erros Runtime . O Databricks SQL Connector para Python envia query SQL diretamente para recursos compute remotos e busca resultados.
Requisitos
Esta seção lista os requisitos para o Databricks Connect.
Apenas as seguintes versões do Databricks Runtime são suportadas:
Databricks Runtime 12.2 LTS ML, Databricks Runtime 12.2 LTS
Databricks Runtime 11.3 LTS ML, Databricks Runtime 11.3 LTS
Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
Databricks Runtime 9.1 LTS ML, Databricks Runtime 9.1 LTS
Databricks Runtime 7.3 LTS
Você deve instalar o Python 3 em sua máquina de desenvolvimento e a versão secundária da instalação do cliente Python deve ser igual à versão secundária do Python de seus clusters Databricks. A tabela a seguir mostra a versão do Python instalada com cada Databricks Runtime.
Versão do Databricks Runtime
versão Python
12,2 LTS ML, 12,2 LTS
3.9
11,3 LTS ML, 11,3 LTS
3.9
10,4 LTSML, 10,4 LTS
3.8
9.1 LTS ML, 9.1 LTS
3.8
7.3 LTS
3.7
Databricks recomenda enfaticamente que você tenha um ambiente virtual Python ativado para cada versão Python que você usa com Databricks Connect. Os ambientes virtuais do Python ajudam a garantir que você esteja usando as versões corretas do Python e do Databricks Connect juntos. Isso pode ajudar a reduzir o tempo gasto na resolução de problemas técnicos relacionados.
Por exemplo, se estiver usando venv em sua máquina de desenvolvimento e seus clusters estiverem executando o Python 3.9, você deverá criar um ambiente
venv
com essa versão. O comando de exemplo a seguir gera os scripts para ativar um ambientevenv
com Python 3.9 e, em seguida, coloca esses scripts em uma pasta oculta chamada.venv
no diretório de trabalho atual:# Linux and macOS python3.9 -m venv ./.venv # Windows python3.9 -m venv .\.venv
Para usar esses scripts para ativar este ambiente
venv
, consulte Como funcionam os venvs.Como outro exemplo, se você estiver usando o Conda em sua máquina de desenvolvimento e seus clusters estiverem executando o Python 3.9, você deve criar um ambiente Conda com essa versão, por exemplo:
conda create --name dbconnect python=3.9
Para ativar o ambiente Conda com este nome de ambiente, execute
conda activate dbconnect
.A versão do pacote principal e secundário do Databricks Connect deve sempre corresponder à sua versão do Databricks Runtime. Databricks recomenda que você sempre use o pacote mais recente do Databricks Connect que corresponda à sua versão do Databricks Runtime. Por exemplo, quando você usa clusters Databricks Runtime 12.2 LTS, também deve usar o pacote
databricks-connect==12.2.*
.Observação
Consulte as notas sobre a versão do Databricks Connect para obter uma lista de lançamentos e atualizações de manutenção disponíveis do Databricks Connect.
Java Runtime Environment (JRE) 8. O cliente foi testado com o OpenJDK 8 JRE. O cliente não suporta Java 11.
Observação
No Windows, se você vir um erro que o Databricks Connect não consegue localizar winutils.exe
, consulte Cannot find winutils.exe on Windows.
Configurar o cliente
Conclua as passos a seguir para configurar o cliente local para Databricks Connect.
Observação
Antes de começar a configurar o cliente Databricks Connect local, você deve atender aos requisitos do Databricks Connect.
passo 1: Instalar o cliente Databricks Connect
Com seu ambiente virtual ativado, desinstale o PySpark, caso já esteja instalado, executando o comando
uninstall
. Isso é necessário porque o pacotedatabricks-connect
entra em conflito com o PySpark. Para obter detalhes, consulte Instalações conflitantes do PySpark. Para verificar se o PySpark já está instalado, execute o comandoshow
.# Is PySpark already installed? pip3 show pyspark # Uninstall PySpark pip3 uninstall pyspark
Com seu ambiente virtual ainda ativado, instale o cliente Databricks Connect executando o comando
install
. Use a opção--upgrade
para atualizar qualquer instalação de cliente existente para a versão especificada.pip3 install --upgrade "databricks-connect==12.2.*" # Or X.Y.* to match your cluster version.
Observação
Databricks recomenda que você anexe a notação “ponto-asterisco” para especificar
databricks-connect==X.Y.*
em vez dedatabricks-connect=X.Y
, para garantir que o pacote mais recente esteja instalado.
passo 2: configurar as propriedades da conexão
Colete as seguintes propriedades de configuração.
A URL do espaço de trabalho Databricks.
Seu access tokenpessoal do Databricks.
O ID de seus clusters. Você pode obter o ID clusters na URL. Aqui, o ID clusters é
0304-201045-hoary804
.A porta à qual o Databricks Connect se conecta em seus clusters. A porta default é
15001
.
Configure a conexão da seguinte maneira.
Você pode usar a CLI, configurações SQL ou variável de ambiente. A precedência dos métodos de configuração do maior para o menor é: SQL config key, CLI e variável de ambiente.
CLI
Execute
databricks-connect
.databricks-connect configure
A licença exibe:
Copyright (2018) Databricks, Inc. This library (the "Software") may not be used except in connection with the Licensee's use of the Databricks Platform Services pursuant to an Agreement ...
Aceite a licença e forneça os valores de configuração. Para Databricks Host e Databricks tokenss, insira a URL workspace e os access tokens pessoal que você anotou na passo 1.
Do you accept the above agreement? [y/N] y Set new config values (leave input empty to accept default): Databricks Host [no current value, must start with https://]: <databricks-url> Databricks Token [no current value]: <databricks-token> Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id> Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id> Port [15001]: <port>
SQL configs ou variável de ambiente. A tabela a seguir mostra a key de configuração SQL e a variável de ambiente que correspondem às propriedades de configuração que você anotou na passo 1. Para definir uma key de configuração SQL, use
sql("set config=value")
. Por exemplo:sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")
.Parâmetro
keyde configuração SQL
Nome da variável de ambiente
Host Databricks
spark.databricks.serviço.address
DATABRICKS_ADDRESS
tokensde databricks
spark.databricks.serviço.tokens
DATABRICKS_API_TOKEN
ID clusters
spark.databricks.serviço.clusterId
clusters
ID da organização
spark.databricks.serviço.orgId
DATABRICKS_ORG_ID
Porta
spark.databricks.serviço.port
DATABRICKS_PORT
Com seu ambiente virtual ainda ativado, teste a conectividade com o Databricks da seguinte maneira.
databricks-connect test
Se os clusters que você configurou não estiverem em execução, o teste começará os clusters que permanecerão em execução até o seu tempo de autoterminação configurado. A saída deve ser semelhante à seguinte:
* PySpark is installed at /.../.../pyspark * Checking java version java version "1.8..." Java(TM) SE Runtime Environment (build 1.8...) Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode) * Testing scala command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2... /_/ Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi res0: Long = 4950 scala> :quit * Testing python command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
Se nenhum erro relacionado à conexão for exibido (
WARN
mensagens estão corretas), você se conectou com sucesso.
Usar o Databricks Connect
A seção descreve como configurar seu servidor IDE ou Notebook preferido para usar o cliente para Databricks Connect.
Nesta secção:
JupyterLabGenericName
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar Databricks Connect com JupyterLab e Python, siga estas instruções.
Para instalar o JupyterLab, com seu ambiente virtual Python ativado, execute o seguinte comando no seu terminal ou Prompt de Comando:
pip3 install jupyterlab
Para iniciar o JupyterLab em seu navegador, execute o seguinte comando de seu ambiente virtual Python ativado:
jupyter lab
Se o JupyterLab não aparecer no seu navegador, copie o URL que começa com
localhost
ou127.0.0.1
do seu ambiente virtual e insira-o na barra de endereço do navegador.Crie um novo notebook: no JupyterLab, clique em File > New > Notebook no menu principal, selecione Python 3 (ipykernel) e clique em Select.
Na primeira célula do Notebook , insira o código de exemplo ou seu próprio código. Se você usar seu próprio código, deverá instanciar, no mínimo, uma instância de
SparkSession.builder.getOrCreate()
, conforme mostrado no código de exemplo.Para executar o Notebook, clique em execução > execução Todas as Células.
Para depurar o Notebook, clique no ícone do bug (Ativar depurador) ao lado de Python 3 (ipykernel) na barra de ferramentas do Notebook . Defina um ou mais pontos de interrupção e clique em execução > execução Todas as células.
Para desligar o JupyterLab, clique em Arquivo > Desligar. Se o processo do JupyterLab ainda estiver em execução em seu terminal ou prompt de comando, interrompa esse processo pressionando
Ctrl + c
e inserindoy
para confirmar.
Para obter instruções de depuração mais específicas, consulte Debugger.
Notebook Jupyter Clássico
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
O script de configuração do Databricks Connect adiciona automaticamente o pacote à configuração do seu projeto. Para começar em um kernel Python, execute:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Para ativar a abreviação %sql
para executar e visualizar query SQL, use o seguinte snippet:
from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
@magics_class
class DatabricksConnectMagics(Magics):
@line_cell_magic
def sql(self, line, cell=None):
if cell and line:
raise ValueError("Line must be empty for cell magic", line)
try:
from autovizwidget.widget.utils import display_dataframe
except ImportError:
print("Please run `pip install autovizwidget` to enable the visualization widget.")
display_dataframe = lambda x: x
return display_dataframe(self.get_spark().sql(cell or line).toPandas())
def get_spark(self):
user_ns = get_ipython().user_ns
if "spark" in user_ns:
return user_ns["spark"]
else:
from pyspark.sql import SparkSession
user_ns["spark"] = SparkSession.builder.getOrCreate()
return user_ns["spark"]
ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)
Código do Visual Studio
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar o Databricks Connect com o Visual Studio Code, faça o seguinte:
Verifique se a extensão Python está instalada.
Abra a paleta de comandos (comando+Shift+P no macOS e Ctrl+Shift+P no Windows/Linux).
Selecione um interpretador Python. Vá para Code > Preferences > Settings e escolha Python settings.
Execute
databricks-connect get-jar-dir
.Adicione o diretório retornado do comando ao JSON de configurações do usuário em
python.venvPath
. Isso deve ser adicionado à configuração do Python.Desabilite o linter. Clique em … no lado direito e edite as configurações JSON . As configurações modificadas são as seguintes:
Se estiver executando com um ambiente virtual, que é a maneira recomendada de desenvolver para Python no VS Code, na Paleta de comandos, digite
select python interpreter
e aponte para o ambiente que corresponde à versão do Python clusters .Por exemplo, se seus clusters forem Python 3.9, seu ambiente de desenvolvimento deverá ser Python 3.9.
PyCharmGenericName
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
O script de configuração do Databricks Connect adiciona automaticamente o pacote à configuração do seu projeto.
Clusters do Python 3
Ao criar um projeto PyCharm, selecione Existing Interpreter. No menu suspenso, selecione o ambiente Conda que você criou (consulte Requisitos).
Vá em execução > Editar configurações.
Adicione
PYSPARK_PYTHON=python3
como uma variável de ambiente.
SparkR e RStudio Desktop
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar Databricks Connect com SparkR e RStudio Desktop, faça o seguinte:
Baixe e descompacte a distribuição Spark de código aberto em sua máquina de desenvolvimento. Escolha a mesma versão em seus clusters Databricks (Hadoop 2.7).
execução
databricks-connect get-jar-dir
. Este comando retorna um caminho como/usr/local/lib/python3.5/dist-packages/pyspark/jars
. Copie o caminho do arquivo de um diretório acima do caminho do arquivo do diretório JAR, por exemplo,/usr/local/lib/python3.5/dist-packages/pyspark
, que é o diretórioSPARK_HOME
.Configure o caminho da biblioteca do Spark e a página inicial do Spark adicionando-os ao topo do seu script R. Defina
<spark-lib-path>
para o diretório onde você descompactou o pacote Spark de código aberto na passo 1. Defina<spark-home-path>
para o diretório Databricks Connect da passo 2.# Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7 library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths()))) # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark Sys.setenv(SPARK_HOME = "<spark-home-path>")
Inicie uma sessão do Spark e comece a executar os comandos do SparkR.
sparkR.session() df <- as.DataFrame(faithful) head(df) df1 <- dapply(df, function(x) { x }, schema(df)) collect(df1)
sparklyr e RStudio Desktop
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Visualização
Este recurso está em visualização pública.
Você pode copiar o código dependente do sparklyr que desenvolveu localmente usando o Databricks Connect e executá-lo em um Databricks Notebook ou RStudio Server hospedado em seu workspace do Databricks com alterações mínimas ou nenhuma alteração no código.
Nesta secção:
Requisitos
sparklyr 1.2 ouacima.
Databricks Runtime 7.3 LTS ouacima com a versão correspondente do Databricks Connect.
Instalar, configurar e usar o sparklyr
No RStudio Desktop, instale o sparklyr 1.2 ouacima do CRAN ou instale a versão master mais recente do GitHub.
# Install from CRAN install.packages("sparklyr") # Or install the latest master version from GitHub install.packages("devtools") devtools::install_github("sparklyr/sparklyr")
Ative o ambiente Python com a versão correta do Databricks Connect instalada e execute o seguinte comando no terminal para obter o
<spark-home-path>
:databricks-connect get-spark-home
Inicie uma sessão do Spark e comece a executar os comandos do sparklyr.
library(sparklyr) sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>") iris_tbl <- copy_to(sc, iris, overwrite = TRUE) library(dplyr) src_tbls(sc) iris_tbl %>% count
Feche a conexão.
spark_disconnect(sc)
recurso
Para obter mais informações, consulte o README do GitHub sparklyr.
Para obter exemplos de código, consulte sparklyr.
Limitações do Sparklyr e do RStudio Desktop
Os seguintes recursos não são suportados:
APIs de transmissão sparklyr
APIs de ML brilhantes
vassoura APIs
modo de serialização csv_file
faísca enviar
IntelliJ (Scala ou Java)
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar Databricks Connect com IntelliJ (Scala ou Java), faça o seguinte:
Execute
databricks-connect get-jar-dir
.Aponte as dependências para o diretório retornado do comando. Vá para Arquivo > Estrutura do Projeto > Módulos > Dependências > sinal '+' > JARs ou Diretórios.
Para evitar conflitos, é altamente recomendável remover qualquer outra instalação do Spark do seu caminho de classe. Se isso não for possível, certifique-se de que os JARs incluídos estejam na frente do caminho de classe. Em particular, eles devem estar à frente de qualquer outra versão instalada do Spark (caso contrário, você usará uma dessas outras versões e execução do Spark localmente ou lançará um
ClassDefNotFoundError
).Verifique a configuração da opção breakout no IntelliJ. O default é Todos e causará tempos limite de rede se você definir pontos de interrupção para depuração. Defina-o como Thread para evitar a interrupção dos threads de rede em segundo plano.
PyDev com Eclipse
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar Databricks Connect e PyDev com Eclipse, siga estas instruções.
começar Eclipse.
Crie um projeto: clique em Arquivo > Novo > Projeto > PyDev > Projeto PyDev e clique em Avançar.
Especifique um nome de projeto.
Para Conteúdo do projeto, especifique o caminho para seu ambiente virtual Python.
Clique em Configure um interpretador antes de prosseguir.
Clique em Configuração manual.
Clique em Novo > Procurar Python/pypy exe.
Procure e selecione o caminho completo para o interpretador Python referenciado no ambiente virtual e, em seguida, clique em Abrir.
Na caixa de diálogo Selecionar intérprete , clique em OK.
Na caixa de diálogo Seleção necessária , clique em OK.
Na caixa de diálogo Preferências , clique em Aplicar e Fechar.
Na caixa de diálogo Projeto PyDev , clique em Concluir.
Clique em Abrir perspectiva.
Adicione ao projeto um arquivo de código Python (
.py
) que contenha o código de exemplo ou seu próprio código. Se você usar seu próprio código, deverá instanciar, no mínimo, uma instância deSparkSession.builder.getOrCreate()
, conforme mostrado no código de exemplo.Com o arquivo de código Python aberto, defina quaisquer pontos de interrupção onde deseja que seu código seja pausado durante a execução.
Clique em execução > execução ou execução > Debug.
Para obter instruções de execução e depuração mais específicas, consulte Executando um programa.
Eclipse
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar Databricks Connect e Eclipse, faça o seguinte:
Execute
databricks-connect get-jar-dir
.Aponte a configuração dos JARs externos para o diretório retornado do comando. Vá para o menu Projeto > Propriedades > Java Build Path > biblioteca > Adicionar jars externos.
Para evitar conflitos, é altamente recomendável remover qualquer outra instalação do Spark do seu caminho de classe. Se isso não for possível, certifique-se de que os JARs incluídos estejam na frente do caminho de classe. Em particular, eles devem estar à frente de qualquer outra versão instalada do Spark (caso contrário, você usará uma dessas outras versões e execução do Spark localmente ou lançará um
ClassDefNotFoundError
).
SBT
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar o Databricks Connect com SBT, você deve configurar seu arquivo build.sbt
para vincular aos JARs do Databricks Connect em vez da dependência usual da biblioteca Spark. Você faz isso com a diretiva unmanagedBase
no arquivo de construção de exemplo a seguir, que assume um aplicativo Scala que possui um objeto principal com.example.Test
:
CascaSpark
Observação
Antes de começar a usar o Databricks Connect, você deve atender aos requisitos e configurar o cliente para Databricks Connect.
Para usar Databricks Connect com o shell Spark e Python ou Scala, siga estas instruções.
Com seu ambiente virtual ativado, certifique-se de que o comando
databricks-connect test
seja executado com sucesso em <span texecuçãoslate="no">1<span texecuçãoslate="no">2Configure o cliente <span texecuçãoslate="no">3<span texecuçãoslate="no">4.Com seu ambiente virtual ativado, comece o shell Spark. Para Python, execute o comando
pyspark
. Para Scala, execute o comandospark-shell
.# For Python: pyspark
# For Scala: spark-shell
O shell Spark aparece, por exemplo, para Python:
Python 3... (v3...) [Clang 6... (clang-6...)] on darwin Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.... /_/ Using Python version 3... (v3...) Spark context Web UI available at http://...:... Spark context available as 'sc' (master = local[*], app id = local-...). SparkSession available as 'spark'. >>>
Para Scala:
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3... /_/ Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala>
Consulte Análise interativa com o Spark Shell para obter informações sobre como usar o Spark shell com Python ou Scala para executar comandos em seus clusters.
Use a variável integrada
spark
para representar oSparkSession
em seus clusterss em execução, por exemplo, para Python:>>> df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
Para Scala:
>>> val df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
Para interromper o shell do Spark, pressione
Ctrl + d
ouCtrl + z
ou execute o comandoquit()
ouexit()
para Python ou:q
ou:quit
para Scala.
Exemplos de código
Este exemplo de código simples query a tabela especificada e mostra as primeiras 5 linhas da tabela especificada. Para usar uma tabela diferente, ajuste a chamada para spark.read.table
.
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
Este exemplo de código mais longo faz o seguinte:
Cria um DataFrame na memória.
Cria uma tabela com o nome
zzz_demo_temps_table
no esquemadefault
. Se a tabela com este nome já existir, a tabela será excluída primeiro. Para usar um esquema ou tabela diferente, ajuste as chamadas paraspark.sql
,temps.write.saveAsTable
ou ambos.Salva o conteúdo do DataFrame na tabela.
execução de uma query
SELECT
sobre o conteúdo da tabela.Mostra o resultado da query .
Exclui a tabela.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName('temps-demo').getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object Demo {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local").getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
temps.write().saveAsTable("zzz_demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table");
}
}
Trabalhar com dependências
Normalmente, sua classe principal ou arquivo Python terá outros JARs e arquivos de dependência. Você pode incluir esses arquivos e JARs de dependência chamando sparkContext.addJar("path-to-the-jar")
ou sparkContext.addPyFile("path-to-the-file")
. Você também pode adicionar arquivos Egg e arquivos zip com a interface addPyFile()
. Toda vez que você executa o código em seu IDE, os JARs e arquivos de dependência são instalados nos clusters.
from lib import Foo
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
#sc.setLogLevel("INFO")
print("Testing simple count")
print(spark.range(100).count())
print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())
class Foo(object):
def __init__(self, x):
self.x = x
UDFs Python + Java
from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column
## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
# val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}
spark = SparkSession.builder \
.config("spark.jars", "/path/to/udf.jar") \
.getOrCreate()
sc = spark.sparkContext
def plus_one_udf(col):
f = sc._jvm.com.example.Test.plusOne()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()
package com.example
import org.apache.spark.sql.SparkSession
case class Foo(x: String)
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
...
.getOrCreate();
spark.sparkContext.setLogLevel("INFO")
println("Running simple show query...")
spark.read.format("parquet").load("/tmp/x").show()
println("Running simple UDF query...")
spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
spark.udf.register("f", (x: Int) => x + 1)
spark.range(10).selectExpr("f(id)").show()
println("Running custom objects query...")
val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
println(objs.toSeq)
}
}
Utilidades do Access Databricks
Esta seção descreve como usar o Databricks Connect para acessar as utilidades do Databricks.
Você pode usar dbutils.fs
e dbutils.secrets
russas do módulo de referência Databricks russas (dbutils) . Os comandos suportados são dbutils.fs.cp
, dbutils.fs.head
, dbutils.fs.ls
, dbutils.fs.mkdirs
, dbutils.fs.mv
, dbutils.fs.put
, dbutils.fs.rm
, dbutils.secrets.get
, dbutils.secrets.getBytes
, dbutils.secrets.list
, dbutils.secrets.listScopes
. Veja Sistema de arquivos russos (dbutils.fs) ou execução dbutils.fs.help()
e Secrets russas (dbutils.secrets) ou execução dbutils.secrets.help()
.
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())
Ao usar Databricks Runtime 7.3 LTS ouacima, para acessar o módulo DBUtils de uma maneira que funcione localmente e em clusters Databricks, use o seguinte get_dbutils()
:
def get_dbutils(spark):
from pyspark.dbutils import DBUtils
return DBUtils(spark)
Caso contrário, use o seguinte get_dbutils()
:
def get_dbutils(spark):
if spark.conf.get("spark.databricks.service.client.enabled") == "true":
from pyspark.dbutils import DBUtils
return DBUtils(spark)
else:
import IPython
return IPython.get_ipython().user_ns["dbutils"]
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())
Copiando arquivos entre sistemas de arquivos locais e remotos
Você pode usar dbutils.fs
para copiar arquivos entre seu cliente e sistemas de arquivos remotos. Esquema file:/
refere-se ao sistema de arquivos local no cliente.
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')
O tamanho máximo de arquivo que pode ser transferido dessa forma é de 250 MB.
Definir configurações do Hadoop
No cliente, você pode definir as configurações do Hadoop usando a API spark.conf.set
, que se aplica a operações SQL e DataFrame. As configurações do Hadoop definidas no sparkContext
devem ser definidas na configuração clusters ou usando um Notebook. Isso ocorre porque as configurações definidas em sparkContext
não estão vinculadas às sessões do usuário, mas se aplicam a todos os clusters.
Solução de problemas
execução databricks-connect test
para verificar problemas de conectividade. Esta seção descreve alguns problemas comuns que você pode encontrar com o Databricks Connect e como resolvê-los.
Nesta secção:
Incompatibilidade de versão do Python
Verifique se a versão do Python que você está usando localmente tem pelo menos a mesma versão secundária da versão nos clusters (por exemplo, 3.9.16
versus 3.9.15
está OK, 3.9
versus 3.8
não está).
Se você tiver várias versões do Python instaladas localmente, verifique se o Databricks Connect está usando a correta definindo a variável de ambiente PYSPARK_PYTHON
(por exemplo, PYSPARK_PYTHON=python3
).
Servidor não ativado
Certifique-se de que os clusters tenham o servidor Spark ativado com spark.databricks.service.server.enabled true
. Você deve ver as seguintes linhas nos logs do driver, se for:
../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms
Instalações conflitantes do PySpark
O pacote databricks-connect
está em conflito com o PySpark. Ter ambos instalados causará erros ao inicializar o contexto do Spark em Python. Isso pode se manifestar de várias maneiras, incluindo erros de “transmissão corrompida” ou “classe não encontrada”. Se você tiver o PySpark instalado em seu ambiente Python, verifique se ele foi desinstalado antes de instalar o databricks-connect. Depois de desinstalar o PySpark, certifique-se de reinstalar totalmente o pacote Databricks Connect:
pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*" # or X.Y.* to match your specific cluster version.
Conflitante SPARK_HOME
Se você já usou o Spark em sua máquina, seu IDE pode ser configurado para usar uma dessas outras versões do Spark em vez do Databricks Connect Spark. Isso pode se manifestar de várias maneiras, incluindo erros de “transmissão corrompida” ou “classe não encontrada”. Você pode ver qual versão do Spark está sendo usada verificando o valor da variável de ambiente SPARK_HOME
:
import os
print(os.environ['SPARK_HOME'])
println(sys.env.get("SPARK_HOME"))
System.out.println(System.getenv("SPARK_HOME"));
Resolução
Se SPARK_HOME
estiver definido para uma versão do Spark diferente da do cliente, desative a variável SPARK_HOME
e tente novamente.
Verifique suas configurações de variável de ambiente IDE, seu arquivo .bashrc
, .zshrc
ou .bash_profile
e qualquer outro lugar onde as variáveis de ambiente possam ser definidas. Você provavelmente terá que sair e reiniciar seu IDE para limpar o estado antigo e pode até precisar criar um novo projeto se o problema persistir.
Você não precisa definir SPARK_HOME
para um novo valor; desativá-lo deve ser suficiente.
Entrada PATH
conflitante ou ausente para binários
É possível que seu PATH esteja configurado para que comandos como spark-shell
executem algum outro binário instalado anteriormente em vez daquele fornecido com o Databricks Connect. Isso pode fazer com que databricks-connect test
falhe. Você deve certificar-se de que os binários do Databricks Connect tenham precedência ou remova os instalados anteriormente.
Se você não pode executar comandos como spark-shell
, também é possível que seu PATH não tenha sido configurado automaticamente por pip3 install
e você precisará adicionar o diretório de instalação bin
ao seu PATH manualmente. É possível usar Databricks Connect com IDEs mesmo que isso não esteja configurado. No entanto, o comando databricks-connect test
não funcionará.
Configurações de serialização conflitantes nos clusters
Se você vir erros de “transmissão corrompida” ao executar databricks-connect test
, isso pode ser devido a configurações de serialização clusters incompatíveis. Por exemplo, definir a configuração spark.io.compression.codec
pode causar esse problema. Para resolver esse problema, considere remover essas configurações das configurações clusters ou definir a configuração no cliente Databricks Connect.
Não é possível encontrar winutils.exe
no Windows
Se você estiver usando Databricks Connect no Windows e veja:
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
Siga as instruções para configurar o caminho do Hadoop no Windows.
O nome do arquivo, nome do diretório ou sintaxe do rótulo do volume está incorreto no Windows
Se você estiver usando Windows e Databricks Connect e veja:
The filename, directory name, or volume label syntax is incorrect.
Java ou Databricks Connect foi instalado em um diretório com um espaço em seu caminho. Você pode contornar isso instalando em um caminho de diretório sem espaços ou configurando seu caminho usando o formulário de nome curto.
Limitações
transmissão estruturada.
Executar código arbitrário que não faz parte de um Spark Job nos clusters remotos.
As APIs nativas Scala, Python e R para operações de tabela Delta (por exemplo,
DeltaTable.forPath
) não são suportadas. No entanto, a API SQL (spark.sql(...)
) com operações Delta Lake e a Spark API (por exemplo,spark.read.load
) em tabelas Delta são suportadas.Copie em.
Utilizando funções SQL, Python ou Scala UDFs que fazem parte do catálogo do servidor. No entanto, os UDFs Scala e Python introduzidos localmente funcionam.
Apache Zeppelin 0.7.xe abaixo.
Conectando-se a clusters com controle de acesso da tabela.
Conectando-se a clusters com isolamento de processo ativado (em outras palavras, em que
spark.databricks.pyspark.enableProcessIsolation
é definido comotrue
).Comando SQL Delta
CLONE
.view temporária global.
Koalas e
pyspark.pandas
.CREATE TABLE table AS SELECT ...
Os comandos SQL nem sempre funcionam. Em vez disso, usespark.sql("SELECT ...").write.saveAsTable("table")
.