Databricks Connect for Databricks Runtime 12.2 LTS 以前
注
Databricks Connect では、代わりに Databricks Runtime 13.0 以降用の Databricks Connect を使用することをお勧めします。
Databricks は、 Databricks Runtime 12.2 LTS 以下の Databricks Connect の新機能の作業を計画していません。
Databricks Connect を使用すると、Visual Studio Code や PyCharm などの一般的な IDE、ノートブック サーバー、その他のカスタム アプリケーションを Databricks クラスターに接続できます。
この記事では、Databricks Connect のしくみについて説明し、Databricks Connect の使用を開始するための手順を説明し、Databricks Connect の使用時に発生する可能性のある問題のトラブルシューティング方法、および Databricks Connect を使用して実行する場合と Databricks ノートブックで実行する場合の違いについて説明します。
概要
データブリック接続は、 Databricks Runtimeのクライアント ライブラリです。 これにより、Spark APIs を使用してジョブを作成し、ローカルの Spark セッションではなく Databricks クラスターでリモートで実行できます。
たとえば、Databricks Connect を使用して DataFrame コマンド spark.read.format(...).load(...).groupBy(...).agg(...).show()
実行すると、コマンドの論理表現が Databricks で実行されている Spark サーバーに送信され、リモート クラスターで実行されます。
Databricks Connect を使用すると、次のことができます。
大規模な Spark ジョブを任意の Python、R、Scala、または Java アプリケーションから実行します。
import pyspark
、require(SparkR)
、またはimport org.apache.spark
できる場所ならどこでも、IDEプラグインをインストールしたりSpark送信スクリプトを使用したりすることなく、アプリケーションから直接Sparkジョブを実行できるようになりました。リモートクラスターを操作する場合でも、IDE でコードをステップ実行してデバッグします。
ライブラリを開発するときにすばやく反復します。 各クライアント セッションはクラスター内で互いに分離されているため、Databricks Connect で Python または Java ライブラリの依存関係を変更した後、クラスターを再起動する必要はありません。
作業を失うことなくアイドル状態のクラスターをシャットダウンします。 クライアント アプリケーションはクラスターから切り離されているため、通常はノートブックで定義されているすべての変数、RDD、および DataFrame オブジェクトが失われる原因となるクラスターの再起動やアップグレードの影響を受けません。
注
SQL クエリーを使用した Python 開発の場合、Databricks Connect ではなく Databricks SQL Connector for Python を使用することをお勧めします。Databricks SQL Connector for Python は、Databricks Connect よりも簡単に設定できます。 また、Databricks Connect は、ジョブがローカル コンピューターで実行され、ジョブがリモート コンピュート リソースで実行されることを解析して計画します。 これにより、ランタイムエラーのデバッグが特に困難になる可能性があります。 Databricks SQL コネクタ for Python は、SQL クエリーをリモート コンピュート リソースに直接送信し、結果をフェッチします。
要件
このセクションでは、Databricks Connect の要件を示します。
次の Databricks Runtime バージョンのみがサポートされています。
Databricks Runtime 12.2 LTS ML, Databricks Runtime 12.2 LTS
Databricks Runtime 11.3 LTS ML, Databricks Runtime 11.3 LTS
Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS
Databricks Runtime 9.1 LTS ML, Databricks Runtime 9.1 LTS
Databricks Runtime 7.3 LTS
開発マシンに Python 3 をインストールする必要があり、クライアント Python インストールのマイナー バージョンは、Databricks クラスターのマイナー Python バージョンと同じである必要があります。 次の表は、各 Databricks Runtimeと共にインストールされる Python のバージョンを示しています。
Databricks Runtime バージョン
Python バージョン
12.2 LTS ML, 12.2 LTS
3.9
11.3 LTS ML, 11.3 LTS
3.9
10.4 LTS ML, 10.4 LTS
3.8
9.1 LTS ML, 9.1 LTS
3.8
7.3 LTS
3.7
Databricks では、Databricks Connect で使用する Python バージョンごとに Python 仮想環境 をアクティブ化することを強くお勧めします。 Python 仮想環境は、正しいバージョンの Python と Databricks Connect を一緒に使用していることを確認するのに役立ちます。 これにより、関連する技術的な問題の解決に費やす時間を短縮できます。
たとえば、開発マシンで venv を使用していて、クラスターで Python 3.9 を実行している場合は、そのバージョンで
venv
環境を作成する必要があります。 次のコマンド例では、Python 3.9 でvenv
環境をアクティブ化するスクリプトを生成し、これらのスクリプトを現在の作業ディレクトリ内の.venv
という名前の隠しフォルダーに配置します。# Linux and macOS python3.9 -m venv ./.venv # Windows python3.9 -m venv .\.venv
これらのスクリプトを使用してこの
venv
環境をアクティブ化するには、「 venvs のしくみ」を参照してください。別の例として、開発マシンで Conda を使用していて、クラスターで Python 3.9 を実行している場合は、そのバージョンで Conda 環境を作成する必要があります。
conda create --name dbconnect python=3.9
この環境名で Conda 環境をアクティブ化するには、
conda activate dbconnect
を実行します。Databricks Connect のメジャー パッケージ バージョンとマイナー パッケージ バージョンは、常に Databricks Runtime バージョンと一致する必要があります。 Databricks では、 Databricks Runtime バージョンと一致する最新の Databricks Connect パッケージを常に使用することをお勧めします。 たとえば、 Databricks Runtime 12.2 LTS クラスターを使用する場合は、
databricks-connect==12.2.*
パッケージも使用する必要があります。注
利用可能な Databricks Connect リリースとメンテナンス更新プログラムの一覧については、 Databricks Connect リリースノート を参照してください。
Java ランタイム環境 (JRE) 8. クライアントはOpenJDK 8 JREでテストされています。 クライアントは Java 11 をサポートしていません。
注
Windows で、Databricks Connect が winutils.exe
を見つけることができないというエラーが表示された場合は、「 Winutils が見つかりません.exe Windows で」を参照してください。
クライアントを設定する
次の手順を実行して、Databricks Connect のローカル クライアントを設定します。
注
ローカルの Databricks Connect クライアントの設定を開始する前に、Databricks Connect の 要件を満たす 必要があります。
ステップ 1: Databricks Connect クライアントをインストールする
仮想環境をアクティブにした状態で、
uninstall
コマンドを実行して、PySpark が既にインストールされている場合はアンインストールします。 これは、databricks-connect
パッケージが PySpark と競合するために必要です。 詳細については、「 競合する PySpark インストール」を参照してください。 PySpark が既にインストールされているかどうかを確認するには、show
コマンドを実行します。# Is PySpark already installed? pip3 show pyspark # Uninstall PySpark pip3 uninstall pyspark
仮想環境をアクティブ化したまま、
install
コマンドを実行して Databricks Connect クライアントをインストールします。--upgrade
オプションを使用して、既存のクライアント インストールを指定したバージョンにアップグレードします。pip3 install --upgrade "databricks-connect==12.2.*" # Or X.Y.* to match your cluster version.
注
Databricks では、最新のパッケージがインストールされていることを確認するために、
databricks-connect=X.Y
の代わりにdatabricks-connect==X.Y.*
を指定するために "ドット アスタリスク" 表記を追加することをお勧めします。
ステップ 2: 接続プロパティを構成する
次の構成プロパティを収集します。
ワークスペース Databricks URL。
Databricks の個人用アクセストークン。
クラスターの ID。 クラスター ID は URL から取得できます。 ここでは、クラスター ID は
0304-201045-hoary804
です。クラスター上で Databricks Connect が接続するポート。 デフォルトのポートは
15001
です。
接続を次のように構成します。
CLI、SQL 構成、または環境変数を使用できます。 構成方法の優先順位は、SQL 構成キー、CLI、および環境変数です。
CLI
databricks-connect
を実行します。databricks-connect configure
ライセンスには次の情報が表示されます。
Copyright (2018) Databricks, Inc. This library (the "Software") may not be used except in connection with the Licensee's use of the Databricks Platform Services pursuant to an Agreement ...
ライセンスを受け入れ、構成値を指定します。 [ Databricks ホスト ] と [Databricks トークン] に、ワークスペースの URL と、手順 1 でメモした個人用アクセストークンを入力します。
Do you accept the above agreement? [y/N] y Set new config values (leave input empty to accept default): Databricks Host [no current value, must start with https://]: <databricks-url> Databricks Token [no current value]: <databricks-token> Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id> Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id> Port [15001]: <port>
SQL 構成または環境変数。 次の表に、手順 1 でメモした構成プロパティに対応する SQL 構成キーと環境変数を示します。 SQL 構成キーを設定するには、
sql("set config=value")
を使用します。 たとえば、sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")
です。パラメーター
SQL コンフィグ キー
環境変数名
Databricks ホスト
spark.databricks.service.address
DATABRICKS_ADDRESS
Databricks トークン
spark.databricks.サービス.トークン
DATABRICKS_API_TOKEN
クラスター ID
spark.databricks.service.clusterId
クラスター
組織 ID
spark.databricks.service.orgId
DATABRICKS_ORG_ID
港
spark.databricks.service.port
DATABRICKS_PORT
仮想環境をアクティブ化したまま、次のように Databricks への接続をテストします。
databricks-connect test
構成したクラスターが実行されていない場合、テストはクラスターを開始し、構成された自動終了時刻まで実行され続けます。 出力は次のようになります。
* PySpark is installed at /.../.../pyspark * Checking java version java version "1.8..." Java(TM) SE Runtime Environment (build 1.8...) Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode) * Testing scala command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2... /_/ Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi res0: Long = 4950 scala> :quit * Testing python command ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
接続関連のエラーが表示されない場合(
WARN
メッセージは正常です)、正常に接続されています。
Databricks 接続を使用する
このセクションでは、Databricks Connect のクライアントを使用するように優先 IDE またはノートブック サーバーを構成する方法について説明します。
このセクションの内容:
JupyterLab
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Databricks Connect を JupyterLab および Python で使用するには、こちらの手順に従ってください。
Python 仮想環境をアクティブにして JupyterLab をインストールするには、ターミナルまたはコマンドプロンプトから次のコマンドを実行します。
pip3 install jupyterlab
ウェブブラウザでJupyterLabを起動するには、アクティブ化されたPython仮想環境から次のコマンドを実行します。
jupyter lab
JupyterLab が Web ブラウザーに表示されない場合は、仮想環境から
localhost
または127.0.0.1
で始まる URL をコピーし、Web ブラウザーのアドレス バーに入力します。新しいノートブックを作成する: JupyterLab で、メイン メニューの [ファイル] > [ 新しい> ノートブック ] をクリックし、[ Python 3 (ipykernel)] を選択して [ 選択] をクリックします。
ノートブックの最初のセルに、サンプル コードまたは独自の コード を入力します。 独自のコードを使用する場合は、 コード例に示すように、少なくとも
SparkSession.builder.getOrCreate()
のインスタンスをインスタンス化する必要があります。ノートブックを実行するには、[ 実行] > [すべてのセルを実行] をクリックします。
ノートブックをデバッグするには、ノートブックのツール バーの Python 3 (ipykernel) の横にあるバグ (デバッガーを有効にする) アイコンをクリックします。1 つ以上のブレークポイントを設定し、[ 実行] > [すべてのセルを実行] をクリックします。
JupyterLab をシャットダウンするには、[ ファイル] > [シャットダウン] をクリックします。 JupyterLabプロセスがまだターミナルまたはコマンドプロンプトで実行されている場合は、
Ctrl + c
を押してこのプロセスを停止し、y
を入力して確認します。
より具体的なデバッグ手順については、「 デバッガー」を参照してください。
Classic Jupyter Notebook
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Databricks Connect の構成スクリプトは、パッケージをプロジェクト構成に自動的に追加します。 Python カーネルの使用を開始するには、次のコマンドを実行します。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SQL クエリーを実行および視覚化するための %sql
短縮形を有効にするには、次のスニペットを使用します。
from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class
@magics_class
class DatabricksConnectMagics(Magics):
@line_cell_magic
def sql(self, line, cell=None):
if cell and line:
raise ValueError("Line must be empty for cell magic", line)
try:
from autovizwidget.widget.utils import display_dataframe
except ImportError:
print("Please run `pip install autovizwidget` to enable the visualization widget.")
display_dataframe = lambda x: x
return display_dataframe(self.get_spark().sql(cell or line).toPandas())
def get_spark(self):
user_ns = get_ipython().user_ns
if "spark" in user_ns:
return user_ns["spark"]
else:
from pyspark.sql import SparkSession
user_ns["spark"] = SparkSession.builder.getOrCreate()
return user_ns["spark"]
ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)
Visual Studio Code
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Visual Studio Code で Databricks Connect を使用するには、次の操作を行います。
Python 拡張機能 がインストールされていることを確認します。
コマンド パレットを開きます (macOS の場合は Command + Shift + P、Windows/Linux の場合は Ctrl + Shift + P)。
Python インタプリタを選択します。 [ コード>の基本設定] >[設定] に移動し、[ Python 設定] を選択します。
databricks-connect get-jar-dir
を実行します。コマンドから返されたディレクトリを [
python.venvPath
] の下の [ユーザー設定] JSON に追加します。 これは Python 構成に追加する必要があります。リンターを無効にします。 右側の [...] をクリックし 、 JSON 設定を編集します。 変更された設定は次のとおりです。
仮想環境 (VS Code で Python 用に開発するための推奨される方法) で実行する場合は、コマンド パレットに「
select python interpreter
」と入力し、クラスターの Python バージョン に一致する 環境をポイントします。たとえば、クラスターが Python 3.9 の場合、開発環境は Python 3.9 である必要があります。
PyCharm
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Databricks Connect の構成スクリプトは、パッケージをプロジェクト構成に自動的に追加します。
Python 3 クラスター
PyCharm プロジェクトを作成するときに、[ 既存のインタプリタ] を選択します。 ドロップダウン メニューから、作成した Conda 環境を選択します (「 要件」を参照)。
[ 実行] > [構成の編集] に移動します。
PYSPARK_PYTHON=python3
環境変数として追加します。
SparkR および RStudio Desktop
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
SparkR および RStudio Desktop で Databricks Connect を使用するには、次の手順を実行します。
オープンソースの Spark ディストリビューションを開発マシンにダウンロードして解凍します。Databricks クラスター (Hadoop 2.7) と同じバージョンを選択します。
databricks-connect get-jar-dir
を実行します。このコマンドは、/usr/local/lib/python3.5/dist-packages/pyspark/jars
のようなパスを返します。 JAR ディレクトリーのファイル・パスより 上の 1 つのディレクトリー のファイル・パス (例えば、SPARK_HOME
ディレクトリーである/usr/local/lib/python3.5/dist-packages/pyspark
など) をコピーします。Spark ライブラリ パスと Spark ホームを構成するには、R スクリプトの先頭に追加します。 ステップ 1 でオープンソース Spark パッケージを解凍したディレクトリに
<spark-lib-path>
を設定します。ステップ 2 から Databricks Connect ディレクトリに<spark-home-path>
を設定します。# Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7 library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths()))) # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark Sys.setenv(SPARK_HOME = "<spark-home-path>")
Spark セッションを開始し、SparkR コマンドの実行を開始します。
sparkR.session() df <- as.DataFrame(faithful) head(df) df1 <- dapply(df, function(x) { x }, schema(df)) collect(df1)
sparklyrとRStudioデスクトップ
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
プレビュー
この機能はパブリックプレビュー段階です。
Databricks Connect を使用してローカルで開発した sparklyr に依存するコードをコピーし、Databricks ワークスペース内の Databricks ノートブックまたはホストされている RStudio サーバーで、最小限のコード変更またはコード変更なしで実行できます。
このセクションの内容:
必要条件
sparklyr 1.2以上。
Databricks Runtime 7.3 LTS 以上と、一致するバージョンの Databricks Connect を使用します。
sparklyrのインストール、構成、および使用
RStudio Desktop で、CRAN から sparklyr 1.2 以降をインストールするか、GitHub から最新のマスター バージョンをインストールします。
# Install from CRAN install.packages("sparklyr") # Or install the latest master version from GitHub install.packages("devtools") devtools::install_github("sparklyr/sparklyr")
正しいバージョンの Databricks Connect がインストールされている状態で Python 環境をアクティブ化し、ターミナルで次のコマンドを実行して
<spark-home-path>
を取得します。databricks-connect get-spark-home
Spark セッションを開始し、sparklyr コマンドの実行を開始します。
library(sparklyr) sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>") iris_tbl <- copy_to(sc, iris, overwrite = TRUE) library(dplyr) src_tbls(sc) iris_tbl %>% count
接続を閉じます。
spark_disconnect(sc)
sparklyrとRStudioデスクトップの制限
次の機能はサポートされていません。
sparklyr ストリーミング APIs
sparklyr 機械学習 APIs
ほうき APIs
csv_file シリアル化モード
スパーク送信
IntelliJ (Scala または Java)
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Databricks Connect を IntelliJ (Scala または Java) で使用するには、次の手順を実行します。
databricks-connect get-jar-dir
を実行します。コマンドから返されたディレクトリへの依存関係を指定します。 「ファイル>プロジェクト構造>モジュール>依存関係」に移動し>JARまたはディレクトリ>「+」記号を付けます。
競合を避けるために、クラスパスから他の Spark インストールを削除することを強くお勧めします。 これが不可能な場合は、追加する JAR がクラスパスの先頭にあることを確認してください。 特に、インストールされている他のバージョンの Spark よりも進んでいる必要があります (それ以外の場合は、他の Spark バージョンのいずれかを使用してローカルで実行するか、
ClassDefNotFoundError
をスローします)。IntelliJのブレークアウトオプションの設定を確認してください。 既定値は All で、デバッグ用のブレークポイントを設定するとネットワーク タイムアウトが発生します。 バックグラウンド ネットワーク スレッドが停止しないようにするには、[ スレッド ] に設定します。
PyDev with Eclipse
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Eclipse で Databricks Connect と PyDev を使用するには、次の手順に従います。
エクリプスを起動します。
プロジェクトを作成する: [ファイル] > [新しい>プロジェクト] > [PyDev > PyDev プロジェクト] をクリックし、[ 次へ] をクリックします。
プロジェクト名を指定します。
[ プロジェクト コンテンツ] で、Python 仮想環境へのパスを指定します。
続行 する前に [通訳を設定してください] をクリックします。
[ 手動構成] をクリックします。
[ 新規] をクリックし> [Python/pypy exe を参照] をクリックします。
仮想環境から参照される Python インタープリターへの完全なパスを参照して選択し、[ 開く] をクリックします。
[ インタープリターの選択 ] ダイアログで、[ OK] をクリックします。
[ 選択が必要 ] ダイアログで、[ OK] をクリックします。
「環境設定 」ダイアログで、「 適用して閉じる」をクリックします。
[PyDev プロジェクト] ダイアログで、[完了] をクリックします。
「 パースペクティブを開く」をクリックします。
サンプル コードまたは独自の コード を含む Python コード (
.py
) ファイルをプロジェクトに追加します。独自のコードを使用する場合は、 コード例に示すように、少なくともSparkSession.builder.getOrCreate()
のインスタンスをインスタンス化する必要があります。Python コード ファイルを開いた状態で、実行中にコードを一時停止するブレークポイントを設定します。
[ 実行] > [実行 ] または [デバッグ>実行] をクリックします。
より具体的な実行およびデバッグ手順については、「 プログラムの実行」を参照してください。
Eclipse
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Databricks Connect と Eclipse を使用するには、次の手順を実行します。
databricks-connect get-jar-dir
を実行します。外部 JAR 構成が、コマンドから返されたディレクトリーを指すようにします。 [プロジェクト]メニューに移動し、[プロパティ]>[Javaビルドパス>ライブラリ]>外部jarの追加>に移動します。
競合を避けるために、クラスパスから他の Spark インストールを削除することを強くお勧めします。 これが不可能な場合は、追加する JAR がクラスパスの先頭にあることを確認してください。 特に、インストールされている他のバージョンの Spark よりも進んでいる必要があります (それ以外の場合は、他の Spark バージョンのいずれかを使用してローカルで実行するか、
ClassDefNotFoundError
をスローします)。
SBT
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
SBT で Databricks Connect を使用するには、通常の Spark ライブラリの依存関係ではなく、 Databricks Connect JAR に対してリンクするように build.sbt
ファイルを構成する必要があります。 これは、次のビルドファイルの例の unmanagedBase
ディレクティブを使用して行いますが、これは com.example.Test
メインオブジェクトを持つ Scala アプリを想定しています。
Spark シェル
注
Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります 。
Spark シェルと Python または Scala で Databricks Connect を使用するには、次の手順に従います。
仮想環境をアクティブ化したら、「 クライアントのセットアップ」で
databricks-connect test
コマンドが正常に実行されたことを確認します。仮想環境をアクティブ化したら、Spark シェルを起動します。 Python の場合は、
pyspark
コマンドを実行します。 Scala の場合は、spark-shell
コマンドを実行します。# For Python: pyspark
# For Scala: spark-shell
Spark シェルが表示されます (たとえば、Python の場合)。
Python 3... (v3...) [Clang 6... (clang-6...)] on darwin Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.... /_/ Using Python version 3... (v3...) Spark context Web UI available at http://...:... Spark context available as 'sc' (master = local[*], app id = local-...). SparkSession available as 'spark'. >>>
Scalaの場合:
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://... Spark context available as 'sc' (master = local[*], app id = local-...). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3... /_/ Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...) Type in expressions to have them evaluated. Type :help for more information. scala>
Python または Scala で Spark シェルを使用してクラスターでコマンドを実行する方法については、 Spark シェルを使用した対話型分析 を参照してください。
組み込みの
spark
変数を使用して、稼働中のクラスターのSparkSession
を表します (Python の場合など)。>>> df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
Scalaの場合:
>>> val df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
Spark シェルを停止するには、
Ctrl + d
またはCtrl + z
キーを押すか、Python の場合はコマンドquit()
またはexit()
、Scala の場合は:q
または:quit
コマンドを実行します。
コード例
この簡単なコード例では、指定したテーブルにクエリーを付け、指定したテーブルの最初の 5 行を表示します。 別のテーブルを使用するには、呼び出しを spark.read.table
に調整します。
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
この長いコード例では、次の処理を行います。
メモリ内 DataFrameを作成します。
default
スキーマ内にzzz_demo_temps_table
名前のテーブルを作成します。この名前のテーブルが既に存在する場合は、最初にテーブルが削除されます。 別のスキーマまたはテーブルを使用するには、呼び出しをspark.sql
、temps.write.saveAsTable
、またはその両方に調整します。DataFrameの内容をテーブルに保存します。
テーブルの内容に対して
SELECT
クエリーを実行します。クエリーの結果を表示します。
テーブルを削除します。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName('temps-demo').getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object Demo {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local").getOrCreate()
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
temps.write.saveAsTable("zzz_demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table")
}
}
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class App {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
temps.write().saveAsTable("zzz_demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE zzz_demo_temps_table");
}
}
依存関係を操作する
通常、メインクラスまたはPythonファイルには、他の依存関係JARとファイルがあります。 このような依存関係 JAR およびファイルを追加するには、 sparkContext.addJar("path-to-the-jar")
または sparkContext.addPyFile("path-to-the-file")
を呼び出します。 addPyFile()
インターフェイスを使用してEggファイルとzipファイルを追加することもできます。IDE でコードを実行するたびに、依存関係 JAR とファイルがクラスターにインストールされます。
from lib import Foo
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
#sc.setLogLevel("INFO")
print("Testing simple count")
print(spark.range(100).count())
print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())
class Foo(object):
def __init__(self, x):
self.x = x
Python + Java UDF
from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column
## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
# val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}
spark = SparkSession.builder \
.config("spark.jars", "/path/to/udf.jar") \
.getOrCreate()
sc = spark.sparkContext
def plus_one_udf(col):
f = sc._jvm.com.example.Test.plusOne()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()
package com.example
import org.apache.spark.sql.SparkSession
case class Foo(x: String)
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
...
.getOrCreate();
spark.sparkContext.setLogLevel("INFO")
println("Running simple show query...")
spark.read.format("parquet").load("/tmp/x").show()
println("Running simple UDF query...")
spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
spark.udf.register("f", (x: Int) => x + 1)
spark.range(10).selectExpr("f(id)").show()
println("Running custom objects query...")
val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
println(objs.toSeq)
}
}
ユーティリティDatabricksアクセス
このセクションでは、Databricks Connect を使用して Databricks ユーティリティにアクセスする方法について説明します。
Databricks Utilities (dbutils) リファレンス モジュールの dbutils.fs
ユーティリティと dbutils.secrets
ユーティリティを使用できます。サポートされているコマンドは、 dbutils.fs.cp
、 dbutils.fs.head
、 dbutils.fs.ls
、 dbutils.fs.mkdirs
、 dbutils.fs.mv
、 dbutils.fs.put
、 dbutils.fs.rm
、 dbutils.secrets.get
、 dbutils.secrets.getBytes
、 dbutils.secrets.list
、 dbutils.secrets.listScopes
です。 「ファイル システム ユーティリティ (dbutils.fs)」を参照してください。 または、 dbutils.fs.help()
and Secrets ユーティリティ (dbutils.secrets) を実行します。 または、 dbutils.secrets.help()
を実行します。
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())
Databricks Runtime 7.3 LTS 以降を使用する場合、ローカルと Databricks クラスターの両方で動作する方法で DBUtils モジュールにアクセスするには、次の get_dbutils()
を使用します。
def get_dbutils(spark):
from pyspark.dbutils import DBUtils
return DBUtils(spark)
それ以外の場合は、次の get_dbutils()
を使用します。
def get_dbutils(spark):
if spark.conf.get("spark.databricks.service.client.enabled") == "true":
from pyspark.dbutils import DBUtils
return DBUtils(spark)
else:
import IPython
return IPython.get_ipython().user_ns["dbutils"]
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())
ローカルファイルシステムとリモートファイルシステム間でのファイルのコピー
dbutils.fs
を使用して、クライアントとリモートファイルシステム間でファイルをコピーできます。スキーム file:/
は、クライアント上のローカルファイルシステムを参照します。
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)
dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')
この方法で転送できる最大ファイルサイズは250MBです。
Hadoop 構成を設定する
クライアントでは、SQL および DataFrame 操作に適用される spark.conf.set
API を使用して Hadoop 構成を設定できます。 sparkContext
に設定されている Hadoop 構成は、クラスター構成またはノートブックを使用して設定する必要があります。これは、 sparkContext
に設定された設定がユーザーセッションに関連付けられておらず、クラスタ全体に適用されるためです。
トラブルシューティング
databricks-connect test
を実行して、接続の問題を確認します。このセクションでは、Databricks Connect で発生する可能性があるいくつかの一般的な問題とその解決方法について説明します。
このセクションの内容:
Python のバージョンが一致しません
ローカルで使用している Python のバージョンに、クラスター上のバージョンと少なくとも同じマイナー リリースがあることを確認します (たとえば、 3.9.16
対 3.9.15
は問題ありませんが、 3.9
対 3.8
は問題ありません)。
複数の Python バージョンがローカルにインストールされている場合は、 PYSPARK_PYTHON
環境変数 ( PYSPARK_PYTHON=python3
など) を設定して、Databricks Connect が正しいバージョンを使用していることを確認します。
サーバーが有効になっていない
クラスターで Spark サーバーが spark.databricks.service.server.enabled true
で有効になっていることを確認します。 次の場合は、ドライバー ログに次の行が表示されます。
../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms
競合する PySpark のインストール
databricks-connect
パッケージが PySpark と競合しています。両方をインストールすると、Python で Spark コンテキストを初期化するときにエラーが発生します。 これは、「ストリームが破損している」エラーや「クラスが見つかりません」エラーなど、いくつかの方法で発生する可能性があります。 Python 環境に PySpark がインストールされている場合は、databricks-connect をインストールする前にアンインストールされていることを確認してください。 PySpark をアンインストールした後、Databricks Connect パッケージを完全に再インストールしてください。
pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*" # or X.Y.* to match your specific cluster version.
競合する SPARK_HOME
以前にコンピューターで Spark を使用したことがある場合は、Databricks Connect Spark ではなく、他のバージョンの Spark のいずれかを使用するように IDE が構成されている可能性があります。 これは、「ストリームが破損している」エラーや「クラスが見つかりません」エラーなど、いくつかの方法で発生する可能性があります。 使用されている Spark のバージョンを確認するには、 SPARK_HOME
環境変数の値を確認します。
import os
print(os.environ['SPARK_HOME'])
println(sys.env.get("SPARK_HOME"))
System.out.println(System.getenv("SPARK_HOME"));
解決
SPARK_HOME
がクライアント内のバージョン以外のバージョンの Spark に設定されている場合は、 SPARK_HOME
変数の設定を解除して再試行する必要があります。
IDE 環境変数の設定、 .bashrc
、 .zshrc
、または .bash_profile
ファイル、およびその他の環境変数が設定される可能性のある場所を確認します。 ほとんどの場合、古い状態をパージするにはIDEを終了して再起動する必要があり、問題が解決しない場合は新しいプロジェクトを作成する必要があるかもしれません。
SPARK_HOME
を新しい値に設定する必要はありません。設定を解除するだけで十分です。
バイナリの PATH
エントリの競合または欠落
spark-shell
のようなコマンドが、Databricks Connect で提供されるバイナリではなく、以前にインストールされた他のバイナリを実行するように PATH が構成されている可能性があります。これにより、 databricks-connect test
が失敗する可能性があります。 Databricks Connect バイナリが優先されることを確認するか、以前にインストールしたバイナリを削除する必要があります。
spark-shell
のようなコマンドを実行できない場合は、PATHが pip3 install
によって自動的に設定されていない可能性があり、インストール bin
ディレクトリをPATHに手動で追加する必要があります。これが設定されていない場合でも、IDE で Databricks Connect を使用することは可能です。 ただし、 databricks-connect test
コマンドは機能しません。
クラスターでのシリアル化設定の競合
databricks-connect test
の実行時に "ストリームが破損しています" というエラーが表示される場合は、互換性のないクラスターのシリアル化構成が原因である可能性があります。たとえば、 spark.io.compression.codec
構成を設定すると、この問題が発生する可能性があります。 この問題を解決するには、クラスター設定からこれらの構成を削除するか、Databricks Connect クライアントで構成を設定することを検討してください。
Windowsで winutils.exe
が見つかりません
Windows で Databricks Connect を使用していて、以下を参照してください。
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
指示に従って 、Windows で Hadoop パスを構成します。
ファイル名、ディレクトリ名、またはボリューム ラベルの構文が Windows で正しくない
Windows および Databricks Connect を使用している場合は、以下を参照してください。
The filename, directory name, or volume label syntax is incorrect.
Java または Databricks Connect のいずれかが、パスにスペースを含むディレクトリ にインストールされました。 これを回避するには、スペースを含まないディレクトリパスにインストールするか、 短い名前の形式を使用してパスを構成します。
制限
構造化ストリーミング。
リモート クラスターで Spark ジョブの一部ではない任意のコードを実行する。
Delta テーブル操作 (
DeltaTable.forPath
など) のネイティブ Scala、Python、および R APIs はサポートされていません。 ただし、 Delta Lake 操作を使用する SQL API (spark.sql(...)
) と、 Delta テーブルに対する Spark API (spark.read.load
など) はどちらもサポートされています。にコピーします。
サーバーのカタログの一部であるSQL関数、PythonまたはScala UDFを使用します。 ただし、ローカルで導入された Scala と Python UDF は機能します。
Apache Zeppelin 0.7.x 以下。
テーブルアクセスコントロールを使用したクラスターへの接続。
プロセス分離を有効にしたクラスターへの接続 (つまり、
spark.databricks.pyspark.enableProcessIsolation
がtrue
に設定されている場合)。Delta
CLONE
SQL コマンドを使用します。グローバル一時ビュー。
Koalas と
pyspark.pandas
。CREATE TABLE table AS SELECT ...
SQL コマンドは常に機能するとは限りません。 代わりに、spark.sql("SELECT ...").write.saveAsTable("table")
を使用します。