メインコンテンツまでスキップ

Python 用 Databricks SQL コネクタ

Databricks SQLConnector forPython は、Python PythonSQLコードを使用してDatabricks クラスターとDatabricks SQL ウェアハウスで コマンドを実行できる ライブラリです。Databricks SQL Connector for Python は、 pyodbc などの同様の Python ライブラリよりもセットアップと使用が簡単です。 このライブラリは、 PEP 249 – Python Database API Specification v2.0 に準拠しています。

注記

Databricks SQL Connector for Python は、Databricks の SQLAlchemy 言語もサポートしていますが、これらの機能を使用するにはインストールする必要があります。 「Databricks での SQLAlchemy の使用」を参照してください

必要条件

  • Python >=3.8 および <=3.11 を実行している開発マシン。
  • Databricks では、Python に含まれている venv によって提供される環境など、Python 仮想環境を使用することをお勧めします。 仮想環境は、正しいバージョンの Python と Databricks SQL Connector for Python を一緒に使用していることを確認するのに役立ちます。 仮想環境の設定と使用については、この記事の範囲外です。 詳細については、「 仮想環境の作成」を参照してください。
  • 既存の クラスター または SQLウェアハウス

はじめに

  • Databricks SQL Connector for Python をインストールします。 PyArrow は Databricks SQL Connector for Python のオプションの依存関係であり、デフォルトによってインストールされません。 PyArrow をインストールしないと、CloudFetch やその他の Apache Arrow 機能などの機能が利用できないため、大量のデータのパフォーマンスに影響を与える可能性があります。

    • リーン コネクタを取り付けるには、 pip install databricks-sql-connectorを使用します。
    • PyArrow を含む完全なコネクタをインストールするには、pip install databricks-sql-connector[pyarrow]を使用します。
  • 使用するクラスターまたは SQLウェアハウスについて、次の情報を収集します。

認証

Python 用 Databricks SQL コネクタでは、次の Databricks 認証の種類がサポートされています。

Python 用の Databricks SQL コネクタは、次の Databricks 認証の種類をまだサポートしていません。

Databricks 個人用アクセス トークン認証

Databricks 個人用アクセス トークン認証で Python 用の Databricks SQL コネクタを使用するには、まず Databricks 個人用アクセス トークンを作成する必要があります。 これを行うには、「 ワークスペース ユーザーの Databricks 個人用アクセス トークン」の手順に従います。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAMEクラスターまたはウェアハウスの Server Hostname 値 SQLに設定します。
  • DATABRICKS_HTTP_PATHで、クラスターまたはSQL ウェアハウスの HTTP パス 値に設定します。
  • DATABRICKS_TOKENで、Databricks 個人用アクセス トークンに設定されます。

環境変数を設定するには、ご利用になっているオペレーティングシステムのドキュメントを参照してください。

Python
from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...

OAuth マシン間 (M2M) 認証

Databricks SQL Connector for Python バージョン 2.7.0 以降では、 OAuth マシン間 (M2M) 認証がサポートされています。 また、Databricks SDK for Python 0.18.0 以降もインストールする必要があります (たとえば、pip install databricks-sdkpython -m pip install databricks-sdkを実行します)。

OAuth M2M 認証で Databricks SQL Connector for Python を使用するには、次の操作を行う必要があります。

  1. DatabricksワークスペースにDatabricks サービスプリンシパルを作成し、そのサービスプリンシパルのOAuthシークレットを作成します。

    サービスプリンシパルとそのOAuth シークレットを作成するには、「Databricksを使用してサービスプリンシパルを使用して リソースへの無人アクセスを承認OAuth する」を参照してください。サービスプリンシパルの UUID または Application ID の値と、サービスプリンシパルの シークレットの Secret OAuth 値をメモします。

  2. そのサービスプリンシパルに、クラスターまたはウェアハウスへのアクセス権を付与します。

    サービスプリンシパルにクラスターまたはウェアハウスへのアクセス権を付与するには、「コンピュートのアクセス許可」または「SQLウェアハウスの管理」を参照してください。

Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 このスニペットは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME クラスターまたはウェアハウスの Server Hostname 値 SQLに設定します。
  • DATABRICKS_HTTP_PATHで、クラスターまたはSQL ウェアハウスの HTTP パス 値に設定します。
  • DATABRICKS_CLIENT_IDで、サービスプリンシパルの UUID または アプリケーション ID の値に設定します。
  • DATABRICKS_CLIENT_SECRETで、サービスプリンシパルの OAuth シークレットの Secret 値に設定します。

