Query Amazon Redshift using Databricks

You can read and write tables from Amazon Redshift with Databricks.

Experimental

The configurations described in this article are Experimental. Experimental features are provided as-is and are not supported by Databricks through customer technical support. To get full query federation support, you should instead use Lakehouse Federation, which enables your Databricks users to take advantage of Unity Catalog syntax and data governance tools.

The Databricks Redshift data source uses Amazon S3 to efficiently transfer data in and out of Redshift and uses JDBC to automatically trigger the appropriate COPY and UNLOAD commands on Redshift.

Note

In Databricks Runtime 11.3 LTS and above, Databricks Runtime includes the Redshift JDBC driver, accessible using the redshift keyword for the format option. See Databricks Runtime release notes versions and compatibility for driver versions included in each Databricks Runtime. User-provided drivers are still supported and take precedence over the bundled JDBC driver.

In Databricks Runtime 10.4 LTS and below, manual installation of the Redshift JDBC driver is required, and queries should use the driver (com.databricks.spark.redshift) for the format. See Redshift driver installation.

Usage

The following examples demonstrate connecting with the Redshift driver. Replace the url parameter values if you’re using the PostgreSQL JDBC driver.

Once you have configured your AWS credentials, you can use the data source with the Spark data source API in Python, SQL, R, or Scala.

Important

External locations defined in Unity Catalog are not supported as tempdir locations.

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

Read data using SQL on Databricks Runtime 10.4 LTS and below:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Read data using SQL on Databricks Runtime 11.3 LTS and above:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Write data using SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

The SQL API supports only the creation of new tables and not overwriting or appending.

Read data using R on Databricks Runtime 10.4 LTS and below:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Read data using R on Databricks Runtime 11.3 LTS and above:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Recommendations for working with Redshift

Query execution may extract large amounts of data to S3. If you plan to perform several queries against the same data in Redshift, Databricks recommends saving the extracted data using Delta Lake.

Note

You should not create a Redshift cluster inside the Databricks managed VPC as it can lead to permissions issues due to the security model in the Databricks VPC. You should create your own VPC and then perform VPC peering to connect Databricks to your Redshift instance.

Configuration

Authenticating to S3 and Redshift

The data source involves several network connections, illustrated in the following diagram:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

The data source reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires AWS credentials with read and write access to an S3 bucket (specified using the tempdir configuration parameter).

Note

The data source does not clean up the temporary files that it creates in S3. As a result, we recommend that you use a dedicated temporary S3 bucket with an object lifecycle configuration to ensure that temporary files are automatically deleted after a specified expiration period. See the Encryption section of this document for a discussion of how to encrypt these files. You cannot use an External location defined in Unity Catalog as a tempdir location.

The following sections describe each connection’s authentication configuration options:

Spark driver to Redshift

The Spark driver connects to Redshift via JDBC using a username and password. Redshift does not support the use of IAM roles to authenticate this connection. By default, this connection uses SSL encryption; for more details, see Encryption.

Spark to S3

S3 acts as an intermediary to store bulk data when reading from or writing to Redshift. Spark connects to S3 using both the Hadoop FileSystem interfaces and directly using the Amazon Java SDK’s S3 client.

Note

