メインコンテンツまでスキップ

Databricks を使用した Amazon Redshift のクエリ

Databricks を使用して、Amazon Redshift からテーブルを読み書きできます。

備考

実験段階

この記事で説明する構成は 試験段階です。 試験的な機能は現状のまま提供され、 Databricks を通じて顧客のテクニカル サポートを通じてサポートされることはありません。 クエリ フェデレーションを完全にサポートするには、代わりに レイクハウスフェデレーションを使用して、 Databricks ユーザーが Unity Catalog 構文ツールとデータガバナンス ツールを利用できるようにする必要があります。

Databricks Redshift データソースは、Amazon S3 を使用して Redshift との間でデータを効率的に転送し、JDBC を使用して Redshift で適切な COPY コマンドと UNLOAD コマンドを自動的にトリガーします。

注記

Databricks Runtime 11.3 LTS 以降では、Databricks Runtime には Redshift JDBC ドライバーが含まれており、format オプションの redshift キーワードを使用してアクセスできます。 Databricks Runtimeリリースノートのバージョンと、各 に含まれるドライバーのバージョン の互換性 Databricks Runtimeを参照してください。ユーザー提供のドライバーは引き続きサポートされ、バンドルされた JDBC ドライバーよりも優先されます。

Databricks Runtime 10.4 LTS 以下では、Redshift JDBC ドライバーの手動インストールが必要であり、クエリでは形式にドライバー (com.databricks.spark.redshift) を使用する必要があります。 「Redshift ドライバーのインストール」を参照してください。

使い

次の例は、Redshift ドライバーとの接続を示しています。 PostgreSQL JDBC ドライバーを使用している場合は、 url パラメーター値を置き換えます。

AWS 認証情報を設定したら、Python、SQL、R、または Scala の Spark データソース API でデータソースを使用できます。

important

Unity Catalogで定義された外部ロケーションは、tempdirロケーションとしてサポートされていません。

Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)

# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)

# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)

Redshiftの使用に関する推奨事項

クエリを実行すると、大量のデータが S3 に抽出される場合があります。 Redshift で同じデータに対して複数のクエリを実行する予定の場合、Databricks では、抽出したデータを Delta Lake を使用して保存することをお勧めします。

注記

マネージド 内に クラスターを作成すると、 のセキュリティRedshiftDatabricks VPCモデルによるアクセス許可の問題が発生する可能性があるため、作成しないでください。DatabricksVPC独自の VPC を作成してから、 VPC ピアリング を実行して Databricks を Redshift インスタンスに接続する必要があります。

構成

S3 と Redshift への認証

データソースには、次の図に示すように、いくつかのネットワーク接続が含まれます。

                            ┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)

データソースは、Redshift との間でデータを転送するときに、S3 に対してデータの読み取りと書き込みを行います。 AWSその結果、S3 バケット (tempdir 構成パラメーターを使用して指定) への読み取りおよび書き込みアクセス権を持つ 資格情報が必要になります。

注記

データソースは、S3 で作成した一時ファイルをクリーンアップしません。 そのため、 オブジェクトライフサイクル設定 を持つ専用の一時 S3 バケットを使用して、指定した有効期限後に一時ファイルが自動的に削除されるようにすることをお勧めします。 これらのファイルを暗号化する方法については、このドキュメントの 「暗号化 」セクションを参照してください。 Unity Catalogで定義された外部ロケーションtempdirロケーションとして使用することはできません。

次のセクションでは、各接続の認証設定オプションについて説明します。

Spark ドライバーから Redshift へ

Spark ドライバーは、ユーザー名とパスワードを使用して JDBC 経由で Redshift に接続します。 Redshift では、この接続を認証するための IAMロールの使用はサポートされていません。 デフォルトでは、この接続は SSL 暗号化を使用します。詳細については、「 暗号化」を参照してください。

Spark から S3 へ

S3は、Redshiftからの読み取りまたはRedshiftへの書き込み時に大量のデータを保存するための仲介役として機能します。 Spark は、Hadoop FileSystem インターフェイスと Amazon Java SDK の S3 クライアントの両方を使用して S3 に接続します。

注記

