Consulte o Amazon Redshift usando Databricks
Você pode ler e gravar tabelas do Amazon Redshift com Databricks.
Experimental
As configurações descritas neste artigo são Experimentais. Os recursos experimentais são fornecidos como estão e não são suportados pelo Databricks por meio do suporte técnico ao cliente. Para obter suporte completo à federação query , você deve usar lakehouse Federation, que permite que os usuários do Databricks aproveitem a sintaxe do Unity Catalog e as ferramentas de governança de dados.
A fonte de dados Databricks Redshift usa o Amazon S3 para transferir dados de forma eficiente para dentro e para fora do Redshift e usa JDBC para acionar automaticamente os comandos COPY
e UNLOAD
apropriados no Redshift.
Observação
No Databricks Runtime 11.3 LTS e acima, o Databricks Runtime inclui o driver Redshift JDBC, acessível usando a palavra-chave redshift
para a opção de formato. Consulte as notas do Databricks Runtime sobre versões de versão e compatibilidade para versões de driver incluídas em cada Databricks Runtime. Os drivers fornecidos pelo usuário ainda são suportados e têm precedência sobre o driver JDBC incluído.
No Databricks Runtime 10.4 LTS e versões anteriores, é necessária a instalação manual do driver Redshift JDBC e query deve usar o driver (com.databricks.spark.redshift
) para o formato. Consulte Instalação do driver Redshift.
Uso
Os exemplos a seguir demonstram a conexão com o driver Redshift. Substitua os valores do parâmetro url
se estiver usando o driver PostgreSQL JDBC.
Depois de configurar suas credenciais da AWS, você pode usar a fonte de dados com a API Spark fonte de dados em Python, SQL, R ou Scala.
Importante
Os locais externos definidos no Unity Catalog não são suportados como tempdir
locais.
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
Leia o uso de dados SQL no Databricks Runtime 10.4 LTS e abaixo:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Leia o uso de dados SQL no Databricks Runtime 11.3 LTS e acima:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Escreva o uso de dados SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
A API SQL oferece suporte apenas à criação de novas tabelas e não à substituição ou acréscimo.
Leia o uso de dados R no Databricks Runtime 10.4 LTS e abaixo:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Leia o uso de dados R no Databricks Runtime 11.3 LTS e acima:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Recomendações para trabalhar com Redshift
a execução query pode extrair grandes quantidades de dados para o S3. Se você planeja realizar várias query nos mesmos dados no Redshift, a Databricks recomenda salvar a extração de dados usando Delta Lake.
Observação
Você não deve criar clusters Redshift dentro do VPC de gerenciamento do Databricks, pois isso pode levar a problemas de permissões devido ao modelo de segurança no VPC do Databricks. Você deve criar sua própria VPC e, em seguida, realizar o peering de VPC para conectar o Databricks à sua instância do Redshift.
Configuração
Autenticando para S3 e Redshift
A fonte de dados envolve diversas conexões de rede, ilustradas no diagrama a seguir:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
A fonte de dados lê e grava dados no S3 ao transferir dados de/para o Redshift. Como resultado, são necessárias credenciais da AWS com acesso de leitura e gravação a um bucket S3 (especificado usando o parâmetro de configuração tempdir
).
Observação
A fonte de dados não limpa os arquivos temporários que cria no S3. Como resultado, recomendamos que você use um bucket S3 temporário dedicado com uma configuração de ciclo de vida de objeto para garantir que os arquivos temporários sejam excluídos automaticamente após um período de expiração especificado. Consulte a seção Criptografia deste documento para obter uma discussão sobre como criptografar esses arquivos. Você não pode usar um local externo definido no Unity Catalog como um local tempdir
.
As seções a seguir descrevem as opções de configuração de autenticação de cada conexão:
Driver Spark para Redshift
O driver Spark se conecta ao Redshift via JDBC usando nome de usuário e senha. O Redshift não oferece suporte ao uso da IAM role para autenticar esta conexão. Por default, esta conexão usa criptografia SSL; para mais detalhes, veja Criptografia.
Spark para S3
S3 atua como um intermediário para armazenar dados em massa ao ler ou gravar no Redshift. O Spark se conecta ao S3 usando as interfaces do Hadoop FileSystem e diretamente usando o cliente S3 do Amazon Java SDK.
Observação
Você não pode usar montagens DBFS para configurar o acesso ao S3 para Redshift.
cadeia de provedores de credenciaisdefault (melhor opção para a maioria dos usuários): as credenciais da AWS são recuperadas automaticamente por meio do DefaultAWSCredentialsProviderChain. Se você usar instance profile para autenticar no S3, provavelmente deverá usar este método.
Os métodos a seguir para fornecer credenciais têm precedência sobre esse default.
Ao assumir uma IAM role: você pode usar uma IAM role que o instance profile possa assumir. Para especificar o ARN da função, você deve anexar um instance profile aos clusters e fornecer a seguinte key de configuração:
sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole") sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>) // An optional duration, expressed as a quantity and a unit of // time, such as "15m" or "1h" sc.hadoopConfiguration.set("fs.s3a.assumed.role.session.duration", <duration>)
sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole") sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>) # An optional duration, expressed as a quantity and a unit of # time, such as "15m" or "1h" sc._jsc.hadoopConfiguration().set("fs.s3a.assumed.role.session.duration", <duration>)
Definir key na configuração do Hadoop: você pode especificar key AWS usando as propriedades de configuração do Hadoop. Se sua configuração
tempdir
apontar para um sistema de arquivoss3a://
, você poderá definir as propriedadesfs.s3a.access.key
efs.s3a.secret.key
em um arquivo de configuração XML do Hadoop ou chamarsc.hadoopConfiguration.set()
para definir a configuração global do Hadoop do Spark. Se você usar um sistema de arquivoss3n://
, poderá fornecer a key configuração herdada conforme mostrado no exemplo a seguir.Por exemplo, se você estiver usando o sistema de arquivos
s3a
, adicione:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Para o sistema de arquivos herdado
s3n
, adicione:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
O comando a seguir depende de alguns componentes internos do Spark, mas deve funcionar com todas as versões do PySpark e é improvável que mude no futuro:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift para S3
O Redshift também se conecta ao S3 durante query COPY
e UNLOAD
. Existem três métodos para autenticar esta conexão:
Faça com que o Redshift assuma uma IAM role (mais segura): você pode conceder permissão ao Redshift para assumir uma IAM role durante as operações
COPY
ouUNLOAD
e, em seguida, configurar a fonte de dados para instruir o Redshift a usar essa função:Crie uma IAM role concedendo permissões S3 apropriadas ao seu bucket.
Siga o guia Autorizando o Amazon Redshift a acessar outro serviço da AWS em seu nome para configurar a política de confiança dessa função para permitir que o Redshift assuma essa função.
Siga as passos no guia Autorizando as operações COPY e UNLOAD usando a IAM role para associar essa IAM role aos clusters do Redshift.
Defina a opção
aws_iam_role
da fonte de dados para o ARN da função.
Encaminhar as credenciais S3 do Spark para o Redshift: se a opção
forward_spark_s3_credentials
estiver definida comotrue
, a fonte de dados descobrirá automaticamente as credenciais que o Spark está usando para se conectar ao S3 e encaminhará essas credenciais para o Redshift por JDBC. Se o Spark estiver autenticando no S3 usando um instance profile , um conjunto de credenciais STS temporárias será encaminhado para o Redshift; caso contrário, key AWS serão encaminhadas. A query JDBC incorpora estas credenciais, portanto, a Databricks recomenda vivamente que ative a encriptação SSL da ligação JDBC ao utilizar este método de autenticação.Use credenciais do serviço tokens de segurança (STS): você pode configurar as propriedades de configuração
temporary_aws_access_key_id
,temporary_aws_secret_access_key
etemporary_aws_session_token
para apontar para key temporária criada por meio do serviço tokens de segurança da AWS. A query JDBC incorpora essas credenciais, portanto, é altamente recomendável ativar a criptografia SSL da conexão JDBC ao usar esse método de autenticação. Se você escolher esta opção, esteja ciente do risco de as credenciais expirarem antes que as operações de leitura/gravação sejam bem-sucedidas.
Essas três opções são mutuamente exclusivas e você deve escolher explicitamente qual delas usar.
Criptografia
Protegendo JDBC: a menos que alguma configuração relacionada a SSL esteja presente na URL JDBC, a fonte de dados por default habilita a criptografia SSL e também verifica se o servidor Redshift é confiável (ou seja,
sslmode=verify-full
). Para isso, um certificado de servidor é downloads automaticamente dos servidores Amazon na primeira vez que for necessário. Caso isso falhe, um arquivo de certificado pré-agrupado será usado como fallback. Isso vale para os drivers Redshift e PostgreSQL JDBC.Caso haja algum problema com este recurso ou você simplesmente queira desativar o SSL, você pode ligar para
.option("autoenablessl", "false")
no seuDataFrameReader
ouDataFrameWriter
.Se desejar especificar configurações personalizadas relacionadas ao SSL, você pode seguir as instruções na documentação do Redshift: Usando SSL e certificados de servidor em Java e opções de configuração do driver JDBC Quaisquer opções relacionadas ao SSL presentes no JDBC
url
usado com a fonte de os dados têm precedência (ou seja, a configuração automática não será acionada).Criptografando dados UNLOAD armazenados no S3 (dados armazenados durante a leitura do Redshift): De acordo com a documentação do Redshift sobre Descarregamento de dados para S3, “UNLOAD criptografa automaticamente arquivos de dados usando a criptografia do lado do servidor Amazon S3 (SSE-S3).”
O Redshift também oferece suporte à criptografia do lado do cliente com uma key personalizada (consulte: Descarregando arquivos de dados criptografados), mas a fonte de dados não tem a capacidade de especificar a key simétrica necessária.
Criptografando dados COPY armazenados no S3 (dados armazenados ao gravar no Redshift): De acordo com a documentação do Redshift em Carregamento de arquivos de dados criptografados do Amazon S3:
Você pode usar o comando COPY
para carregar arquivos de dados que foram upload no Amazon S3 usando criptografia no lado do servidor com key de criptografia gerenciada pela AWS (SSE-S3 ou SSE-KMS), criptografia no lado do cliente ou ambas. COPY não oferece suporte à criptografia no lado do servidor do Amazon S3 com uma key fornecida pelo cliente (SSE-C).
Para usar esse recurso, configure seu sistema de arquivos Hadoop S3 para usar a criptografia do Amazon S3. Isso não criptografará o arquivo MANIFEST
que contém uma lista de todos os arquivos gravados.
Parâmetros
O mapa de parâmetros ou OPTIONS fornecido no Spark SQL oferece suporte às seguintes configurações:
Parâmetro |
Obrigatório |
Padrão |
Descrição |
---|---|---|---|
tabela de banco de dados |
Sim, a menos que query seja especificada. |
Nenhuma |
A tabela a ser criada ou lida no Redshift. Este parâmetro é necessário ao salvar dados no Redshift. |
query |
Sim, a menos que dbtable seja especificado. |
Nenhuma |
A query para leitura no Redshift. |
usuário |
Não |
Nenhuma |
O nome de usuário do Redshift. Deve ser usado em conjunto com a opção de senha. Pode ser utilizado somente se o usuário e a senha não forem passados na URL, passar ambos resultará em erro. Use este parâmetro quando o nome de usuário contiver caracteres especiais que precisam ser escapados. |
Senha |
Não |
Nenhuma |
A senha do Redshift. Deve ser usado em conjunto com a opção |
url |
Sim |
Nenhuma |
Uma URL JDBC, no formato jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>
|
caminho_pesquisa |
Não |
Nenhuma |
Defina o caminho de pesquisa do esquema no Redshift. Será definido usando o comando |
aws_iam_role |
Somente se estiver usando IAM role para autorizar. |
Nenhuma |
ARN totalmente especificado da função de operações COPY/UNLOAD do IAM Redshift anexada aos clusters Redshift, por exemplo, |
forward_spark_s3_credentials |
Não |
|
Se |
temporário_aws_access_key_id |
Não |
Nenhuma |
key de acesso AWS, deve ter permissões de gravação no bucket S3. |
temporário_aws_secret_access_key |
Não |
Nenhuma |
key de acesso secreta da AWS correspondente à key de acesso fornecida. |
temporário_aws_session_token |
Não |
Nenhuma |
tokens de sessão da AWS correspondentes à key de acesso fornecida. |
tempdir |
Sim |
Nenhuma |
Um local gravável no Amazon S3, para ser usado para dados descarregados durante a leitura e dados Avro para serem carregados no Redshift durante a gravação. Se você estiver usando a fonte de dados do Redshift para Spark como parte de um pipeline ETL regular, pode ser útil definir uma política de ciclo de vida em um bucket e usá-la como um local temporário para esses dados. Você não pode usar locais externos definidos no Unity Catalog como |
jdbcdriver |
Não |
Determinado pelo subprotocolo do URL JDBC. |
O nome da classe do driver JDBC a ser usado. Esta classe deve estar no classpath. Na maioria dos casos, não será necessário especificar esta opção, pois o nome da classe do driver apropriado deverá ser determinado automaticamente pelo subprotocolo da URL JDBC. |
estilo dist |
Não |
|
O estilo de distribuição Redshift a ser usado ao criar uma tabela. Pode ser um de |
tecla de disquete |
Não, a menos que use |
Nenhuma |
O nome de uma coluna na tabela a ser usada como key de distribuição ao criar uma tabela. |
sortkeyspec |
Não |
Nenhuma |
Uma definição completa keyRedshift Sort . Exemplos incluem:
|
usestagingtable (obsoleto) |
Não |
|
Definir esta opção obsoleta como Como a configuração de |
Descrição |
Não |
Nenhuma |
Uma descrição para a tabela. Será definido usando o comando SQL COMMENT e deverá aparecer na maioria das ferramentas query . Consulte também os metadados |
pré-ações |
Não |
Nenhuma |
Uma lista separada por Esteja avisado que se esses comandos falharem, isso será tratado como um erro e uma exceção será lançada. Se estiver usando uma tabela intermediária, as alterações serão revertidas e a tabela de backup restaurada se as pré-ações falharem. |
postagens |
Não |
Nenhuma |
Uma lista separada por Esteja avisado que se esses comandos falharem, isso será tratado como um erro e uma exceção será lançada. Se estiver usando uma tabela intermediária, as alterações serão revertidas e a tabela de backup restaurada se as ações posteriores falharem. |
opções extras |
Não |
Nenhuma |
Uma lista de opções extras para anexar ao comando Redshift Como essas opções são anexadas ao final do comando |
formato temporário |
Não |
|
O formato no qual salvar arquivos temporários no S3 ao gravar no Redshift. default é O Redshift é significativamente mais rápido ao carregar CSV do que ao carregar arquivos Avro, portanto, usar esse formato temporário pode fornecer um grande aumento de desempenho ao gravar no Redshift. |
csvnullstring |
Não |
|
O valor strings a serem gravadas para nulos ao usar o formato temporário CSV. Este deve ser um valor que não aparece nos seus dados reais. |
separador csv |
Não |
|
Separador a ser usado ao gravar arquivos temporários com formato temporário definido como |
csvignoreleaderwhitespace |
Não |
|
Quando definido como verdadeiro, remove os espaços em branco iniciais dos valores durante gravações quando |
csvignoretrailingwhitespace |
Não |
|
Quando definido como verdadeiro, remove os espaços em branco finais dos valores durante as gravações quando |
infer_timestamp_ntz_type |
Não |
|
Se |
Opções de configuração adicionais
Configurando o tamanho máximo das colunas strings
Ao criar tabelas do Redshift, o comportamento default é criar TEXT
colunas para colunas strings . O Redshift armazena TEXT
colunas como VARCHAR(256)
, portanto essas colunas têm um tamanho máximo de 256 caracteres (source).
Para suportar colunas maiores, você pode usar o campo de metadados de coluna maxlength
para especificar o comprimento máximo de colunas de strings individuais. Isso também é útil para implementar otimizações de desempenho que economizam espaço, declarando colunas com comprimento máximo menor que o default.
Observação
Devido às limitações do Spark, as APIs das linguagens SQL e R não oferecem suporte à modificação de metadados de coluna.
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Aqui está um exemplo de atualização de campos de metadados de múltiplas colunas usando a API Scala do Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Defina um tipo de coluna personalizado
Se precisar definir manualmente um tipo de coluna, você poderá usar os metadados da coluna redshift_type
. Por exemplo, se desejar substituir a correspondência de tipo Spark SQL Schema -> Redshift SQL
para atribuir um tipo de coluna definido pelo usuário, você poderá fazer o seguinte:
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configurar codificação de coluna
Ao criar uma tabela, use o campo de metadados da coluna encoding
para especificar uma codificação de compactação para cada coluna (consulte a documentação da Amazon para ver as codificações disponíveis).
Configurando descrições em colunas
O Redshift permite que as colunas tenham descrições anexadas que devem aparecer na maioria das ferramentas query (usando o comando COMMENT
). Você pode definir o campo de metadados da coluna description
para especificar uma descrição para colunas individuais.
Pushdown de consulta no Redshift
O otimizador Spark envia os seguintes operadores para o Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
Dentro de Project
e Filter
, ele suporta as seguintes expressões:
A maioria dos operadores lógicos Boolean
Comparações
Operações aritméticas básicas
Conversões numéricas e strings
A maioria das funções strings
Subconsultas escalares, se puderem ser totalmente transferidas para o Redshift.
Observação
Este pushdown não oferece suporte a expressões que operam em datas e carimbos de data/hora.
Dentro de Aggregation
, ele oferece suporte às seguintes funções de agregação:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
combinado com a cláusula DISTINCT
, quando aplicável.
Dentro de Join
, ele suporta os seguintes tipos de join:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
Subconsultas que são reescritas em
Join
pelo otimizador, por exemploWHERE EXISTS
,WHERE NOT EXISTS
Observação
O pushdown join não suporta FULL OUTER JOIN
.
O pushdown será mais útil em query com LIMIT
. Uma query como SELECT * FROM large_redshift_table LIMIT 10
pode demorar muito, pois a tabela inteira seria primeiro UNLOADed para S3 como resultado intermediário. Com pushdown, o LIMIT
é executado no Redshift. Em query com agregações, empurrar a agregação para o Redshift também ajuda a reduzir a quantidade de dados que precisam ser transferidos.
o pushdown query no Redshift está habilitado por default. Ele pode ser desativado configurando spark.databricks.redshift.pushdown
como false
. Mesmo quando desabilitado, o Spark ainda empurra filtros e executa a eliminação de colunas no Redshift.
Instalação do driver Redshift
A fonte de dados do Redshift também requer um driver JDBC compatível com Redshift. Como o Redshift é baseado no sistema de banco de dados PostgreSQL, você pode usar o driver PostgreSQL JDBC incluído no Databricks Runtime ou o driver Redshift JDBC recomendado pela Amazon. Nenhuma instalação é necessária para usar o driver PostgreSQL JDBC. A versão do driver PostgreSQL JDBC incluída em cada versão do Databricks Runtime está listada nas notas sobre a versão do Databricks Runtime.
Para instalar manualmente o driver Redshift JDBC:
downloads o driver da Amazon.
Faça o upload do driver para o site da Databricks workspace. Ver biblioteca.
Instale a biblioteca em seus clusters.
Observação
A Databricks recomenda usar a versão mais recente do driver Redshift JDBC. As versões do driver Redshift JDBC abaixo de 1.2.41 têm as seguintes limitações:
A versão 1.2.16 do driver retorna dados vazios ao usar uma cláusula
where
em uma query SQL.Versões do driver abaixo de 1.2.41 podem retornar resultados inválidos porque a nulidade de uma coluna é informada incorretamente como “Não anulável” em vez de “Desconhecido”.
Garantias transacionais
Esta seção descreve as garantias transacionais da fonte de dados Redshift para Spark.
Informações gerais sobre propriedades Redshift e S3
Para obter informações gerais sobre garantias transacionais do Redshift, consulte o capítulo Gerenciando operações de gravação simultâneas na documentação do Redshift. Resumindo, o Redshift fornece isolamento serializável de acordo com a documentação do comando Redshift BEGIN :
[embora] você possa usar qualquer um dos quatro níveis de isolamento de transação, o Amazon Redshift processa todos os níveis de isolamento como serializáveis.
De acordo com a documentação do Redshift:
O Amazon Redshift oferece suporte a um default comportamento automática , no qual cada comando SQL executado commit commit separadamente individualmente.
Assim, comandos individuais como COPY
e UNLOAD
são atômicos e transacionais, enquanto BEGIN
e END
explícitos só devem ser necessários para impor a atomicidade de múltiplos comandos ou query.
Ao ler e gravar no Redshift, a fonte de dados lê e grava dados no S3. Tanto o Spark quanto o Redshift produzem saída particionada e a armazenam em vários arquivos no S3. De acordo com a documentação do modelo de consistência de dados do Amazon S3 , as operações de listagem de buckets do S3 são eventualmente consistentes, portanto, os arquivos devem ter esforços especiais para evitar dados ausentes ou incompletos devido a essa fonte de consistência eventual.
Garantias da fonte de dados Redshift para Spark
Anexar a uma tabela existente
Ao inserir linhas no Redshift, a fonte de dados usa o comando COPY e especifica manifestos para proteger contra certas operações S3 eventualmente consistentes. Como resultado, os anexos spark-redshift
às tabelas existentes têm as mesmas propriedades atômicas e transacionais que o comando COPY
normal do Redshift.
Crie uma nova tabela (SaveMode.CreateIfNotExists
)
A criação de uma nova tabela é um processo de duas passos, que consiste em um comando CREATE TABLE
seguido por um comando COPY para anexar o conjunto inicial de linhas. Ambas as operações são realizadas na mesma transação.
Substituir uma tabela existente
Por default, a fonte de dados utiliza transações para realizar sobrescrições, que são implementadas excluindo a tabela de destino, criando uma nova tabela vazia e anexando linhas a ela.
Se a configuração obsoleta usestagingtable
for definida como false
, a fonte de dados commit o comando DELETE TABLE
antes de anexar linhas à nova tabela, sacrificando a atomicidade das operações de substituição, mas reduzindo a quantidade de espaço de teste que o Redshift precisa durante a substituição.
Consultar tabela Redshift
query use o comando Redshift UNLOAD para executar uma query e salvar seus resultados no S3 e usar manifestos para se proteger contra certas operações S3 eventualmente consistentes. Como resultado, query da fonte de dados do Redshift para Spark deve ter as mesmas propriedades de consistência que a query normal do Redshift.
Problemas e soluções comuns
O bucket S3 e clusters Redshift estão em diferentes regiões da AWS
Por default, as cópias S3 <-> Redshift não funcionam se o bucket S3 e clusters Redshift estiverem em regiões diferentes da AWS.
Se você tentar ler uma tabela Redshift quando o bucket S3 estiver em uma região diferente, poderá ver um erro como:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Da mesma forma, tentar gravar no Redshift usando um bucket S3 em uma região diferente pode causar o seguinte erro:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Gravações: o comando COPY do Redshift oferece suporte à especificação explícita da região do bucket S3, portanto, você pode fazer com que as gravações no Redshift funcionem corretamente nesses casos adicionando
region 'the-region-name'
à configuraçãoextracopyoptions
. Por exemplo, com um bucket na região Leste dos EUA (Virgínia) e a API Scala, use:.option("extracopyoptions", "region 'us-east-1'")
Como alternativa, você pode usar a configuração
awsregion
:.option("awsregion", "us-east-1")
Leituras: O comando Redshift UNLOAD também oferece suporte à especificação explícita da região do bucket S3. Você pode fazer com que as leituras funcionem corretamente adicionando a região à configuração
awsregion
:.option("awsregion", "us-east-1")
Erro inesperado de credenciais S3ServiceException ao usar instance profile para autenticar no S3
Se você estiver usando instance profile para autenticar no S3 e receber um erro S3ServiceException
inesperado, verifique se key de acesso da AWS está especificada no URI do S3 tempdir
, nas configurações do Hadoop ou em qualquer uma das fontes verificadas pelo DefaultAWSCredentialsProviderChain: aquelas as fontes têm precedência sobre as credenciais instance profile .
Aqui está um exemplo de mensagem de erro que pode ser um sintoma de que key acidentalmente tem precedência sobre instance profile:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
Erro de autenticação ao usar uma senha com caracteres especiais na url JDBC
Se você estiver fornecendo o nome de usuário e a senha como parte do URL JDBC e a senha contiver caracteres especiais como ;
, ?
ou &
, você poderá ver a seguinte exceção:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Isso é causado por caracteres especiais no nome de usuário ou senha que não são escapados corretamente pelo driver JDBC. Certifique-se de especificar o nome de usuário e a senha usando as opções correspondentes do DataFrame user
e password
. Para mais informações, consulte Parâmetros.
A consulta Spark de longa duração trava indefinidamente, mesmo que as operações correspondentes do Redshift sejam concluídas
Se você estiver lendo ou gravando grandes quantidades de dados de e para o Redshift, sua query do Spark poderá travar indefinidamente, mesmo que a página de monitoramento do AWS Redshift mostre que as operações LOAD
ou UNLOAD
correspondentes foram concluídas e que os clusters estão parados. Isso é causado pelo tempo limite da conexão entre o Redshift e o Spark. Para evitar isso, certifique-se de que o sinalizador JDBC tcpKeepAlive
esteja ativado e TCPKeepAliveMinutes
esteja definido com um valor baixo (por exemplo, 1).
Para obter informações adicionais, consulte Configuração do driver JDBC do Amazon Redshift.
Carimbo de data/hora com semântica de fuso horário
Ao ler dados, os tipos de dados Redshift TIMESTAMP
e TIMESTAMPTZ
são mapeados para Spark TimestampType
e um valor é convertido em Tempo Universal Coordenado (UTC) e armazenado como o carimbo de data/hora UTC. Para um Redshift TIMESTAMP
, o fuso horário local é assumido, pois o valor não possui nenhuma informação de fuso horário. Ao gravar dados em uma tabela Redshift, um Spark TimestampType
é mapeado para o tipo de dados Redshift TIMESTAMP
.
Guia de migração
A fonte de dados agora exige que você defina explicitamente forward_spark_s3_credentials
antes que as credenciais do Spark S3 sejam encaminhadas para o Redshift. Esta alteração não terá impacto se você usar os mecanismos de autenticação aws_iam_role
ou temporary_aws_*
. No entanto, se você confiava no comportamento default antigo, agora deverá definir explicitamente forward_spark_s3_credentials
como true
para continuar usando seu mecanismo de autenticação anterior do Redshift para S3. Para uma discussão sobre os três mecanismos de autenticação e suas compensações de segurança, consulte a seção Autenticação para S3 e Redshift deste documento.