Databricks Connect for Databricks Runtime 12.2 LTS 以前

Databricks Connect では、代わりに Databricks Runtime 13.0 以降用の Databricks Connect を使用することをお勧めします。

Databricks は、 Databricks Runtime 12.2 LTS 以下の Databricks Connect の新機能の作業を計画していません。

Databricks Connect を使用すると、Visual Studio Code や PyCharm などの一般的な IDE、ノートブック サーバー、その他のカスタム アプリケーションを Databricks クラスターに接続できます。

この記事では、Databricks Connect のしくみについて説明し、Databricks Connect の使用を開始するための手順を説明し、Databricks Connect の使用時に発生する可能性のある問題のトラブルシューティング方法、および Databricks Connect を使用して実行する場合と Databricks ノートブックで実行する場合の違いについて説明します。

概要

データブリック接続は、 Databricks Runtimeのクライアント ライブラリです。 これにより、Spark APIs を使用してジョブを作成し、ローカルの Spark セッションではなく Databricks クラスターでリモートで実行できます。

たとえば、Databricks Connect を使用して DataFrame コマンド spark.read.format(...).load(...).groupBy(...).agg(...).show() 実行すると、コマンドの論理表現が Databricks で実行されている Spark サーバーに送信され、リモート クラスターで実行されます。

Databricks Connect を使用すると、次のことができます。

  • 大規模な Spark ジョブを任意の Python、R、Scala、または Java アプリケーションから実行します。 import pysparkrequire(SparkR) 、または import org.apache.sparkできる場所ならどこでも、IDEプラグインをインストールしたりSpark送信スクリプトを使用したりすることなく、アプリケーションから直接Sparkジョブを実行できるようになりました。

  • リモートクラスターを操作する場合でも、IDE でコードをステップ実行してデバッグします。

  • ライブラリを開発するときにすばやく反復します。 各クライアント セッションはクラスター内で互いに分離されているため、Databricks Connect で Python または Java ライブラリの依存関係を変更した後、クラスターを再起動する必要はありません。

  • 作業を失うことなくアイドル状態のクラスターをシャットダウンします。 クライアント アプリケーションはクラスターから切り離されているため、通常はノートブックで定義されているすべての変数、RDD、および DataFrame オブジェクトが失われる原因となるクラスターの再起動やアップグレードの影響を受けません。

SQL クエリーを使用した Python 開発の場合、Databricks Connect ではなく Databricks SQL Connector for Python を使用することをお勧めします。Databricks SQL Connector for Python は、Databricks Connect よりも簡単に設定できます。 また、Databricks Connect は、ジョブがローカル コンピューターで実行され、ジョブがリモート コンピュート リソースで実行されることを解析して計画します。 これにより、ランタイムエラーのデバッグが特に困難になる可能性があります。 Databricks SQL コネクタ for Python は、SQL クエリーをリモート コンピュート リソースに直接送信し、結果をフェッチします。

要件

このセクションでは、Databricks Connect の要件を示します。

  • 次の Databricks Runtime バージョンのみがサポートされています。

    • Databricks Runtime 12.2 LTS ML, Databricks Runtime 12.2 LTS

    • Databricks Runtime 11.3 LTS ML, Databricks Runtime 11.3 LTS

    • Databricks Runtime 10.4 LTS ML, Databricks Runtime 10.4 LTS

    • Databricks Runtime 9.1 LTS ML, Databricks Runtime 9.1 LTS

    • Databricks Runtime 7.3 LTS

  • 開発マシンに Python 3 をインストールする必要があり、クライアント Python インストールのマイナー バージョンは、Databricks クラスターのマイナー Python バージョンと同じである必要があります。 次の表は、各 Databricks Runtimeと共にインストールされる Python のバージョンを示しています。

    Databricks Runtime バージョン

    Python バージョン

    12.2 LTS ML, 12.2 LTS

    3.9

    11.3 LTS ML, 11.3 LTS

    3.9

    10.4 LTS ML, 10.4 LTS

    3.8

    9.1 LTS ML, 9.1 LTS

    3.8

    7.3 LTS

    3.7

    Databricks では、Databricks Connect で使用する Python バージョンごとに Python 仮想環境 をアクティブ化することを強くお勧めします。 Python 仮想環境は、正しいバージョンの Python と Databricks Connect を一緒に使用していることを確認するのに役立ちます。 これにより、関連する技術的な問題の解決に費やす時間を短縮できます。

    たとえば、開発マシンで venv を使用していて、クラスターで Python 3.9 を実行している場合は、そのバージョンで venv 環境を作成する必要があります。 次のコマンド例では、Python 3.9 で venv 環境をアクティブ化するスクリプトを生成し、これらのスクリプトを現在の作業ディレクトリ内の .venv という名前の隠しフォルダーに配置します。

    # Linux and macOS
    python3.9 -m venv ./.venv
    
    # Windows
    python3.9 -m venv .\.venv
    

    これらのスクリプトを使用してこの venv 環境をアクティブ化するには、「 venvs のしくみ」を参照してください。

    別の例として、開発マシンで Conda を使用していて、クラスターで Python 3.9 を実行している場合は、そのバージョンで Conda 環境を作成する必要があります。

    conda create --name dbconnect python=3.9
    

    この環境名で Conda 環境をアクティブ化するには、 conda activate dbconnectを実行します。

  • Databricks Connect のメジャー パッケージ バージョンとマイナー パッケージ バージョンは、常に Databricks Runtime バージョンと一致する必要があります。 Databricks では、 Databricks Runtime バージョンと一致する最新の Databricks Connect パッケージを常に使用することをお勧めします。 たとえば、 Databricks Runtime 12.2 LTS クラスターを使用する場合は、 databricks-connect==12.2.* パッケージも使用する必要があります。

    利用可能な Databricks Connect リリースとメンテナンス更新プログラムの一覧については、 Databricks Connect リリースノート を参照してください。

  • Java ランタイム環境 (JRE) 8. クライアントはOpenJDK 8 JREでテストされています。 クライアントは Java 11 をサポートしていません。

