Amazon Redshift

Note

Users should not create a Redshift cluster inside of the Databricks managed VPC. This may lead to permissions issues due to the security model in the Databricks VPC. Users should create their own VPC and then perform VPC Peering in order to connect Databricks to your Redshift instance.

A library to load data into Spark SQL DataFrames from Amazon Redshift, and write them back to Redshift tables. This data source uses Amazon S3 to efficiently transfer data in and out of Redshift, and uses JDBC to automatically trigger the appropriate COPY and UNLOAD commands on Redshift.

This library is more suited to ETL than interactive queries, since each query execution may extract large amounts of data to S3. If you plan to perform several queries against the same Redshift data then we recommend saving the extracted data in a format such as Parquet.

Installation

The installation steps vary depending on your Spark cluster’s cluster image version:

In addition, you must configure a Redshift-compatible JDBC driver. There are two options:

  1. Use the bundled PostgreSQL JDBC driver: Databricks cluster images automatically include the PostgreSQL JDBC driver, so you may specify a JDBC url of the form jdbc:postgresql://... in order to use that driver.

  2. Download and install the offical Redshift JDBC driver: download the official Amazon Redshift JDBC driver, upload it to Databricks, and attach the library to your cluster. Then, use JDBC urls of the form jdbc:redshift://....

    Databricks purposely does not bundle the Redshift driver in order to avoid dependency conflicts: an automatically-bundled Redshift JDBC driver would interfere with users’ ability to choose between the JDBC 4.0, 4.1, or 4.2 drivers when working directly with Redshift over JDBC and would also adversely impact PostegreSQL users because the Redshift driver registers itself as a handler of both the postgresql and redshift JDBC subprotocols (source).

Usage

Once you have configured your AWS credentials, you can use this library via the Data Sources API in Scala, Python or SQL, as follows:

Scala

// Get some data from a Redshift table
val df: DataFrame = spark.read
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
  .option("dbtable", "my_table")
  .option("tempdir", "s3n://path/for/temp/data")
  .load()

// Can also load data from a Redshift query
val df: DataFrame = spark.read
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
  .option("query", "select x, count(*) my_table group by x")
  .option("tempdir", "s3n://path/for/temp/data")
  .load()

// Apply some transformations to the data as per normal, then you can use the
// Data Source API to write the data back to another table

df.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
  .option("dbtable", "my_table_copy")
  .option("tempdir", "s3n://path/for/temp/data")
  .mode("error")
  .save()

// Using IAM Role based authentication
df.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
  .option("dbtable", "my_table_copy")
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .option("tempdir", "s3n://path/for/temp/data")
  .mode("error")
  .save()

Python

# Read data from a table
df = spark.read \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable", "my_table") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .load()

# Read data from a query
df = spark.read \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("query", "select x, count(*) my_table group by x") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .load()

# Write back to a table
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable", "my_table_copy") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .mode("error") \
  .save()

# Using IAM Role based authentication
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
  .option("dbtable", "my_table_copy") \
  .option("tempdir", "s3n://path/for/temp/data") \
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
  .mode("error") \
  .save()

SQL

Reading data using SQL:

CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (
  dbtable 'my_table',
  tempdir 's3n://path/for/temp/data',
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
);

Writing data using SQL:

-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE my_table
USING com.databricks.spark.redshift
OPTIONS (
  dbtable 'my_table',
  tempdir 's3n://path/for/temp/data'
  url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
)
AS SELECT * FROM tabletosave;

Note that the SQL API only supports the creation of new tables and not overwriting or appending; this corresponds to the default save mode of the other language APIs.

R

Reading data using R:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3n://path/for/temp/data",
   dbtable = "my_table",
   url = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")

Configuration

Authenticating to S3 and Redshift

The use of this library involves several network connections, illustrated in the following diagram:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │   Spark   │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
                password                    Spark
        (SSL enabled by default)

This library reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires AWS credentials with read and write access to a S3 bucket (specified using the tempdir configuration parameter).

Note

This library does not clean up the temporary files that it creates in S3. As a result, we recommend that you use a dedicated temporary S3 bucket with an object lifecycle configuration to ensure that temporary files are automatically deleted after a specified expiration period. See the Encryption section of this document for a discussion of how to encrypt these files.