You cannot use DBFS mounts to configure access to S3 for Redshift.

  • Default Credential Provider Chain (best option for most users): AWS credentials are automatically retrieved through the DefaultAWSCredentialsProviderChain. If you use instance profiles to authenticate to S3 then you should probably use this method.

    The following methods of providing credentials take precedence over this default.

  • By assuming an IAM role: You can use an IAM role that the instance profile can assume. To specify the role ARN, you must attach an instance profile to the cluster, and provide the following configuration keys:

    sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole")
    sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
    // An optional duration, expressed as a quantity and a unit of
    // time, such as "15m" or "1h"
    sc.hadoopConfiguration.set("fs.s3a.assumed.role.session.duration", <duration>)
    
    sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole")
    sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
    # An optional duration, expressed as a quantity and a unit of
    # time, such as "15m" or "1h"
    sc._jsc.hadoopConfiguration().set("fs.s3a.assumed.role.session.duration", <duration>)
    
  • Set keys in Hadoop conf: You can specify AWS keys using Hadoop configuration properties. If your tempdir configuration points to an s3a:// filesystem, you can set the fs.s3a.access.key and fs.s3a.secret.key properties in a Hadoop XML configuration file or call sc.hadoopConfiguration.set() to configure Spark’s global Hadoop configuration. If you use an s3n:// filesystem, you can provide the legacy configuration keys as shown in the following example.

    For example, if you are using the s3a filesystem, add:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    For the legacy s3n filesystem, add:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    

    The following command relies on some Spark internals, but should work with all PySpark versions and is unlikely to change in the future:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift to S3

Redshift also connects to S3 during COPY and UNLOAD queries. There are three methods of authenticating this connection:

  • Have Redshift assume an IAM role (most secure): You can grant Redshift permission to assume an IAM role during COPY or UNLOAD operations and then configure the data source to instruct Redshift to use that role:

    1. Create an IAM role granting appropriate S3 permissions to your bucket.

    2. Follow the guide Authorizing Amazon Redshift to Access Other AWS Services On Your Behalf to configure this role’s trust policy in order to allow Redshift to assume this role.

    3. Follow the steps in the Authorizing COPY and UNLOAD Operations Using IAM Roles guide to associate that IAM role with your Redshift cluster.

    4. Set the data source’s aws_iam_role option to the role’s ARN.

  • Forward Spark’s S3 credentials to Redshift: if the forward_spark_s3_credentials option is set to true then the data source automatically discovers the credentials that Spark is using to connect to S3 and forwards those credentials to Redshift over JDBC. If Spark is authenticating to S3 using an instance profile then a set of temporary STS credentials is forwarded to Redshift; otherwise, AWS keys are forwarded. The JDBC query embeds these credentials so therefore Databricks strongly recommends that you enable SSL encryption of the JDBC connection when using this authentication method.

  • Use Security Token Service (STS) credentials: You may configure the temporary_aws_access_key_id, temporary_aws_secret_access_key, and temporary_aws_session_token configuration properties to point to temporary keys created via the AWS Security Token Service. The JDBC query embeds these credentials so therefore it is strongly recommended to enable SSL encryption of the JDBC connection when using this authentication method. If you choose this option then be aware of the risk that the credentials expire before the read / write operation succeeds.

These three options are mutually exclusive and you must explicitly choose which one to use.

Encryption

  • Securing JDBC: Unless any SSL-related settings are present in the JDBC URL, the data source by default enables SSL encryption and also verifies that the Redshift server is trustworthy (that is, sslmode=verify-full). For that, a server certificate is automatically downloaded from the Amazon servers the first time it is needed. In case that fails, a pre-bundled certificate file is used as a fallback. This holds for both the Redshift and the PostgreSQL JDBC drivers.

    In case there are any issues with this feature, or you simply want to disable SSL, you can call .option("autoenablessl", "false") on your DataFrameReader or DataFrameWriter.

    If you want to specify custom SSL-related settings, you can follow the instructions in the Redshift documentation: Using SSL and Server Certificates in Java and JDBC Driver Configuration Options Any SSL-related options present in the JDBC url used with the data source take precedence (that is, the auto-configuration will not trigger).

  • Encrypting UNLOAD data stored in S3 (data stored when reading from Redshift): According to the Redshift documentation on Unloading Data to S3, “UNLOAD automatically encrypts data files using Amazon S3 server-side encryption (SSE-S3).”

    Redshift also supports client-side encryption with a custom key (see: Unloading Encrypted Data Files) but the data source lacks the capability to specify the required symmetric key.

  • Encrypting COPY data stored in S3 (data stored when writing to Redshift): According to the Redshift documentation on Loading Encrypted Data Files from Amazon S3:

