Databricks Connect for Python のテスト
注:
この記事では、Databricks Runtime 13.3 LTS 以降の Databricks Connect について説明します。
この記事では、Databricks Runtime 13.3 LTS 以降の Databricks Connect でpytest
を使用してテストを実行する方法について説明します。 Databricks Connectの詳細については、 Databricks Connect for Pythonを参照してください。
この情報は、Databricks Connect for Python が既にインストールされていることを前提としています。 「Databricks Connect for Python のインストール」を参照してください。
リモート Databricks ワークスペース内のクラスターへの接続を必要としないローカル コードでpytestを実行できます。 たとえば、 pytest
を使用して、ローカル メモリ内の PySpark DataFrame
オブジェクトを受け入れて返す関数をテストできます。 pytest
を使い始めてローカルで実行するには、 pytest
ドキュメントの「はじめに」を参照してください。
たとえば、SparkSession
インスタンスを返す get_spark
関数と、samples
カタログのnyctaxi
スキーマ内のtrips
テーブルを表すDataFrame
を返す get_nyctaxi_trips
関数を含む nyctaxi_functions.py
という名前の次のファイルがあるとします。
nyctaxi_functions.py
:
from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
def get_spark() -> SparkSession:
spark = DatabricksSession.builder.getOrCreate()
return spark
def get_nyctaxi_trips() -> DataFrame:
spark = get_spark()
df = spark.read.table("samples.nyctaxi.trips")
return df
そして、これらのget_spark
関数とget_nyctaxi_trips
関数を呼び出すmain.py
という名前の次のファイルがあるとします。
main.py
:
from nyctaxi_functions import *
df = get_nyctaxi_trips()
df.show(5)
次の test_nyctaxi_functions.py
という名前のファイルは、 get_spark
関数が SparkSession
インスタンスを返すかどうか、および get_nyctaxi_trips
関数が少なくとも 1 行のデータを含む DataFrame
を返すかどうかをテストします。
test_nyctaxi_functions.py
:
import pyspark.sql.connect.session
from nyctaxi_functions import *
def test_get_spark():
spark = get_spark()
assert isinstance(spark, pyspark.sql.connect.session.SparkSession)
def test_get_nyctaxi_trips():
df = get_nyctaxi_trips()
assert df.count() > 0
これらのテストを実行するには、コード プロジェクトのルートからpytest
コマンドを実行します。これにより、次のようなテスト結果が生成されます。
$ pytest
=================== test session starts ====================
platform darwin -- Python 3.11.7, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 2 items
test_nyctaxi_functions.py .. [100%]
======================== 2 passed ==========================