Amazon Redshift
This article describes a data source that lets you load data into Apache Spark SQL DataFrames from Amazon Redshift,
and write them back to Redshift tables. This data source uses Amazon S3 to efficiently transfer data in and out of Redshift, and uses JDBC to automatically trigger the appropriate COPY
and UNLOAD
commands on Redshift.
This data source is more suited to ETL than interactive queries, since each query execution may extract large amounts of data to S3. If you plan to perform several queries against the same Redshift data, we recommend saving the extracted data in a format such as Parquet.
Note
You should not create a Redshift cluster inside the Databricks managed VPC as it can lead to permissions issues due to the security model in the Databricks VPC. You should create your own VPC and then perform VPC peering to connect Databricks to your Redshift instance.
Installation
The installation steps vary depending on your Spark cluster’s cluster image version:
- Spark 2.1.0-db2 and later versions: the data source is automatically included in these cluster images and is documented by this page.
- Spark 2.0.x through 2.1.0-db1: the data source is not included in these cluster images. Instead,
you can install version 3.0.0-preview1
of the
spark-redshift
library using Databricks’ Maven library installer. For documentation specific to that version of the library, see the version 3.0.0-preview1 README in the databricks/spark-redshift repository. - Spark 1.6.x: version 0.6.0 of
the
spark-redshift
library is automatically included in the cluster image. For documentation specific to that version of the library, see the version 0.6.0 README in the databricks/spark-redshift repository.
In addition, you must configure a Redshift-compatible JDBC driver. There are two options:
Use the bundled PostgreSQL JDBC driver: Databricks cluster images automatically include the PostgreSQL JDBC driver, so you may specify a JDBC url of the form
jdbc:postgresql://...
in order to use that driver.Download and install the offical Redshift JDBC driver: download the official Amazon Redshift JDBC driver, upload it to Databricks, and attach the library to your cluster. Then, use JDBC URLs of the form
jdbc:redshift://...
.Databricks purposely does not bundle the Redshift driver in order to avoid dependency conflicts: an automatically-bundled Redshift JDBC driver would interfere with your ability to choose between the JDBC 4.0, 4.1, or 4.2 drivers when working directly with Redshift over JDBC and would also adversely impact PostegreSQL users because the Redshift driver registers itself as a handler of both the
postgresql
andredshift
JDBC subprotocols (source).Note
The latest version of Redshift JDBC driver (v1.2.16) is known to return empty data when a
where
clause is used in the SQL string. We recommend using v1.2.12 or lower with Databricks clusters.
Usage
Once you have configured your AWS credentials, you can use the data source via the Spark data source API in Scala, Python, or SQL, as follows:
# Read data from a table
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table") \
.option("tempdir", "s3n://path/for/temp/data") \
.load()
# Read data from a query
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("query", "select x, count(*) my_table group by x") \
.option("tempdir", "s3n://path/for/temp/data") \
.load()
# Write back to a table
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("error") \
.save()
# Using IAM Role based authentication
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
.mode("error") \
.save()
// Get some data from a Redshift table
val df: DataFrame = spark.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table")
.option("tempdir", "s3n://path/for/temp/data")
.load()
// Also load data from a Redshift query
val df: DataFrame = spark.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("query", "select x, count(*) my_table group by x")
.option("tempdir", "s3n://path/for/temp/data")
.load()
// Apply some transformations to the data as per normal, then you can use the
// data source API to write the data back to another table
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.mode("error")
.save()
// Using IAM Role based authentication
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.option("tempdir", "s3n://path/for/temp/data")
.mode("error")
.save()
Read data using R:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3n://path/for/temp/data",
dbtable = "my_table",
url = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
Read data using SQL:
CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (
dbtable 'my_table',
tempdir 's3n://path/for/temp/data',
url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
);
Write data using SQL:
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (
dbtable 'my_table',
tempdir 's3n://path/for/temp/data'
url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
)
AS SELECT * FROM tabletosave;
The SQL API supports only the creation of new tables and not overwriting or appending; this corresponds to the default save mode of the other language APIs.
Configuration
Authenticating to S3 and Redshift
The data source involves several network connections, illustrated in the following diagram:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
The data source reads and writes data to S3 when transferring data to/from
Redshift. As a result, it requires AWS credentials with read and write
access to an S3 bucket (specified using the tempdir
configuration
parameter).
Note
The data source does not clean up the temporary files that it creates in S3. As a result, we recommend that you use a dedicated temporary S3 bucket with an object lifecycle configuration to ensure that temporary files are automatically deleted after a specified expiration period. See the Encryption section of this document for a discussion of how to encrypt these files.
The following sections describe each connection’s authentication configuration options:
Spark driver to Redshift
The Spark driver connects to Redshift via JDBC using a username and password. Redshift does not support the use of IAM roles to authenticate this connection. By default, this connection uses SSL encryption; for more details, see Encryption.
Spark to S3
S3 acts as an intermediary to store bulk data when reading from or writing to Redshift. Spark connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK’s S3 client. This connection supports either AWS keys or instance profiles (DBFS mount points are not supported, so if you do not want to rely on AWS keys you should use cluster instance profiles instead). There are four methods of providing these credentials:
Default Credential Provider Chain (best option for most users): AWS credentials are automatically retrieved through the DefaultAWSCredentialsProviderChain. If you use instance profiles to authenticate to S3 then you should probably use this method.
The following methods of providing credentials take precedence over this default.
Set keys in Hadoop conf: You can specify AWS keys using Hadoop configuration properties. If your
tempdir
configuration points to ans3a://
filesystem, you can set thefs.s3a.access.key
andfs.s3a.secret.key
properties in a Hadoop XML configuration file or callsc.hadoopConfiguration.set()
to configure Spark’s global Hadoop configuration. If you use ans3n://
filesystem, you can provide the legacy configuration keys as shown in the following example.For example, if you are using the
s3a
filesystem, add:sc.hadoopConfiguration.set("fs.s3a.access.key", "YOUR_KEY_ID") sc.hadoopConfiguration.set("fs.s3a.secret.key", "YOUR_SECRET_ACCESS_KEY")
For the legacy
s3n
filesystem, add:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
The following command relies on some Spark internals, but should work with all PySpark versions and is unlikely to change in the future:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_KEY_ID") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_ACCESS_KEY")
Encode keys in tempdir URI: For example, the URI
s3a://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir
encodes the key pair (ACCESSKEY
,SECRETKEY
).Due to Hadoop limitations, this approach does not work for secret keys that contain forward slash (
/
) characters, even if those characters are urlencoded.By assuming an IAM role: You can use an IAM role that the instance profile can assume. To specify the role ARN, you must attach an instance profile to the cluster, and provide the following configuration keys:
sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole") sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole") sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
Redshift to S3
Redshift also connects to S3 during COPY
and UNLOAD
queries. There are three methods of authenticating this connection:
- Have Redshift assume an IAM role (most secure): You can grant Redshift permission to assume an IAM role during
COPY
orUNLOAD
operations and then configure the data source to instruct Redshift to use that role:- Create an IAM role granting appropriate S3 permissions to your bucket.
- Follow the guide Authorizing Amazon Redshift to Access Other AWS Services On Your Behalf to configure this role’s trust policy in order to allow Redshift to assume this role.
- Follow the steps in the Authorizing COPY and UNLOAD Operations Using IAM Roles guide to associate that IAM role with your Redshift cluster.
- Set the data source’s
aws_iam_role
option to the role’s ARN.
- Forward Spark’s S3 credentials to Redshift: if the
forward_spark_s3_credentials
option is set totrue
then the data source automatically discovers the credentials that Spark is using to connect to S3 and forwards those credentials to Redshift over JDBC. If Spark is authenticating to S3 using an instance profile then a set of temporary STS credentials is forwarded to Redshift; otherwise, AWS keys are forwarded. The JDBC query embeds these credentials so therefore Databricks strongly recommends that you enable SSL encryption of the JDBC connection when using this authentication method. - Use Security Token Service (STS) credentials: You may configure the
temporary_aws_access_key_id
,temporary_aws_secret_access_key
, andtemporary_aws_session_token
configuration properties to point to temporary keys created via the AWS Security Token Service. The JDBC query embeds these credentials so therefore it is strongly recommended to enable SSL encryption of the JDBC connection when using this authentication method. If you choose this option then please be aware of the risk that the credentials expire before the read / write operation succeeds.
These three options are mutually exclusive and you must explicitly choose which one to use.
Encryption
Securing JDBC: Unless any SSL-related settings are present in the JDBC URL, the data source by default, enables SSL encryption and also verifies that the Redshift server is trustworthy (that is,
sslmode=verify-full
). For that, a server certificate is automatically downloaded from the Amazon servers the first time it is needed. In case that fails, a pre-bundled certificate file is used as a fallback. This holds for both the Redshift and the PostgreSQL JDBC drivers. Automatic SSL configuration was introduced in 2.1.1-db4 cluster image (Unsupported); earlier releases do not automatically configure SSL and uses the default JDBC driver configuration (SSL disabled).In case there are any issues with this feature, or you simply want to disable SSL, you can call
.option("autoenablessl", "false")
on yourDataFrameReader``or ``DataFrameWriter
.If you want to specify custom SSL-related settings, you can follow the instructions in the Redshift documentation: Using SSL and Server Certificates in Java and JDBC Driver Configuration Options Any SSL-related options present in the JDBC
url
used with the data source take precedence (that is, the auto-configuration will not trigger).Encrypting UNLOAD data stored in S3 (data stored when reading from Redshift): According to the Redshift documentation on Unloading Data to S3, “UNLOAD automatically encrypts data files using Amazon S3 server-side encryption (SSE-S3).”
Redshift also supports client-side encryption with a custom key (see: Unloading Encrypted Data Files) but the data source lacks the capability to specify the required symmetric key.
Encrypting COPY data stored in S3 (data stored when writing to Redshift): According to the Redshift documentation on Loading Encrypted Data Files from Amazon S3:
You can use the COPY
command to load data files that were
uploaded to Amazon S3 using server-side encryption with
AWS-managed encryption keys (SSE-S3 or SSE-KMS), client-side
encryption, or both. COPY does not support Amazon S3 server-side
encryption with a customer-supplied key (SSE-C).
To use this capability, you should configure your Hadoop S3 filesystem to use encryption by
setting the appropriate configuration properties (which will vary depending on whether you are
using s3a
, s3n
, and so on). This will not encrypt the MANIFEST
file (a list of all files written).
Parameters
The parameter map or OPTIONS provided in Spark SQL support the following settings:
Parameter | Required | Default | Description |
---|---|---|---|
dbtable | Yes, unless query is specified. | None | The table to create or read from in Redshift. This parameter is required when saving data back to Redshift. |
query | Yes, unless dbtable is specified. | None | The query to read from in Redshift. |
user |
No | None | The Redshift username. Must be used in tandem with password option. Can be used only if the user and password are not passed in the URL, passing both will result in an error. |
password | No | None | The Redshift password. Must be used in tandem with user option. Can only be used only if
the user and password are not passed in the URL; passing both will result in an error. |
url | Yes | None | A JDBC URL, of the format jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>
|
search_path | No | None | Set schema search path in Redshift. Will be set using the SET search_path to command.
Should be a comma separated list of schema names to search for tables in.
See Redshift documentation of search_path. |
aws_iam_role | Only if using IAM roles to authorize. | None | Fully specified ARN of the IAM Redshift COPY/UNLOAD operations
Role
attached to the Redshift cluster, For example, arn:aws:iam::123456789000:role/<redshift-iam-role> . |
forward_spark_s3_credentials | No | false |
If true , the data source automatically discovers the credentials that Spark is using
to connect to S3 and forwards those credentials to Redshift over JDBC. These credentials
are sent as part of the JDBC query, so therefore it is strongly recommended to enable SSL
encryption of the JDBC connection when using this option. |
temporary_aws_access_key_id | No | None | AWS access key, must have write permissions to the S3 bucket. |
temporary_aws_secret_access_key | No | None | AWS secret access key corresponding to provided access key. |
temporary_aws_session_token | No | None | AWS session token corresponding to provided access key. |
tempdir | Yes | None | A writeable location in Amazon S3, to be used for unloaded data when reading and Avro data to be loaded into Redshift when writing. If you’re using Redshift data source for Spark as part of a regular ETL pipeline, it can be useful to set a Lifecycle Policy on a bucket and use that as a temp location for this data. |
jdbcdriver | No | Determined by the JDBC URL’s subprotocol. | The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL’s subprotocol. |
diststyle | No | EVEN |
The Redshift Distribution Style
to be used when creating a table. Can be one of EVEN , KEY or ALL (see Redshift
docs). When using KEY , you must also set a distribution key with the distkey option. |
distkey | No, unless using DISTSTYLE KEY |
None | The name of a column in the table to use as the distribution key when creating a table. |
sortkeyspec | No | None | A full Redshift Sort Key definition. Examples include:
|
usestagingtable (Deprecated) | No | true |
Setting this deprecated option to Since setting |
description | No | None | A description for the table. Will be set using the SQL COMMENT command, and should show up in
most query tools. See also the description metadata to set descriptions on individual
columns. |
preactions | No | None | A Be warned that if this commands fail, it is treated as an error and you’ll get an exception. If using a staging table, the changes are reverted and the backup table restored if pre actions fail. |
postactions | No | None | A Be warned that if this commands fail, it is treated as an error and you’ll get an exception. If using a staging table, the changes are reverted and the backup table restored if post actions fail. |
extracopyoptions | No | None | A list of extra options to append to the Redshift Since these options are appended to the end of the |
tempformat (Experimental) | No | AVRO |
The format in which to save temporary files in S3 when writing to Redshift. Defaults to
Redshift is significantly faster when loading CSV than when loading Avro files, so using that tempformat may provide a large performance boost when writing to Redshift. |
csvnullstring (Experimental) | No | @NULL@ |
The String value to write for nulls when using the CSV tempformat. This should be a value that does not appear in your actual data. |
csvseparator`` (Experimental) | No | , |
Separator to use when writing temporary files with tempformat set to CSV or
CSV GZIP . This must be a valid ASCII character, for example, “, ” or “| ”. |
Additional configuration options
Configuring the maximum size of string columns
When creating Redshift tables, the default behavior is to
create TEXT
columns for string columns. Redshift stores TEXT
columns as VARCHAR(256)
, so these columns have a maximum size of 256
characters
(source).
To support larger columns, you can use the maxlength
column metadata field to specify the
maximum length of individual string columns. This is also useful for implementing space-saving
performance optimizations by declaring columns with a smaller maximum length than the default.
Note
Due to limitations in Spark, the SQL and R language APIs do not support column metadata modification.
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Here is an example of updating multiple columns’ metadata fields using Spark’s Scala API:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Setting a custom column type
If you need to manually set a column type, you can use the
redshift_type
column metadata. For example, if you desire to
override the Spark SQL Schema -> Redshift SQL
type matcher to assign
a user-defined column type, you can do the following:
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configuring column encoding
When creating a table, can use the encoding
column metadata field to specify a compression
encoding for each column (see
Amazon docs
for available encodings).
Setting descriptions on columns
Redshift allows columns to have descriptions attached that should show
up in most query tools (using the COMMENT
command). You can set the
description
column metadata field to specify a description for
individual columns.
Query pushdown into Redshift
The Spark optimizer pushes the following operators down into Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
Within Project
and Filter
, it supports the following expressions:
- Most Boolean logic operators
- Comparisons
- Basic arithmetic operations
- Numeric and string casts
- Most string functions
- Scalar subqueries, if they can be pushed down entirely into Redshift.
Note
This pushdown does not support expressions operating on dates and timestamps.
Within Aggregation
, it supports the following aggregation functions:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
combined with the DISTINCT
clause, where applicable.
Within Join
, it supports the following types of joins:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Subqueries that are rewritten into
Join
by the optimizer e.g.WHERE EXISTS
,WHERE NOT EXISTS
Note
Join pushdown does not support FULL OUTER JOIN
.
The pushdown might be most beneficial in queries with LIMIT
.
A query such as SELECT * FROM large_redshift_table LIMIT 10
could take very long,
as the whole table would first be UNLOADed to S3 as an intermediate result.
With pushdown, the LIMIT
is executed in Redshift.
In queries with aggregations, pushing the aggregation down into Redshift also helps
to reduce the amount of data that needs to be transferred.
Query pushdown into Redshift is enabled by default.
It can be disabled by setting spark.databricks.redshift.pushdown
to false
.
Even when disabled, Spark still pushes down filters and performs column elimination into Redshift.
Transactional guarantees
This section describes the transactional guarantees of the Redshift data source for Spark.
General background on Redshift and S3 properties
For general information on Redshift transactional guarantees, see the Managing Concurrent Write Operations chapter in the Redshift documentation. In a nutshell, Redshift provides serializable isolation according to the documentation for Redshift’s BEGIN command:
[although] you can use any of the four transaction isolation levels, Amazon Redshift processes all isolation levels as serializable.
According to the Redshift documentation:
Amazon Redshift supports a default automatic commit behavior in which each separately-executed SQL command commits individually.
Thus, individual commands like COPY
and UNLOAD
are atomic and
transactional, while explicit BEGIN
and END
should only be
necessary to enforce the atomicity of multiple commands or queries.
When reading from and writing to Redshift, the data source reads and writes data in S3. Both Spark and Redshift produce partitioned output and store it in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent, so the files must to go to special lengths to avoid missing or incomplete data due to this source of eventual-consistency.
Guarantees of the Redshift data source for Spark
Appending to an existing table
In the COPY
command, the data source uses manifests
to guard against certain eventually-consistent S3 operations. As a result, it appends to existing tables have the same atomic and
transactional properties as regular Redshift COPY
commands.
Appending to an existing table
When inserting rows into Redshift, the data source uses the COPY
command and specifies manifests to guard against certain eventually-consistent S3 operations. As a result, spark-redshift
appends to existing tables have the same
atomic and transactional properties as regular Redshift COPY
commands.
Creating a new table (SaveMode.CreateIfNotExists)
Creating a new table is a two-step process, consisting of a CREATE TABLE
command followed by a
COPY command to append the initial set of rows.
Both operations are performed in the same transaction.
Overwriting an existing table
By default, the data source uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it.
If the deprecated usestagingtable
setting is set to false
,
the data source commits the DELETE TABLE
command before appending
rows to the new table, sacrificing the atomicity of the overwrite
operation but reducing the amount of staging space that Redshift needs
during the overwrite.
Querying Redshift tables
Queries use Redshift’s UNLOAD command to execute a query and save its results to S3 and use manifests to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries.
Common problems and solutions
S3 bucket and Redshift cluster are in different AWS regions
By default, S3 <-> Redshift copies do not work if the S3 bucket and Redshift cluster are in different AWS regions.
If you attempt to read a Redshift table when the S3 bucket is in a different region, you may see an error such as:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Similarly, attempting to write to Redshift using a S3 bucket in a different region may cause the following error:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Writes: Redshift’s COPY command supports explicit specification of the S3 bucket’s region, so you can make writes to Redshift work properly in these cases by adding
region 'the-region-name'
to theextracopyoptions
setting. For example, with a bucket in the US East (Virginia) region and the Scala API, use:.option("extracopyoptions", "region 'us-east-1'")
For Databricks Runtime 6.2 and above, you can alternatively use the
awsregion
setting instead:.option("awsregion", "us-east-1")
Reads: Redshift’s UNLOAD command also supports explicit specification of the S3 bucket’s region. When using Databricks Runtime 6.2 and above, you can make reads work properly by adding the region to the
awsregion
setting:.option("awsregion", "us-east-1")
For Databricks Runtime 6.1 and lower, this data source does not support writing to a bucket in a different region. The only workaround is to use a new bucket in the same region as your Redshift cluster.
Unexpected S3ServiceException credentials error when you use instance profiles to authenticate to S3
If you are using instance profiles to authenticate to S3 and receive an unexpected
S3ServiceException
error, check whether AWS access keys are specified in the
tempdir
S3 URI, in Hadoop configurations, or in any of the sources checked by the
DefaultAWSCredentialsProviderChain: those sources take precedence over instance profile credentials.
Here is a sample error message that can be a symptom of keys accidentally taking precedence over instance profiles:
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
Long-running Spark query hangs indefinitely even though the corresponding Redshift operation is done
If you are reading or writing large amounts of data from/to Redshift, it may sometimes happen that
your Spark query hangs indefinitely, even though the AWS Redshift Monitoring page shows that the
corresponding LOAD / UNLOAD operation has completed and that the cluster is idle. This is caused by
the connection between Redshift and Spark timing out. To avoid this, make sure the tcpKeepAlive
JDBC flag is enabled and set TCPKeepAliveMinutes
to a low value (for example, 1).
For additional information, see Amazon Redshift JDBC Driver Configuration.
Timestamp with timezone semantics
When reading data, both Redshift TIMESTAMP
and TIMESTAMPTZ
data types are mapped to Spark
TimestampType
, and a value is converted to Coordinated Universal Time (UTC) and is stored as
the UTC timestamp. For a Redshift TIMESTAMP
, the local timezone is assumed as the value
does not have any timezone information. When writing data to a Redshift table, a Spark
TimestampType
is mapped to the Redshift TIMESTAMP
data type.
Migration Guide
The data source now requires you to explicitly set forward_spark_s3_credentials
before Spark S3 credentials are forwarded to Redshift. This change has no impact if you use the
aws_iam_role
or temporary_aws_*
authentication mechanisms. However, if you relied on the old default behavior you must now explicitly set forward_spark_s3_credentials
to true
to
continue using your previous Redshift to S3 authentication mechanism. For a discussion of the
three authentication mechanisms and their security trade-offs, see the
Authenticating to S3 and Redshift section of this document.