Windows で、Databricks Connect が winutils.exeを見つけることができないというエラーが表示された場合は、「 Winutils が見つかりません.exe Windows で」を参照してください。

クライアントを設定する

次の手順を実行して、Databricks Connect のローカル クライアントを設定します。

ローカルの Databricks Connect クライアントの設定を開始する前に、Databricks Connect の 要件を満たす 必要があります。

ステップ 1: Databricks Connect クライアントをインストールする

  1. 仮想環境をアクティブにした状態で、 uninstall コマンドを実行して、PySpark が既にインストールされている場合はアンインストールします。 これは、 databricks-connect パッケージが PySpark と競合するために必要です。 詳細については、「 競合する PySpark インストール」を参照してください。 PySpark が既にインストールされているかどうかを確認するには、 show コマンドを実行します。

    # Is PySpark already installed?
    pip3 show pyspark
    
    # Uninstall PySpark
    pip3 uninstall pyspark
    
  2. 仮想環境をアクティブ化したまま、 install コマンドを実行して Databricks Connect クライアントをインストールします。 --upgrade オプションを使用して、既存のクライアント インストールを指定したバージョンにアップグレードします。

    pip3 install --upgrade "databricks-connect==12.2.*"  # Or X.Y.* to match your cluster version.
    

    Databricks では、最新のパッケージがインストールされていることを確認するために、 databricks-connect=X.Yの代わりに databricks-connect==X.Y.* を指定するために "ドット アスタリスク" 表記を追加することをお勧めします。