The following describes each connection’s authentication configuration options:

  • Spark driver to Redshift: The Spark driver connects to Redshift via JDBC using a username and password. Redshift does not support the use of IAM roles to authenticate this connection. By default, this connection uses SSL encryption; for more details, see the Encryption section below.

  • Spark to S3: S3 acts as an intermediary to store bulk data when reading from or writing to Redshift. Spark connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK’s S3 client.

    This connection supports either AWS keys or IAM role (DBFS mountpoints are not currently supported, so users who do not want to rely on AWS keys should use cluster IAM roles instead).

    There are multiple ways of providing these credentials:

    1. Default Credential Provider Chain (best option for most users): AWS credentials are automatically retrieved through the DefaultAWSCredentialsProviderChain.

      If you use IAM role roles to authenticate to S3 then you should probably use this method.

      Other methods of providing credentials (methods 2 and 3 below) take precedence over this default.

    2. Set keys in Hadoop conf: You can specify AWS keys via Hadoop configuration properties. For example, if your tempdir configuration points to a s3n:// filesystem then you can set the fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey properties in a Hadoop XML configuration file or call sc.hadoopConfiguration.set() to mutate Spark’s global Hadoop configuration.

      For example, if you are using the s3n filesystem then add

      sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
      sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
      

      and for the s3a filesystem add

      sc.hadoopConfiguration.set("fs.s3a.access.key", "YOUR_KEY_ID")
      sc.hadoopConfiguration.set("fs.s3a.secret.key", "YOUR_SECRET_ACCESS_KEY")
      

      Python users must use a slightly different method to modify the hadoopConfiguration, since this field is not exposed in all versions of PySpark. Although the following command relies on some Spark internals, it should work with all PySpark versions and is unlikely to break or change in the future:

      sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
      sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
      
    3. Encode keys in tempdir URI: For example, the URI s3n://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir encodes the key pair (ACCESSKEY, SECRETKEY).

      Due to Hadoop limitations, this approach will not work for secret keys which contain forward slash (/) characters, even if those characters areurlencoded.

  • Redshift to S3: Redshift also connects to S3 during COPY and UNLOAD queries. There are three methods of authenticating this connection:

    1. Have Redshift assume an IAM role (most secure): You can grant Redshift permission to assume an IAM role during COPY or UNLOAD operations and then configure this library to instruct Redshift to use that role:
      1. Create an IAM role granting appropriate S3 permissions to your bucket.
      2. Follow the guide Authorizing Amazon Redshift to Access Other AWS Services On Your Behalf to configure this role’s trust policy in order to allow Redshift to assume this role.
      3. Follow the steps in the Authorizing COPY and UNLOAD Operations Using IAM Roles guide to associate that IAM role with your Redshift cluster.
      4. Set this library’s aws_iam_role option to the role’s ARN.
    2. Forward Spark’s S3 credentials to Redshift: if the forward_spark_s3_credentials option isset to true then this library will automatically discover the credentials that Spark is using to connect to S3 and will forward those credentials to Redshift over JDBC. If Spark is authenticating to S3 using an IAM instance role then a set of temporary STS credentials will beforwarded to Redshift; otherwise, AWS keys will beforwarded. The JDBC query embeds these credentials credentials so therefore it is strongly recommended to enable SSL encryption of the JDBC connection when using this authentication method.
    3. Use Security Token Service (STS) credentials: You may configure the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration properties to point to temporary keys created via the AWS Security Token Service. The JDBC query embeds these credentials so therefore it is strongly recommended to enable SSL encryption of the JDBC connection when using this authentication method. If you choose this option then please be aware of the risk that the credentials expire before the read / write operation succeeds.

    These three options are mutually-exclusive and you must explicitly choose which one to use.

