メインコンテンツまでスキップ

SFTPサーバーからファイルを取り込む

備考

プレビュー

SFTP コネクタはパブリック プレビュー段階です。

LakeFlow Connectを使用して SFTP サーバーからファイルを取り込む方法を学習します。 SFTP コネクタは、Auto Loader 機能を拡張し、Unity Catalog ガバナンスを使用して SFTP サーバーからの安全な増分取り込みを提供します。

主な特徴

SFTP コネクタは以下を提供します。

  • 秘密鍵とパスワードベースの認証。
  • 1 回限りの保証による増分ファイルの取り込みと処理。
  • 自動スキーマ推論、進化、およびデータ救出。
  • 安全な取り込みと資格情報のための Unity Catalog ガバナンス。
  • ワイド ファイル形式のサポート: JSONCSVXMLPARQUETAVROTEXTBINARYFILE 、およびORC
  • パターンとワイルドカードのマッチングの組み込みサポートにより、データのサブセットを簡単にターゲットにできます。
  • LakeFlow Spark宣言型パイプライン、 Databricks SQL 、サーバーレス、およびDatabricks Runtime 17.3 以降のクラシックを含むすべてのコンピュート タイプで利用可能。

始める前に

接続と取り込みパイプラインを作成するには、次のものが必要です。

SFTPの設定

まず、ソース SFTP サーバーが Databricks クラスター環境にアクセスできることを確認します。

  • ワークスペースで設定された VPC でリモート サーバーが使用可能であることを確認します。
  • SSH ルールでDatabricks VPCの IP 範囲 (クラシック コンピュートを使用している場合) または安定した IP (サーバーレス コンピュートを使用している場合) が許可されていることを確認します。
  • クラシック コンピュート プレーンから、ロード バランサー、NAT ゲートウェイ、インターネット ゲートウェイ、または同等のものを使用して安定した IP アドレスを設定し、 Databricksコンピュートがデプロイされているサブネットに接続します。 これにより、コンピュート リソースは、SFTP サーバー側で許可リストに登録できる安定したパブリック IP アドレスを共有できるようになります。 ネットワーク設定を構成する手順については、 VPC ピアリングを参照してください。
  • サーバレス コンピュート プレーンから、ステップ 1: ネットワーク接続構成を作成し、安定した IP をコピーして、安定した出力 IP を取得します。

接続を作成する

SFTP 資格情報を保存するためのUnity Catalog接続を作成します。 CREATE CONNECTION権限が必要です。

コネクタは次の認証方法をサポートしています。

  • PEM秘密鍵
  • パスワードベースの認証

Databricks では、PEM 秘密キー認証の使用を推奨しています。Databricks では、ソース SFTP サーバー上で最小限の権限を持つ資格情報 (読み取り専用アクセスに制限された非ルート ユーザーなど) を使用することも推奨しています。

パイプラインを作成すると、コネクタは使用可能な接続でホストに一致するものを自動的に検索しようとします。一致する接続が複数ある場合、コネクタはホストへの接続に成功した最初の接続を選択します。DBR 18.2以降では、 databricks.connectionオプションを使用して接続を明示的に指定できます。これは推奨されるオプションであり、同じホストに対して複数の接続が存在する場合の曖昧さを回避します。