ステップ 2: 接続プロパティを構成する

  1. 次の構成プロパティを収集します。

    • ワークスペース Databricks URL

    • Databricks の個人用アクセストークン

    • クラスターの ID。 クラスター ID は URL から取得できます。 ここでは、クラスター ID は 0304-201045-hoary804です。

      クラスター ID 2
    • クラスター上で Databricks Connect が接続するポート。 デフォルトのポートは 15001です。

  2. 接続を次のように構成します。

    CLI、SQL 構成、または環境変数を使用できます。 構成方法の優先順位は、SQL 構成キー、CLI、および環境変数です。

    • CLI

      1. databricks-connectを実行します。

        databricks-connect configure
        

        ライセンスには次の情報が表示されます。

        Copyright (2018) Databricks, Inc.
        
        This library (the "Software") may not be used except in connection with the
        Licensee's use of the Databricks Platform Services pursuant to an Agreement
          ...
        
      2. ライセンスを受け入れ、構成値を指定します。 [ Databricks ホスト ] と [Databricks トークン] に、ワークスペースの URL と、手順 1 でメモした個人用アクセストークンを入力します。

        Do you accept the above agreement? [y/N] y
        Set new config values (leave input empty to accept default):
        Databricks Host [no current value, must start with https://]: <databricks-url>
        Databricks Token [no current value]: <databricks-token>
        Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
        Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
        Port [15001]: <port>
        
    • SQL 構成または環境変数。 次の表に、手順 1 でメモした構成プロパティに対応する SQL 構成キーと環境変数を示します。 SQL 構成キーを設定するには、 sql("set config=value")を使用します。 たとえば、 sql("set spark.databricks.service.clusterId=0304-201045-abcdefgh")です。

      パラメーター

      SQL コンフィグ キー

      環境変数名

      Databricks ホスト

      spark.databricks.service.address

      DATABRICKS_ADDRESS

      Databricks トークン

      spark.databricks.サービス.トークン

      DATABRICKS_API_TOKEN

      クラスター ID

      spark.databricks.service.clusterId

      クラスター

      組織 ID

      spark.databricks.service.orgId

      DATABRICKS_ORG_ID

      spark.databricks.service.port

      DATABRICKS_PORT

  3. 仮想環境をアクティブ化したまま、次のように Databricks への接続をテストします。

    databricks-connect test
    

    構成したクラスターが実行されていない場合、テストはクラスターを開始し、構成された自動終了時刻まで実行され続けます。 出力は次のようになります。

    * PySpark is installed at /.../.../pyspark
    * Checking java version
    java version "1.8..."
    Java(TM) SE Runtime Environment (build 1.8...)
    Java HotSpot(TM) 64-Bit Server VM (build 25..., mixed mode)
    * Testing scala command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab..., invalidating prev state
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2...
          /_/
    
    Using Scala version 2.... (Java HotSpot(TM) 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.range(100).reduce(_ + _)
    Spark context Web UI available at https://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
    res0: Long = 4950
    
    scala> :quit
    
    * Testing python command
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    ../../.. ..:..:.. WARN SparkServiceRPCClient: Now tracking server state for 5ab.., invalidating prev state
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    
  4. 接続関連のエラーが表示されない場合(WARN メッセージは正常です)、正常に接続されています。

Databricks 接続を使用する

このセクションでは、Databricks Connect のクライアントを使用するように優先 IDE またはノートブック サーバーを構成する方法について説明します。

JupyterLab

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Databricks Connect を JupyterLab および Python で使用するには、こちらの手順に従ってください。

  1. Python 仮想環境をアクティブにして JupyterLab をインストールするには、ターミナルまたはコマンドプロンプトから次のコマンドを実行します。

    pip3 install jupyterlab
    
  2. ウェブブラウザでJupyterLabを起動するには、アクティブ化されたPython仮想環境から次のコマンドを実行します。

    jupyter lab
    

    JupyterLab が Web ブラウザーに表示されない場合は、仮想環境から localhost または 127.0.0.1 で始まる URL をコピーし、Web ブラウザーのアドレス バーに入力します。

  3. 新しいノートブックを作成する: JupyterLab で、メイン メニューの [ファイル] > [ 新しい> ノートブック ] をクリックし、[ Python 3 (ipykernel)] を選択して [ 選択] をクリックします。

  4. ノートブックの最初のセルに、サンプル コードまたは独自の コード を入力します。 独自のコードを使用する場合は、 コード例に示すように、少なくとも SparkSession.builder.getOrCreate()のインスタンスをインスタンス化する必要があります。

  5. ノートブックを実行するには、[ 実行] > [すべてのセルを実行] をクリックします。

  6. ノートブックをデバッグするには、ノートブックのツール バーの Python 3 (ipykernel) の横にあるバグ (デバッガーを有効にする) アイコンをクリックします。1 つ以上のブレークポイントを設定し、[ 実行] > [すべてのセルを実行] をクリックします。

  7. JupyterLab をシャットダウンするには、[ ファイル] > [シャットダウン] をクリックします。 JupyterLabプロセスがまだターミナルまたはコマンドプロンプトで実行されている場合は、 Ctrl + c を押してこのプロセスを停止し、 y を入力して確認します。

より具体的なデバッグ手順については、「 デバッガー」を参照してください。

Classic Jupyter Notebook

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Databricks Connect の構成スクリプトは、パッケージをプロジェクト構成に自動的に追加します。 Python カーネルの使用を開始するには、次のコマンドを実行します。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

SQL クエリーを実行および視覚化するための %sql 短縮形を有効にするには、次のスニペットを使用します。

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

Visual Studio Code

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Visual Studio Code で Databricks Connect を使用するには、次の操作を行います。

  1. Python 拡張機能 がインストールされていることを確認します。

  2. コマンド パレットを開きます (macOS の場合は Command + Shift + P、Windows/Linux の場合は Ctrl + Shift + P)。

  3. Python インタプリタを選択します。 [ コード>の基本設定] >[設定] に移動し、[ Python 設定] を選択します。

  4. databricks-connect get-jar-dirを実行します。

  5. コマンドから返されたディレクトリを [ python.venvPath] の下の [ユーザー設定] JSON に追加します。 これは Python 構成に追加する必要があります。

  6. リンターを無効にします。 右側の [...] をクリックし 、 JSON 設定を編集します。 変更された設定は次のとおりです。

    VS コードの構成
  7. 仮想環境 (VS Code で Python 用に開発するための推奨される方法) で実行する場合は、コマンド パレットに「 select python interpreter 」と入力し、クラスターの Python バージョン に一致する 環境をポイントします。

    Python インタプリタを選択

    たとえば、クラスターが Python 3.9 の場合、開発環境は Python 3.9 である必要があります。

    Python バージョン

PyCharm

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Databricks Connect の構成スクリプトは、パッケージをプロジェクト構成に自動的に追加します。

Python 3 クラスター

  1. PyCharm プロジェクトを作成するときに、[ 既存のインタプリタ] を選択します。 ドロップダウン メニューから、作成した Conda 環境を選択します (「 要件」を参照)。

    通訳者を選択
  2. [ 実行] > [構成の編集] に移動します。

  3. PYSPARK_PYTHON=python3 環境変数として追加します。

    Python 3 クラスター構成

SparkR および RStudio Desktop

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

SparkR および RStudio Desktop で Databricks Connect を使用するには、次の手順を実行します。

  1. オープンソースの Spark ディストリビューションを開発マシンにダウンロードして解凍します。Databricks クラスター (Hadoop 2.7) と同じバージョンを選択します。

  2. databricks-connect get-jar-dirを実行します。このコマンドは、 /usr/local/lib/python3.5/dist-packages/pyspark/jarsのようなパスを返します。 JAR ディレクトリーのファイル・パスより 上の 1 つのディレクトリー のファイル・パス (例えば、 SPARK_HOME ディレクトリーである /usr/local/lib/python3.5/dist-packages/pysparkなど) をコピーします。

  3. Spark ライブラリ パスと Spark ホームを構成するには、R スクリプトの先頭に追加します。 ステップ 1 でオープンソース Spark パッケージを解凍したディレクトリに <spark-lib-path> を設定します。ステップ 2 から Databricks Connect ディレクトリに <spark-home-path> を設定します。

    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. Spark セッションを開始し、SparkR コマンドの実行を開始します。

    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyrとRStudioデスクトップ

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

プレビュー

この機能はパブリックプレビュー段階です。

Databricks Connect を使用してローカルで開発した sparklyr に依存するコードをコピーし、Databricks ワークスペース内の Databricks ノートブックまたはホストされている RStudio サーバーで、最小限のコード変更またはコード変更なしで実行できます。

必要条件

  • sparklyr 1.2以上。

  • Databricks Runtime 7.3 LTS 以上と、一致するバージョンの Databricks Connect を使用します。

sparklyrのインストール、構成、および使用

  1. RStudio Desktop で、CRAN から sparklyr 1.2 以降をインストールするか、GitHub から最新のマスター バージョンをインストールします。

    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. 正しいバージョンの Databricks Connect がインストールされている状態で Python 環境をアクティブ化し、ターミナルで次のコマンドを実行して <spark-home-path>を取得します。

    databricks-connect get-spark-home
    
  3. Spark セッションを開始し、sparklyr コマンドの実行を開始します。

    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. 接続を閉じます。

    spark_disconnect(sc)
    

リソース

詳細については、sparklyr GitHub README を参照してください。

コード例については、「 sparklyr」を参照してください。

sparklyrとRStudioデスクトップの制限

次の機能はサポートされていません。

  • sparklyr ストリーミング APIs

  • sparklyr 機械学習 APIs

  • ほうき APIs

  • csv_file シリアル化モード

  • スパーク送信

IntelliJ (Scala または Java)

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Databricks Connect を IntelliJ (Scala または Java) で使用するには、次の手順を実行します。

  1. databricks-connect get-jar-dirを実行します。

  2. コマンドから返されたディレクトリへの依存関係を指定します。 「ファイル>プロジェクト構造>モジュール>依存関係」に移動し>JARまたはディレクトリ>「+」記号を付けます。

    インテリJ JAR

    競合を避けるために、クラスパスから他の Spark インストールを削除することを強くお勧めします。 これが不可能な場合は、追加する JAR がクラスパスの先頭にあることを確認してください。 特に、インストールされている他のバージョンの Spark よりも進んでいる必要があります (それ以外の場合は、他の Spark バージョンのいずれかを使用してローカルで実行するか、 ClassDefNotFoundErrorをスローします)。

  3. IntelliJのブレークアウトオプションの設定を確認してください。 既定値は All で、デバッグ用のブレークポイントを設定するとネットワーク タイムアウトが発生します。 バックグラウンド ネットワーク スレッドが停止しないようにするには、[ スレッド ] に設定します。

    インテリJスレッド

PyDev with Eclipse

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Eclipse で Databricks Connect と PyDev を使用するには、次の手順に従います。

  1. エクリプスを起動します。

  2. プロジェクトを作成する: [ファイル] > [新しい>プロジェクト] > [PyDev > PyDev プロジェクト] をクリックし、[ 次へ] をクリックします。

  3. プロジェクト名を指定します。

  4. [ プロジェクト コンテンツ] で、Python 仮想環境へのパスを指定します。

  5. 続行 する前に [通訳を設定してください] をクリックします。

  6. [ 手動構成] をクリックします。

  7. [ 新規] をクリックし> [Python/pypy exe を参照] をクリックします

  8. 仮想環境から参照される Python インタープリターへの完全なパスを参照して選択し、[ 開く] をクリックします。

  9. [ インタープリターの選択 ] ダイアログで、[ OK] をクリックします。

  10. [ 選択が必要 ] ダイアログで、[ OK] をクリックします。

  11. 「環境設定 」ダイアログで、「 適用して閉じる」をクリックします。

  12. [PyDev プロジェクト] ダイアログで、[完了] をクリックします。

  13. パースペクティブを開く」をクリックします。

  14. サンプル コードまたは独自の コード を含む Python コード (.py) ファイルをプロジェクトに追加します。独自のコードを使用する場合は、 コード例に示すように、少なくとも SparkSession.builder.getOrCreate()のインスタンスをインスタンス化する必要があります。

  15. Python コード ファイルを開いた状態で、実行中にコードを一時停止するブレークポイントを設定します。

  16. [ 実行] > [実行 ] または [デバッグ>実行] をクリックします。

より具体的な実行およびデバッグ手順については、「 プログラムの実行」を参照してください。

Eclipse

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Databricks Connect と Eclipse を使用するには、次の手順を実行します。

  1. databricks-connect get-jar-dirを実行します。

  2. 外部 JAR 構成が、コマンドから返されたディレクトリーを指すようにします。 [プロジェクト]メニューに移動し、[プロパティ]>[Javaビルドパス>ライブラリ]>外部jarの追加>に移動します。

    外部 JAR 構成の

    競合を避けるために、クラスパスから他の Spark インストールを削除することを強くお勧めします。 これが不可能な場合は、追加する JAR がクラスパスの先頭にあることを確認してください。 特に、インストールされている他のバージョンの Spark よりも進んでいる必要があります (それ以外の場合は、他の Spark バージョンのいずれかを使用してローカルで実行するか、 ClassDefNotFoundErrorをスローします)。

    エクリプス Spark 構成

SBT

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

SBT で Databricks Connect を使用するには、通常の Spark ライブラリの依存関係ではなく、 Databricks Connect JAR に対してリンクするように build.sbt ファイルを構成する必要があります。 これは、次のビルドファイルの例の unmanagedBase ディレクティブを使用して行いますが、これは com.example.Test メインオブジェクトを持つ Scala アプリを想定しています。

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

Spark シェル

Databricks Connect の使用を開始する前に、 要件を満たし 、Databricks Connect の クライアントを設定する必要があります

Spark シェルと Python または Scala で Databricks Connect を使用するには、次の手順に従います。

  1. 仮想環境をアクティブ化したら、「 クライアントのセットアップ」databricks-connect test コマンドが正常に実行されたことを確認します。

  2. 仮想環境をアクティブ化したら、Spark シェルを起動します。 Python の場合は、 pyspark コマンドを実行します。 Scala の場合は、 spark-shell コマンドを実行します。

    # For Python:
    pyspark
    
    # For Scala:
    spark-shell
    
  3. Spark シェルが表示されます (たとえば、Python の場合)。

    Python 3... (v3...)
    [Clang 6... (clang-6...)] on darwin
    Type "help", "copyright", "credits" or "license" for more information.
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Welcome to
           ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3....
          /_/
    
    Using Python version 3... (v3...)
    Spark context Web UI available at http://...:...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    SparkSession available as 'spark'.
    >>>
    

    Scalaの場合:

    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    ../../.. ..:..:.. WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://...
    Spark context available as 'sc' (master = local[*], app id = local-...).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 3...
          /_/
    
    Using Scala version 2... (OpenJDK 64-Bit Server VM, Java 1.8...)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala>
    
  4. Python または Scala で Spark シェルを使用してクラスターでコマンドを実行する方法については、 Spark シェルを使用した対話型分析 を参照してください。

    組み込みの spark 変数を使用して、稼働中のクラスターの SparkSession を表します (Python の場合など)。

    >>> df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    

    Scalaの場合:

    >>> val df = spark.read.table("samples.nyctaxi.trips")
    >>> df.show(5)
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    | 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
    | 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
    | 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
    | 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
    | 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
    +--------------------+---------------------+-------------+-----------+----------+-----------+
    only showing top 5 rows
    
  5. Spark シェルを停止するには、 Ctrl + d または Ctrl + zキーを押すか、Python の場合はコマンド quit() または exit() 、Scala の場合は :q または :quit コマンドを実行します。

コード例

この簡単なコード例では、指定したテーブルにクエリーを付け、指定したテーブルの最初の 5 行を表示します。 別のテーブルを使用するには、呼び出しを spark.read.tableに調整します。

from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.table("samples.nyctaxi.trips")
df.show(5)

この長いコード例では、次の処理を行います。

  1. メモリ内 DataFrameを作成します。

  2. default スキーマ内に zzz_demo_temps_table 名前のテーブルを作成します。この名前のテーブルが既に存在する場合は、最初にテーブルが削除されます。 別のスキーマまたはテーブルを使用するには、呼び出しを spark.sqltemps.write.saveAsTable、またはその両方に調整します。

  3. DataFrameの内容をテーブルに保存します。

  4. テーブルの内容に対して SELECT クエリーを実行します。

  5. クエリーの結果を表示します。

  6. テーブルを削除します。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date

object Demo {
  def main(args: Array[String]) {
      val spark = SparkSession.builder.master("local").getOrCreate()

      // Create a Spark DataFrame consisting of high and low temperatures
      // by airport code and date.
      val schema = StructType(Array(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      ))

      val data = List(
        Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
        Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
        Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
        Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
        Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
        Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
        Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
        Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
        Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
      )

      val rdd = spark.sparkContext.makeRDD(data)
      val temps = spark.createDataFrame(rdd, schema)

      // Create a table on the Databricks cluster and then fill
      // the table with the DataFrame's contents.
      // If the table already exists from a previous run,
      // delete it first.
      spark.sql("USE default")
      spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
      temps.write.saveAsTable("zzz_demo_temps_table")

      // Query the table on the Databricks cluster, returning rows
      // where the airport code is not BLI and the date is later
      // than 2021-04-01. Group the results and order by high
      // temperature in descending order.
      val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
        "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
        "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
        "ORDER BY TempHighF DESC")
      df_temps.show()

      // Results:
      //
      // +-----------+----------+---------+--------+
      // |AirportCode|      Date|TempHighF|TempLowF|
      // +-----------+----------+---------+--------+
      // |        PDX|2021-04-03|       64|      45|
      // |        PDX|2021-04-02|       61|      41|
      // |        SEA|2021-04-03|       57|      43|
      // |        SEA|2021-04-02|       54|      39|
      // +-----------+----------+---------+--------+

      // Clean up by deleting the table from the Databricks cluster.
      spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;

public class App {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
            .builder()
            .appName("Temps Demo")
            .config("spark.master", "local")
            .getOrCreate();

        // Create a Spark DataFrame consisting of high and low temperatures
        // by airport code and date.
        StructType schema = new StructType(new StructField[] {
            new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
            new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
            new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
        });

        List<Row> dataList = new ArrayList<Row>();
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
        dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
        dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
        dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));

        Dataset<Row> temps = spark.createDataFrame(dataList, schema);

        // Create a table on the Databricks cluster and then fill
        // the table with the DataFrame's contents.
        // If the table already exists from a previous run,
        // delete it first.
        spark.sql("USE default");
        spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table");
        temps.write().saveAsTable("zzz_demo_temps_table");

        // Query the table on the Databricks cluster, returning rows
        // where the airport code is not BLI and the date is later
        // than 2021-04-01. Group the results and order by high
        // temperature in descending order.
        Dataset<Row> df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
            "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
            "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
            "ORDER BY TempHighF DESC");
        df_temps.show();

        // Results:
        //
        // +-----------+----------+---------+--------+
        // |AirportCode|      Date|TempHighF|TempLowF|
        // +-----------+----------+---------+--------+
        // |        PDX|2021-04-03|       64|      45|
        // |        PDX|2021-04-02|       61|      41|
        // |        SEA|2021-04-03|       57|      43|
        // |        SEA|2021-04-02|       54|      39|
        // +-----------+----------+---------+--------+

        // Clean up by deleting the table from the Databricks cluster.
        spark.sql("DROP TABLE zzz_demo_temps_table");
    }
}