Encryption

  • Securing JDBC: Unless any SSL-related settings are present in the JDBC URL, the library will, by default, enable SSL encryption and also verify that the Redshift server is trustworthy (i.e. sslmode=verify-full). For that, a server certificate will be automatically downloaded from the Amazon servers the first time it is needed. In case that fails, a pre-bundled certificate file will be used as a fallback. This holds for both the Redshift and the PostgreSQL JDBC drivers. Automatic SSL configuration was introduced in 2.1.0-db3 Cluster Image; earlier releases do not automatically configure SSL and will use the default JDBC driver configuration (SSL disabled).

    In case there are any issues with this feature, or you simply want to disable SSL, you may call .option("autoenablessl", "false") on your DataFrameReader/Writer.

    If you want to specify custom SSL-related settings, you may follow the instructions in the Redshift documentation: Using SSL and Server Certificates in Java and JDBC Driver Configuration Options Any SSL-related options present in the JDBC url used with this library will take precedence (i.e. the auto-configuration will not trigger).

  • Encrypting UNLOAD data stored in S3 (data stored when reading from Redshift): According to the Redshift documentation on Unloading Data to S3, “UNLOAD automatically encrypts data files using Amazon S3 server-side encryption (SSE-S3).”

    Redshift also supports client-side encryption with a custom key (see: Unloading Encrypted Data Files) but this library currently lacks the capability to specify the required symmetric key.

  • Encrypting COPY data stored in S3 (data stored when writing to Redshift): According to the Redshift documentation on Loading Encrypted Data Files from Amazon S3:

    You can use the COPY command to load data files that wereuploaded to Amazon S3 using server-side encryption with AWS-managed encryption keys (SSE-S3 or SSE-KMS), client-side encryption, or both. COPY does not support Amazon S3 server-side encryption with a customer-supplied key (SSE-C)

    To use this capability, you should configure your Hadoop S3 FileSystem to use encryption by setting the appropriate configuration properties (which will vary depending on whether you are using s3a, s3n, etc.). Note that this will not encrypt the MANIFEST file (a list of all files written).

Parameters

The parameter map or OPTIONS provided in Spark SQL supports the following settings:

Parameter Required Default Notes
dbtable Yes, unless query isspecified No default The table to create or read from in Redshift. This parameter isrequired when saving data back to Redshift.
query Yes, unless dbtable isspecified No default The query to read from in Redshift
user No No default The Redshift username. Must beused in tandem with password option. May only beused if the user and password are not passed in the URL, passing both will result in an error.
password No No default The Redshift password. Must beused in tandem with user option. May only be used if the user and password are not passed in the URL; passing both will result in an error.
url Yes No default

A JDBC URL, of the format, jdbc:subprotocol://host:port/database?user=username&password=password

subprotocol can be postgresql or redshift, depending on which JDBC driver you have loaded. Note however that one Redshift-compatible driver must be on the classpath and match this URL. host and port should point to the Redshift master node, so security groups and/or VPC will must be configured to allow access from your driver application. database identifies a Redshift database name user and password are credentials to access the database, which must be embedded in this URL for JDBC, and your user account should have necessary privileges for the table being referenced.

search_path No No default Set schema search path in Redshift. Will be set using the SET search_path to command. Should be a comma separated list of schema names to search for tables in. See Redshift documentation of search_path.
aws_iam_role Only if using IAM roles to authorize No default Fully specified ARN of the IAM Redshift COPY/UNLOAD operations Role attached to the Redshift cluster, ex: arn:aws:iam::123456789000:role/redshift_iam_role
forward_spark_s3_credentials No false If true then this library will automatically discover the credentials that Spark is using to connect to S3 and will forward those credentials to Redshift over JDBC. These credentials are sent as part of the JDBC query, so therefore it is strongly recommended to enable SSL encryption of the JDBC connection when using this option.
temporary_aws_access_key_id No No default AWS access key, must have write permissions to the S3 bucket.
temporary_aws_secret_access_key No No default AWS secret access key corresponding to provided access key.
temporary_aws_session_token No No default AWS session token corresponding to provided access key.
tempdir Yes No default A writeable location in Amazon S3, to be used for unloaded data when reading and Avro data to be loaded into Redshift when writing. If you’re using Redshift data source for Spark as part of a regular ETL pipeline, it can be useful to set a Lifecycle Policy on a bucket and use that as a temp location for this data.
jdbcdriver No Determined by the JDBC URL’s subprotocol The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL’s subprotocol.
diststyle No EVEN The Redshift Distribution Style to be used when creating a table. Can be one of EVEN, KEY or ALL (see Redshift docs). When using KEY, you must also set a distribution key with the distkey option.
distkey No, unless using DISTSTYLE KEY No default The name of a column in the table to use as the distribution key when creating a table.
sortkeyspec No No default