PEM秘密鍵(推奨)

  1. Databricksワークスペースで、 をクリックします。 データアイコン。カタログ

  2. クリックプラグアイコン。 接続し てから、 「接続」 をクリックします。

  3. 「接続を作成」 ボタンをクリックしてください。

  4. 接続のセットアップ ウィザードの 「接続の基本」 ページで、一意の 接続名 を入力します。

  5. 接続タイプ には SFTP を選択します。

  6. 認証タイプ には、 PEM 秘密キー を選択します。

  7. 「次へ」 を選択します。

  8. [認証] ページの [ホスト] に、外部サーバーのホスト名を入力します。

  9. 「ユーザー」 には、外部インスタンスにアクセスするために使用するユーザー ID を入力します。

  10. 「次へ」 を選択します。

  11. 「接続の詳細」 ページで、PEM 形式で秘密キーを入力します。該当する場合は、キーのパスフレーズも入力します。

  12. ホスト キーのフィンガープリントのチェックをスキップする場合は、 「ホスト キーのフィンガープリントを強制する」を オフにします。

    これを選択すると、サーバーの公開キーが予想される SHA-256 フィンガープリントと一致する場合にのみ接続が続行されます。無効にすると、一致に関係なく接続が続行されます。これを無効にする前に、ネットワーク管理者に確認してください。

  13. 「ホスト キー フィンガープリントを強制する」を オンにした場合は、SFTP サーバーのフィンガープリントを入力します。

    フィンガープリントは、サーバー管理者から取得するか、CLI コマンドを使用して取得できます。また、 [テスト]を押して接続を作成 > [テスト] することもできます。結果のエラー メッセージには、フィンガープリントが提供されます。例えば:

    ECDSA key fingerprint is SHA256:XXX/YYY

  14. [テストと接続の作成] を選択します。

  15. 接続が成功したら、 「作成」 をクリックします。

パスワードベースの認証

  1. Databricksワークスペースで、 をクリックします。 データアイコン。カタログ

  2. クリックプラグアイコン。 接続し てから、 「接続」 をクリックします。

  3. 「接続を作成」 ボタンをクリックしてください。

  4. 接続のセットアップ ウィザードの 「接続の基本」 ページで、一意の 接続名 を入力します。

  5. 接続タイプ には SFTP を選択します。

  6. 認証タイプ には、 ユーザー名とパスワード を選択します。

  7. 「次へ」 を選択します。

  8. [認証] ページの [ホスト] に、外部サーバーのホスト名を入力します。

  9. 「ユーザー」 には、外部インスタンスにアクセスするために使用するユーザー ID を入力します。

  10. 「パスワード」 には、外部インスタンスのパスワードを入力します。

  11. 「次へ」 を選択します。

  12. ホスト キーのフィンガープリントのチェックをスキップする場合は、 「ホスト キーのフィンガープリントを強制する」を オフにします。

    これを選択すると、サーバーの公開キーが予想される SHA-256 フィンガープリントと一致する場合にのみ接続が続行されます。無効にすると、一致に関係なく接続が続行されます。これを無効にする前に、ネットワーク管理者に確認してください。

  13. 「ホスト キー フィンガープリントを強制する」を オンにした場合は、SFTP サーバーのフィンガープリントを入力します。

    フィンガープリントは、サーバー管理者から取得するか、CLI コマンドを使用して取得できます。また、 [テスト]を押して接続を作成 > [テスト] することもできます。結果のエラー メッセージには、フィンガープリントが提供されます。例えば:

    ECDSA key fingerprint is SHA256:XXX/YYY

  14. [テストと接続の作成] を選択します。

  15. 接続が成功したら、 「作成」 をクリックします。

SFTPサーバーからファイルを読み取る

次の例は、Auto Loader のストリーミング機能を使用して SFTP サーバーからファイルを読み取る方法を示しています。Auto Loader使用法の詳細については、 「一般的なデータ読み込みパターン」を参照してください。

Unity Catalog接続を認証用に指定するには、次の2つの方法のいずれかを使用できます。

  • 明示的(推奨、DBR 18.2 以降が必要): databricks.connectionオプションを使用して、接続名を指定します。接続を指定する際、URI内のホスト名とポート番号は、保存されている接続認証情報と一致している必要があります。URIのユーザー名フィールドは省略可能です。省略した場合、コネクタは接続時に指定されたユーザー名を使用します。
  • 自動: databricks.connectionオプションを使用して接続を指定しない場合、コネクタは URI 内の<host><username>を利用可能な接続と照合して接続を解決します。一致する接続が複数ある場合、コネクタは最初に正常に接続できた接続を使用します。

次の例は、自動接続解決を使用してファイルを読み込む方法を示しています。