DBFS マウントを使用して Redshift の S3 へのアクセスを設定することはできません。

  • デフォルトの資格情報プロバイダーチェーン(ほとんどのユーザーに最適なオプション): AWS 認証情報は、 DefaultAWSCredentialsProviderChain を通じて自動的に取得されます。 インスタンスプロファイルを使用して S3 への認証を行う場合は、おそらくこの方法を使用する必要があります。

    次の資格情報の提供方法は、このデフォルトよりも優先されます。

  • IAMロールを引き受ける: インスタンスプロファイルが引き受けることができるIAMロールを使用できます。ロール ARNを指定するには、 インスタンスプロファイルをクラスターにアタッチし、次の設定キーを指定する必要があります。

Scala
sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole")
sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
// An optional duration, expressed as a quantity and a unit of
// time, such as "15m" or "1h"
sc.hadoopConfiguration.set("fs.s3a.assumed.role.session.duration", <duration>)
  • Hadoop conf でキーを設定します。 AWS キーは、 Hadoop 設定プロパティを使用して指定できます。 tempdir構成がs3a://ファイルシステムを指している場合は、Hadoop XML構成ファイルでfs.s3a.access.keyプロパティとfs.s3a.secret.keyプロパティを設定するか、sc.hadoopConfiguration.set()を呼び出してSparkのグローバルHadoop構成を構成できます。s3n://ファイルシステムを使用する場合は、次の例に示すように、従来の設定キーを指定できます。

For example, if you are using the s3a filesystem, add:

Scala
sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")

For the legacy s3n filesystem, add:

Scala
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")

Redshift から S3 へ

Redshift は、 COPY および UNLOAD クエリ中にも S3 に接続します。 この接続を認証するには、次の 3 つの方法があります。

  • RedshiftにIAMロールを引き受けてもらう(最も安全): COPYまたはUNLOAD操作中にRedshiftにIAMロールを引き受ける権限を付与し、そのロールを使用するようにRedshiftに指示するようにデータソースを設定できます。

    1. IAMバケットに適切なS3 アクセス許可を付与する ロールを作成します。
    2. Amazon Redshift がお客様に代わって他の AWS サービスにアクセスすることを許可する 」のガイドに従って、Redshift がこのロールを引き受けることを許可するために、このロールの信頼ポリシーを設定します。
    3. IAMロールを使用した COPY および UNLOAD 操作の承認」ガイドのステップに従って、その IAMロールを Redshift クラスターに関連付けます。
    4. データソースの aws_iam_role オプションをロールの ARNに設定します。
  • SparkS3Redshiftforward_spark_s3_credentials true資格情報を に転送する: オプションが に設定されている場合、データソースは が への接続に使用している資格情報を自動的に検出し、それらの資格情報をSpark S3RedshiftoverJDBC に転送します。Spark がインスタンスプロファイルを使用して S3 に対して認証を行っている場合、一時的な STS 認証情報のセットが Redshift に転送されます。それ以外の場合、AWS キーは転送されます。 JDBC クエリにはこれらの資格情報が埋め込まれているため、Databricks では、この認証方法を使用するときに JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。

  • セキュリティ トークン サービス (STS) 資格情報を使用する : temporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_token の構成プロパティを、 AWS セキュリティ トークン サービスを介して作成された一時キーを指すように構成できます。 JDBC クエリにはこれらの資格情報が埋め込まれているため、この認証方法を使用する場合は、JDBC 接続の SSL 暗号化を有効にする ことを強くお勧めします 。 このオプションを選択する場合は、読み取り/書き込み操作が成功する前に資格情報の有効期限が切れるリスクに注意してください。

これら 3 つのオプションは相互に排他的であり、どちらを使用するかを明示的に選択する必要があります。

