Databricks ジョブで JAR を使用する
Javaアーカイブまたは JAR ファイル形式は、一般的なZIPファイル形式に基づいており、多くのJavaまたはScalaファイルを1つに集約するために使用されます。 JAR タスクを使用すると、Databricks ジョブに Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次のことを行います。
- サンプル・アプリケーションを定義するJARプロジェクトを作成します。
- サンプルファイルをJARにバンドルします。
- JAR を実行するジョブを作成します。
- ジョブを実行し、結果を表示する。
始める前に
この例を完了するには、次のものが必要です。
- Java JAR の場合は、Java Development Kit (JDK)。
- Scala JAR の場合、JDK と sbt.
ステップ 1: 例のローカル ディレクトリを作成する
サンプルコードと生成されたアーティファクトを保持するローカルディレクトリを作成します (例: . databricks_jar_test
.
ステップ2:JARを作成する
Java または Scala を使用して JAR を作成するには、次の手順を実行します。
Java JAR の作成
-
databricks_jar_test
フォルダから、次の内容のPrintArgs.java
という名前のファイルを作成します。Javaimport java.util.Arrays;
public class PrintArgs {
public static void main(String[] args) {
System.out.println(Arrays.toString(args));
}
} -
PrintArgs.java
ファイルをコンパイルすると、次のファイルが作成されますPrintArgs.class
Bashjavac PrintArgs.java
-
(オプション)コンパイルされたプログラムを実行します。
Bashjava PrintArgs Hello World!
# [Hello, World!] -
PrintArgs.java
ファイルやPrintArgs.class
ファイルと同じフォルダに、META-INF
という名前のフォルダを作成します。 -
META-INF
フォルダーに、次の内容でMANIFEST.MF
という名前のファイルを作成します。このファイルの最後に改行を追加してください。Main-Class: PrintArgs
-
databricks_jar_test
フォルダのルートから、PrintArgs.jar
という名前の JAR を作成します。Bashjar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
-
(オプション)これをテストするには、
databricks_jar_test
フォルダーのルートから JAR を実行します。Bashjava -jar PrintArgs.jar Hello World!
# [Hello, World!]
エラー no main manifest attribute, in PrintArgs.jar
が表示された場合は、 MANIFEST.MF
ファイルの末尾に改行を追加してから、JARの作成と実行を再試行してください。
PrintArgs.jar
ボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。
Scala JAR の作成
-
databricks_jar_test
フォルダから、次の内容を含むbuild.sbt
という名前の空のファイルを作成します。ThisBuild / scalaVersion := "2.12.14"
ThisBuild / organization := "com.example"
lazy val PrintArgs = (project in file("."))
.settings(
name := "PrintArgs"
) -
databricks_jar_test
フォルダーから、フォルダー構造src/main/scala/example
を作成します。 -
example
フォルダーに、次の内容のPrintArgs.scala
という名前のファイルを作成します。Scalapackage example
object PrintArgs {
def main(args: Array[String]): Unit = {
println(args.mkString(", "))
}
} -
プログラムをコンパイルします。
Bashsbt compile
-
(オプション)コンパイルされたプログラムを実行します。
Bashsbt "run Hello World\!"
# Hello, World! -
databricks_jar_test/project
フォルダーに、次の内容のassembly.sbt
という名前のファイルを作成します。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
-
databricks_jar_test
フォルダーのルートからassembly
コマンドを実行すると、target
フォルダーの下に JAR が生成されます。Bashsbt assembly
-
(オプション)これをテストするには、
databricks_jar_test
フォルダーのルートから JAR を実行します。Bashjava -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!
# Hello, World! -
PrintArgs-assembly-0.1.0-SNAPSHOT.jar
ボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。
ステップ3.JAR を実行する Databricks ジョブを作成する
-
Databricks ランディングページに移動し、次のいずれかの操作を行います。
- サイドバーで、「
ワークフロー 」をクリックし、「
」をクリックします。
- サイドバーで、[
新規]をクリックし、メニューから[ジョブ]を選択します。
- サイドバーで、「
-
タスク タブに表示されるタスク ダイアログ ボックスで、[ ジョブの名前を追加 ] をジョブ名 (
JAR example
など) に置き換えます。 -
[タスク名 ] に、タスクの名前を入力します (例:
java_jar_task
の場合は Java、scala_jar_task
は Scalaです)。 -
「タイプ 」で「 JAR」 を選択します。
-
この例では、Main クラス に Javaに
PrintArgs
を、Scalaにexample.PrintArgs
を入力します。 -
クラスター の場合は、互換性のあるクラスターを選択します。Java と Scala のライブラリのサポートを参照してください。
-
[Dependent Library ] で [ + Add ] をクリックします。
-
「 依存ライブラリの追加 」ダイアログで、「 ボリューム 」を選択した状態で、前の手順で JAR をアップロードした場所 (
PrintArgs.jar
またはPrintArgs-assembly-0.1.0-SNAPSHOT.jar
) を 「ボリューム・ファイル・パス 」に入力するか、JAR をフィルタリングまたは参照して見つけます。 それを選択します。 -
[ 追加 ] をクリックします。
-
この例では、[パラメーター ] に「
["Hello", "World!"]
」と入力します。 -
[ 追加 ] をクリックします。
ステップ 4: ジョブを実行し、ジョブの実行詳細を表示する
[ ] をクリックしてワークフローを実行します。 実行の詳細 を表示するには、「 トリガーされた実行 」ポップアップで「 実行の表示 」をクリックするか、 ジョブ実行 ビューで実行の「 開始時刻 」列のリンクをクリックします。
実行が完了すると、タスクに渡された引数を含む出力が [出力 ] パネルに表示されます。
JAR ジョブの出力サイズ制限
ジョブ出力 (stdout に出力されるログ出力など) には、20MB のサイズ制限があります。 合計出力のサイズが大きい場合、実行はキャンセルされ、失敗としてマークされます。
この制限が発生しないようにするには、 spark.databricks.driver.disableScalaOutput
Spark 設定を true
に設定することで、ドライバーから Databricks に stdout が返されないようにすることができます。 デフォルトでは、フラグ値は false
です。 このフラグは、Scala JAR ジョブと Scala ノートブックのセル出力を制御します。 フラグが有効になっている場合、Spark はジョブの実行結果をクライアントに返しません。 このフラグは、クラスターのログ・ファイルに書き込まれるデータには影響しません。 Databricks では、このフラグを設定するとノートブックの結果が無効になるため JAR ジョブのジョブ クラスターにのみ設定することをお勧めします。
推奨事項: 共有 SparkContext
Databricks はマネージドサービスであるため、Apache Spark ジョブを正しく実行するために、一部のコード変更が必要になる場合があります。JAR ジョブ・プログラムは、共有 SparkContext
API を使用して SparkContext
を取得する必要があります。 Databricks は SparkContext
を初期化するため、 new SparkContext()
を呼び出すプログラムは失敗します。 SparkContext
を取得するには、Databricks によって作成された共有SparkContext
のみを使用します。
val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()
共有 SparkContext
を使用する際に避けるべき方法もいくつかあります。
SparkContext.stop()
に電話しないでください。Main
プログラムの最後にSystem.exit(0)
またはsc.stop()
に電話しないでください。これにより、未定義の動作が発生する可能性があります。
推奨事項: ジョブのクリーンアップには try-finally
ブロックを使用します
次の 2 つの部分で構成される JAR について考えてみます。
jobBody()
これには、ジョブの主要部分が含まれています。jobCleanup()
これは、その関数が成功したか例外を返したかに関係なく、jobBody()
後に実行する必要があります。
たとえば、jobBody()
はテーブルを作成し、jobCleanup()
はそれらのテーブルを削除します。
clean-up メソッドが呼び出されるようにする安全な方法は、コードに try-finally
ブロックを配置することです。
try {
jobBody()
} finally {
jobCleanup()
}
sys.addShutdownHook(jobCleanup)
または次のコードを使用してクリーンアップを試み ないでください 。
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)
Spark コンテナーの有効期間が Databricks で管理される方法のため、シャットダウン フックは確実に実行されません。
JAR ジョブ・パラメーターの構成
JARジョブにパラメーターを渡JSON文字列配列を使用します。ジョブ API の Create a new job 操作 (POST /jobs/create
) に渡される要求本文の spark_jar_task
オブジェクトを参照してください。これらのパラメーターにアクセスするには、main
関数に渡された String
配列を調べます。
ライブラリの依存関係の管理
Spark ドライバーには、オーバーライドできない特定のライブラリ依存関係があります。 ジョブで競合するライブラリが追加された場合は、 Spark driver ライブラリの依存関係が優先されます。
ドライバー ライブラリの依存関係の完全な一覧を取得するには、同じ Spark バージョン (または調査するドライバーを含むクラスター) で構成されたクラスターにアタッチされたノートブックで次のコマンドを実行します。
%sh
ls /databricks/jars
JAR のライブラリ依存関係を定義する場合、Databricks では Spark と Hadoop を provided
依存関係としてリストすることをお勧めします。 Maven で、指定された依存関係として Spark と Hadoop を追加します。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
sbt
で、指定された依存関係として Spark と Hadoop を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
実行しているバージョンに基づいて、依存関係の正しい Scala バージョンを指定します。
次のステップ
Databricks ジョブの作成と実行の詳細については、「 Databricks ジョブを使用したオーケストレーション」を参照してください。