依存関係を操作する

通常、メインクラスまたはPythonファイルには、他の依存関係JARとファイルがあります。 このような依存関係 JAR およびファイルを追加するには、 sparkContext.addJar("path-to-the-jar") または sparkContext.addPyFile("path-to-the-file")を呼び出します。 addPyFile() インターフェイスを使用してEggファイルとzipファイルを追加することもできます。IDE でコードを実行するたびに、依存関係 JAR とファイルがクラスターにインストールされます。

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Python + Java UDF

from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column

## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
#  val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}

spark = SparkSession.builder \
  .config("spark.jars", "/path/to/udf.jar") \
  .getOrCreate()
sc = spark.sparkContext

def plus_one_udf(col):
  f = sc._jvm.com.example.Test.plusOne()
  return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()
package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.format("parquet").load("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

ユーティリティDatabricksアクセス

このセクションでは、Databricks Connect を使用して Databricks ユーティリティにアクセスする方法について説明します。

Databricks Utilities (dbutils) リファレンス モジュールの dbutils.fs ユーティリティと dbutils.secrets ユーティリティを使用できます。サポートされているコマンドは、 dbutils.fs.cpdbutils.fs.headdbutils.fs.lsdbutils.fs.mkdirsdbutils.fs.mvdbutils.fs.putdbutils.fs.rmdbutils.secrets.getdbutils.secrets.getBytesdbutils.secrets.listdbutils.secrets.listScopesです。 「ファイル システム ユーティリティ (dbutils.fs)」を参照してください。 または、 dbutils.fs.help() and Secrets ユーティリティ (dbutils.secrets) を実行します。 または、 dbutils.secrets.help()を実行します。

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

Databricks Runtime 7.3 LTS 以降を使用する場合、ローカルと Databricks クラスターの両方で動作する方法で DBUtils モジュールにアクセスするには、次の get_dbutils()を使用します。

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

それ以外の場合は、次の get_dbutils()を使用します。

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]
val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