暗号化

  • JDBCの保護 : JDBC URLにSSL関連の設定がない限り、データソースはデフォルトでSSL暗号化を有効にし、Redshiftサーバーが信頼できる(つまり、 sslmode=verify-full)ことも確認します。 そのため、サーバー証明書は、初めて必要になったときにAmazonサーバーから自動的にダウンロードされます。 これが失敗した場合は、事前にバンドルされた証明書ファイルがフォールバックとして使用されます。 これは、Redshift と PostgreSQL JDBC ドライバーの両方に当てはまります。

    この機能に問題がある場合、または単にSSLを無効にしたい場合は、DataFrameReaderまたはDataFrameWriter.option("autoenablessl", "false")を呼び出すことができます。

    カスタムSSL関連設定を指定する場合は、Redshiftのドキュメント「 JavaでのSSLおよびサーバー証明書の使用」の指示に従うことができます JDBC ドライバ設定オプション データソースで使用される JDBC url に存在する SSL 関連のオプションが優先されます (つまり、自動設定はトリガされません)。

  • S3 に保存された UNLOAD データ (Redshift からの読み取り時に保存されたデータ) の暗号化 : Redshift の S3 へのデータのアンロードに関するドキュメントによると、「UNLOAD は Amazon S3 サーバー側暗号化 (SSE-S3) を使用してデータ ファイルを自動的に暗号化します」。

    Redshiftはカスタムキーを使用したクライアント側の暗号化もサポートしていますが( 「暗号化されたデータファイルのアンロード」を参照)、データソースには必要な対称キーを指定する機能がありません。

  • S3に保存されたCOPYデータの暗号化(Redshiftへの書き込み時に保存されたデータ): Amazon S3からの暗号化されたデータファイルのロードに関するRedshiftのドキュメントによると、次のようになっています。

COPY コマンドを使用すると、AWS が管理する暗号化キー (SSE-S3 または SSE-KMS) によるサーバー側の暗号化、クライアント側の暗号化、またはその両方を使用して Amazon S3 にアップロードされたデータファイルをロードできます。COPY は Amazon S3 顧客提供のキー (SSE-C) を使用したサーバー側の暗号化をサポートしていません。

この機能を使用するには、 Amazon S3 暗号化を使用するように Hadoop S3 ファイルシステムを設定します。 これにより、書き込まれたすべてのファイルのリストを含む MANIFEST ファイルは暗号化されません。

パラメーター

Spark SQL で提供されるパラメーター マップまたは OPTIONS は、次の設定をサポートします。

パラメーター

必須

デフォルト

説明

dbtable

はい (クエリが指定されていない場合)。

なし

Redshift で作成または読み取りを行うテーブル。 このパラメーターは、データを Redshiftに戻すときに必要です。

クエリー

はい。ただし、dbtable が指定されていない場合は、はい。

なし

Redshift で読み取るクエリ。

user

いいえ

なし

Redshift のユーザー名。 パスワードオプションと組み合わせて使用する必要があります。 URLでユーザーとパスワードが渡されていない場合にのみ使用でき、両方を渡すとエラーになります。 このパラメーターは、ユーザー名にエスケープが必要な特殊文字が含まれている場合に使用します。

password

いいえ

なし

Redshift のパスワード。 userオプションと組み合わせて使用する必要があります。ユーザーとパスワードが URL で渡されていない場合にのみ使用できます。両方を渡すとエラーになります。 このパラメーターは、エスケープが必要な特殊文字がパスワードに含まれている場合に使用します。

url

あり

なし

JDBC URL (形式)

jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol postgresql または redshiftは、ロードした JDBC ドライバーによって異なります。1 つの Redshift 互換ドライバーがクラスパス上にあり、この URL と一致する必要があります。 hostport はRedshiftマスターノードを指す必要があるため、セキュリティグループやVPCは、ドライバーアプリケーションからのアクセスを許可するように設定する必要があります。 databaseはRedshiftデータベース名を識別し、user``passwordデータベースにアクセスするための認証情報であり、JDBCのこのURLに埋め込む必要があり、ユーザーアカウントは参照されるテーブルに必要な権限を持っている必要があります。

search_path

いいえ

なし

Redshiftでスキーマ検索パスを設定します。 SET search_path toコマンドを使用して設定します。テーブルを検索するスキーマ名のコンマ区切りリストである必要があります。 search_pathのRedshiftドキュメントを参照してください。

aws_iam_role

IAMロールを使用して承認する場合のみ。

なし

ARNIAMRedshiftCOPY/UNLOAD 操作 の完全に指定されたRedshift クラスター アタッチされたロール (例:)。arn:aws:iam::123456789000:role/<redshift-iam-role>

forward_spark_s3_credentials

いいえ

false

trueの場合、データソースは、Spark が S3 への接続に使用している認証情報を自動的に検出し、それらの認証情報を JDBC 経由で Redshift に転送します。これらの認証情報は JDBC クエリの一部として送信されるため、このオプションを使用する場合は JDBC 接続の SSL 暗号化を有効にすることを強くお勧めします。

temporary_aws_access_key_id

いいえ

なし

AWS アクセスキーには、S3 バケットへの書き込み権限が必要です。

temporary_aws_secret_access_key

いいえ

なし

