メインコンテンツまでスキップ

Databricks ジョブで JAR を使用する

Javaアーカイブまたは JAR ファイル形式は、一般的なZIPファイル形式に基づいており、多くのJavaまたはScalaファイルを1つに集約するために使用されます。 JAR タスクを使用すると、Databricks ジョブに Java または Scala コードを迅速かつ確実にインストールできます。 この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。 この例では、次のことを行います。

  • サンプル・アプリケーションを定義するJARプロジェクトを作成します。
  • サンプルファイルをJARにバンドルします。
  • JAR を実行するジョブを作成します。
  • ジョブを実行し、結果を表示する。

始める前に

この例を完了するには、次のものが必要です。

  • Java JAR の場合は、Java Development Kit (JDK)。
  • Scala JAR の場合、JDK と sbt.

ステップ 1: 例のローカル ディレクトリを作成する

サンプルコードと生成されたアーティファクトを保持するローカルディレクトリを作成します (例: . databricks_jar_test.

ステップ2:JARを作成する

Java または Scala を使用して JAR を作成するには、次の手順を実行します。

Java JAR の作成

  1. databricks_jar_test フォルダから、次の内容の PrintArgs.java という名前のファイルを作成します。

    Java
    import java.util.Arrays;

    public class PrintArgs {
    public static void main(String[] args) {
    System.out.println(Arrays.toString(args));
    }
    }
  2. PrintArgs.javaファイルをコンパイルすると、次のファイルが作成されますPrintArgs.class

    Bash
    javac PrintArgs.java
  3. (オプション)コンパイルされたプログラムを実行します。

    Bash
    java PrintArgs Hello World!

    # [Hello, World!]
  4. PrintArgs.java ファイルや PrintArgs.class ファイルと同じフォルダに、META-INFという名前のフォルダを作成します。

  5. META-INF フォルダーに、次の内容で MANIFEST.MF という名前のファイルを作成します。このファイルの最後に改行を追加してください。

    Main-Class: PrintArgs
  6. databricks_jar_test フォルダのルートから、PrintArgs.jarという名前の JAR を作成します。

    Bash
    jar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
  7. (オプション)これをテストするには、 databricks_jar_test フォルダーのルートから JAR を実行します。

    Bash
    java -jar PrintArgs.jar Hello World!

    # [Hello, World!]
注記

エラー no main manifest attribute, in PrintArgs.jarが表示された場合は、 MANIFEST.MF ファイルの末尾に改行を追加してから、JARの作成と実行を再試行してください。

  1. PrintArgs.jarボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。

Scala JAR の作成

  1. databricks_jar_test フォルダから、次の内容を含む build.sbt という名前の空のファイルを作成します。

    ThisBuild / scalaVersion := "2.12.14"
    ThisBuild / organization := "com.example"

    lazy val PrintArgs = (project in file("."))
    .settings(
    name := "PrintArgs"
    )
  2. databricks_jar_test フォルダーから、フォルダー構造 src/main/scala/example を作成します。

  3. example フォルダーに、次の内容の PrintArgs.scala という名前のファイルを作成します。

    Scala
    package example

    object PrintArgs {
    def main(args: Array[String]): Unit = {
    println(args.mkString(", "))
    }
    }
  4. プログラムをコンパイルします。

    Bash
    sbt compile
  5. (オプション)コンパイルされたプログラムを実行します。

    Bash
    sbt "run Hello World\!"

    # Hello, World!
  6. databricks_jar_test/project フォルダーに、次の内容の assembly.sbt という名前のファイルを作成します。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
  7. databricks_jar_test フォルダーのルートから assembly コマンドを実行すると、target フォルダーの下に JAR が生成されます。

    Bash
    sbt assembly
  8. (オプション)これをテストするには、 databricks_jar_test フォルダーのルートから JAR を実行します。

    Bash
    java -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!

    # Hello, World!
  9. PrintArgs-assembly-0.1.0-SNAPSHOT.jarボリュームにアップロードします。「Unity Catalog ボリュームへのファイルのアップロード」を参照してください。

ステップ3.JAR を実行する Databricks ジョブを作成する

  1. Databricks ランディングページに移動し、次のいずれかの操作を行います。

    • サイドバーで、「ワークフローアイコン ワークフロー 」をクリックし、「 「ジョブを作成」ボタン」をクリックします。
    • サイドバーで、[新しいアイコン新規]をクリックし、メニューから[ジョブ]を選択します。
  2. タスク タブに表示されるタスク ダイアログ ボックスで、[ ジョブの名前を追加 ] をジョブ名 (JAR exampleなど) に置き換えます。

  3. [タスク名 ] に、タスクの名前を入力します (例: java_jar_task の場合は Java、scala_jar_task は Scalaです)。

  4. 「タイプ 」で「 JAR」 を選択します。

  5. この例では、Main クラス に Javaに PrintArgs を、Scalaに example.PrintArgs を入力します。

  6. クラスター の場合は、互換性のあるクラスターを選択します。Java と Scala のライブラリのサポートを参照してください。

  7. [Dependent Library ] で [ + Add ] をクリックします。

  8. 依存ライブラリの追加 」ダイアログで、「 ボリューム 」を選択した状態で、前の手順で JAR をアップロードした場所 (PrintArgs.jar または PrintArgs-assembly-0.1.0-SNAPSHOT.jar) を 「ボリューム・ファイル・パス 」に入力するか、JAR をフィルタリングまたは参照して見つけます。 それを選択します。

  9. [ 追加 ] をクリックします。

  10. この例では、[パラメーター ] に「["Hello", "World!"]」と入力します。

  11. [ 追加 ] をクリックします。

ステップ 4: ジョブを実行し、ジョブの実行詳細を表示する

[ 「今すぐ実行」ボタン ] をクリックしてワークフローを実行します。 実行の詳細 を表示するには、「 トリガーされた実行 」ポップアップで「 実行の表示 」をクリックするか、 ジョブ実行 ビューで実行の「 開始時刻 」列のリンクをクリックします。

実行が完了すると、タスクに渡された引数を含む出力が [出力 ] パネルに表示されます。

JAR ジョブの出力サイズ制限

ジョブ出力 (stdout に出力されるログ出力など) には、20MB のサイズ制限があります。 合計出力のサイズが大きい場合、実行はキャンセルされ、失敗としてマークされます。

この制限が発生しないようにするには、 spark.databricks.driver.disableScalaOutput Spark 設定を trueに設定することで、ドライバーから Databricks に stdout が返されないようにすることができます。 デフォルトでは、フラグ値は falseです。 このフラグは、Scala JAR ジョブと Scala ノートブックのセル出力を制御します。 フラグが有効になっている場合、Spark はジョブの実行結果をクライアントに返しません。 このフラグは、クラスターのログ・ファイルに書き込まれるデータには影響しません。 Databricks では、このフラグを設定するとノートブックの結果が無効になるため JAR ジョブのジョブ クラスターにのみ設定することをお勧めします。

推奨事項: 共有 SparkContext

Databricks はマネージドサービスであるため、Apache Spark ジョブを正しく実行するために、一部のコード変更が必要になる場合があります。JAR ジョブ・プログラムは、共有 SparkContext API を使用して SparkContextを取得する必要があります。 Databricks は SparkContextを初期化するため、 new SparkContext() を呼び出すプログラムは失敗します。 SparkContextを取得するには、Databricks によって作成された共有SparkContextのみを使用します。

Scala
val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()

共有 SparkContextを使用する際に避けるべき方法もいくつかあります。

  • SparkContext.stop()に電話しないでください。
  • Mainプログラムの最後にSystem.exit(0)またはsc.stop()に電話しないでください。これにより、未定義の動作が発生する可能性があります。

推奨事項: ジョブのクリーンアップには try-finally ブロックを使用します

次の 2 つの部分で構成される JAR について考えてみます。

  • jobBody() これには、ジョブの主要部分が含まれています。
  • jobCleanup() これは、その関数が成功したか例外を返したかに関係なく、 jobBody()後に実行する必要があります。

たとえば、jobBody() はテーブルを作成し、jobCleanup() はそれらのテーブルを削除します。

clean-up メソッドが呼び出されるようにする安全な方法は、コードに try-finally ブロックを配置することです。

Scala
try {
jobBody()
} finally {
jobCleanup()
}

sys.addShutdownHook(jobCleanup) または次のコードを使用してクリーンアップを試み ないでください

Scala
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)