ローカルファイルシステムとリモートファイルシステム間でのファイルのコピー

dbutils.fs を使用して、クライアントとリモートファイルシステム間でファイルをコピーできます。スキーム file:/ は、クライアント上のローカルファイルシステムを参照します。

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

この方法で転送できる最大ファイルサイズは250MBです。

dbutils.secrets.getを有効にする

セキュリティ上の制限により、 dbutils.secrets.get を呼び出す機能は Default によって無効になっています。 Databricks サポートに連絡して、ワークスペースでこの機能を有効にします。

Hadoop 構成を設定する

クライアントでは、SQL および DataFrame 操作に適用される spark.conf.set API を使用して Hadoop 構成を設定できます。 sparkContext に設定されている Hadoop 構成は、クラスター構成またはノートブックを使用して設定する必要があります。これは、 sparkContext に設定された設定がユーザーセッションに関連付けられておらず、クラスタ全体に適用されるためです。

トラブルシューティング

databricks-connect test を実行して、接続の問題を確認します。このセクションでは、Databricks Connect で発生する可能性があるいくつかの一般的な問題とその解決方法について説明します。

Python のバージョンが一致しません

ローカルで使用している Python のバージョンに、クラスター上のバージョンと少なくとも同じマイナー リリースがあることを確認します (たとえば、 3.9.163.9.15 は問題ありませんが、 3.93.8 は問題ありません)。