環境変数を設定するには、ご利用になっているオペレーティングシステムのドキュメントを参照してください。

Python
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os

server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")

def credential_provider():
config = Config(
host = f"https://{server_hostname}",
client_id = os.getenv("DATABRICKS_CLIENT_ID"),
client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
return oauth_service_principal(config)

with sql.connect(server_hostname = server_hostname,
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
credentials_provider = credential_provider) as connection:
# ...

OAuth ユーザー間 (U2M) 認証

Databricks SQL Connector for Python バージョン 3.0.3 以上 は、OAuth ユーザー間 (U2M) 認証をサポートします。 また、Databricks SDK for Python 0.19.0 以降もインストールする必要があります (たとえば、pip install databricks-sdkpython -m pip install databricks-sdkを実行します)。

OAuth U2M 認証を使用して Databricks SQL Connector for Python を認証するには、次のコード スニペットを使用します。 OAuth U2M 認証では、リアルタイムの人間によるサインインと同意を使用して、ターゲットの Databricks ユーザー アカウントを認証します。 このスニペットは、次の環境変数を設定していることを前提としています。

  • DATABRICKS_SERVER_HOSTNAME をクラスターまたはウェアハウスの [Server Hostname ] の値 SQLに設定します。
  • DATABRICKS_HTTP_PATHクラスターまたはSQL ウェアハウスの[HTTP パス] の値に を設定します。

環境変数を設定するには、ご利用になっているオペレーティングシステムのドキュメントを参照してください。

Python
from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
auth_type = "databricks-oauth") as connection:
# ...

次のコード例は、Python 用 Databricks SQL コネクタを使用して、データのクエリと挿入、メタデータのクエリ、カーソルと接続の管理、ログ記録の構成を行う方法を示しています。

注記

次のコード例は、認証に Databricks 個人用アクセス トークンを使用する方法を示しています。 代わりに使用可能な他の Databricks 認証の種類を使用するには、「 認証」を参照してください。

次のコード例では、これらの環境変数から server_hostnamehttp_path、および access_token 接続変数の値を取得します。

  • DATABRICKS_SERVER_HOSTNAMEこれは要件のサーバーホスト名の値を表します
  • DATABRICKS_HTTP_PATH、これは要件からの HTTP Path 値を表します。
  • DATABRICKS_TOKENこれは、要件からのアクセス トークンを表します。

他の方法を使用して、これらの接続変数値を取得できます。 環境変数の使用は、多くのアプローチの1つにすぎません。

User-Agent の設定

次のコード例は、使用状況の追跡のために User-Agent アプリケーションの product_name を設定する方法を示しています。

Python
from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
_user_agent_entry = "product_name") as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT 1 + 1")
result = cursor.fetchall()

for row in result:
print(row)

データのクエリ

次のコード例は、Databricks SQL PythonSQLの Connector を呼び出して、クラスターSQL ウェアハウスまたは ウェアハウスで基本的な コマンドを実行する方法を示しています。このコマンドは、samples カタログの nyctaxi スキーマの trips テーブルから最初の 2 つのローを返します。

Python
from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

with connection.cursor() as cursor:
cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
result = cursor.fetchall()

for row in result:
print(row)

データの挿入

次の例は、少量 (数千行) のデータを挿入する方法を示しています。

Python
from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")

squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])

cursor.execute(f"INSERT INTO squares VALUES {values}")

cursor.execute("SELECT * FROM squares LIMIT 10")

result = cursor.fetchall()

for row in result:
print(row)

大量のデータの場合は、まずデータをクラウドストレージにアップロードしてから、 COPY INTO コマンドを実行する必要があります。

クエリのメタデータ

メタデータを取得するための専用のメソッドがあります。 次の例では、サンプルテーブルの列に関するメタデータを取得します。

Python
from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")) as connection:

with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())

カーソルと接続の管理

使用されなくなった接続とカーソルを閉じることをお勧めします。 これにより、 Databricks クラスターと Databricks SQL ウェアハウスのリソースが解放されます。

コンテキストマネージャー (前の例で使用した with 構文) を使用してリソースを管理するか、 closeを明示的に呼び出すことができます。

Python
from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()

Unity Catalog ボリューム内のファイルを管理する

Databricks SQL コネクタを使用すると、次の例に示すように、Unity Catalog ボリュームへのローカル ファイルの書き込み、ボリュームからのファイルのダウンロード、ボリュームからのファイルの削除を行うことができます。