You can use the COPY command to load data files that were uploaded to Amazon S3 using server-side encryption with AWS-managed encryption keys (SSE-S3 or SSE-KMS), client-side encryption, or both. COPY does not support Amazon S3 server-side encryption with a customer-supplied key (SSE-C).

To use this capability, configure your Hadoop S3 filesystem to use Amazon S3 encryption. This will not encrypt the MANIFEST file that contains a list of all files written.

Parameters

The parameter map or OPTIONS provided in Spark SQL support the following settings:

Parameter

Required

Default

Description

dbtable

Yes, unless query is specified.

None

The table to create or read from in Redshift. This parameter is required when saving data back to Redshift.

query

Yes, unless dbtable is specified.

None

The query to read from in Redshift.

user

No

None

The Redshift username. Must be used in tandem with password option. Can be used only if the user and password are not passed in the URL, passing both will result in an error. Use this parameter when the username contains special characters that need to be escaped.

password

No

None

The Redshift password. Must be used in tandem with user option. Can be used only if the user and password are not passed in the URL; passing both will result in an error. Use this parameter when the password contains special characters that need to be escaped.

url

Yes

None

A JDBC URL, of the format

jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol can be postgresql or redshift, depending on which JDBC driver you have loaded. One Redshift-compatible driver must be on the classpath and match this URL. host and port should point to the Redshift master node, so security groups and/or VPC must be configured to allow access from your driver application. database identifies a Redshift database name user and password are credentials to access the database, which must be embedded in this URL for JDBC, and your user account should have necessary privileges for the table being referenced.

search_path

No

None

Set schema search path in Redshift. Will be set using the SET search_path to command. Should be a comma separated list of schema names to search for tables in. See Redshift documentation of search_path.

aws_iam_role

Only if using IAM roles to authorize.

None

Fully specified ARN of the IAM Redshift COPY/UNLOAD operations Role attached to the Redshift cluster, For example, arn:aws:iam::123456789000:role/<redshift-iam-role>.

forward_spark_s3_credentials

No

false

If true, the data source automatically discovers the credentials that Spark is using to connect to S3 and forwards those credentials to Redshift over JDBC. These credentials are sent as part of the JDBC query, so therefore it is strongly recommended to enable SSL encryption of the JDBC connection when using this option.

temporary_aws_access_key_id

No

None

AWS access key, must have write permissions to the S3 bucket.

temporary_aws_secret_access_key

No

None

AWS secret access key corresponding to provided access key.

temporary_aws_session_token

No

None

AWS session token corresponding to provided access key.

tempdir

Yes

None

A writable location in Amazon S3, to be used for unloaded data when reading and Avro data to be loaded into Redshift when writing. If you’re using Redshift data source for Spark as part of a regular ETL pipeline, it can be useful to set a Lifecycle Policy on a bucket and use that as a temp location for this data.

You cannot use External locations defined in Unity Catalog as tempdir locations.

jdbcdriver

No

Determined by the JDBC URL’s subprotocol.

The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver class name should automatically be determined by the JDBC URL’s subprotocol.

diststyle

No

EVEN

The Redshift Distribution Style to be used when creating a table. Can be one of EVEN, KEY or ALL (see Redshift docs). When using KEY, you must also set a distribution key with the distkey option.

distkey

No, unless using DISTSTYLE KEY

None

The name of a column in the table to use as the distribution key when creating a table.

sortkeyspec

No

None

A full Redshift Sort Key definition. Examples include:

  • SORTKEY(my_sort_column)

  • COMPOUND SORTKEY(sort_col_1, sort_col_2)

  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)

usestagingtable (Deprecated)

No

true

