メインコンテンツまでスキップ

SFTPサーバーからファイルを取り込む

備考

プレビュー

SFTP コネクタはパブリック プレビュー段階です。

LakeFlow Connectを使用して SFTP サーバーからファイルを取り込む方法を学習します。 SFTP コネクタは、Auto Loader 機能を拡張し、Unity Catalog ガバナンスを使用して SFTP サーバーからの安全な増分取り込みを提供します。

主な特徴

SFTP コネクタは以下を提供します。

  • 主キーとパスワードベースの認証。
  • 1 回限りの保証による増分ファイルの取り込みと処理。
  • 自動スキーマ推論、進化、およびデータ救出。
  • 安全な取り込みと資格情報のための Unity Catalog ガバナンス。
  • ワイド ファイル形式のサポート: JSONCSVXMLPARQUETAVROTEXTBINARYFILE 、およびORC
  • パターンとワイルドカードのマッチングの組み込みサポートにより、データのサブセットを簡単にターゲットにできます。
  • LakeFlow Spark宣言型パイプライン、 Databricks SQL 、サーバーレス、およびDatabricks Runtime 17.3 以降のクラシックを含むすべてのコンピュート タイプで利用可能。

始める前に

接続と取り込みパイプラインを作成するには、次のものが必要です。

  • Unity Catalog が有効になっているワークスペース。
  • CREATE CONNECTION 特権。
  • Databricks Runtimeバージョン 17.3 以降を使用するコンピュート。

SFTPの設定

まず、ソース SFTP サーバーが Databricks クラスター環境にアクセスできることを確認します。

  • ワークスペースで設定されている VPC でリモート サーバーが使用可能であることを確認します。
  • SSH ルールで、 Databricks VPCの IP 範囲 (クラシック コンピュートを使用している場合) または安定した IP (サーバーレス コンピュートを使用している場合) が許可されていることを確認してください。
  • クラシック コンピュート プレーンから、ロード バランサー、NAT ゲートウェイ、インターネット ゲートウェイ、または同等のものを使用して安定した IP アドレスを設定し、 Databricksコンピュートがデプロイされているサブネットに接続します。 これにより、コンピュート リソースは、SFTP サーバー側で許可リストに登録できる安定したパブリック IP アドレスを共有できるようになります。 ネットワーク設定を構成する手順については、 「ワークスペースでプライベート サービス接続を有効にする」を参照してください。
  • サーバレス コンピュート プレーンから、安定した出力 IP を取得するための安定した IP の構成を参照してください。

接続を作成する

SFTP 資格情報を保存するためのUnity Catalog接続を作成します。 CREATE CONNECTION権限が必要です。

コネクタは次の認証方法をサポートしています。

  • PEM秘密鍵
  • パスワードベースの認証

Databricks では、PEM 秘密キー認証の使用を推奨しています。Databricks では、ソース SFTP サーバー上で最小限の権限を持つ資格情報 (読み取り専用アクセスに制限された非ルート ユーザーなど) を使用することも推奨しています。

パイプラインを作成すると、コネクタは使用可能なホストに一致する接続を自動的に見つけようとします。一致する接続が複数ある場合、コネクタはホストに正常に接続された最初の接続を選択します。ただし、Databricks ではユーザーを明示的に指定することをお勧めします。これにより、コネクタがホスト アクセス権を持つ別のユーザーの接続を選択しないことが保証されます。

PEM秘密鍵(推奨)

  1. Databricks ワークスペースで、 カタログ > 外部データ > 接続 をクリックします。

  2. 接続の作成 をクリックします。

  3. 接続のセットアップ ウィザードの 「接続の基本」 ページで、一意の 接続名 を入力します。

  4. 接続タイプ には SFTP を選択します。

  5. 認証タイプ には、 PEM 秘密キー を選択します。

  6. 次へ をクリックします。

  7. [認証] ページの [ホスト] に、外部サーバーのホスト名を入力します。

  8. 「ユーザー」 には、外部インスタンスにアクセスするために使用するユーザー ID を入力します。

  9. 次へ をクリックします。

  10. 「接続の詳細」 ページで、PEM 形式で秘密キーを入力します。該当する場合は、キーのパスフレーズも入力します。

  11. ホスト キーのフィンガープリントのチェックをスキップする場合は、 [ホスト キーのフィンガープリントを強制する] の 選択を解除します。

    これを選択すると、サーバーの公開キーが予想される SHA-256 フィンガープリントと一致する場合にのみ接続が続行されます。無効にすると、一致に関係なく接続が続行されます。これを無効にする前に、ネットワーク管理者に確認してください。

  12. 「ホスト キー フィンガープリントを強制する」を オンにした場合は、SFTP サーバーのフィンガープリントを入力します。

    フィンガープリントは、サーバー管理者から取得するか、CLI コマンドを使用して取得できます。また、 [テスト]を押して接続を作成 > [テスト] することもできます。結果のエラー メッセージには、フィンガープリントが提供されます。例えば:

    ECDSA key fingerprint is SHA256:XXX/YYY

  13. [テストと接続の作成] をクリックします。

  14. 接続が成功したら、 「作成」 をクリックします。