A full Redshift Sort Key definition.

Examples include:

  • SORTKEY(my_sort_column)
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (Deprecated) No true

Setting this deprecated option to false will cause an overwrite operation’s destination table to be dropped immediately at the beginning of the write, making the overwrite operation non-atomic and reducing the availability of the destination table. This may reduce the temporary disk space requirements for overwrites.

Since setting usestagingtable=false operation risks data loss / unavailability, we have chosen to deprecate it in favor of requiring users to manually drop the destination table themselves.

description No No default A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. See also the description metadata to set descriptions on individual columns.
preactions No No default

This can be a ; separated list of SQL commands to be executed before loading COPY command. It may be useful to have some DELETE commands or similar run here before loading new data. If the command contains %s, the table name will be formatted in before execution (in case you’re using a staging table).

Be warned that if this commands fail, it is treated as an error and you’ll get an exception. If using a staging table, the changes will be reverted and the backup table restored if pre actions fail.

postactions No No default

This can be a ; separated list of SQL commands to be executed after a successful COPY when loading data. It may be useful to have some GRANT commands or similar run here when loading new data. If the command contains %s, the table name will be formatted in before execution (in case you’re using a staging table).

Be warned that if this commands fail, it is treated as an error and you’ll get an exception. If using a staging table, the changes will be reverted and the backup table restored if post actions fail.

extracopyoptions No No default

A list extra options to append to the Redshift COPY command when loading data, e.g. TRUNCATECOLUMNS or MAXERROR n (see the Redshift docs for other options).

Note that since these options are appended to the end of the COPY command, only options that make sense at the end of the command can be used, but that should cover most possible use cases.

tempformat (Experimental) No AVRO

The format in which to save temporary files in S3 when writing to Redshift. Defaults to “AVRO”; the other allowed values are “CSV” and “CSV GZIP” for CSV and gzipped CSV, respectively.

Redshift is significantly faster when loading CSV than when loading Avro files, so using that tempformat may provide a large performance boost when writing to Redshift.

csvnullstring (Experimental) No @NULL@ The String value to write for nulls when using the CSV tempformat. This should be a value which does not appear in your actual data.

Additional configuration options

Configuring the maximum size of string columns

When creating Redshift tables, this library’s default behavior is to create TEXT columns for string columns. Redshift stores TEXT columns as VARCHAR(256), so these columns have a maximum size of 256 characters (source).

To support larger columns, you can use the maxlength column metadata field to specify the maximum length of individual string columns. This is also useful for implementing space-saving performance optimizations by declaring columns with a smaller maximum length than the default.

Note

Due to limitations in Spark, the SQL and R language APIs do not support column metadata modification. Python support for column metadata modification requires Databricks Runtime 3.0+.

Scala

Here is an example of updating multiple columns’ metadata fields using Spark’s Scala API:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
  .save()

Python

New in version runtime-3.0.

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Setting a custom column type

If you need to manually set a column type, you can use the redshift_type column metadata. For example, if you desire to override the Spark SQL Schema -> Redshift SQL type matcher to assign a user-defined column type, you can do the following:

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Python

New in version runtime-3.0.

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Configuring column encoding

When creating a table, can use the encoding column metadata field to specify a compression encoding for each column (see Amazon docs for available encodings).

Setting descriptions on columns

Redshift allows columns to have descriptions attached that should show up in most query tools (using the COMMENT command). You can set the description column metadata field to specify a description for individual columns.

Query pushdown into Redshift

New in version 2.1.0-db3.

The Spark optimizer will try to push the following operators down into Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation

Within Project and Filter, it supports the following expressions:

  • most boolean logic operators
  • comparisons
  • basic arithmetic operations
  • numeric and string casts
  • most string functions
  • scalar subqueries, if they can be pushed down entirely into Redshift.

Note

This pushdown does not currently support expressions operating on dates and timestamps.

Within Aggregation, it supports the following aggregation functions:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

combined with DISTINCT clause, where applicable.