複数の Python バージョンがローカルにインストールされている場合は、 PYSPARK_PYTHON 環境変数 ( PYSPARK_PYTHON=python3など) を設定して、Databricks Connect が正しいバージョンを使用していることを確認します。

サーバーが有効になっていない

クラスターで Spark サーバーが spark.databricks.service.server.enabled trueで有効になっていることを確認します。 次の場合は、ドライバー ログに次の行が表示されます。

../../.. ..:..:.. INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
../../.. ..:..:.. INFO SparkContext: Loading Spark Service RPC Server
../../.. ..:..:.. INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
../../.. ..:..:.. INFO Server: jetty-9...
../../.. ..:..:.. INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
../../.. ..:..:.. INFO Server: Started @5879ms

競合する PySpark のインストール

databricks-connect パッケージが PySpark と競合しています。両方をインストールすると、Python で Spark コンテキストを初期化するときにエラーが発生します。 これは、「ストリームが破損している」エラーや「クラスが見つかりません」エラーなど、いくつかの方法で発生する可能性があります。 Python 環境に PySpark がインストールされている場合は、databricks-connect をインストールする前にアンインストールされていることを確認してください。 PySpark をアンインストールした後、Databricks Connect パッケージを完全に再インストールしてください。

pip3 uninstall pyspark
pip3 uninstall databricks-connect
pip3 install --upgrade "databricks-connect==12.2.*"  # or X.Y.* to match your specific cluster version.