提供されたアクセスキーに対応するAWSシークレットアクセスキー。

temporary_aws_session_token

いいえ

なし

指定されたアクセスキーに対応するAWSセッショントークン。

テンプディレクトリ

あり

なし

Amazon S3 の書き込み可能な場所で、読み取り時にアンロードされたデータと、書き込み時に Redshift にロードされる Avro データに使用されます。 Spark の Redshift データソースを通常の ETL パイプラインの一部として使用している場合は、バケットに ライフサイクルポリシー を設定し、それをこのデータの一時的な場所として使用すると便利です。

Unity Catalogで定義された外部ロケーションtempdirロケーションとして使用することはできません。

jdbcdriver

いいえ

JDBC URL のサブプロトコルによって決定されます。

使用する JDBC ドライバーのクラス名。 このクラスはクラスパス上に存在する必要があります。 ほとんどの場合、適切なドライバ・クラス名はJDBC URLのサブプロトコルによって自動的に決定されるため、このオプションを指定する必要はありません。

diststyle

いいえ

EVEN

テーブルの作成時に使用するRedshift Distribution StyleEVENKEY 、または ALL のいずれかになります (Redshift のドキュメントを参照)。KEYを使用する場合は、distkey オプションを使用して分散キーも設定する必要があります。

distkey

いいえ、使用しない限り DISTSTYLE KEY

なし

テーブルの作成時に分散キーとして使用するテーブル内のカラムの名前。

sortkeyspec

いいえ

なし

完全なRedshift ソートキー の定義。 たとえば、次のようなものがあります。

  • SORTKEY(my_sort_column)
  • COMPOUND SORTKEY(sort_col_1, sort_col_2)
  • INTERLEAVED SORTKEY(sort_col_1, sort_col_2)

usestagingtable (非推奨)

いいえ

true

この非推奨のオプションを false に設定すると、上書き操作の宛先テーブルは書き込みの開始時にすぐに削除されるため、上書き操作が非アトミックになり、宛先テーブルの可用性が低下します。 これにより、上書きに必要な一時ディスク容量が削減される可能性があります。

usestagingtable=false操作を設定すると、データが失われたり使用できなくなるリスクがあるため、宛先テーブルを手動で削除する必要があるため、非推奨とされています。

description

いいえ

なし

テーブルの説明。 SQL COMMENT コマンドを使用して設定され、ほとんどのクエリツールに表示されます。 個々の列に説明を設定するには、 description メタデータも参照してください。

preactions

いいえ

なし

コマンドを読み込む前に実行する SQL コマンドの ; 区切りの一覧COPY。 新しいデータをロードする前に、ここでいくつかの DELETE コマンドなどを実行すると便利な場合があります。 コマンドに %sが含まれている場合、テーブル名は実行前に で書式設定されます (ステージング テーブルを使用している場合)。

これらのコマンドが失敗すると、エラーとして扱われ、例外がスローされることに注意してください。 ステージング・テーブルを使用している場合、事前アクションが失敗した場合は、変更が元に戻され、バックアップ・テーブルがリストアされます。

postactions

いいえ

なし

データの読み込み時にCOPYが成功した後に実行される SQL コマンドの ; 区切りのリスト。新しいデータをロードするときに、ここでいくつかの GRANT コマンドまたは同様のコマンドを実行すると便利な場合があります。 コマンドに %sが含まれている場合、テーブル名は実行前に で書式設定されます (ステージング テーブルを使用している場合)。

これらのコマンドが失敗すると、エラーとして扱われ、例外がスローされることに注意してください。 ステージング・テーブルを使用している場合、変更は元に戻され、ポスト・アクションが失敗した場合はバックアップ・テーブルがリストアされます。

extracopyoptions

いいえ

なし

データをロードするときにRedshift COPY コマンドに追加する追加オプションのリスト ( TRUNCATECOLUMNSMAXERROR n など) (その他のオプションについては、 Redshift のドキュメント を参照してください)。

これらのオプションは COPY コマンドの末尾に追加されるため、コマンドの末尾で意味のあるオプションのみを使用できますが、考えられるほとんどのユースケースをカバーする必要があります。

tempformat

いいえ

AVRO

Redshift に書き込むときに S3 に一時ファイルを保存する形式。 デフォルトは AVROです。その他の使用できる値は、CSV と gzip 圧縮された CSV でそれぞれ CSVCSV GZIP です。