Python
from databricks import sql
import os

# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allows_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allows_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"),
staging_allowed_local_path = "/tmp/") as connection:

with connection.cursor() as cursor:

# Write a local file to the specified path in a volume.
# Specify OVERWRITE to overwrite any existing file in that path.
cursor.execute(
"PUT '/temp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
)

# Download a file from the specified path in a volume.
cursor.execute(
"GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
)

# Delete a file from the specified path in a volume.
cursor.execute(
"REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
)

ログ記録を構成する

Databricks SQL コネクタは、Python の 標準ログ モジュールを使用します。 ログレベルは、次のように設定できます。

Python
from databricks import sql
import os, logging

logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
level = logging.DEBUG)

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")

result = cursor.fetchall()

for row in result:
logging.debug(row)

cursor.close()
connection.close()

テスティング

コードをテストするには、 pytest などの Python テスト フレームワークを使用します。 Databricks REST API エンドポイントを呼び出したり、Databricks アカウントやワークスペースの状態を変更したりせずに、シミュレートされた条件下でコードをテストするには、 unittest.mock などの Python モック ライブラリを使用できます。

たとえば、Databricks パーソナル アクセス トークンを使用して Databricks ワークスペースへの接続を返す get_connection_personal_access_token 関数と、接続を使用して samples カタログの nyctaxi スキーマの trips テーブルから指定された数のデータ行を取得する select_nyctaxi_trips 関数を含む 、helpers.py という名前の次のファイルがあるとします。

Python
# helpers.py

from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor

def get_connection_personal_access_token(
server_hostname: str,
http_path: str,
access_token: str
) -> Connection:
return sql.connect(
server_hostname = server_hostname,
http_path = http_path,
access_token = access_token
)

def select_nyctaxi_trips(
connection: Connection,
num_rows: int
) -> List[Row]:
cursor: Cursor = connection.cursor()
cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
result: List[Row] = cursor.fetchall()
return result

また、get_connection_personal_access_token 関数と select_nyctaxi_trips 関数を呼び出す main.py という名前の次のファイルがあるとします。

Python
# main.py

from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips

connection: Connection = get_connection_personal_access_token(
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
access_token = os.getenv("DATABRICKS_TOKEN")
)

rows: List[Row] = select_nyctaxi_trips(
connection = connection,
num_rows = 2
)

for row in rows:
print(row)

次の test_helpers.py という名前のファイルは、 select_nyctaxi_trips 関数が予期した応答を返すかどうかをテストします。 このテストでは、ターゲット ワークスペースへの実際の接続を作成するのではなく、 Connection オブジェクトをモックします。 また、このテストでは、実際のデータにあるスキーマと値に準拠する一部のデータをモックします。 このテストでは、モックされた接続を介してモックされたデータが返され、モックされたデータ行の値の 1 つが期待値と一致するかどうかがチェックされます。

Python
# test_helpers.py

import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec

@pytest.fixture
def mock_data() -> List[Row]:
return [
Row(
tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
trip_distance = 4.94,
fare_amount = 19.0,
pickup_zip = 10282,
dropoff_zip = 10171
),
Row(
tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
trip_distance = 0.28,
fare_amount = 3.5,
pickup_zip = 10110,
dropoff_zip = 10110
)
]

def test_select_nyctaxi_trips(mock_data: List[Row]):
# Create a mock Connection.
mock_connection = create_autospec(Connection)

# Set the mock Connection's cursor().fetchall() to the mock data.
mock_connection.cursor().fetchall.return_value = mock_data

# Call the real function with the mock Connection.
response: List[Row] = select_nyctaxi_trips(
connection = mock_connection,
num_rows = 2)

# Check the value of one of the mocked data row's columns.
assert response[1].fare_amount == 3.5

select_nyctaxi_trips 関数には SELECT ステートメントが含まれているため、trips テーブルの状態は変更されないため、この例ではモックは絶対に必要ではありません。ただし、モックを使用すると、ワークスペースとの実際の接続を待たずに、テストをすばやく実行できます。 また、モックを使用すると、テーブルの状態を変更する可能性のある関数 ( 、 、 INSERT INTOUPDATEDELETE FROMなど) のシミュレートされたテストを複数回実行できます。

API リファレンス

パッケージ

databricks-sql-connector

使い方: pip install databricks-sql-connector

Python パッケージ インデックス (PyPI) の databricks-sql-connector も参照してください。

モジュール

databricks.sql

使い方: from databricks import sql

クラス

