Amazon S3 からのデータのオンボード
この記事では、Amazon S3 から新しい Databricks ワークスペースにデータをオンボードする方法について説明します。Unity Catalogボリューム (推奨) またはUnity Catalog外部ロケーションに対応するクラウドオブジェクトストレージの場所にあるソースデータに安全にアクセスする方法を学習します。次に、DLTと を使用して、データをUnity Catalog マネージドテーブルに段階的に取り込む方法を学びます。Auto Loader
ノートブックではなく Databricks SQL でデータをオンボードするには、「 Databricks SQL のストリーミング テーブルを使用してデータを読み込む」を参照してください。
始める前に
あなたが管理者でない場合、この記事は管理者があなたに次の情報を提供したことを前提としています:
-
Unity Catalog が有効になっている Databricks ワークスペースへのアクセス。 詳しくは、「Unity Catalogの設定と管理」を参照してください。
-
Unity Catalog外部ボリュームに対する
READ VOLUME
権限、またはソース データを含むクラウド ストレージの場所に対応するUnity Catalog外部ロケーションに対するREAD FILES
権限。詳細については、「クラウド ストレージを Databricksに接続するための外部ロケーションを作成する」を参照してください。 -
ソース データへのパス。
ボリュームパスの例:
/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>
外部ロケーション パスの例:
s3://<bucket>/<folder>/
-
データをロードしたいスキーマの
USE SCHEMA
とCREATE TABLE
の権限。 -
クラスタリングの作成アクセス許可 、または DLT パイプライン クラスタリングを定義する クラスターポリシー へのアクセス (
cluster_type
フィールドをdlt
に設定)。ソース データへのパスがボリューム パスの場合、クラスターはDatabricks Runtime 13.3LTS 以上 実行する必要があります。
これらの前提条件について質問がある場合は、アカウント管理者にお問い合わせください。
ステップ 1: クラスターを作成する
クラスターを作成するには、次の手順を実行します。
- Databricks ワークスペースにサインインします。
- サイドバーで、「 新規 > クラスター 」をクリックします。
- クラスター UI で、クラスターの一意の名前を指定します。
- ソース データへのパスがボリューム パスの場合、 Databricksランタイム バージョン で13.2 以上を選択します。
- [クラスターの作成] をクリックします。
ステップ 2: データ探索ノートブックを作成する
このセクションでは、データパイプラインを作成する前にデータを理解できるように、データ探索ノートブックを作成する方法について説明します。
-
サイドバーで、[ +新規 ]、[ ノートブック ] の順にクリックします。
ノートブックは、最後に使用したクラスター (この場合は、「 ステップ 1: クラスターを作成する 」で作成したクラスター) に自動的にアタッチされます。
-
ノートブックの名前を入力します。
-
言語ボタンをクリックし、ドロップダウンメニューから
Python
またはSQL
を選択します。 デフォルトではPython
が選択されています。 -
S3でソース データへのデータ アクセスを確認するには、次のコードをノートブック セルに貼り付け、[
] をクリックして、[ 実行セル ] をクリックします。
- SQL
- Python
LIST '<path-to-source-data>'
%fs ls '<path-to-source-data>'
<path-to-source-data>
は、データを含むディレクトリへのパスに置き換えます。
これにより、データセットを含むディレクトリの内容が表示されます。
5. レコードのサンプルを表示して各レコードの内容と形式をよりよく理解するには、以下をノートブックのセルに貼り付け、をクリックし、 [セルの実行] をクリックします。
- SQL
- Python
SELECT * from read_files('<path-to-source-data>', format => '<file-format>') LIMIT 10
spark.read.format('<file-format>').load('<path-to-source-data>').limit(10).display()
次の値を置き換えます。
<file-format>
: サポートされているファイル形式。 「ファイル形式のオプション」を参照してください。<path to source data>
: データを含むディレクトリ内のファイルへのパス。
指定したファイルの最初の 10 個のレコードが表示されます。
ステップ 3: 生データを取り込む
生データを取り込むには、次の操作を行います。
-
サイドバーで、「 新規 ノートブック」> をクリックします。
ノートブックは、最後に使用したクラスター (この場合は、この記事で前に作成したクラスター) に自動的にアタッチされます。
-
ノートブックの名前を入力します。
-
言語ボタンをクリックし、ドロップダウンメニューから
Python
またはSQL
を選択します。 デフォルトではPython
が選択されています。 -
次のコードをノートブックのセルに貼り付けます。
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE
<table-name>
AS SELECT
*
FROM
STREAM read_files(
'<path-to-source-data>',
format => '<file-format>'
)
@dlt.table(table_properties={'quality': 'bronze'})
def <table-name>():
return (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', '<file-format>')
.load(f'{<path-to-source-data>}')
)
次の値を置き換えます。
<table-name>
: 取り込まれたレコードを格納するテーブルの名前。<path-to-source-data>
: ソース データへのパス。<file-format>
: サポートされているファイル形式。 「ファイル形式のオプション」を参照してください。
DLT は、ノートブックのセルで対話的に実行するようには設計されていません。ノートブックで DLT 構文を含むセルを実行すると、クエリが構文的に有効かどうかに関するメッセージが返されますが、クエリ ロジックは実行されません。次の手順では、作成したインジェスト ノートブックからパイプラインを作成する方法について説明します。
手順 4: パイプラインを作成して発行する
パイプラインを作成して Unity Catalog に発行するには、次の操作を行います。
- サイドバーで、[ ワークフロー ]、[ DLT ] タブ、[ パイプラインの作成 ] の順にクリックします。
- パイプラインの名前を入力します。
- パイプライン モード の場合は、 トリガー済み を選択します。
- ソース コード の場合は、パイプライン ソース コードが含まれているノートブックを選択します。
- [宛先] で [ Unity Catalog ] を選択します。
- テーブルが Unity Catalog によって管理され、親スキーマにアクセスできるすべてのユーザーがクエリを実行できるようにするには、ドロップダウン リストから [カタログ ] と [ターゲット スキーマ ] を選択します。
- クラスタリング作成権限がない場合は、DLT をサポートする クラスターポリシー をドロップダウンリストから選択します。
- [Advanced] で、 チャンネル を [Preview ] に設定します。
- 他のすべてのデフォルト値をそのまま使用し、[ 作成 ] をクリックします。
ステップ 5: パイプラインをスケジュールする
パイプラインをスケジュールするには、次の操作を行います。
- サイドバーで、「 DLT 」をクリックします。
- スケジュールするパイプラインの名前をクリックします。
- [スケジュール ] をクリックして> スケジュールを追加します 。
- [ジョブ名 ] に、ジョブの名前を入力します。
- スケジュール を スケジュール に設定します。
- 期間、開始時刻、およびタイムゾーンを指定します。
- パイプラインの開始、成功、または失敗時にアラートを受信するように、1つ以上のEメール アドレスを設定します。
- 作成 をクリックします。
次のステップ
- 新しいテーブルへのアクセス権をユーザーに付与します。 詳細については、「 Unity Catalog 特権とセキュリティ保護可能なオブジェクト」を参照してください。
- 新しいテーブルへのアクセス権を持つユーザーは、 ノートブック でテーブルに対してクエリを実行したり、 Databricks SQL エディターを使用したりできるようになりました。