Setting this deprecated option to false causes an overwrite operation’s destination table to be dropped immediately at the beginning of the write, making the overwrite operation non-atomic and reducing the availability of the destination table. This may reduce the temporary disk space requirements for overwrites.

Since setting usestagingtable=false operation risks data loss or unavailability, it is deprecated in favor of requiring you to manually drop the destination table.

description

No

None

A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. See also the description metadata to set descriptions on individual columns.

preactions

No

None

A ; separated list of SQL commands to be executed before loading COPY command. It may be useful to have some DELETE commands or similar run here before loading new data. If the command contains %s, the table name is formatted in before execution (in case you’re using a staging table).

Be warned that if these commands fail, it is treated as an error and an exception is thrown. If using a staging table, the changes are reverted and the backup table restored if pre actions fail.

postactions

No

None

A ; separated list of SQL commands to be executed after a successful COPY when loading data. It may be useful to have some GRANT commands or similar run here when loading new data. If the command contains %s, the table name is formatted in before execution (in case you’re using a staging table).

Be warned that if these commands fail, it is treated as an error and an exception is thrown. If using a staging table, the changes are reverted and the backup table restored if post actions fail.

extracopyoptions

No

None

A list of extra options to append to the Redshift COPY command when loading data, for example, TRUNCATECOLUMNS or MAXERROR n (see the Redshift docs for other options).

Since these options are appended to the end of the COPY command, only options that make sense at the end of the command can be used, but that should cover most possible use cases.

tempformat

No

AVRO

The format in which to save temporary files in S3 when writing to Redshift. Defaults to AVRO; the other allowed values are CSV and CSV GZIP for CSV and gzipped CSV, respectively.

Redshift is significantly faster when loading CSV than when loading Avro files, so using that tempformat may provide a large performance boost when writing to Redshift.

csvnullstring

No

@NULL@

The String value to write for nulls when using the CSV tempformat. This should be a value that does not appear in your actual data.

csvseparator

No

,

Separator to use when writing temporary files with tempformat set to CSV or CSV GZIP. This must be a valid ASCII character, for example, “,” or “|”.

csvignoreleadingwhitespace

No

true

When set to true, removes leading whitespace from values during writes when tempformat is set to CSV or CSV GZIP. Otherwise, whitespace is retained.

csvignoretrailingwhitespace

No

true

When set to true, removes trailing whitespace from values during writes when tempformat is set to CSV or CSV GZIP. Otherwise, the whitespace is retained.

infer_timestamp_ntz_type

No

false

If true, values of type Redshift TIMESTAMP are interpreted as TimestampNTZType (timestamp without time zone) during reads. Otherwise, all timestamps are interpreted as TimestampType regardless of the type in the underlying Redshift table.

Additional configuration options

Configuring the maximum size of string columns

When creating Redshift tables, the default behavior is to create TEXT columns for string columns. Redshift stores TEXT columns as VARCHAR(256), so these columns have a maximum size of 256 characters (source).

To support larger columns, you can use the maxlength column metadata field to specify the maximum length of individual string columns. This is also useful for implementing space-saving performance optimizations by declaring columns with a smaller maximum length than the default.

Note

Due to limitations in Spark, the SQL and R language APIs do not support column metadata modification.

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Here is an example of updating multiple columns’ metadata fields using Spark’s Scala API:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Set a custom column type

If you need to manually set a column type, you can use the redshift_type column metadata. For example, if you desire to override the Spark SQL Schema -> Redshift SQL type matcher to assign a user-defined column type, you can do the following:

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Configure column encoding

When creating a table, use the encoding column metadata field to specify a compression encoding for each column (see Amazon docs for available encodings).

Setting descriptions on columns

Redshift allows columns to have descriptions attached that should show up in most query tools (using the COMMENT command). You can set the description column metadata field to specify a description for individual columns.

Query pushdown into Redshift

The Spark optimizer pushes the following operators down into Redshift:

  • Filter

  • Project

  • Sort

  • Limit

  • Aggregation

  • Join