選択したクラスには、次のものが含まれます。

クラス

Connection A session on a Databricks コンピュート リソース.

Cursor データ レコードを走査するためのメカニズム。

Row SQL クエリ結果のデータの行。

Connection クラス

Connection オブジェクトを作成するには、次のパラメーターを指定して databricks.sql.connect メソッドを呼び出します。

パラメータ

server_hostname タイプ: str クラスターのサーバ・ホスト名。 サーバーのホスト名を取得するには、この記事で前述した手順を参照してください。 このパラメーターは必須です。 例: 1234567890123456.7.gcp.databricks.com

http_path タイプ: str クラスターの HTTP パス。 HTTP パスを取得するには、この記事で前述した手順を参照してください。 このパラメーターは必須です。 sql/protocolv1/o/1234567890123456/1234-567890-test123 クラスターの場合。 /sql/1.0/warehouses/a1b234c567d8e9fa SQLウェアハウスの場合。

access_token タイプ: str クラスターのワークスペースの Databricks 個人用アクセス トークン。 トークンを作成するには、この記事で前述した手順を参照してください。 このパラメーターは必須です。 例: dapi...<the-remaining-portion-of-your-token>

session_configuration タイプ: dict[str, Any] Spark セッション構成パラメーターのディクショナリ。 構成の設定は、 SET key=val SQL コマンドを使用することと同じです。 SQL コマンド SET -v を実行して、使用可能な構成の完全な一覧を取得します。 デフォルトは Noneです。 このパラメーターはオプションです。 例: {"spark.sql.variable.substitute": True}