The pushdown might be most beneficial in queries with LIMIT. A query such as SELECT * FROM large_redshift_table LIMIT 10 could take very long, as the whole table would first be UNLOADed to S3 as an intermediate result. With pushdown, the LIMIT will be executed in Redshift. In queries with aggregations, pushing the aggregation down into Redshift also helps to reduce the amount of data that needs to be transferred.

Query pushdown into Redshift is enabled by default. It can be disabled by setting spark.databricks.redshift.pushdown to false. Even when disabled, Spark will still push down filters and perform column elimination into Redshift.

Transactional Guarantees

This section describes the transactional guarantees of the Redshift data source for Spark

General background on Redshift and S3’s properties

For general information on Redshift’s transactional guarantees, see the Managing Concurrent Write Operations chapter in the Redshift documentation. In a nutshell, Redshift provides serializable isolation (according to the documentation for Redshift’s BEGIN command, “[although] you can use any of the four transaction isolation levels, Amazon Redshift processes all isolation levels as serializable”). According to its documentation, “Amazon Redshift supports a default automatic commit behavior in which each separately-executed SQL command commits individually.” Thus, individual commands like COPY and UNLOAD are atomic and transactional, while explicit BEGIN and END should only be necessary to enforce the atomicity of multiple commands / queries.

When reading from / writing to Redshift, this library reads and writes data in S3. Both Spark and Redshift produce partitioned output and store it in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent, so the files must to go to special lengths to avoid missing / incomplete data due to this source of eventual-consistency.

Guarantees of the Redshift data source for Spark

Appending to an existing table: In the COPY command, this library uses manifests to guard against certain eventually-consistent S3 operations. As a result, it appends to existing tables have the same atomic and transactional properties as regular Redshift COPY commands.

Appending to an existing table: When inserting rows into Redshift, this library uses the COPY command and specifies manifests to guard against certain eventually-consistent S3 operations. As a result, spark-redshift appends to existing tables have the same atomic and transactional properties as regular Redshift COPY commands.

Creating a new table (SaveMode.CreateIfNotExists): Creating a new table is a two-step process, consisting of a CREATE TABLE command followed by a COPY command to append the initial set of rows. Both operations areperformed in the same transaction.

Overwriting an existing table: By default, this library uses transactions to perform overwrites, which areimplemented by deleting the destination table, creating a new empty table, and appending rows to it.

If the deprecated usestagingtable setting isset to false then this library will commit the DELETE TABLE command before appending rows to the new table, sacrificing the atomicity of the overwrite operation but reducing the amount of staging space that Redshift needs during the overwrite.

Querying Redshift tables: Queries use Redshift’s UNLOAD command to execute a query and save its results to S3 and use manifests to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries.

Common problems and solutions

S3 bucket and Redshift cluster are in different AWS regions

By default, S3 <-> Redshift copies will not work if the S3 bucket and Redshift cluster are in different AWS regions.

If you attempt to perform a read of a Redshift table when the S3 bucket is in a different region then you may see a confusing error, such as

java.sql.SQLException: [Amazon](500310) Invalid operation: S3ServiceException:The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.

Similarly, attempting to write to Redshift using a S3 bucket in a different region may cause the following error:

error:  Problem reading manifest file - S3ServiceException:The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.,Status 301,Error PermanentRedirect

For writes: Redshift’s COPY command supports explicit specification of the S3 bucket’s region, so you can make writes to Redshift work properly in these cases by adding

region 'the-region-name'

to the extracopyoptions setting. For example, with a bucket in the US East (Virginia) region and the Scala API, use

.option("extracopyoptions", "region 'us-east-1'")

For reads: According to its documentation, the Redshift UNLOAD command does not support writing to a bucket in a different region:

The Amazon S3 bucket where Amazon Redshift will write the output files must reside in the same region as your cluster.

As a result, this use-case is not supported by this library. The only workaround is to use a new bucket in the same region as your Redshift cluster.

Migration Guide

  • This library now requires forward_spark_s3_credentials to be explicitly set before Spark S3 credentials areforwarded to Redshift. This change does not affect users who use the aws_iam_role or temporary_aws_* authentication mechanisms. Users who relied on the old default behavior will now need to explicitly set forward_spark_s3_credentials to true to continue using their previous Redshift to S3 authentication mechanism. For a discussion of the three authentication mechanisms and their security trade-offs, see the Authenticating to S3 and Redshift section of this document.