メインコンテンツまでスキップ

ユーザー定義のスカラー関数 - Scala

この記事には、Scala ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法、および Spark SQL での部分式の評価順序に関する注意事項を示します。 詳細については、 外部ユーザー定義スカラー関数 (UDF) を参照してください。

要件

  • 標準アクセス モードの Unity カタログ対応コンピュート リソース上のScala UDF には、 Databricks Runtime 14.2 以降が必要です。

  • Unity Catalog 対応クラスター上の Scala UDF の ARM インスタンス サポートには、Databricks Runtime 15.2 以上が必要です。

関数を UDF として登録する

Scala
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)

Spark SQL で UDF を呼び出す

Scala
spark.range(1, 20).createOrReplaceTempView("test")
SQL
%sql select id, square(id) as id_squared from test

UDF と データフレーム の併用

Scala
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))

評価順序と null チェック

Spark SQL ( SQL 、 データフレーム 、データセット APIを含む)は、部分式の評価順序を保証するものではありません。 特に、演算子または関数の入力は、必ずしも左から右に評価されたり、その他の固定された順序で評価されるわけではありません。 たとえば、論理 AND 式と OR 式には、左から右への "短絡" セマンティクスはありません。

したがって、 Boolean 式の副作用や評価順序、 WHERE 句と HAVING 句の順序に依存することは危険です。なぜなら、そのような式や句はクエリの最適化や計画中に並べ替えられる可能性があるためです。 具体的には、UDF が null チェックのために SQL のショートサーキット セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われる保証はありません。例えば

Scala
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee

この WHERE 句は、null をフィルターで除外した後に strlen UDF が呼び出されることを保証するものではありません。

適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。

  • UDF 自体を null 対応にし、UDF 自体の内部で null チェックを行います
  • IF式またはCASE WHEN式を使用してヌルチェックを行い、条件分岐でUDFを呼び出します
Scala
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok

型付きデータセット API

注記

この機能は、 Databricks Runtime 15.4 以降の標準アクセス モードを使用した Unity Catalog 対応クラスターでサポートされています。

型付きデータセット API を使用すると、ユーザー定義関数を使用して、結果のデータセットに対してマップ、フィルター、集約などの変換を実行できます。

たとえば、次の Scala アプリケーションは map() API を使用して、result 列の数値をプレフィックス付きの文字列に変更します。

Scala
spark.range(3).map(f => s"row-$f").show()

この例では map() APIを使用していますが、これは filter()mapPartitions()foreach()foreachPartition()reduce()flatMap() などの他の型付きデータセット APIにも適用されます。

Scala UDF 機能と Databricks Runtime の互換性

次のScala 機能は、標準Databricks Runtime Unity Catalog(共有) アクセス モードで有効なクラスタリングで使用する場合 最小 バージョンが必要です。

機能

Databricks Runtime の最小バージョン

スカラー UDF

Databricks Runtime 14.2

Dataset.mapDataset.mapPartitionsDataset.filterDataset.reduceDataset.flatMap

Databricks Runtime 15.4

KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups

Databricks Runtime 15.4

(ストリーミング) foreachWriter Sink

Databricks Runtime 15.4

(ストリーミング) foreachBatch

Databricks Runtime 16.1

(ストリーミング) KeyValueGroupedDataset.flatMapGroupsWithState

Databricks Runtime 16.2