競合する SPARK_HOME

以前にコンピューターで Spark を使用したことがある場合は、Databricks Connect Spark ではなく、他のバージョンの Spark のいずれかを使用するように IDE が構成されている可能性があります。 これは、「ストリームが破損している」エラーや「クラスが見つかりません」エラーなど、いくつかの方法で発生する可能性があります。 使用されている Spark のバージョンを確認するには、 SPARK_HOME 環境変数の値を確認します。

import os
print(os.environ['SPARK_HOME'])
println(sys.env.get("SPARK_HOME"))
System.out.println(System.getenv("SPARK_HOME"));

解決

SPARK_HOME がクライアント内のバージョン以外のバージョンの Spark に設定されている場合は、 SPARK_HOME 変数の設定を解除して再試行する必要があります。

IDE 環境変数の設定、 .bashrc.zshrc、または .bash_profile ファイル、およびその他の環境変数が設定される可能性のある場所を確認します。 ほとんどの場合、古い状態をパージするにはIDEを終了して再起動する必要があり、問題が解決しない場合は新しいプロジェクトを作成する必要があるかもしれません。

SPARK_HOME を新しい値に設定する必要はありません。設定を解除するだけで十分です。

バイナリの PATH エントリの競合または欠落

spark-shell のようなコマンドが、Databricks Connect で提供されるバイナリではなく、以前にインストールされた他のバイナリを実行するように PATH が構成されている可能性があります。これにより、 databricks-connect test が失敗する可能性があります。 Databricks Connect バイナリが優先されることを確認するか、以前にインストールしたバイナリを削除する必要があります。