http_headers タイプ: List[Tuple[str, str]]] 追加の (キーと値のペアは、クライアントが行うすべての RPC 要求の HTTP ヘッダーに設定します。 一般的な使用法では、追加のHTTPヘッダーは設定されません。 デフォルトは Noneです。 このパラメーターはオプションです。 バージョン 2.0 から

catalog タイプ: str 接続に使用する初期カタログ。 デフォルトから None (この場合、デフォルト カタログ (通常は hive_metastore ) が使用されます)。 このパラメーターはオプションです。 バージョン 2.0 から

schema タイプ: str 接続に使用する初期スキーマ。 デフォルトから None (この場合、デフォルト スキーマ スキーマ default が使用されます)。 このパラメーターはオプションです。 バージョン 2.0 から

use_cloud_fetch タイプ: bool True を使用して、フェッチ要求をクラウド・オブジェクト・ストアに直接送信し、データのチャンクをダウンロードします。 False (デフォルト) は、フェッチ要求を直接 Databricksに送信します。 use_cloud_fetchTrue に設定されているのにネットワークアクセスがブロックされている場合、フェッチリクエストは失敗します。 バージョン 2.8 から

選択された Connection 方法は次のとおりです。

メソッド

close データベースへの接続を閉じ、サーバー上のすべての関連リソースを解放します。 この接続に対して追加の呼び出しを行うと、 Error. パラメーターはありません。 戻り値はありません。

cursor データベース内のレコードを走査できるようにする新しい Cursor オブジェクトを返します。 パラメーターはありません。

Cursor クラス

Cursor オブジェクトを作成するには、Connection クラスの cursor メソッドを呼び出します。

選択した Cursor 属性には次のものが含まれます。

属性

arraysize fetchmanyメソッドと共に使用すると、内部バッファサイズが指定され、これは一度にサーバーから実際にフェッチされる行数でもあります。デフォルト値は 10000です。 狭い結果 (各行に大量のデータが含まれない結果) の場合は、パフォーマンスを向上させるためにこの値を大きくする必要があります。 読み取り/書き込みアクセス。

description tuple 個のオブジェクトの Python listが含まれています。これらの tuple オブジェクトにはそれぞれ 7 つの値が含まれ、各 tuple オブジェクトの最初の 2 つのアイテムには、次のように 1 つの結果列を記述する情報が含まれています。 - name: 列の名前。 - type_code: 列のタイプを表す文字列。 たとえば、整数列の型コードは intになります。 各7項目の tuple オブジェクトの残りの5項目は実装されておらず、その値も定義されていません。 それらは通常4として返されます None 値の後に 1 つの True 値が続きます。 読み取り専用アクセス。

選択された Cursor 方法は次のとおりです。

メソッド

cancel カーソルが開始したデータベース・クエリまたはコマンドの実行を中断します。 サーバー上の関連リソースを解放するには、コマンドを呼び出します。 close メソッドを呼び出した後、メソッド cancel 。 パラメーターはありません。 戻り値はありません。

close カーソルを閉じ、サーバー上の関連リソースを解放します。 既に閉じているカーソルを閉じると、エラーがスローされる可能性があります。 パラメーターはありません。 戻り値はありません。

execute データベース・クエリまたはコマンドを準備して実行します。 戻り値はありません。 パラメーター: operation タイプ: str 準備して実行するクエリまたはコマンド。 このパラメーターは必須です。 parameters パラメーターを使用しない例: cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2' ) parameters パラメーターを使用した例: cursor.execute( 'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2', { 'pickup_zip': '10019' } ) parameters タイプ:辞書 operation パラメーターと共に使用するパラメーターのシーケンス。 このパラメーターはオプションです。 デフォルトはNoneです。

executemany データベース・クエリまたはコマンドを準備し、 seq_of_parameters 引数のすべてのパラメーター・シーケンスを使用して実行します。 最終結果セットのみが保持されます。 戻り値はありません。 パラメーター: operation タイプ: str 準備して実行するクエリまたはコマンド。 このパラメーターは必須です。 seq_of_parameters タイプ:list dict とともに使用するパラメーター値の多くのセットのシーケンス operation パラメーター。 このパラメーターは必須です。

catalogs カタログに関するメタデータ クエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。 結果セットの重要なフィールドは次のとおりです。 - フィールド名: TABLE_CAT. タイプ: str。 カタログの名前。 パラメーターはありません。 戻り値はありません。 バージョン1.0以降

schemas スキーマに関するメタデータクエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。 結果セットの重要なフィールドは次のとおりです。 - フィールド名: TABLE_SCHEM. タイプ: str。 スキーマの名前。 - フィールド名: TABLE_CATALOG. タイプ: str。 スキーマが属するカタログ。 戻り値はありません。 バージョン1.0以降 パラメーター: catalog_name タイプ: str 情報を取得するカタログ名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 schema_name タイプ: str 情報を取得するスキーマ名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。

tables テーブルとビューに関するメタデータクエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。 結果セットの重要なフィールドは次のとおりです。 - フィールド名: TABLE_CAT. タイプ: str。 テーブルが属するカタログ。 - フィールド名: TABLE_SCHEM. タイプ: str。 テーブルが属するスキーマ。 - フィールド名: TABLE_NAME. タイプ: str。 テーブルの名前。 - フィールド名: TABLE_TYPE. タイプ: strVIEWTABLE などの種類のリレーション (Databricks Runtime 10.4 LTS 以降に適用され、以前のバージョンの Databricks Runtime は空の文字列を返します)。 戻り値はありません。 バージョン1.0以降 パラメータ catalog_name タイプ: str 情報を取得するカタログ名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 schema_name タイプ: str 情報を取得するスキーマ名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 table_name タイプ: str 情報を取得するテーブル名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 table_types タイプ: List[str] 一致するテーブルの種類の一覧 ( TABLEVIEWなど)。 このパラメーターはオプションです。

columns 列に関するメタデータクエリを実行します。 実際の結果は、 fetchmany または fetchallを使用して取得する必要があります。 結果セットの重要なフィールドは次のとおりです。 - フィールド名: TABLE_CAT. タイプ: str。 列が属するカタログ。 - フィールド名: TABLE_SCHEM. タイプ: str。 列が属するスキーマ。 - フィールド名: TABLE_NAME. タイプ: str。 列が属するテーブルの名前。 - フィールド名: COLUMN_NAME. タイプ: str。 列の名前。 戻り値はありません。 バージョン1.0以降 パラメーター: catalog_name タイプ: str 情報を取得するカタログ名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 schema_name タイプ: str 情報を取得するスキーマ名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 table_name タイプ: str 情報を取得するテーブル名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。 column_name タイプ: str 情報を取得する列名。 %文字はワイルドカードとして解釈されます。 このパラメーターはオプションです。

fetchall クエリのすべての (または残りのすべての) 行を取得します。 パラメーターはありません。 クエリのすべての (または残りのすべての) 行を Python list として返します。 Row オブジェクト。 前回の execute メソッドの呼び出しでデータが返されなかった場合、または execute 呼び出しがまだ行われていない場合は、Error を投げます。

fetchmany クエリの次の行を取得します。 クエリの次の行の最大 size (または、sizeが指定されていない場合は arraysize 属性) を、Row オブジェクトの Python listとして返します。 フェッチする行が size 行未満の場合、残りのすべての行が返されます。 前回の execute メソッドの呼び出しでデータが返されなかった場合、または execute 呼び出しがまだ行われていない場合は、Error を投げます。 パラメーター: size タイプ: int 取得する次の行の数。 このパラメーターはオプションです。 指定しない場合は、 arraysize 属性の値が使用されます。 例: cursor.fetchmany(10)

fetchone データセットの次の行を取得します。 パラメーターはありません。 データセットの次の行を Python として 1 つのシーケンスとして返します tuple object を返すか、使用可能なデータがこれ以上ない場合は None を返します。 前回の execute メソッドの呼び出しでデータが返されなかった場合、または execute 呼び出しがまだ行われていない場合は、Error を投げます。

fetchall_arrow クエリのすべての (または残りのすべての) 行を PyArrow Table オブジェクトとして取得します。 非常に大量のデータを返すクエリでは、メモリ消費量を減らすために、代わりに fetchmany_arrow を使用する必要があります。 パラメーターはありません。 クエリの全行(または残りの全行)をPyArrowテーブルとして返します。 前回の execute メソッドの呼び出しでデータが返されなかった場合、または execute 呼び出しがまだ行われていない場合は、Error を投げます。 バージョン 2.0 から

fetchmany_arrow クエリの次の行を PyArrow Table オブジェクトとして取得します。 クエリの次の行の size 引数 (sizeが指定されていない場合は arraysize 属性) まで Python PyArrow として返します Table オブジェクト。 前回の execute メソッドの呼び出しでデータが返されなかった場合、または execute 呼び出しがまだ行われていない場合は、Error を投げます。 バージョン 2.0 から パラメーター: size タイプ: int 取得する次の行の数。 このパラメーターはオプションです。 指定しない場合は、 arraysize 属性の値が使用されます。 例: cursor.fetchmany_arrow(10)

Row クラス

行クラスは、個々の結果行を表すタプルのようなデータ構造です。 行に "my_column" という名前の列が含まれている場合は、次の方法で row"my_column" フィールドにアクセスできます row.my_columnrow[0] のように、数値インデックスを使ってフィールドにアクセスすることもできます。 列名を属性メソッド名として使用できない場合(たとえば、数字で始まる)、 その場合は、row["1_my_column"] としてフィールドにアクセスできます。

バージョン1.0以降

選択された Row 方法は次のとおりです。

| asDict

フィールド名で索引付けされた行の辞書表現を返します。 フィールド名が重複している場合、重複したフィールドの1つ(ただし、1つのみ)が辞書に返されます。 どの重複フィールドが返されるかは定義されていません。

パラメーターはありません。

dictフィールドを返します。|

型変換

次の表は、Apache Spark SQL データ型を Python データ型と同等の Python データ型に対応付けたものです。

Apache Spark SQL データ型

Python データ型

array

numpy.ndarray

bigint

int

binary

bytearray

boolean

bool

date

datetime.date

decimal

decimal.Decimal

double

float

int

int

map

str

null

NoneType

smallint

int

string

str

struct

str

timestamp

datetime.datetime

tinyint

int

トラブルシューティング

tokenAuthWrapperInvalidAccessToken: Invalid access token メッセージ

問題 : コードを実行すると、次のようなメッセージが表示されます Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token

考えられる原因 : access_token に渡された値は、有効な Databricks 個人用アクセス トークンではありません。

推奨される修正: access_token に渡された値が正しいことを確認し、もう一度やり直してください。

gaierror(8, 'nodename nor servname provided, or not known') メッセージ

問題 : コードを実行すると、次のようなメッセージが表示されます Error during request to server: gaierror(8, 'nodename nor servname provided, or not known')

考えられる原因 : server_hostname に渡された値が正しいホスト名ではありません。

推奨される修正: server_hostname に渡された値が正しいことを確認し、もう一度やり直してください。

サーバーのホスト名の検索の詳細については、「Databricks コンピュート リソースの接続の詳細を取得する」を参照してください。

IpAclError メッセージ

問題 : コードを実行すると Error during request to server: IpAclValidation 、 Databricks ノートブック上のコネクタ。

考えられる原因 : Databricks ワークスペースで IP 許可リストが有効になっている可能性があります。 IP 許可リスト、接続 Spark クラスターからコントロールプレーンに戻ることは、デフォルトでは許可されていません。

推奨される修正: 管理者に依頼して、コンピュート プレーン サブネットを IP 許可リストに追加してください。

追加のリソース

詳細については、以下を参照してください。