Within Project and Filter, it supports the following expressions:

  • Most Boolean logic operators

  • Comparisons

  • Basic arithmetic operations

  • Numeric and string casts

  • Most string functions

  • Scalar subqueries, if they can be pushed down entirely into Redshift.

Note

This pushdown does not support expressions operating on dates and timestamps.

Within Aggregation, it supports the following aggregation functions:

  • AVG

  • COUNT

  • MAX

  • MIN

  • SUM

  • STDDEV_SAMP

  • STDDEV_POP

  • VAR_SAMP

  • VAR_POP

combined with the DISTINCT clause, where applicable.

Within Join, it supports the following types of joins:

  • INNER JOIN

  • LEFT OUTER JOIN

  • RIGHT OUTER JOIN

  • LEFT SEMI JOIN

  • LEFT ANTI JOIN

  • Subqueries that are rewritten into Join by the optimizer e.g. WHERE EXISTS, WHERE NOT EXISTS

Note

Join pushdown does not support FULL OUTER JOIN.

The pushdown might be most beneficial in queries with LIMIT. A query such as SELECT * FROM large_redshift_table LIMIT 10 could take very long, as the whole table would first be UNLOADed to S3 as an intermediate result. With pushdown, the LIMIT is executed in Redshift. In queries with aggregations, pushing the aggregation down into Redshift also helps to reduce the amount of data that needs to be transferred.

Query pushdown into Redshift is enabled by default. It can be disabled by setting spark.databricks.redshift.pushdown to false. Even when disabled, Spark still pushes down filters and performs column elimination into Redshift.

Redshift driver installation

The Redshift data source also requires a Redshift-compatible JDBC driver. Because Redshift is based on the PostgreSQL database system, you can use the PostgreSQL JDBC driver included with Databricks Runtime or the Amazon recommended Redshift JDBC driver. No installation is required to use the PostgreSQL JDBC driver. The version of the PostgreSQL JDBC driver included in each Databricks Runtime release is listed in the Databricks Runtime release notes.

To manually install the Redshift JDBC driver:

  1. Download the driver from Amazon.

  2. Upload the driver to your Databricks workspace. See Libraries.

  3. Install the library on your cluster.

Note

Databricks recommends using the latest version of the Redshift JDBC driver. Versions of the Redshift JDBC driver below 1.2.41 have the following limitations:

  • Version 1.2.16 of the driver returns empty data when using a where clause in an SQL query.

  • Versions of the driver below 1.2.41 may return invalid results because a column’s nullability is incorrectly reported as “Not Nullable” instead of “Unknown”.

Transactional guarantees

This section describes the transactional guarantees of the Redshift data source for Spark.

General background on Redshift and S3 properties

For general information on Redshift transactional guarantees, see the Managing Concurrent Write Operations chapter in the Redshift documentation. In a nutshell, Redshift provides serializable isolation according to the documentation for the Redshift BEGIN command:

[although] you can use any of the four transaction isolation levels, Amazon Redshift processes all isolation levels as serializable.

According to the Redshift documentation:

Amazon Redshift supports a default automatic commit behavior in which each separately-executed SQL command commits individually.

Thus, individual commands like COPY and UNLOAD are atomic and transactional, while explicit BEGIN and END should only be necessary to enforce the atomicity of multiple commands or queries.

When reading from and writing to Redshift, the data source reads and writes data in S3. Both Spark and Redshift produce partitioned output and store it in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent, so the files must to go to special lengths to avoid missing or incomplete data due to this source of eventual-consistency.

Guarantees of the Redshift data source for Spark

Append to an existing table

When inserting rows into Redshift, the data source uses the COPY command and specifies manifests to guard against certain eventually-consistent S3 operations. As a result, spark-redshift appends to existing tables have the same atomic and transactional properties as regular Redshift COPY commands.