Python
# Run the Auto Loader job to ingest all existing data in the SFTP server.
# The <username> and <host> in the URI must match the connection created in the previous step.
# The connector automatically resolves the matching Unity Catalog connection for authentication.
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.schemaLocation", "<path to store schema information>") # This is a cloud storage path
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
# Specify the absolute path on the SFTP server starting from the root /.
# Example: /home/<username>/data/files or /uploads/csv_files
.load("sftp://<username>@<host>:<port>/<absolute_path_to_files>")
.writeStream
.format("delta")
.option("checkpointLocation", "<path to store checkpoint information>") # This is a cloud storage path.
.trigger(availableNow = True)
.table("<table name>"))
df.awaitTermination()

次の例は、 databricks.connectionオプションを使用してUnity Catalog接続を明示的に指定する方法を示しています。 これにはDBR 18.2以降が必要です。

Python
# Requires DBR 18.2 or above. Explicitly specify the Unity Catalog connection by name (recommended).
# The username is optional in the URI when databricks.connection is provided.
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.schemaLocation", "<path to store schema information>") # This is a cloud storage path
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
.option("databricks.connection", "<connection_name>")
# Specify the absolute path on the SFTP server starting from the root /.
# Example: /uploads/csv_files
.load("sftp://<host>:<port>/<absolute_path_to_files>")
.writeStream
.format("delta")
.option("checkpointLocation", "<path to store checkpoint information>") # This is a cloud storage path.
.trigger(availableNow = True)
.table("<table name>"))
df.awaitTermination()

次の例は、 LakeFlow Spark宣言型パイプラインのAuto Loaderを使用して SFTP サーバーからファイルを読み取る方法を示しています。

Python
from pyspark import pipelines as dp

# The <username> and <host> in the URI must match the connection created in the previous step.
# The connector automatically resolves the matching Unity Catalog connection for authentication.
@dp.table
def sftp_bronze_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
# Specify the absolute path on the SFTP server starting from the root /.
# Example: /home/username/data/files or /uploads/csv_files
.load("sftp://<username>@<host>:<port>/<absolute_path_to_files>")))

Auto Loaderオプションを構成します。 次のオプションを除くすべてのオプションがサポートされています:

  • cloudFiles.useNotifications
  • cloudFiles.useManagedFileEvents
  • cloudFiles.cleanSource
  • クラウド固有のオプション

制限事項

  • SFTP は、 COPY INTOspark.readdbutils.lsなどの他の取り込みサーフェスではサポートされていません。
  • SFTP サーバーへの書き戻しはサポートされていません。
  • Auto Loader cleanSource (取り込み後にソースでファイルを削除またはアーカイブする) はサポートされていません。
  • FTP プロトコルはサポートされていません。

よくある質問

SFTP コネクタに関するよくある質問への回答を見つけます。

ワイルドカードまたはファイル名パターンを使用して、取り込むファイルを選択するにはどうすればよいですか?

SFTPコネクタは、標準のAuto Loaderフレームワークを基盤として、SFTPサーバーからデータを読み取ります。 これは、すべてのAuto Loaderオプションがサポートされていることを意味します。 ファイル名のパターンやワイルドカードには、 pathGlobFilterまたはfileNamePatternオプションを使用してください。See Auto Loader.

SFTP コネクタは暗号化されたファイルを取り込むことができますか?(PGP はサポートされていますか?)

コネクタは送信中に暗号化を解除しませんが、暗号化されたファイルをバイナリ ファイルとして取り込み、取り込み後に暗号化を解除することができます。

互換性のない秘密鍵形式をどのように処理すればよいですか?

PEM 形式のみがサポートされます。次のいずれかの方法で、PEM 形式の秘密鍵を生成できます。

  • (オプション 1) 標準の PEM 形式で新しい RSA キーを作成します。

    Shell
    % ssh-keygen -t rsa -m pem
  • (オプション 2) 既存の OpenSSH 形式のキーを PEM 形式に変換します。

    Shell
    % ssh-keygen -p -m pem -f /path/to/key  # This updates the key file.

その他のリソース