Redshift は、CSV を読み込むときの方が Avro ファイルを読み込むときよりも大幅に高速であるため、その tempformat を使用すると、Redshift に書き込むときにパフォーマンスが大幅に向上しる可能性があります。

csvnullstring

いいえ

@NULL@

CSV tempformat を使用する場合に null に書き込む文字列値。 これは、実際のデータには表示されない値にする必要があります。

csvseparator

いいえ

,

tempformat を CSV または CSV GZIPに設定して一時ファイルを書き込むときに使用する区切り記号。 これは、"," や "|" などの有効な ASCII 文字である必要があります。

csvignoreleadingwhitespace

いいえ

true

true に設定すると、 tempformatCSV または CSV GZIPに設定されている場合に、書き込み中に値から先頭の空白が削除されます。 それ以外の場合、空白は保持されます。

csvignoretrailingwhitespace

いいえ

true

true に設定すると、 tempformatCSV または CSV GZIPに設定されている場合に、書き込み中に値から末尾の空白が削除されます。 それ以外の場合、空白は保持されます。

infer_timestamp_ntz_type

いいえ

false

trueの場合、Redshift TIMESTAMP 型の値は、読み取り中に (タイムゾーンなしのタイムスタンプ) TimestampNTZType として解釈されます。それ以外の場合、すべてのタイムスタンプは、基になる Redshift テーブルのタイプに関係なく、 TimestampType として解釈されます。

追加の構成オプション

文字列列の最大サイズの構成

Redshift テーブルを作成するとき、デフォルトの動作では、文字列列の TEXT 列が作成されます。 Redshift は TEXT 列を VARCHAR(256)として保存するため、これらの列の最大サイズは 256 文字です (ソース)。

より大きな列をサポートするには、 maxlength 列メタデータ フィールドを使用して、個々の文字列列の最大長を指定できます。 これは、デフォルトよりも小さい最大長の列を宣言することで、スペースを節約するパフォーマンスの最適化を実装する場合にも役立ちます。

注記

Sparkの制限により、SQL および R 言語APIsでは、列メタデータの変更はサポートされていません。

Python
df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()

カスタム列タイプを設定する

列の種類を手動で設定する必要がある場合は、 redshift_type 列メタデータを使用できます。 たとえば、 Spark SQL Schema -> Redshift SQL 型マッチャーをオーバーライドしてユーザー定義の列型を割り当てる場合は、次の操作を行います。

Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

列のエンコードを構成する

テーブルを作成するときは、 encoding 列メタデータフィールドを使用して、各列の圧縮エンコードを指定します (使用可能なエンコードについては 、Amazon ドキュメント を参照してください)。

列の説明の設定

Redshift では、ほとんどのクエリツール ( COMMENT コマンドを使用) に表示される説明を列に添付できます。 description列メタデータフィールドを設定して、次の説明を指定できます 個々の列。

Redshift へのクエリのプッシュダウン

Spark オプティマイザーは、次の演算子を Redshift にプッシュダウンします。

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

ProjectFilterでは、次の式がサポートされています。

  • ほとんどの Boolean 論理演算子
  • 比較
  • 基本的な算術演算
  • 数値キャストと文字列キャスト
  • ほとんどの文字列関数
  • スカラーサブクエリ (Redshift に完全にプッシュダウンできる場合)。
注記

このプッシュダウンは、日付とタイムスタンプを操作する式をサポートしていません。

Aggregation内では、次の集計関数がサポートされています。

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

DISTINCT条項と組み合わせます(該当する場合)。