Create a new table (SaveMode.CreateIfNotExists)

Creating a new table is a two-step process, consisting of a CREATE TABLE command followed by a COPY command to append the initial set of rows. Both operations are performed in the same transaction.

Overwrite an existing table

By default, the data source uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it.

If the deprecated usestagingtable setting is set to false, the data source commits the DELETE TABLE command before appending rows to the new table, sacrificing the atomicity of the overwrite operation but reducing the amount of staging space that Redshift needs during the overwrite.

Query Redshift table

Queries use the Redshift UNLOAD command to execute a query and save its results to S3 and use manifests to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries.

Common problems and solutions

S3 bucket and Redshift cluster are in different AWS regions

By default, S3 <-> Redshift copies do not work if the S3 bucket and Redshift cluster are in different AWS regions.

If you attempt to read a Redshift table when the S3 bucket is in a different region, you may see an error such as:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Similarly, attempting to write to Redshift using a S3 bucket in a different region may cause the following error:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Writes: The Redshift COPY command supports explicit specification of the S3 bucket region, so you can make writes to Redshift work properly in these cases by adding region 'the-region-name' to the extracopyoptions setting. For example, with a bucket in the US East (Virginia) region and the Scala API, use:

    .option("extracopyoptions", "region 'us-east-1'")
    

    You can alternatively use the awsregion setting:

    .option("awsregion", "us-east-1")
    
  • Reads: The Redshift UNLOAD command also supports explicit specification of the S3 bucket region. You can make reads work properly by adding the region to the awsregion setting:

    .option("awsregion", "us-east-1")
    

Unexpected S3ServiceException credentials error when you use instance profiles to authenticate to S3

If you are using instance profiles to authenticate to S3 and receive an unexpected S3ServiceException error, check whether AWS access keys are specified in the tempdir S3 URI, in Hadoop configurations, or in any of the sources checked by the DefaultAWSCredentialsProviderChain: those sources take precedence over instance profile credentials.

Here is a sample error message that can be a symptom of keys accidentally taking precedence over instance profiles:

com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;

Authentication error when using a password with special characters in the JDBC url

If you are providing the username and password as part of the JDBC url and the password contains special characters such as ;, ?, or &, you might see the following exception:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

This is caused by special characters in the username or password not being escaped correctly by the JDBC driver. Make sure to specify the username and password using the corresponding DataFrame options user and password. For more information, see Parameters.

Long-running Spark query hangs indefinitely even though the corresponding Redshift operation is done

If you are reading or writing large amounts of data from and to Redshift, your Spark query may hang indefinitely, even though the AWS Redshift Monitoring page shows that the corresponding LOAD or UNLOAD operation has completed and that the cluster is idle. This is caused by the connection between Redshift and Spark timing out. To avoid this, make sure the tcpKeepAlive JDBC flag is enabled and TCPKeepAliveMinutes is set to a low value (for example, 1).

For additional information, see Amazon Redshift JDBC Driver Configuration.

Timestamp with timezone semantics

When reading data, both Redshift TIMESTAMP and TIMESTAMPTZ data types are mapped to Spark TimestampType, and a value is converted to Coordinated Universal Time (UTC) and is stored as the UTC timestamp. For a Redshift TIMESTAMP, the local timezone is assumed as the value does not have any timezone information. When writing data to a Redshift table, a Spark TimestampType is mapped to the Redshift TIMESTAMP data type.

Migration guide

The data source now requires you to explicitly set forward_spark_s3_credentials before Spark S3 credentials are forwarded to Redshift. This change has no impact if you use the aws_iam_role or temporary_aws_* authentication mechanisms. However, if you relied on the old default behavior you must now explicitly set forward_spark_s3_credentials to true to continue using your previous Redshift to S3 authentication mechanism. For a discussion of the three authentication mechanisms and their security trade-offs, see the Authenticating to S3 and Redshift section of this document.