パスワードベースの認証

  1. Databricks ワークスペースで、 カタログ > 外部データ > 接続 をクリックします。

  2. 接続の作成 をクリックします。

  3. 接続のセットアップ ウィザードの 「接続の基本」 ページで、一意の 接続名 を入力します。

  4. 接続タイプ には SFTP を選択します。

  5. 認証タイプ には、 ユーザー名とパスワード を選択します。

  6. 次へ をクリックします。

  7. [認証] ページの [ホスト] に、外部サーバーのホスト名を入力します。

  8. 「ユーザー」 には、外部インスタンスにアクセスするために使用するユーザー ID を入力します。

  9. 「パスワード」 には、外部インスタンスのパスワードを入力します。

  10. 次へ をクリックします。

  11. ホスト キーのフィンガープリントのチェックをスキップする場合は、 [ホスト キーのフィンガープリントを強制する] の 選択を解除します。

    これを選択すると、サーバーの公開キーが予想される SHA-256 フィンガープリントと一致する場合にのみ接続が続行されます。無効にすると、一致に関係なく接続が続行されます。これを無効にする前に、ネットワーク管理者に確認してください。

  12. 「ホスト キー フィンガープリントを強制する」を オンにした場合は、SFTP サーバーのフィンガープリントを入力します。

    フィンガープリントは、サーバー管理者から取得するか、CLI コマンドを使用して取得できます。また、 [テスト]を押して接続を作成 > [テスト] することもできます。結果のエラー メッセージには、フィンガープリントが提供されます。例えば:

    ECDSA key fingerprint is SHA256:XXX/YYY

  13. [テストと接続の作成] をクリックします。

  14. 接続が成功したら、 「作成」 をクリックします。

SFTPサーバーからファイルを読み取る

次の例は、Auto Loader のストリーミング機能を使用して SFTP サーバーからファイルを読み取る方法を示しています。Auto Loader使用法の詳細については、 「一般的なデータ読み込みパターン」を参照してください。

Python
# Run the Auto Loader job to ingest all existing data in the SFTP server.
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.schemaLocation", "<path to store schema information>") # This is a cloud storage path
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
# 1. $absolute_path_to_files should be the full server-side file system path
# relative to the root /, e.g. /home/$username/subdir/source/…
.load("sftp://$your_user@$your_host:$your_port/$absolute_path_to_files")
.writeStream
.format("delta")
.option("checkpointLocation", "<path to store checkpoint information>") # This is a cloud storage path.
.trigger(availableNow = True)
.table("<table name>"))
df.awaitTermination()

次の例は、 LakeFlow Spark宣言型パイプラインのAuto Loaderを使用して SFTP サーバーからファイルを読み取る方法を示しています。

Python
import dlt

@dlt.table
def sftp_bronze_table():
return (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") # Or other format supported by Auto Loader
# 1. $absolute_path_to_files should be the full server side file system path
# relative to the root /, for example /home/$username/subdir/source/…
.load("sftp://$your_user@$your_host:$your_port/$absolute_path_to_files"))

Auto Loaderオプションを構成します。 次のオプションを除くすべてのオプションがサポートされています:

  • cloudFiles.useNotifications
  • cloudFiles.useManagedFileEvents
  • cloudFiles.cleanSource
  • クラウド固有のオプション

制限事項

  • SFTP は、 COPY INTOspark.readdbutils.lsなどの他の取り込みサーフェスではサポートされていません。
  • SFTP サーバーへの書き戻しはサポートされていません。
  • Auto Loader cleanSource (取り込み後にソースでファイルを削除またはアーカイブする) はサポートされていません。
  • FTP プロトコルはサポートされていません。

よくある質問

SFTP コネクタに関するよくある質問への回答を見つけます。

ワイルドカードまたはファイル名パターンを使用して、取り込むファイルを選択するにはどうすればよいですか?

SFTP コネクタは、SFTP サーバーから読み取るための標準の Auto Loader フレームワーク上に構築されます。これは、すべての Auto Loader オプションがサポートされていることを意味します。ファイル名のパターンとワイルドカードの場合は、 pathGlobFilterまたはfileNamePatternオプションを使用します。Auto Loaderオプションを参照してください。

SFTP コネクタは暗号化されたファイルを取り込むことができますか?(PGP はサポートされていますか?)

コネクタは送信中に暗号化を解除しませんが、暗号化されたファイルをバイナリ ファイルとして取り込み、取り込み後に暗号化を解除することができます。

互換性のない秘密鍵形式をどのように処理すればよいですか?

PEM 形式のみがサポートされます。次のいずれかの方法で、PEM 形式の秘密鍵を生成できます。

  • (オプション 1) 標準の PEM 形式で新しい RSA キーを作成します。

    % ssh-keygen -t rsa -m pem
  • (オプション 2) 既存の OpenSSH 形式のキーを PEM 形式に変換します。

    % ssh-keygen -p -m pem -f /path/to/key  # This updates the key file.

その他のリソース