Databricks Connect for Scala のユーザー定義関数
この記事では、Databricks Runtime 14.1 以降の Databricks Connect について説明します。
この記事では、Databricks Connect for Scala でユーザー定義関数を実行する方法について説明します。 Databricks Connect を使用すると、一般的な IDEs、ノートブック サーバー、およびカスタム アプリケーションを Databricks クラスターに接続できます。 この記事の Python バージョンについては、「 Databricks Connect for Python のユーザー定義関数」を参照してください。
Databricks Connect の使用を開始する前に、 Databricks Connect クライアントを設定する必要があります。
Databricks Runtime 14.1 以降の場合、Databricks Connect for Scala はユーザー定義関数 (UDF) の実行をサポートします。
UDFを実行するには、UDFに必要なコンパイル済みクラスとJARをクラスターにアップロードする必要があります。addCompiledArtifacts()
API を使用して、アップロードする必要があるコンパイル済みクラスと JAR ファイルを指定できます。
ScalaScalaDatabricksクライアントが使用する は、 クラスターの バージョンと一致する必要があります。クラスターの バージョンを確認するには、ScalaDatabricks Runtime Databricks Runtimeリリースノートのバージョンと互換性の クラスターの バージョンの「システム環境」セクションを参照してください。
次の Scala プログラムは、列の値を 2 乗する単純な UDF を設定します。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
前の例では、UDF が Main
内に完全に含まれているため、 Main
のコンパイル済みアーティファクトのみが追加されます。UDF が他のクラスにまたがっている場合、または外部ライブラリ (つまり JAR) を使用している場合、これらのライブラリもすべて
含ま。
Spark セッションがすでに初期化されている場合は、
spark.addArtifact()
API。
JAR をアップロードするときは、すべての推移的な依存関係 JAR をアップロードに含める必要があります。 APIsは、推移的な依存関係の自動検出を実行しません。
Typed データセット APIs
前のセクションで UDF について説明したのと同じメカニズムが、型付きデータセット APIsにも適用されます。
型付きデータセット APIs 、結果のデータセットに対してマップ、フィルター、集計などの変換を実行できます。 これらもまた Databricks クラスターの UDF と同様に実行されます。
次の Scala アプリケーションは、 map()
API を使用して、result カラムの数値をプレフィックス付きの文字列に変更します。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
この例では map()
APIを使用していますが、これは filter()
、mapPartitions()
などの他の型付きデータセット APIs にも適用されます。