Databricks Connect for Python の高度な使用法
この記事では、Databricks Runtime 14.0 以降の Databricks Connect について説明します。
この記事では、Databricks Connect の基本的なセットアップ以外のトピックについて説明します。
Spark Connect 接続文字列を構成する
「クラスターへの接続を構成する」で説明されているオプションを使用してクラスターに接続するだけでなく、より高度なオプションは Spark Connect 接続文字列を使用して接続することです。文字列を remote
関数に渡すか、 SPARK_REMOTE
環境変数を設定できます。
Databricks 個人用アクセス トークン認証は、Spark Connect 接続文字列を使用して接続するためにのみ使用できます。
remote
関数を使用して接続文字列を設定するには:
# Set the Spark Connect connection string in DatabricksSession.builder.remote.
from databricks.connect import DatabricksSession
workspace_instance_name = retrieve_workspace_instance_name()
token = retrieve_token()
cluster_id = retrieve_cluster_id()
spark = DatabricksSession.builder.remote(
f"sc://{workspace_instance_name}:443/;token={token};x-databricks-cluster-id={cluster_id}"
).getOrCreate()
または、 SPARK_REMOTE
環境変数を設定します。
sc://<workspace-instance-name>:443/;token=<access-token-value>;x-databricks-cluster-id=<cluster-id>
次に、次のように DatabricksSession
クラスを初期化します。
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
PySpark シェル
Databricks Connect for Python には、Databricks Connect を使用するように構成された PySpark REPL (Spark シェル) である pyspark
バイナリが付属しています。
追加のパラメーターを指定せずに開始すると、シェルは環境 (DATABRICKS_
環境変数や DEFAULT
構成プロファイルなど) からデフォルトの資格情報を取得して、 Databricks クラスターに接続します。 接続の設定に関する情報については、Databricks Connectのコンピュート設定を参照してください。
-
Spark シェルを起動し、稼働中のクラスターに接続するには、アクティブ化された仮想環境から次のいずれかのコマンドPython実行します。
Bashpyspark
次のように、 Spark シェルが表示されます。
OutputPython 3.10 ...
[Clang ...] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 13.x.dev0
/_/
Using Python version 3.10 ...
Client connected to the Spark Connect server at sc://...:.../;token=...;x-databricks-cluster-id=...
SparkSession available as 'spark'.
>>>シェルが起動すると、
spark
オブジェクトはApache Spark Databricksクラスターで コマンドを実行できるようになります。単純な PySpark コマンド(spark.range(1,10).show()
など)を実行します。 エラーがなければ、正常に接続されています。 -
コンピュートで Sparkシェルを してコマンドを実行する方法については、「 Sparkシェルを使用したインタラクティブ解析 」を参照してください。Python
組み込み
spark
変数を使用して、稼働中のクラスターのSparkSession
を表します。>>> df = spark.read.table("samples.nyctaxi.trips")
>>> df.show(5)
+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171|
| 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110|
| 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023|
| 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017|
| 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282|
+--------------------+---------------------+-------------+-----------+----------+-----------+
only showing top 5 rowsすべてのPython コードはローカルで実行されますが、リモートPySpark DataFrameワークスペースのクラスターで実行されるDatabricks 操作と実行応答を含むすべての コードは、ローカルの呼び出し元に送り返されます。
-
Spark シェルを停止するには、
Ctrl + d
またはCtrl + z
を押すか、コマンドquit()
またはexit()
を実行します。
追加の HTTP ヘッダー
Databricks Connect は、gRPC over HTTP/2 を介して Databricks クラスターと通信します。
一部の上級ユーザーは、クライアントと Databricks クラスターの間にプロキシサービスをインストールして、クライアントからの要求をより適切に制御することを選択できます。
プロキシは、場合によっては、HTTP 要求にカスタム ヘッダーを必要とすることがあります。
headers()
メソッドを使用して、HTTPリクエストにカスタムヘッダーを追加できます。
spark = DatabricksSession.builder.header('x-custom-header', 'value').getOrCreate()
証明 書
クラスターがカスタム SSL/TLS 証明書に依存して Databricks ワークスペースの完全修飾ドメイン名 (FQDN) を解決する場合は、ローカル開発マシンで環境変数 GRPC_DEFAULT_SSL_ROOTS_FILE_PATH
を設定する必要があります。 この環境変数は、クラスタにインストールされている証明書のフルパスに設定する必要があります。
たとえば、この環境変数を Python コードで次のように設定します。
import os
os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = "/etc/ssl/certs/ca-bundle.crt"
環境変数を設定するその他の方法については、オペレーティングシステムのドキュメントを参照してください。
ログ記録とデバッグログ
Databricks Connect for Python は、標準の Python ログを使用してログを生成します。
ログは標準エラーストリーム( stderr )に出力され、デフォルトではWARNレベル以上のログのみが出力されます。
環境変数 SPARK_CONNECT_LOG_LEVEL=debug
を設定すると、このデフォルトが変更され、すべてのログメッセージが DEBUG
レベル以上で出力されます。