Databricks Connect for Python のコード例
注
この記事では、 Databricks Runtime 13.0 以降の Databricks Connect について説明します。
この記事では、Databricks Connect for Python を使用するコード例を示します。 Databricks Connect を使用すると、一般的な IDE、ノートブック サーバー、およびカスタム アプリケーションを Databricks クラスターに接続できます。 「Databricks Connect とは」を参照してください。この記事の Scala バージョンについては、「 Databricks Connect for Scala のコード例」を参照してください。
注
Databricks Connectの使用を開始する前に、Databricks Connect クライアントをセットアップする必要があります。
Databricks には、Databricks Connect の使用方法を示す追加のサンプル アプリケーションがいくつか用意されています。 具体的には、GitHub の Databricks Connect リポジトリのサンプル アプリケーション を参照してください。
次の簡単なコード例を使用して、Databricks Connect でエクスペリメントすることもできます。 これらの例では、Databricks Connect クライアントのセットアップに 既定の認証 を使用していることを前提としています。
この簡単なコード例では、指定したテーブルをクエリーし、指定したテーブルの最初の 5 行を表示します。 別のテーブルを使用するには、コールを spark.read.table
に調整します。
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
df = spark.read.table("samples.nyctaxi.trips")
df.show(5)
この長いコード例では、次の処理を行います。
メモリ内 DataFrameを作成します。
default
スキーマ内にzzz_demo_temps_table
という名前のテーブルを作成します。この名前のテーブルが既に存在する場合は、最初にテーブルが削除されます。 別のスキーマまたはテーブルを使用するには、呼び出しをspark.sql
、temps.write.saveAsTable
、またはその両方に調整します。DataFrameの内容をテーブルに保存します。
テーブルの内容に対して
SELECT
クエリーを実行します。クエリーの結果を表示します。
テーブルを削除します。
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from datetime import date
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS zzz_demo_temps_table')
temps.write.saveAsTable('zzz_demo_temps_table')
# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE zzz_demo_temps_table')
注
次の例では、 DatabricksSession
クラスが使用できない環境で Databricks Connect for Databricks Runtime 13.3 LTS 以降の間で移植可能なコードを作成する方法を説明します。
次の例では、DatabricksSession
クラスを使用するか、DatabricksSession
クラスが使用できない場合は SparkSession
クラスを使用して、指定したテーブルをクエリし、最初の 5 行を返します。この例では、認証にSPARK_REMOTE
環境変数を使用します。
from pyspark.sql import SparkSession, DataFrame
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
get_taxis(get_spark()).show(5)