DatabricksによるAmazon Redshift へのクエリー
Databricks を使用して、Amazon Redshift からテーブルを読み書きできます。
実験段階
この記事で説明する構成は 実験的です。 実験的な機能は現状のまま提供され、Databricks による顧客テクニカル サポートを通じてサポートされることはありません。 完全なクエリ フェデレーション サポートを取得するには、代わりにレイクハウス フェデレーションを使用する必要があります。これにより、 DatabricksユーザーはUnity Catalog構文とデータガバナンス ツールを活用できるようになります。
Databricks Redshift データソースは、Amazon S3 を使用して Redshift との間でデータを効率的に転送し、JDBC を使用して Redshift で適切な COPY
コマンドと UNLOAD
コマンドを自動的にトリガーします。
注:
Databricks Runtime 11.3 LTS 以降では、Databricks Runtime には Redshift JDBC ドライバーが含まれており、format オプションの redshift
キーワードを使用してアクセスできます。 「Databricks Runtime リリースノートのバージョンと、各 Databricks Runtime に含まれるドライバー バージョンの互換性」を参照してください。ユーザー提供のドライバーは引き続きサポートされ、バンドルされている JDBC ドライバーよりも優先されます。
Databricks Runtime 10.4 LTS 以下では、Redshift JDBC ドライバーを手動でインストールする必要があり、クエリーでは形式にドライバー (com.databricks.spark.redshift
) を使用する必要があります。 「 Redshift ドライバーのインストール」を参照してください。
使用方法
次の例は、Redshift ドライバーとの接続を示しています。 PostgreSQL JDBC ドライバーを使用している場合は、 url
パラメーター値を置き換えます。
AWS 認証情報を設定したら、Python、SQL、R、または Scala の Spark データソース API でデータソースを使用できます。
重要
Unity Catalogで定義されている外部ロケーションは、tempdir
ロケーションとしてサポートされていません。
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
Databricks Runtime 10.4 LTS 以下で SQL を使用してデータを読み取ります。
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Databricks Runtime 11.3 LTS 以降で SQL を使用してデータを読み取る:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
SQLを使用してデータを書き込みます。
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
SQL API では、新しいテーブルの作成のみがサポートされ、上書きや追加はサポートされません。
Databricks Runtime 10.4 LTS 以下で R を使用してデータを読み取ります。
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Databricks Runtime 11.3 LTS 以降で R を使用してデータを読み取る:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Redshiftを使用するための推奨事項
クエリーの実行により、大量のデータが S3 に抽出される場合があります。 Redshift の同じデータに対して複数のクエリーを実行する予定の場合、Databricks では、 Delta Lake を使用して抽出されたデータを保存することをお勧めします。
注:
Databricks VPC のセキュリティ モデルによるアクセス許可の問題につながる可能性があるため、Databricks マネージド VPC 内に Redshift クラスターを作成しないでください。 独自の VPC を作成し、 VPC ピアリング を実行して Databricks を Redshift インスタンスに接続する必要があります。
設定
S3 と Redshiftへの認証
データソースには、次の図に示すように、複数のネットワーク接続が含まれます。
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
データソースは、Redshift との間でデータを転送するときに、S3 に対してデータの読み取りと書き込みを行います。 その結果、S3 バケットへの読み取りおよび書き込みアクセス権を持つ AWS 認証情報 ( tempdir
設定パラメーターを使用して指定) が必要です。
注:
データソースは、S3 で作成した一時ファイルをクリーンアップしません。 そのため、 オブジェクトライフサイクル設定 を持つ専用の一時 S3 バケットを使用して、指定した有効期限後に一時ファイルが自動的に削除されるようにすることをお勧めします。 これらのファイルを暗号化する方法については、このドキュメントの 「暗号化 」セクションを参照してください。 Unity Catalogで定義された外部ロケーションをtempdir
ロケーションとして使用することはできません。
次のセクションでは、各接続の認証構成オプションについて説明します。
Spark ドライバーから Redshift へ
Sparkドライバーは、ユーザー名とパスワードを使用してJDBC経由でRedshiftに接続します。 Redshift は、この接続を認証するための IAMロールの使用をサポートしていません。 デフォルトでは、この接続はSSL暗号化を使用します。詳細については、「 暗号化」を参照してください。
Spark から S3 へ
S3は、Redshiftからの読み取りまたはRedshiftへの書き込み時に大量のデータを保存する仲介役として機能します。 Spark は、Hadoop FileSystem インターフェイスと Amazon Java SDK の S3 クライアントの両方を使用して S3 に接続します。
注:
DBFS マウントを使用して Redshift の S3 へのアクセスを設定することはできません。
デフォルト 資格情報プロバイダー チェーン (ほとんどのユーザーに最適なオプション): AWS 認証情報は、 DefaultAWSCredentialsProviderChain を通じて自動的に取得されます。 インスタンスプロファイルを使用して S3 への認証を行う場合は、この方法を使用する必要があります。
資格情報を提供する次の方法は、このデフォルトよりも優先されます。
IAMロールを想定する: インスタンスプロファイルが想定できる IAMロールを使用できます。 ロール ARN を指定するには、 インスタンスプロファイルをクラスターにアタッチし、以下の設定キーを指定する必要があります。
sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole") sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>) // An optional duration, expressed as a quantity and a unit of // time, such as "15m" or "1h" sc.hadoopConfiguration.set("fs.s3a.assumed.role.session.duration", <duration>)
sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole") sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>) # An optional duration, expressed as a quantity and a unit of # time, such as "15m" or "1h" sc._jsc.hadoopConfiguration().set("fs.s3a.assumed.role.session.duration", <duration>)
Hadoop confでキーを設定します。 AWS キーは、 Hadoop 設定プロパティを使用して指定できます。
tempdir
構成がs3a://
ファイルシステムを指している場合は、Hadoop XML構成ファイルでfs.s3a.access.key
およびfs.s3a.secret.key
プロパティを設定するか、sc.hadoopConfiguration.set()
を呼び出してSparkのグローバルHadoop構成を構成できます。s3n://
ファイルシステムを使用する場合は、次の例に示すように、従来の構成キーを指定できます。たとえば、
s3a
ファイルシステムを使用している場合は、次のように追加します。sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
レガシー
s3n
ファイルシステムの場合は、以下を追加します。sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
次のコマンドは、一部の Spark 内部に依存していますが、すべての PySpark バージョンで動作するはずであり、将来変更される可能性は低いです。
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift から S3 へ
Redshift は、 COPY
および UNLOAD
クエリー中にも S3 に接続します。 この接続を認証するには、次の 3 つの方法があります。
Redshift に IAMロール (最も安全) を引き受けさせる:
COPY
またはUNLOAD
操作中に IAMロールを引き受けるアクセス許可を Redshift に付与し、そのロールを使用するように Redshift に指示するようにデータソースを設定できます。バケットに適切な S3 アクセス許可を付与する IAMロールを作成します。
「 Amazon Redshift がユーザーに代わって他の AWS のサービスにアクセスすることを承認する 」のガイドに従って、Redshift がこのロールを引き受けられるように、このロールの信頼ポリシーを設定します。
「IAMロールを使用した COPY および UNLOAD 操作の承認」ガイドのステップに従って、その IAMロールを Redshift クラスターに関連付けます。
データソースの
aws_iam_role
オプションをロールの ARN に設定します。
Spark の S3 認証情報を Redshift に転送する:
forward_spark_s3_credentials
オプションがtrue
に設定されている場合、データソースは Spark が S3 への接続に使用している認証情報を自動的に検出し、それらの認証情報を JDBC 経由で Redshift に転送します。Spark がインスタンスプロファイルを使用して S3 に対して認証を行っている場合、一時的な STS 認証情報のセットが Redshift に転送されます。それ以外の場合は、AWS キーが転送されます。 JDBC クエリーにはこれらの資格情報が埋め込まれるため、Databricks では、この認証方法を使用する場合は、JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。セキュリティトークンサービス (STS) 認証情報を使用する:
temporary_aws_access_key_id
、temporary_aws_secret_access_key
、およびtemporary_aws_session_token
設定プロパティを設定して、AWS セキュリティトークンサービスを介して作成された一時キーを指すことができます。 JDBCクエリーにはこれらの認証情報が埋め込まれているため、この認証方法を使用する場合は、JDBC接続のSSL暗号化を有効にする ことを強くお勧めします 。 このオプションを選択する場合は、読み取り/書き込み操作が成功する前に資格情報が期限切れになるリスクに注意してください。
これら 3 つのオプションは相互に排他的であり、どちらを使用するかを明示的に選択する必要があります。
暗号化
JDBC の保護: JDBC URL に SSL 関連の設定が存在しない限り、データソースはデフォルトで SSL 暗号化を有効にし、Redshift サーバーが信頼できる (つまり、
sslmode=verify-full
) であることも検証します。 そのために、サーバー証明書は、最初に必要になったときにAmazonサーバーから自動的にダウンロードされます。 それが失敗した場合は、事前にバンドルされた証明書ファイルがフォールバックとして使用されます。 これは、RedshiftとPostgreSQLの両方のJDBCドライバーに当てはまります。この機能に問題がある場合、または単にSSLを無効にしたい場合は、
DataFrameReader
またはDataFrameWriter
で.option("autoenablessl", "false")
を呼び出すことができます。カスタムSSL関連設定を指定する場合は、Redshiftのドキュメントの指示に従うことができます。 Java および JDBC ドライバ構成オプションでの SSL 証明書とサーバー証明書の使用 JDBC に存在する SSL 関連のオプション
url
データソースで使用されます (つまり、自動構成はトリガーされません)。S3に保存されているUNLOADデータ(Redshiftからの読み取り時に保存されるデータ)の暗号化:S3へのデータのアンロードに関するRedshiftのドキュメントによると、「UNLOADはAmazon S3サーバー側の暗号化(SSE-S3)を使用してデータファイルを自動的に暗号化します」。
Redshiftはカスタムキーを使用したクライアント側の暗号化もサポートしていますが( 「暗号化されたデータファイルのアンロード」を参照)、データソースには必要な対称キーを指定する機能がありません。
S3に保存されたCOPYデータ(Redshiftへの書き込み時に保存されるデータ)の暗号化: Amazon S3からの暗号化されたデータファイルのロードに関するRedshiftのドキュメントによると、
COPY
コマンドを使用して、AWS が管理する暗号化キー (SSE-S3 または SSE-KMS) によるサーバー側の暗号化、クライアント側の暗号化、またはその両方を使用して Amazon S3 にアップロードされたデータファイルをロードできます。COPY は、顧客指定のキー (SSE-C) を使用した Amazon S3 サーバー側の暗号化をサポートしていません。
この機能を使用するには、Amazon S3 暗号化を使用するように Hadoop S3 ファイルシステムを設定します。 これにより、書き込まれたすべてのファイルのリストを含む MANIFEST
ファイルは暗号化されません。
パラメーター
Spark SQLで提供されるパラメーター・マップまたはOPTIONSは、次の設定をサポートしています。
パラメーター |
*必須 |
デフォルト |
説明 |
---|---|---|---|
dbtable |
はい (クエリーが指定されていない場合)。 |
なし |
Redshift で作成または読み取りを行うテーブル。 このパラメーターは、データを Redshift に保存し直すときに必要です。 |
query |
はい (dbtable が指定されていない場合)。 |
なし |
Redshiftで読み取るクエリー。 |
user |
なし |
なし |
Redshift ユーザー名。 パスワードオプションと組み合わせて使用する必要があります。 ユーザーとパスワードが URL で渡されていない場合にのみ使用でき、両方を渡すとエラーになります。 このパラメーターは、ユーザー名にエスケープする必要がある特殊文字が含まれている場合に使用します。 |
password |
なし |
なし |
Redshift のパスワード。 |
url |
あり |
なし |
JDBC URL (次の形式) jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>
|
search_path |
なし |
なし |
Redshiftでスキーマ検索パスを設定します。 |
aws_iam_role |
IAMロールを使用して承認する場合のみ。 |
なし |
Redshift クラスターにアタッチされた IAM Redshift COPY/UNLOAD オペレーションロール の完全指定の ARN (例: |
forward_spark_s3_credentials |
なし |
|
|
temporary_aws_access_key_id |
なし |
なし |
AWS アクセスキーには、S3 バケットへの書き込み権限が必要です。 |
temporary_aws_secret_access_key |
なし |
なし |
提供されたアクセスキーに対応するAWSシークレットアクセスキー。 |
temporary_aws_session_token |
なし |
なし |
提供されたアクセスキーに対応するAWSセッショントークン。 |
tempdir |
あり |
なし |
Amazon S3 の書き込み可能な場所で、読み取り時にアンロードされたデータと、書き込み時に Redshift にロードされる Avro データに使用されます。 通常の ETL パイプラインの一部として Spark の Redshift データソースを使用している場合は、バケットに ライフサイクルポリシー を設定し、それをこのデータの一時的な場所として使用すると便利です。 Unity Catalogで定義された外部ロケーションを |
jdbcdriver |
なし |
JDBC URL のサブプロトコルによって決定されます。 |
使用するJDBCドライバのクラス名。 このクラスはクラスパス上に存在する必要があります。 ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。 |
diststyle |
なし |
|
テーブルの作成時に使用するRedshift 分散スタイル 。 |
distkey |
いいえ、 |
なし |
表の作成時に分散キーとして使用する表の列の名前。 |
sortkeyspec |
なし |
なし |
完全な Redshiftソートキー 定義。 たとえば、次のようになります。
|
usestagingtable (非推奨) |
なし |
|
この非推奨のオプションを
|
description |
なし |
なし |
テーブルの説明。 SQL COMMENTコマンドを使用して設定され、ほとんどのクエリーツールに表示されます。 個々の列に説明を設定するには、 |
preactions |
なし |
なし |
コマンドをロードする前に実行される SQL コマンドの これらのコマンドが失敗すると、エラーとして扱われ、例外がスローされることに注意してください。 ステージング テーブルを使用している場合、事前アクションが失敗すると、変更が元に戻され、バックアップ テーブルが復元されます。 |
postactions |
なし |
なし |
データのロード時に これらのコマンドが失敗すると、エラーとして扱われ、例外がスローされることに注意してください。 ステージング テーブルを使用している場合、ポスト アクションが失敗すると、変更が元に戻され、バックアップ テーブルが復元されます。 |
extracopyoptions |
なし |
なし |
データをロードするときに Redshift これらのオプションは |
tempformat |
なし |
|
Redshift への書き込み時に S3 に一時ファイルを保存する形式。 デフォルトは Redshift は Avro ファイルの読み込み時よりも CSV の読み込み時に大幅に高速であるため、その tempformat を使用すると、Redshift への書き込み時にパフォーマンスが大幅に向上する可能性があります。 |
csvnullstring |
なし |
|
CSV tempformat を使用するときに null に書き込む文字列値。 これは、実際のデータには表示されない値にする必要があります。 |
csvseparator |
なし |
|
tempformat を |
csvignoreleadingwhitespace |
なし |
|
true に設定すると、 |
csvignoretrailingwhitespace |
なし |
|
true に設定すると、 |
infer_timestamp_ntz_type |
なし |
|
|
その他の構成オプション
文字列 列の最大サイズの構成
Redshift テーブルを作成する場合、デフォルトの動作では、文字列の列に TEXT
列が作成されます。 Redshift は TEXT
列を VARCHAR(256)
として保存するため、これらの列の最大サイズは 256 文字です (ソース)。
より大きな列をサポートするには、 maxlength
列メタデータ フィールドを使用して、個々の文字列列の最大長を指定します。 これは、デフォルトよりも小さい最大長の列を宣言することで、スペースを節約するパフォーマンスの最適化を実装する場合にも役立ちます。
注:
Spark の制限により、SQL および R 言語 APIs では、列のメタデータの変更はサポートされていません。
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
以下は、SparkのScala APIを使用して複数の列のメタデータフィールドを更新する例です。
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
カスタム列タイプを設定する
列の種類を手動で設定する必要がある場合は、 redshift_type
列メタデータを使用できます。 たとえば、 Spark SQL Schema -> Redshift SQL
型マッチャーをオーバーライドしてユーザー定義の列型を割り当てる場合は、次の操作を実行できます。
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
列のエンコードを構成する
テーブルを作成するときは、 encoding
列メタデータフィールドを使用して、各列の圧縮エンコードを指定します (使用可能なエンコードについては、 Amazon ドキュメント を参照してください)。
列の説明の設定
Redshiftでは、ほとんどのクエリーツール( COMMENT
コマンドを使用)に表示される説明を列に添付できます。 description
列メタデータ フィールドを設定して、個々の列の説明を指定できます。
Redshiftへのクエリープッシュダウン
Sparkオプティマイザーは、次の演算子をRedshiftにプッシュダウンします。
Filter
Project
Sort
Limit
Aggregation
Join
Project
と Filter
内では、次の式がサポートされています。
ほとんどの Boolean 論理演算子
比較
基本的な算術演算
数値キャストと文字列キャスト
ほとんどの文字列関数
スカラーサブクエリ (Redshift に完全にプッシュダウンできる場合)。
注:
このプッシュダウンでは、日付とタイムスタンプを操作する式はサポートされていません。
Aggregation
内では、次の集計関数がサポートされています。
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
該当する場合は、 DISTINCT
句と組み合わせます。
Join
内では、次のタイプの結合がサポートされています。
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
オプティマイザによって
Join
に書き換えられるサブクエリ。WHERE EXISTS
、WHERE NOT EXISTS
注:
結合プッシュダウンは FULL OUTER JOIN
をサポートしていません。
プッシュダウンは、 LIMIT
を使用したクエリーで最も有益かもしれません。 SELECT * FROM large_redshift_table LIMIT 10
などのクエリーは、まずテーブル全体が中間の結果として S3 にアンロードされるため、非常に時間がかかる可能性があります。プッシュダウンでは、 LIMIT
はRedshiftで実行されます。 集計を使用したクエリーでは、集計をRedshiftにプッシュダウンすることで、転送する必要のあるデータの量を減らすこともできます。
クエリー Redshift へのプッシュダウンはデフォルトで有効になっています。 spark.databricks.redshift.pushdown
をfalse
に設定することで無効にできます。無効になっている場合でも、Sparkはフィルターをプッシュダウンし、列の削除をRedshiftに実行します。
Redshiftドライバーのインストール
Redshift データソースには、Redshift 互換の JDBC ドライバーも必要です。 Redshift は PostgreSQL データベース システムに基づいているため、Databricks Runtime に含まれる PostgreSQL JDBC ドライバーまたは Amazon 推奨の Redshift JDBC ドライバーを使用できます。 PostgreSQL JDBCドライバを使用するためにインストールする必要はありません。 各 Databricks Runtime リリースに含まれる PostgreSQL JDBC ドライバーのバージョンは、Databricks Runtime リリースノートに記載されています。
Redshift JDBCドライバーを手動でインストールするには:
Amazonからドライバーをダウンロードします。
ドライバーを Databricks ワークスペースにアップロードします。 「ライブラリ」を参照してください。
クラスターにライブラリをインストールします。
注:
Databricks では、最新バージョンの Redshift JDBC ドライバーを使用することをお勧めします。 Redshift JDBC ドライバーのバージョンが 1.2.41 より前のバージョンには、次の制限があります。
バージョン 1.2.16 のドライバは、SQL クエリーで
where
句を使用すると空のデータを返します。1.2.41 より前のバージョンのドライバーでは、列の null 値の許容が "Unknown" ではなく "Not Nullable" と誤って報告されるため、無効な結果が返される場合があります。
トランザクションの保証
このセクションでは、Spark の Redshift データソースのトランザクション保証について説明します。
RedshiftとS3のプロパティに関する一般的な背景
Redshift トランザクション保証の一般的な情報については、Redshift ドキュメントの「 並列書き込み操作の管理 」の章を参照してください。 一言で言えば、Redshiftは、Redshift BEGIN コマンドのドキュメントに従って 、シリアル化可能な分離 を提供します。
4 つのトランザクション分離レベルのいずれかを使用できますが、Amazon Redshift はすべての分離レベルをシリアル化可能として処理します。
Redshiftのドキュメントによると:
Amazon Redshift は、個別に実行される各 SQL コマンドが個別にコミットするデフォルトの 自動コミット 動作をサポートしています。
したがって、 COPY
や UNLOAD
などの個々のコマンドは原子的でトランザクション的ですが、明示的な BEGIN
や END
は、複数のコマンドやクエリーの原子性を強制するためにのみ必要です。
Redshift からの読み取りと Redshift への書き込みの場合、データソースは S3 でデータの読み取りと書き込みを行います。 Spark と Redshift はどちらも、パーティション分割された出力を生成し、S3 の複数のファイルに保存します。 Amazon S3 データ整合性モデルのドキュメントによると、 S3 バケットのリスティングオペレーションは結果整合性があるため、この結果整合性のソースによるデータの欠落や不完全さを避けるために、ファイルを特別な長さにする必要があります。
Sparkの Redshift データソースの保証
既存のテーブルに追加する
Redshift に行を挿入する場合、データソースは COPY コマンドを使用し、特定の結果整合性のある S3 オペレーションから保護するための マニフェスト を指定します。 その結果、既存のテーブルへの spark-redshift
追加は、通常の Redshift COPY
コマンドと同じアトミックおよびトランザクションプロパティを持ちます。
新しいテーブルを作成する (SaveMode.CreateIfNotExists
)
新しいテーブルの作成は 2 段階のプロセスで、 CREATE TABLE
コマンドとそれに続く COPY コマンドで行の初期セットを追加します。 どちらの操作も同じトランザクションで実行されます。
一般的な問題と解決策
S3 バケットと Redshift クラスターが異なる AWS リージョンにある
デフォルトでは、S3 バケットと Redshift クラスターが異なる AWS リージョンにある場合、S3 <-> Redshift コピーは機能しません。
S3 バケットが別のリージョンにあるときに Redshift テーブルを読み取ろうとすると、次のようなエラーが表示されることがあります。
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
同様に、別のリージョンの S3 バケットを使用して Redshift に書き込もうとすると、次のエラーが発生する可能性があります。
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
書き込みます:Redshift の COPY コマンドは S3 バケットリージョンの明示的な指定をサポートしているため、
extracopyoptions
設定にregion 'the-region-name'
を追加することで、このような場合に Redshift への書き込みを適切に機能させることができます。たとえば、米国東部 (バージニア) リージョンのバケットと Scala API では、次を使用します。.option("extracopyoptions", "region 'us-east-1'")
または、
awsregion
設定を使用することもできます。.option("awsregion", "us-east-1")
読み取り: Redshift UNLOAD コマンドは、S3 バケットリージョンの明示的な指定もサポートしています。 読み取りを正しく機能させるには、
awsregion
設定に領域を追加します。.option("awsregion", "us-east-1")
インスタンスプロファイルを使用してS3への認証を行うと、予期しないS3ServiceException認証情報エラーが発生する
インスタンスプロファイルを使用して S3 への認証を行い、予期しないS3ServiceException
エラーが発生した場合は、AWS アクセスキーが tempdir
S3 URI、Hadoop 設定、または DefaultAWSCredentialsProviderChain によってチェックされたソースのいずれかで指定されているかどうかを確認します。
以下は、キーが誤ってインスタンスプロファイルよりも優先される症状である可能性のあるエラーメッセージの例です。
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
JDBC URLに特殊文字を含むパスワードを使用すると、認証エラーが発生する
JDBC URL の一部としてユーザー名とパスワードを指定し、パスワードに ;
、 ?
、 &
などの特殊文字が含まれている場合は、次の例外が表示されることがあります。
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
これは、ユーザー名またはパスワードの特殊文字がJDBCドライバによって正しくエスケープされていないことが原因です。 対応する DataFrame オプション user
と password
を使用して、ユーザー名とパスワードを指定してください。 詳細については、「 パラメーター」を参照してください。
実行時間の長いSparkクエリーは、対応するRedshift操作が完了しても無期限にハングする
Redshift との間で大量のデータを読み書きしている場合、AWS Redshift モニタリングページに、対応する LOAD
または UNLOAD
操作が完了し、クラスターがアイドル状態であることが示されていても、Spark クエリーが無期限にハングすることがあります。 これは、Redshift と Spark の間の接続がタイムアウトしたことが原因です。 これを回避するには、 tcpKeepAlive
JDBC フラグが有効になっていて、 TCPKeepAliveMinutes
が低い値 (1 など) に設定されていることを確認してください。
詳細については、「 Amazon Redshift JDBC ドライバーの設定」を参照してください。
移行ガイド
データソースでは、Spark S3 認証情報が Redshift に転送される前に、 forward_spark_s3_credentials
を明示的に設定する必要があります。 この変更は、 aws_iam_role
または temporary_aws_*
認証メカニズムを使用する場合には影響しません。 ただし、以前のデフォルトの動作に依存していた場合は、以前の Redshift から S3 への認証メカニズムを引き続き使用するには、 forward_spark_s3_credentials
を true
に明示的に設定する必要があります。 3 つの認証メカニズムとそのセキュリティのトレードオフについては、このドキュメントの「 S3 と Redshift への認証 」セクションを参照してください。