spark-shellのようなコマンドを実行できない場合は、PATHが pip3 install によって自動的に設定されていない可能性があり、インストール bin ディレクトリをPATHに手動で追加する必要があります。これが設定されていない場合でも、IDE で Databricks Connect を使用することは可能です。 ただし、 databricks-connect test コマンドは機能しません。

クラスターでのシリアル化設定の競合

databricks-connect testの実行時に "ストリームが破損しています" というエラーが表示される場合は、互換性のないクラスターのシリアル化構成が原因である可能性があります。たとえば、 spark.io.compression.codec 構成を設定すると、この問題が発生する可能性があります。 この問題を解決するには、クラスター設定からこれらの構成を削除するか、Databricks Connect クライアントで構成を設定することを検討してください。

Windowsで winutils.exe が見つかりません

Windows で Databricks Connect を使用していて、以下を参照してください。

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

指示に従って 、Windows で Hadoop パスを構成します

ファイル名、ディレクトリ名、またはボリューム ラベルの構文が Windows で正しくない

Windows および Databricks Connect を使用している場合は、以下を参照してください。

The filename, directory name, or volume label syntax is incorrect.

Java または Databricks Connect のいずれかが、パスにスペースを含むディレクトリ インストールされました。 これを回避するには、スペースを含まないディレクトリパスにインストールするか、 短い名前の形式を使用してパスを構成します。

制限

  • 構造化ストリーミング。

  • リモート クラスターで Spark ジョブの一部ではない任意のコードを実行する。

  • Delta テーブル操作 ( DeltaTable.forPath など) のネイティブ Scala、Python、および R APIs はサポートされていません。 ただし、 Delta Lake 操作を使用する SQL API (spark.sql(...)) と、 Delta テーブルに対する Spark API ( spark.read.load など) はどちらもサポートされています。

  • にコピーします。

  • サーバーのカタログの一部であるSQL関数、PythonまたはScala UDFを使用します。 ただし、ローカルで導入された Scala と Python UDF は機能します。

  • Apache Zeppelin 0.7.x 以下。

  • テーブルアクセスコントロールを使用したクラスターへの接続。

  • プロセス分離を有効にしたクラスターへの接続 (つまり、 spark.databricks.pyspark.enableProcessIsolationtrueに設定されている場合)。

  • Delta CLONE SQL コマンドを使用します。

  • グローバル一時ビュー。

  • Koalaspyspark.pandas

  • CREATE TABLE table AS SELECT ... SQL コマンドは常に機能するとは限りません。 代わりに、 spark.sql("SELECT ...").write.saveAsTable("table")を使用します。