Join内では、次のタイプの結合がサポートされています。

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • オプティマイザによって Join に書き換えられるサブクエリ (例: WHERE EXISTSWHERE NOT EXISTS
注記

ジョインプッシュダウンは FULL OUTER JOINをサポートしていません。

プッシュダウンは、 LIMIT. SELECT * FROM large_redshift_table LIMIT 10 などのクエリは、中間の結果としてテーブル全体が最初に S3 に UNLOAD されるため、非常に時間がかかる場合があります。プッシュダウンでは、 LIMIT はRedshiftで実行されます。 集計を含むクエリでは、集計を Redshift にプッシュダウンすると、転送する必要のあるデータ量を減らすのにも役立ちます。

Redshift へのクエリ プッシュダウンは、デフォルトで有効になっています。 spark.databricks.redshift.pushdownfalseに設定することで無効にできます。無効にしても、Spark はフィルターをプッシュダウンし、列の削除を Redshift に実行します。

Redshift ドライバーのインストール

Redshift データソースには、Redshift と互換性のある JDBC ドライバーも必要です。 Redshift は PostgreSQL データベース システムに基づいているため、Databricks Runtime に含まれる PostgreSQL JDBC ドライバーまたは Amazon が推奨する Redshift JDBC ドライバーを使用できます。 PostgreSQL JDBC ドライバを使用するためにインストールは必要ありません。 PostgreSQLJDBCDatabricks Runtime各Databricks Runtimeリリースに含まれる ドライバーのバージョンは、 リリースノート に記載されています。

Redshift JDBC ドライバーを手動でインストールするには:

  1. Amazonからドライバーをダウンロードします
  2. ドライバーを Databricks ワークスペースにアップロードします。 「ライブラリ」を参照してください。
  3. ライブラリをクラスターにインストールします。
注記

Databricks では、最新バージョンの Redshift JDBC ドライバーを使用することをお勧めします。 Redshift JDBC ドライバーのバージョン 1.2.41 未満には、次の制限があります。

  • バージョン 1.2.16 のドライバーでは、SQL クエリで where 句を使用すると、空のデータが返されます。
  • 1.2.41 より前のバージョンのドライバーでは、列の null 値の許容が "不明" ではなく "null 許容されない" と誤って報告されるため、無効な結果が返される場合があります。

トランザクションの保証

このセクションでは、Spark の Redshift データソースのトランザクション保証について説明します。

RedshiftおよびS3プロパティの一般的な背景

Redshiftトランザクション保証に関する一般的な情報については、並列書き込み操作の管理を参照してください Redshiftドキュメントの章を参照してください。一言で言えば、RedshiftはRedshiftBEGIN コマンドのドキュメントに従って シリアル化可能な分離 を提供します。

4 つのトランザクション分離レベルのいずれかを使用できますが、Amazon Redshift はすべての分離レベルをシリアル化可能として処理します。

Redshiftのドキュメントによると、

AmazonRedshiftは、個別に実行される各SQL コマンド が個別にコミットするデフォルト 自動コミット 動作をサポートしています。

したがって、 COPYUNLOAD などの個々のコマンドは原子的でトランザクション的ですが、明示的な BEGINEND は、複数のコマンドやクエリーの原子性を強制するためにのみ必要です。

Redshift からの読み取りと Redshift への書き込みを行う場合、データソースは S3 でデータを読み書きします。 Spark と Redshift はどちらもパーティション分割された出力を生成し、S3 の複数のファイルに保存します。 Amazon S3 Data Consistency Modelのドキュメントによると、バケットリスト操作S3結果整合性があるため、この結果整合性のソースによるデータの欠落や不完全なデータを避けるために、ファイルを特別な長さにする必要があります。

用Redshift データソースの保証Spark

既存のテーブルに追加する

Redshift に行を挿入するとき、データソースは COPY を使用します コマンドを実行し、 マニフェスト を指定して、特定の結果整合性のある S3 オペレーションから保護します。 その結果、既存のテーブルへの spark-redshift 追加は、通常のRedshift COPY コマンドと同じアトミックプロパティとトランザクションプロパティを持ちます。

新しいテーブルを作成する (SaveMode.CreateIfNotExists)

新しいテーブルの作成は 2 段階のプロセスで、 CREATE TABLE コマンドと、それに続く COPY コマンドによるローの初期セットの追加で構成されます。 両方の操作は、同じトランザクションで実行されます。

既存のテーブルを上書きする

デフォルトでは、データソースはトランザクションを使用して上書きを実行します。上書きは、宛先テーブルを削除し、新しい空のテーブルを作成し、それに行を追加することで実装されます。

非推奨の usestagingtable 設定が falseに設定されている場合、データソースは新しいテーブルに行を追加する前に DELETE TABLE コマンドをコミットし、上書き操作の原子性を犠牲にしますが、上書き中に Redshift が必要とするステージングスペースの量を減らします。

Redshift テーブルのクエリ

クエリは、Redshift UNLOAD コマンドを使用してクエリを実行し、その結果を S3 に保存し、 マニフェスト を使用して特定の結果整合性のある S3 オペレーションから保護します。 その結果、Spark の Redshift データソースからのクエリは、通常の Redshift クエリと同じ整合性プロパティを持つ必要があります。

一般的な問題とソリューション

S3 バケットと Redshift クラスターは、異なる AWS リージョンにあります

デフォルトでは、S3<->Redshift S3バケットとRedshift クラスターが異なる リージョンにある場合、AWS \ コピーは機能しません。

S3 バケットが別のリージョンにあるときに Redshift テーブルを読み取ろうとすると、次のようなエラーが表示されることがあります。

Console
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

同様に、別のリージョンのS3バケットを使用してRedshiftに書き込もうとすると、次のエラーが発生する可能性があります。

Console
error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • 書き込みます: Redshift COPY コマンドは S3 バケットリージョンの明示的な指定をサポートしているため、このような場合に Redshift への書き込みを適切に機能させるには、extracopyoptions 設定に Redshift を追加することでregion 'the-region-name'。たとえば、米国東部 (バージニア) リージョンのバケットと Scala API を使用する場合は、次を使用します。

    Scala
    .option("extracopyoptions", "region 'us-east-1'")

    または、 awsregion 設定を使用することもできます。

    Scala
    .option("awsregion", "us-east-1")
  • 読み取り: Redshift UNLOAD コマンドは、S3 バケットリージョンの明示的な指定もサポートしています。 読み取りを適切に機能させるには、 awsregion 設定に領域を追加します。

    Scala
    .option("awsregion", "us-east-1")

インスタンスプロファイルを使用して S3 に認証すると、予期しない S3ServiceException 認証情報エラーが発生します

インスタンスプロファイル S3S3ServiceExceptionAWSを使用してtempdirS3 Hadoopへの認証を行い、予期しない エラーが発生した場合は、 URI、 設定、または DefaultAWSCredentialsProviderChain によってチェックされるソースのいずれかで、アクセスキーが指定されているかどうかを確認します。

以下は、キーが誤ってインスタンスプロファイルよりも優先される兆候である可能性のあるエラーメッセージの例です。

Console
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;

JDBC URL に特殊文字を含むパスワードを使用すると認証エラーが発生する

JDBC URL の一部としてユーザ名とパスワードを指定し、パスワードに ;?&などの特殊文字が含まれている場合は、次の例外が表示されることがあります。

Console
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

これは、ユーザー名またはパスワードの特殊文字が JDBC ドライバーによって正しくエスケープされていないために発生します。 ユーザー名とパスワードは、対応する DataFrame オプション userpasswordを使用して指定してください。 詳細については、「 パラメーター」を参照してください。

長時間実行されている Spark クエリは、対応する Redshift 操作が完了しても無期限にハングします

との間で大量のデータを読み書きしている場合、Redshift SparkAWSRedshiftモニタリング ページには対応するLOAD またはUNLOAD 操作が完了し、クラスターがアイドル状態であることが示されていても、 クエリが無期限にハングする可能性があります。これは、Redshift と Spark の間の接続がタイムアウトになることが原因です。 これを回避するには、 tcpKeepAlive JDBC フラグが有効になっていて、 TCPKeepAliveMinutes が低い値 (1 など) に設定されていることを確認します。

詳細については、「 Amazon Redshift JDBC ドライバーの構成」を参照してください。

タイムゾーンセマンティクスを使用したタイムスタンプ

データを読み取るとき、Redshift TIMESTAMPTIMESTAMPTZ の両方のデータ型が Spark TimestampTypeにマッピングされ、値は協定世界時 (UTC) に変換され、UTC タイムスタンプとして保存されます。 Redshift TIMESTAMPの場合、値にはタイムゾーン情報がないため、ローカルタイムゾーンが想定されます。 Redshift テーブルにデータを書き込む場合、Spark TimestampType は Redshift TIMESTAMP データ型にマッピングされます。

移行ガイド

データソースでは、Spark S3 認証情報が Redshift に転送される前に、 forward_spark_s3_credentials を明示的に設定するようになりました。 この変更は、 aws_iam_role 認証メカニズムまたは temporary_aws_* 認証メカニズムを使用する場合に影響はありません。 ただし、以前のデフォルトの動作に依存していた場合、以前の Redshift から S3 への認証メカニズムを引き続き使用するには、 forward_spark_s3_credentialstrue に明示的に設定する必要があります。 3 つの認証メカニズムとそのセキュリティのトレードオフについては、このドキュメントの「 S3 および Redshift への認証 」セクションを参照してください。