Spark コンテナーの有効期間が Databricks で管理される方法のため、シャットダウン フックは確実に実行されません。

JAR ジョブ・パラメーターの構成

JARジョブにパラメーターを渡JSON文字列配列を使用します。ジョブ API の Create a new job 操作 (POST /jobs/create) に渡される要求本文の spark_jar_task オブジェクトを参照してください。これらのパラメーターにアクセスするには、main 関数に渡された String 配列を調べます。

ライブラリの依存関係の管理

Spark ドライバーには、オーバーライドできない特定のライブラリ依存関係があります。 ジョブで競合するライブラリが追加された場合は、 Spark driver ライブラリの依存関係が優先されます。

ドライバー ライブラリの依存関係の完全な一覧を取得するには、同じ Spark バージョン (または調査するドライバーを含むクラスター) で構成されたクラスターにアタッチされたノートブックで次のコマンドを実行します。

Bash
%sh
ls /databricks/jars

JAR のライブラリ依存関係を定義する場合、Databricks では Spark と Hadoop を provided 依存関係としてリストすることをお勧めします。 Maven で、指定された依存関係として Spark と Hadoop を追加します。

XML
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>

sbtで、指定された依存関係として Spark と Hadoop を追加します。

Scala
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
ヒント

実行しているバージョンに基づいて、依存関係の正しい Scala バージョンを指定します。

次のステップ

Databricks ジョブの作成と実行の詳細については、「 Databricks ジョブを使用したオーケストレーション」を参照してください。