Consultar bancos de dados com JDBC

O Databricks suporta a conexão com bancos de dados externos usando JDBC. Este artigo fornece a sintaxe básica para configurar e usar essas conexões, com exemplos em Python, SQL e Scala.

Experimental

As configurações descritas neste artigo são Experimentais. Os recursos experimentais são fornecidos como estão e não são suportados pelo Databricks por meio do suporte técnico ao cliente. Para obter suporte completo à federação de consultas, você deve usar o Lakehouse Federation, que permite que seus usuários do Databricks aproveitem a sintaxe do Unity Catalog e as ferramentas de governança de dados.

O Partner Connect apresenta integrações otimizadas para sincronizar dados com muitas fontes de dados externas. Consulte O que é Databricks Partner Connect?.

Importante

Os exemplos neste artigo não incluem nomes de usuário e senhas nas URLs JDBC. O Databricks recomenda usar segredos para armazenar as credenciais do seu banco de dados. Por exemplo:

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para fazer referência a segredos do Databricks com SQL, você deve configurar uma propriedade de configuração do Spark durante a inicialização do cluster.

Para obter um exemplo completo de gerenciamento de segredos, consulte o tutorial: Criar e usar um segredo Databricks .

Estabelecer conectividade em cloud

As VPCs do Databricks são configuradas para permitir somente clusters Spark. Na conexão com outra infraestrutura, a prática recomendada é usar o emparelhamento de VPC. Assim que o emparelhamento de VPC for estabelecido, você poderá verificar com o utilitário netcat no cluster.

%sh nc -vz <jdbcHostname> <jdbcPort>

Ler dados com JDBC

Você deve definir várias configurações para ler dados com JDBC. Observe que cada banco de dados usa um formato diferente para o <jdbc-url>.

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

O Spark lê automaticamente o esquema da tabela do banco de dados e associa seus tipos aos tipos Spark SQL.

employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema

Você pode executar consultas nesta tabela JDBC:

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gravar dados com JDBC

Salvar dados em tabelas com JDBC utiliza configurações semelhantes à leitura. Veja o seguinte exemplo:

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)
CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

O comportamento padrão tenta criar uma nova tabela e gera um erro se uma tabela com esse nome já existir.

Você pode adicionar dados a uma tabela existente usando a seguinte sintaxe:

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Você pode sobrescrever uma tabela existente usando a seguinte sintaxe:

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)
CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controlar o paralelismo para consultas JDBC

Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único encadeamento. Para melhorar o desempenho das leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Databricks faz ao seu banco de dados. Para clusters pequenos, definir a opção numPartitions igual ao número de núcleos de executor no seu cluster garante que todos os nós consultem os dados em paralelo.

Aviso

Definir numPartitions para um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático para bancos de dados de aplicativos. Tenha cautela ao definir esse valor acima de 50.

Observação

Acelere as consultas selecionando uma coluna com um índice calculado no banco de dados de origem para o partitionColumn.

O exemplo de código a seguir demonstra como configurar o paralelismo para um cluster com oito núcleos:

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Observação

O Databricks suporta todas as opções do Apache Spark para configurar JDBC.

Ao gravar em bancos de dados usando JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar os dados antes de gravar para controlar o paralelismo. Evite um número alto de partições em clusters grandes para não sobrecarregar seu banco de dados remoto. O exemplo a seguir demonstra como reparticionar em oito partições antes de gravar:

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)
CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Transfira uma consulta para o mecanismo do banco de dados

Você pode transferir uma consulta inteira para o banco de dados e retornar apenas o resultado. O parâmetro table identifica a tabela JDBC a ser lida. Você pode usar qualquer coisa que seja válida em uma cláusula FROM de consulta SQL.

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Controle o número de linhas buscadas por consulta

Os drivers JDBC têm um parâmetro fetchSize que controla o número de linhas buscadas por vez no banco de dados remoto.

Contexto

Resultado

Muito baixo

Alta latência devido a muitas idas e vindas (poucas linhas retornadas por consulta).

Muito alto

Erro de falta de memória (muitos dados retornados em uma consulta)

O valor ideal depende da carga de trabalho. As considerações incluem:

  • Quantas colunas são retornadas pela consulta?

  • Quais tipos de dados são retornados?

  • Por quanto tempo as strings em cada coluna são retornadas?

Sistemas podem ter valores default muito pequenos e podem se beneficiar de ajustes. Por exemplo: o valor padrão de fetchSize no Oracle é 10. Aumentá-lo para 100 reduz o número total de consultas que precisam ser executadas em um fator de 10. Resultados JDBC são tráfego de rede, então evite números muito grandes, mas valores ideais podem estar na casa dos milhares para muitos conjuntos de dados.

Use a opção fetchSize como no exemplo a seguir:

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()