Consultar o Amazon Redshift usando o Databricks
O senhor pode ler e gravar tabelas do Amazon Redshift com o Databricks.
Experimental
As configurações descritas neste artigo são experimentais. Os recursos experimentais são fornecidos no estado em que se encontram e não recebem suporte do site Databricks por meio do suporte técnico ao cliente. Para obter suporte completo à federação de consultas, o senhor deve usar a lakehouse Federation, que permite que os usuários do Databricks aproveitem as ferramentas de sintaxe e governança de dados do Unity Catalog.
A Databricks Redshift fonte de dados usa a Amazon S3 para transferir dados com eficiência para dentro e para fora da Redshift e usa a JDBC para acionar automaticamente os comandos COPY
e UNLOAD
apropriados na Redshift.
Em Databricks Runtime 11.3 LTS e acima, Databricks Runtime inclui o driver Redshift JDBC , acessível usando a palavra-chave redshift
para a opção format. Consulte Databricks Runtime notas sobre as versões e a compatibilidade das versões de driver incluídas em cada Databricks Runtime. Os drivers fornecidos pelo usuário ainda são suportados e têm precedência sobre o driver JDBC incluído.
No Databricks Runtime 10.4 LTS e abaixo, é necessária a instalação manual do driver JDBC do Redshift, e as consultas devem usar o driver (com.databricks.spark.redshift
) para o formato. Consulte Instalação do driver do Redshift.
Uso
Os exemplos a seguir demonstram a conexão com o driver do Redshift. Substitua os valores do parâmetro url
se estiver usando o driver JDBC do PostgreSQL.
Depois de configurar suas credenciais AWS, o senhor pode usar a fonte de dados com a Spark fonte de dados API em Python, SQL, R ou Scala.
Os locais externos definidos no Unity Catalog não são suportados como locais tempdir
.
- Python
- SQL
- R
- Scala
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
Read data using SQL on Databricks Runtime 10.4 LTS and below:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Read data using SQL on Databricks Runtime 11.3 LTS and above:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Write data using SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
The SQL API supports only the creation of new tables and not overwriting or appending.
Read data using R on Databricks Runtime 10.4 LTS and below:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Read data using R on Databricks Runtime 11.3 LTS and above:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Recomendações para trabalhar com o Redshift
A execução de consultas pode extrair grandes quantidades de dados para o S3. Se o senhor planeja realizar várias consultas com os mesmos dados em Redshift, Databricks recomenda salvar a extração de dados usando Delta Lake.
O senhor não deve criar um Redshift clustering dentro do Databricks gerenciar VPC, pois isso pode levar a problemas de permissões devido ao modelo de segurança no Databricks VPC. O senhor deve criar seu próprio VPC e, em seguida, realizar o peering de VPC para conectar o Databricks à sua instância do Redshift.
Configuração
Autenticação no S3 e no Redshift
A fonte de dados envolve várias conexões de rede, ilustradas no diagrama a seguir:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
A fonte de dados lê e grava dados em S3 ao transferir dados de/para Redshift. Como resultado, ele requer credenciais do AWS com acesso de leitura e gravação a um bucket do S3 (especificado usando o parâmetro de configuração tempdir
).
A fonte de dados não limpa os arquivos temporários que cria em S3. Como resultado, recomendamos que o senhor use um bucket S3 temporário dedicado com uma configuração de ciclo de vida de objeto para garantir que os arquivos temporários sejam excluídos automaticamente após um período de expiração especificado. Consulte a seção Criptografia deste documento para uma discussão sobre como criptografar esses arquivos. O senhor não pode usar um local externo definido no Unity Catalog como um local tempdir
.
As seções a seguir descrevem as opções de configuração de autenticação de cada conexão:
Driver Spark para Redshift
O driver Spark se conecta ao Redshift via JDBC usando um nome de usuário e uma senha. Redshift não é compatível com o uso da função IAM para autenticar essa conexão. Por default, essa conexão usa a criptografia SSL; para obter mais detalhes, consulte Criptografia.
Spark para S3
O S3 atua como um intermediário para armazenar dados em massa ao ler ou gravar no Redshift. O Spark se conecta ao S3 usando as interfaces do Hadoop FileSystem e diretamente usando o cliente S3 do Amazon Java SDK.
O senhor não pode usar montagens DBFS para configurar o acesso ao S3 para Redshift.
-
Cadeia de provedores de credenciais padrão (a melhor opção para a maioria dos usuários): as credenciais do site AWS são recuperadas automaticamente por meio da cadeia DefaultAWSCredentialsProviderChain. Se o senhor usar o perfil de instância para se autenticar em S3, provavelmente deverá usar esse método.
Os seguintes métodos de fornecimento de credenciais têm precedência sobre este default.
-
Ao assumir um IAM role : O senhor pode usar um IAM role que o instance profile pode assumir. Para especificar a função ARN, o senhor deve anexar um instance profile ao clustering e fornecer a seguinte chave de configuração:
- Scala
- Python
sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole")
sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
// An optional duration, expressed as a quantity and a unit of
// time, such as "15m" or "1h"
sc.hadoopConfiguration.set("fs.s3a.assumed.role.session.duration", <duration>)
sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole")
sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
# An optional duration, expressed as a quantity and a unit of
# time, such as "15m" or "1h"
sc._jsc.hadoopConfiguration().set("fs.s3a.assumed.role.session.duration", <duration>)
- Defina a chave em Hadoop conf: O senhor pode especificar a chave AWS usando as propriedades de configuraçãoHadoop. Se a sua configuração
tempdir
apontar para um sistema de arquivoss3a://
, o senhor poderá definir as propriedadesfs.s3a.access.key
efs.s3a.secret.key
em um arquivo de configuração XML do Hadoop ou chamarsc.hadoopConfiguration.set()
para configurar a configuração global do Spark no Hadoop. Se o senhor usar um sistema de arquivoss3n://
, poderá fornecer a chave de configuração herdada, conforme mostrado no exemplo a seguir.
- Scala
- Python
For example, if you are using the s3a
filesystem, add:
sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
For the legacy s3n
filesystem, add:
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
The following command relies on some Spark internals, but should work with all PySpark versions and is unlikely to change in the future:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift para S3
O Redshift também se conecta ao S3 durante as consultas COPY
e UNLOAD
. Há três métodos para autenticar essa conexão:
-
Faça com que Redshift assuma uma função IAM role (mais segura) : O senhor pode conceder a Redshift permissão para assumir uma IAM role durante
COPY
ouUNLOAD
operações e, em seguida, configurar a fonte de dados para instruir Redshift a usar essa função:- Crie um IAM role concedendo as permissões S3 apropriadas ao seu bucket.
- Siga o guia Authorizing Amazon Redshift to Access Other AWS serviço On Your Behalf para configurar a política de confiança dessa função a fim de permitir que Redshift assuma essa função.
- Siga as passos no guia Autorizando as operações COPY e UNLOAD usando a IAM role para associar essa IAM role aos clusters do Redshift.
- Defina a opção
aws_iam_role
da fonte de dados como ARN da função.
-
Spark S3 Encaminhar as credenciais de Redshift para: se a
forward_spark_s3_credentials
opção for definida como,true
a fonte de dados descobrirá automaticamente as credenciais que Spark está usando para se conectar a S3 e encaminhará essas credenciais para Redshift JDBCpor meio de. Se Spark estiver autenticando em S3 usando um instance profile, um conjunto de credenciais STS temporárias será encaminhado para Redshift; caso contrário, a chave AWS será encaminhada. A consulta JDBC incorpora essas credenciais, portanto, a Databricks recomenda enfaticamente que o senhor ative a criptografia SSL da conexão JDBC ao usar esse método de autenticação. -
Use as credenciais do Serviço de tokens de segurança (STS): O senhor pode configurar
temporary_aws_access_key_id
astemporary_aws_secret_access_key
temporary_aws_session_token
propriedades de configuração, e para apontar para a chave temporária criada por meio do AWS serviço de tokens de segurança. A consulta JDBC incorpora essas credenciais, portanto, é altamente recomendável ativar a criptografia SSL da conexão JDBC ao usar esse método de autenticação. Se o senhor escolher essa opção, esteja ciente do risco de as credenciais expirarem antes que as operações de leitura/gravação sejam bem-sucedidas.
Essas três opções são mutuamente exclusivas e você deve escolher explicitamente qual delas usar.
Criptografia
-
Protegendo JDBC : A menos que haja alguma configuração relacionada a SSLno URL JDBC, a fonte de dados por default habilita a criptografia SSL e também verifica se o servidor Redshift é confiável (ou seja,
sslmode=verify-full
). Para isso, um certificado de servidor é baixado automaticamente dos servidores Amazon na primeira vez em que for necessário. Em caso de falha, um arquivo de certificado pré-empacotado é usado como fallback. Isso se aplica aos drivers JDBC do Redshift e do PostgreSQL.Se houver algum problema com esse recurso, ou se o senhor simplesmente quiser desativar o SSL, pode ligar para
.option("autoenablessl", "false")
no seuDataFrameReader
ouDataFrameWriter
.Se quiser especificar configurações personalizadas relacionadas ao SSL, o senhor pode seguir as instruções da documentação do Redshift: Using SSL and Server Certificates in Java and JDBC Driver Configuration Options Any SSL-related options present in the JDBC
url
used with the fonte de dados take precedence (that is, the auto-configuration will not trigger). -
Criptografar dados do UNLOAD armazenados no S3 (dados armazenados durante a leitura do Redshift) : De acordo com a documentação do Redshift sobre o descarregamento de dados para o S3, "o UNLOAD criptografa automaticamente os arquivos de dados usando a criptografia do lado do servidor do Amazon S3 (SSE-S3)".
Redshift Também oferece suporte à criptografia no lado do cliente com um key personalizado (consulte: Descarregamento de arquivos de dados criptografados), mas a fonte de dados não tem o recurso de especificar o key simétrico necessário.
-
Criptografar dados COPY armazenados no S3 (dados armazenados ao gravar no Redshift) : De acordo com a documentação do Redshift sobre o carregamento de arquivos de dados criptografados do Amazon S3:
O senhor pode usar o comando COPY
para carregar arquivos de dados que foram enviados para Amazon S3 usando criptografia do lado do servidor com chave de criptografia AWS-gerenciar (SSE-S3 ou SSE-KMS), criptografia do lado do cliente ou ambas. O COPY não oferece suporte à criptografia do lado do servidor Amazon S3 com um key (SSE-C) fornecido pelo cliente.
Para usar esse recurso, configure o sistema de arquivos S3 do Hadoop para usar a criptografia do Amazon S3. Isso não criptografará o arquivo MANIFEST
que contém uma lista de todos os arquivos gravados.
Parâmetros
O mapa de parâmetros ou OPTIONS fornecido no Spark SQL suporta as seguintes configurações:
Parâmetro | Obrigatório | Padrão | Descrição |
---|---|---|---|
duvidoso | Sim, a menos que a consulta seja especificada. | Nenhuma | A tabela para criar ou ler no Redshift. Esse parâmetro é necessário ao salvar os dados de volta no Redshift. |
query | Sim, a menos que a tabela de dívidas seja especificada. | Nenhuma | A consulta para leitura no Redshift. |
usuário | Não | Nenhuma | O nome de usuário do Redshift. Deve ser usado em conjunto com a opção de senha. Só pode ser usado se o usuário e a senha não forem passados na URL; passar ambos resultará em um erro. Use esse parâmetro quando o nome de usuário contiver caracteres especiais que precisam ser escapados. |
Senha | Não | Nenhuma | A senha do Redshift. Deve ser usado em conjunto com a opção |
URL | Sim | Nenhuma | Um URL JDBC, no formato
|
caminho_de_pesquisa | Não | Nenhuma | Definir o caminho de pesquisa do esquema no Redshift. Será definido usando o comando |
aws_iam_role | Somente se o senhor usar a função IAM para autorizar. | Nenhuma | Totalmente especificado ARN das operaçõesIAM Redshift COPY/UNLOAD Função anexada ao cluster Redshift, por exemplo, |
forward_spark_s3_credentials | Não |
| Se |
id_chave de acesso temporário_aws_access | Não | Nenhuma | AWS acessar key, deve ter permissões de gravação no bucket S3. |
key_aws_secret_access_key temporária | Não | Nenhuma | AWS acesso secreto key correspondente ao acesso fornecido key. |
token de sessão aws_temporário | Não | Nenhuma | AWS tokens de sessão correspondentes ao acesso fornecido key. |
tempdir | Sim | Nenhuma | Um local gravável no Amazon S3, a ser usado para dados não carregados durante a leitura e dados Avro a serem carregados no Redshift durante a gravação. Se estiver usando Redshift fonte de dados para Spark como parte de um ETL pipeline regular, pode ser útil definir uma política de ciclo de vida em um bucket e usá-lo como um local temporário para esses dados. O senhor não pode usar locais externos definidos no Unity Catalog como locais |
driver jdbc | Não | Determinado pelo subprotocolo do URL do JDBC. | O nome da classe do driver JDBC a ser usado. Essa classe deve estar no classpath. Na maioria dos casos, não deve ser necessário especificar essa opção, pois o nome da classe de driver apropriada deve ser determinado automaticamente pelo subprotocolo do URL do JDBC. |
disttyle | Não |
| O estilo de distribuição do Redshift a ser usado ao criar uma tabela. Pode ser um dos endereços |
desaversão | Não, a menos que esteja usando | Nenhuma | O nome de uma coluna na tabela a ser usada como distribuição key ao criar uma tabela. |
especificação de chave de classificação | Não | Nenhuma | Uma Redshift definição completa da chave Sort. Os exemplos incluem:
|
usestagingtable (Obsoleto) | Não |
| Definir essa opção obsoleta como Como a configuração |
Descrição | Não | Nenhuma | Uma descrição para a tabela. Será definido usando o comando SQL COMMENT e deverá aparecer na maioria das ferramentas de consulta. Veja também os metadados |
preações | Não | Nenhuma | Uma lista separada por Observe que se esse comando falhar, ele será tratado como um erro e uma exceção será lançada. Se estiver usando uma tabela de teste, as alterações serão revertidas e a tabela de backup restaurada se as pré-ações falharem. |
postações | Não | Nenhuma | Uma lista separada por Observe que se esse comando falhar, ele será tratado como um erro e uma exceção será lançada. Se estiver usando uma tabela intermediária, as alterações serão revertidas e a tabela de backup restaurada se as ações de postagem falharem. |
opções de cópia extra | Não | Nenhuma | Uma lista de opções extras para anexar ao comando Como essas opções são anexadas ao final do comando |
formato temporário | Não |
| O formato no qual salvar arquivos temporários no S3 ao gravar no Redshift. tem como padrão O Redshift é significativamente mais rápido ao carregar arquivos CSV do que ao carregar arquivos Avro, portanto, usar esse formato de tempode proporcionar um grande aumento de desempenho ao gravar no Redshift. |
string null csv | Não |
| O valor da cadeia de caracteres a ser gravado para nulos ao usar o formato de temp CSV. Esse deve ser um valor que não apareça em seus dados reais. |
separador csv | Não |
| Separador a ser usado ao escrever arquivos temporários com tempformat definido como |
csv ignora os principais espaços em branco | Não |
| Quando definido como verdadeiro, remove o espaço em branco inicial dos valores durante as gravações quando |
csv ignore o espaço em branco à direita | Não |
| Quando definido como verdadeiro, remove os espaços em branco à direita dos valores durante as gravações quando |
infer_timestamp_ntz_type | Não |
| Se |
Opções adicionais de configuração
Configuração do tamanho máximo das colunas de strings
Ao criar tabelas Redshift, o comportamento default é criar colunas TEXT
para colunas de strings. O Redshift armazena as colunas TEXT
como VARCHAR(256)
, portanto, essas colunas têm um tamanho máximo de 256 caracteres(fonte).
Para dar suporte a colunas maiores, o senhor pode usar o campo de metadados da coluna maxlength
para especificar o comprimento máximo de colunas de strings individuais. Isso também é útil para implementar otimizações de desempenho que economizam espaço, declarando colunas com um comprimento máximo menor do que o default.
Devido às limitações do Spark, as APIs das linguagens SQL e R não oferecem suporte à modificação de metadados de coluna.
- Python
- Scala
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Here is an example of updating multiple columns’ metadata fields using Spark’s Scala API:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Definir um tipo de coluna personalizado
Se precisar definir manualmente um tipo de coluna, você pode usar os metadados da coluna redshift_type
. Por exemplo, se você quiser substituir o matcher de tipos Spark SQL Schema -> Redshift SQL
para atribuir um tipo de coluna definido pelo usuário, você pode fazer o seguinte:
- Python
- Scala
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configurar a codificação da coluna
Ao criar uma tabela, use o campo de metadados da coluna encoding
para especificar uma codificação de compactação para cada coluna (consulte os documentos da Amazon para ver as codificações disponíveis).
Definindo descrições em colunas
O Redshift permite que as colunas tenham descrições anexadas que devem ser exibidas na maioria das ferramentas de consulta (usando o comando COMMENT
). Você pode definir o campo de metadados da coluna description
para especificar uma descrição para
colunas individuais.
Envio de consulta para o Redshift
O otimizador do Spark transfere os seguintes operadores para o Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
Em Project
e Filter
, ele suporta as seguintes expressões:
- A maioria dos Boolean operadores lógicos
- Comparações
- Operações aritméticas básicas
- Conversão de números e strings
- A maioria das funções de cadeias de caracteres
- Subconsultas escalares, se elas puderem ser totalmente transferidas para o Redshift.
Esse pushdown não suporta expressões que operam em datas e carimbos de data/hora.
No Aggregation
, ele suporta as seguintes funções de agregação:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
combinado com a cláusula DISTINCT
, quando aplicável.
Em Join
, ele oferece suporte aos seguintes tipos de união:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Subconsultas que são reescritas em
Join
pelo otimizador, por exemploWHERE EXISTS
,WHERE NOT EXISTS
O join pushdown não é compatível com FULL OUTER JOIN
.
O pushdown pode ser mais benéfico em consultas com LIMIT
. Uma consulta como SELECT * FROM large_redshift_table LIMIT 10
pode demorar muito, pois a tabela inteira seria primeiro descarregada no S3 como um resultado intermediário. Com o pushdown, o LIMIT
é executado no Redshift. Em consultas com agregações, empurrar a agregação para o Redshift também ajuda a reduzir a quantidade de dados que precisam ser transferidos.
O pushdown de consulta em Redshift é ativado por default. Ele pode ser desativado definindo spark.databricks.redshift.pushdown
para false
. Mesmo quando desativado, o Spark ainda envia filtros para baixo e executa a eliminação de colunas no Redshift.
Instalação do driver do Redshift
A Redshift fonte de dados também requer um Redshift JDBC driver compatível com. Como o Redshift é baseado no sistema de banco de dados PostgreSQL, o senhor pode usar o driver PostgreSQL JDBC incluído no Databricks Runtime ou o driver Redshift JDBC recomendado pela Amazon. Não é necessária nenhuma instalação para usar o driver JDBC do PostgreSQL. A versão do driver PostgreSQL JDBC incluída em cada versão Databricks Runtime está listada em Databricks Runtime notas sobre a versão.
Para instalar manualmente o driver JDBC do Redshift:
- Faça o download do driver da Amazon.
- Carregue o driver em seu site Databricks workspace. Ver biblioteca.
- Instale a biblioteca em seu cluster.
A Databricks recomenda usar a versão mais recente do driver JDBC do Redshift. As versões do driver JDBC do Redshift abaixo da 1.2.41 têm as seguintes limitações:
- A versão 1.2.16 do driver retorna dados vazios ao usar uma cláusula
where
em uma consulta SQL. - As versões do driver abaixo da 1.2.41 podem retornar resultados inválidos porque a anulabilidade de uma coluna é informada incorretamente como "Not Nullable" em vez de "Unknown".
Garantias transacionais
Esta seção descreve as garantias transacionais da Redshift fonte de dados para Spark.
Informações gerais sobre as propriedades do Redshift e do S3
Para obter informações gerais sobre as garantias transacionais do site Redshift, consulte o capítulo Managing concorrente Write operações na documentação do site Redshift. Em resumo, o Redshift fornece isolamento serializável de acordo com a documentação do comando BEGIN do Redshift:
[Embora o senhor possa usar qualquer um dos quatro níveis de isolamento de transação, o Amazon Redshift processa todos os níveis de isolamento como serializáveis.
De acordo com a documentação do Redshift:
Amazon Redshift suporta um comportamento default automático commit no qual cada comando SQL executado separadamente é confirmado individualmente.
Assim, comandos individuais como COPY
e UNLOAD
são atômicos e transacionais, enquanto BEGIN
e END
explícitos devem ser necessários apenas para impor a atomicidade de vários comandos ou consultas.
Ao ler e gravar em Redshift, a fonte de dados lê e grava dados em S3. Tanto o Spark quanto o Redshift produzem resultados particionados e os armazenam em vários arquivos no S3. De acordo com a documentação do Modelo de Consistência de Dados do Amazon S3, as operações de listagem de buckets do S3 são eventualmente consistentes, de modo que os arquivos devem fazer um esforço especial para evitar dados ausentes ou incompletos devido a essa fonte de consistência eventual.
Garantias do site Redshift fonte de dados para Spark
Anexar a uma tabela existente
Ao inserir linhas em Redshift, a fonte de dados usa o comando COPY
e especifica manifestos para se proteger contra determinadas S3 operações eventualmente consistentes. Como resultado, os appends spark-redshift
às tabelas existentes têm as mesmas propriedades atômicas e transacionais que o Redshift COPY
comando regular.
Crie uma nova tabela (SaveMode.CreateIfNotExists
)
A criação de uma nova tabela é um processo de duas passos, que consiste em um comando CREATE TABLE
seguido por um comando COPY para anexar o conjunto inicial de linhas. Ambas as operações são realizadas na mesma transação.
Substituir uma tabela existente
Em default, a fonte de dados usa transações para realizar substituições, que são implementadas pela exclusão da tabela de destino, criando uma nova tabela vazia e acrescentando linhas a ela.
Se a configuração depreciada usestagingtable
for definida como false
, a fonte de dados confirma o comando DELETE TABLE
antes de anexar linhas à nova tabela, sacrificando a atomicidade das operações de substituição, mas reduzindo a quantidade de espaço de preparação que o site Redshift precisa durante a substituição.
Consultar a tabela do Redshift
As consultas usam o comando UNLOAD do Redshift para executar uma consulta e salvar seus resultados no S3 e usam manifestos para se proteger contra determinadas operações do S3 eventualmente consistentes. Como resultado, as consultas de Redshift fonte de dados para Spark devem ter as mesmas propriedades de consistência que as consultas regulares de Redshift.
Problemas e soluções comuns
S3 bucket e clustering estão em regiões diferentes Redshift AWS
Por default, as cópias S3 <-> Redshift não funcionam se o bucket S3 e o clustering Redshift estiverem em regiões AWS diferentes.
Se o senhor tentar ler uma tabela do Redshift quando o bucket S3 estiver em uma região diferente, poderá ver um erro como:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Da mesma forma, a tentativa de gravar no Redshift usando um bucket S3 em uma região diferente pode causar o seguinte erro:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
-
Gravações: O comando Redshift COPY oferece suporte à especificação explícita da região do bucket S3, portanto, o senhor pode fazer com que as gravações no Redshift funcionem corretamente nesses casos, adicionando
region 'the-region-name'
à configuraçãoextracopyoptions
. Por exemplo, com um bucket na região leste dos EUA (Virgínia) e a API Scala, use:Scala.option("extracopyoptions", "region 'us-east-1'")
Como alternativa, você pode usar a configuração
awsregion
:Scala.option("awsregion", "us-east-1")
-
Lê: O comando UNLOAD do Redshift também oferece suporte à especificação explícita da região do bucket S3. Você pode fazer as leituras funcionarem corretamente adicionando a região à configuração
awsregion
:Scala.option("awsregion", "us-east-1")
Erro inesperado de credenciais S3ServiceException quando o usuário usa o perfil de instância para se autenticar no S3
Se o senhor estiver usando o perfil de instância para se autenticar em S3 e receber um erro S3ServiceException
inesperado, verifique se a chave de acesso AWS está especificada no tempdir
URI. S3 URI, nas configurações de Hadoop ou em qualquer uma das fontes verificadas pela DefaultAWSCredentialsProviderChain: essas fontes têm precedência sobre as credenciais de instance profile.
Aqui está um exemplo de mensagem de erro que pode ser um sintoma de que a chave acidentalmente tem precedência sobre o perfil da instância:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
Erro de autenticação ao usar uma senha com caracteres especiais na url do JDBC
Se estiver fornecendo o nome de usuário e a senha como parte do URL do JDBC e a senha contiver caracteres especiais, como ;
, ?
ou &
, o senhor poderá ver a seguinte exceção:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Isso é causado por caracteres especiais no nome de usuário ou na senha que não são escapados corretamente pelo driver JDBC. Certifique-se de especificar o nome de usuário e a senha usando as opções correspondentes do DataFrame user
e password
. Para obter mais informações, consulte Parâmetros.
A consulta de longa duração Spark fica suspensa indefinidamente, mesmo que as Redshift operações correspondentes tenham sido concluídas
Se o senhor estiver lendo ou gravando grandes quantidades de dados de e para Redshift, sua consulta Spark poderá ficar suspensa indefinidamente, mesmo que a página de monitoramento AWS Redshift mostre que as operações LOAD
ou UNLOAD
correspondentes foram concluídas e que o clustering está parado. Isso é causado pelo tempo limite da conexão entre o Redshift e o Spark. Para evitar isso, verifique se o sinalizador tcpKeepAlive
JDBC está ativado e se TCPKeepAliveMinutes
está definido com um valor baixo (por exemplo, 1).
Para obter informações adicionais, consulte Amazon Redshift JDBC Driver Configuration.
Carimbo de data/hora com semântica de fuso horário
Ao ler os dados, os tipos de dados do Redshift TIMESTAMP
e TIMESTAMPTZ
são mapeados para o Spark TimestampType
, e um valor é convertido em Coordinated Universal Time (UTC) e armazenado como o registro de data e hora UTC. Para um Redshift TIMESTAMP
, o fuso horário local é assumido, pois o valor não tem nenhuma informação de fuso horário. Ao gravar dados em uma tabela do Redshift, um Spark TimestampType
é mapeado para o tipo de dados do Redshift TIMESTAMP
.
Guia de migração
A fonte de dados agora exige que o senhor defina explicitamente forward_spark_s3_credentials
antes que as credenciais de Spark S3 sejam encaminhadas para Redshift. Essa alteração não terá impacto se você usar os mecanismos de autenticação aws_iam_role
ou temporary_aws_*
. No entanto, se o senhor confiava no antigo comportamento default, agora deve definir explicitamente forward_spark_s3_credentials
como true
para continuar usando o mecanismo de autenticação anterior Redshift para S3. Para uma discussão sobre os três mecanismos de autenticação e suas compensações de segurança, consulte a seção Autenticação no S3 e no Redshift deste documento.