Consultar bancos de dados com JDBC

O Databricks é compatível com a conexão com bancos de dados externos com JDBC. Este artigo informa a sintaxe básica para configurar e usar essas conexões com exemplos em Python, SQL e Scala.

Observação

O senhor pode preferir o lakehouse Federation para gerenciar consultas a sistemas de bancos de dados externos. Veja o que é lakehouse Federation.

O Partner Connect apresenta integrações otimizadas para sincronizar dados com muitas fontes de dados externas. Consulte O que é o Databricks Partner Connect?.

Importante

Os exemplos neste artigo não incluem nomes de usuários e senhas em URLs JDBC. Databricks recomenda usar segredos para armazenar suas credenciais de banco de dados. Por exemplo:

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para fazer referência a segredos do Databricks com SQL, você deve configurar uma propriedade de configuração do Spark durante a inicialização do cluster.

Para obter um exemplo completo de gerenciamento de segredo, consulte Exemplo de fluxo de trabalho secreto.

Estabeleça conectividade em nuvem

As VPCs do Databricks são configuradas para permitir somente clusters Spark. Na conexão com outra infraestrutura, a prática recomendada é usar o emparelhamento de VPC. Assim que o emparelhamento de VPC for estabelecido, você poderá verificar com o utilitário netcat no cluster.

%sh nc -vz <jdbcHostname> <jdbcPort>

Ler dados com JDBC

Você deve definir várias configurações para ler dados com JDBC. Observe que cada banco de dados usa um formato diferente para o <jdbc-url>.

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

O Spark lê automaticamente o esquema da tabela do banco de dados e associa seus tipos aos tipos Spark SQL.

employees_table.printSchema
DESCRIBE employees_table_vw
employees_table.printSchema

Você pode executar consultas nesta tabela JDBC:

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gravar dados com JDBC

Salvar dados em tabelas com JDBC utiliza configurações semelhantes à leitura. Veja o seguinte exemplo:

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)
CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

O comportamento padrão tenta criar uma nova tabela e gera um erro se já existir uma tabela com esse nome.

Você pode anexar dados a uma tabela existente utilizando a seguinte sintaxe:

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)
CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Você pode substituir uma tabela existente usando a seguinte sintaxe:

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)
CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;
employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controlar o paralelismo para consultas JDBC

Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único encadeamento. Para melhorar o desempenho das leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Databricks faz ao seu banco de dados. Para clusters pequenos, a definição da opção numPartitions igual ao número de núcleos executores em seu cluster garante que todos os nós consultem dados em paralelo.

Aviso

A definição de numPartitions como um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático para bancos de dados de aplicativos. Tenha cuidado se definir esse valor acima de 50.

Observação

Acelere as consultas selecionando uma coluna com um índice calculado no banco de dados de origem para o partitionColumn.

O exemplo de código a seguir demonstra a configuração do paralelismo para um cluster com oito núcleos:

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Observação

O Databricks oferece suporte a todas as opções do Apache Spark para configurar o JDBC.

Ao gravar em bancos de dados com JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar dados antes de gravar para controlar o paralelismo. Evite um grande número de partições em clusters grandes para evitar sobrecarregar seu banco de dados remoto. O exemplo a seguir demonstra o reparticionamento em oito partições antes da gravação:

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)
CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Empurre uma consulta para o mecanismo de banco de dados

Você pode enviar uma consulta inteira para o banco de dados e retornar somente o resultado. O parâmetro table identifica a tabela JDBC a ser lida. Você pode usar qualquer coisa que seja válida em uma cláusula de consulta SQL FROM.

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Controle o número de linhas buscadas por consulta

Os drivers JDBC têm um parâmetro fetchSize que controla o número de linhas buscadas por vez no banco de dados remoto.

Contexto

Resultado

Muito baixo

Alta latência devido a muitas idas e voltas (poucas linhas retornadas por consulta)

Muito alto

Erro de falta de memória (muitos dados retornados em uma consulta)

O valor ideal depende da carga de trabalho. As considerações são:

  • Quantas colunas são retornadas pela consulta?

  • Quais tipos de dados são retornados?

  • Por quanto tempo as strings em cada coluna são retornadas?

Os sistemas podem ter um padrão muito pequeno e se beneficiarem do ajuste. Por exemplo: o fetchSize padrão do Oracle é 10. Aumentá-lo para 100 reduz o número total de consultas que precisam ser executadas por um fator de 10. Os resultados do JDBC são tráfego de rede, portanto evite números muito grandes, mas os valores ideais podem estar na casa dos milhares para muitos conjuntos de dados.

Use a opção fetchSize como no exemplo a seguir:

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)
val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()