LakeflowジョブでJARを使用する
Javaアーカイブまたは JAR ファイル形式は、一般的なZIPファイル形式に基づいており、多くのJavaまたはScalaファイルを1つに集約するために使用されます。JARタスクを使用すると、お使いのLakeflowジョブにJavaやScalaコードを迅速かつ確実にインストールできます。この記事では、JAR と、JAR にパッケージ化されたアプリケーションを実行するジョブを作成する例を示します。この例では、次のことを行います。
- サンプル・アプリケーションを定義するJARプロジェクトを作成します。
- サンプルファイルをJARにバンドルします。
- JAR を実行するジョブを作成します。
- ジョブを実行し、結果を表示する。
始める前に
この例を完了するには、次のものが必要です。
- Java JAR の場合は、Java Development Kit (JDK)。
- Scala JAR の場合、JDK と sbt。
ステップ 1: 例のローカル ディレクトリを作成する
サンプルコードと生成されたアーティファクトを保持するローカルディレクトリを作成します (例: . databricks_jar_test.
ステップ2:JARを作成する
Java または Scala を使用して JAR を作成するには、次の手順を実行します。
Java JAR の作成
- 
databricks_jar_testフォルダから、次の内容のPrintArgs.javaという名前のファイルを作成します。Javaimport java.util.Arrays;
 public class PrintArgs {
 public static void main(String[] args) {
 System.out.println(Arrays.toString(args));
 }
 }
- 
PrintArgs.javaファイルをコンパイルすると、次のファイルが作成されますPrintArgs.classBashjavac PrintArgs.java
- 
(オプション)コンパイルされたプログラムを実行します。 Bashjava PrintArgs Hello World!
 # [Hello, World!]
- 
PrintArgs.javaファイルやPrintArgs.classファイルと同じフォルダに、META-INFという名前のフォルダを作成します。
- 
META-INFフォルダーに、次の内容でMANIFEST.MFという名前のファイルを作成します。このファイルの最後に改行を追加してください。Main-Class: PrintArgs
- 
databricks_jar_testフォルダのルートから、PrintArgs.jarという名前の JAR を作成します。Bashjar cvfm PrintArgs.jar META-INF/MANIFEST.MF *.class
- 
(オプション)これをテストするには、 databricks_jar_testフォルダーのルートから JAR を実行します。Bashjava -jar PrintArgs.jar Hello World!
 # [Hello, World!]
エラー no main manifest attribute, in PrintArgs.jarが表示された場合は、 MANIFEST.MF ファイルの末尾に改行を追加してから、JARの作成と実行を再試行してください。
- PrintArgs.jarボリュームにアップロードします。Unity Catalog ボリュームへのファイルのアップロードを参照してください。
Scala JAR の作成
- 
databricks_jar_testフォルダから、次の内容を含むbuild.sbtという名前の空のファイルを作成します。ThisBuild / scalaVersion := "2.12.14"
 ThisBuild / organization := "com.example"
 lazy val PrintArgs = (project in file("."))
 .settings(
 name := "PrintArgs"
 )
- 
databricks_jar_testフォルダーから、フォルダー構造src/main/scala/exampleを作成します。
- 
exampleフォルダーに、次の内容のPrintArgs.scalaという名前のファイルを作成します。Scalapackage example
 object PrintArgs {
 def main(args: Array[String]): Unit = {
 println(args.mkString(", "))
 }
 }
- 
プログラムをコンパイルします。 Bashsbt compile
- 
(オプション)コンパイルされたプログラムを実行します。 Bashsbt "run Hello World\!"
 # Hello, World!
- 
databricks_jar_test/projectフォルダーに、次の内容のassembly.sbtという名前のファイルを作成します。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
- 
databricks_jar_testフォルダーのルートからassemblyコマンドを実行すると、targetフォルダーの下に JAR が生成されます。Bashsbt assembly
- 
(オプション)これをテストするには、 databricks_jar_testフォルダーのルートから JAR を実行します。Bashjava -jar target/scala-2.12/PrintArgs-assembly-0.1.0-SNAPSHOT.jar Hello World!
 # Hello, World!
- 
PrintArgs-assembly-0.1.0-SNAPSHOT.jarボリュームにアップロードします。Unity Catalog ボリュームへのファイルのアップロードを参照してください。
ステップ3.JAR を実行するジョブを作成する
- 
ワークスペースで、サイドバーの ジョブ & パイプライン をクリックします。 
- 
作成 をクリックし、 ジョブ をクリックします。 タスク タブは、空のタスクペインとともに表示されます。 
LakeFlowジョブUI が オン の場合は、 JAR タイルをクリックして最初のタスクを構成します。 JAR タイルが使用できない場合は、「 別のタスク・タイプの追加 」をクリックして「 JAR」 を検索します。
- 
オプションで、ジョブの名前 (デフォルトは New Job <date-time>) をジョブ名に置き換えます。
- 
タスク 名 に、タスクの名前を入力します (例: JAR_example)。
- 
必要に応じて、「 タイプ 」ドロップダウン・メニューから「 JAR 」を選択します。 
- 
この例では、Main クラス に Javaに PrintArgsを、Scalaにexample.PrintArgsを入力します。
- 
コンピュート の場合は、互換性のあるクラスターを選択します。Java および Scala ライブラリのサポートを参照してください。 
- 
依存ライブラリ で + 追加 をクリックします。 
- 
依存ライブラリの追加 ダイアログで、 ボリューム を選択した状態で、前の手順で JAR をアップロードした場所 ( PrintArgs.jarまたはPrintArgs-assembly-0.1.0-SNAPSHOT.jar) を ボリューム・ファイル・パス に入力するか、JAR をフィルタリングまたは参照して見つけます。 それを選択します。
- 
[ 追加 ] をクリックします。 
- 
この例では、 パラメーター に ["Hello", "World!"]と入力します。
- 
「 タスクを作成 」をクリックします。 
ステップ 4: ジョブを実行し、ジョブの実行詳細を表示する
をクリックしてワークフローを実行します。 実行の詳細 を表示するには、 トリガーされた実行 ポップアップで 実行の表示 をクリックするか、ジョブの実行ビューで実行の 開始時刻 列のリンクをクリックします。
実行が完了すると、タスクに渡された引数を含む出力が 出力 パネルに表示されます。
JAR ジョブの出力サイズ制限
ジョブ出力 (stdout に出力されるログ出力など) には、20MB のサイズ制限があります。 合計出力のサイズが大きい場合、実行はキャンセルされ、失敗としてマークされます。
この制限が発生しないようにするには、 spark.databricks.driver.disableScalaOutput Spark 設定を trueに設定することで、ドライバーから Databricks に stdout が返されないようにすることができます。デフォルトでは、フラグ値は falseです。このフラグは、Scala JAR ジョブと Scala ノートブックのセル出力を制御します。フラグが有効になっている場合、Spark はジョブの実行結果をクライアントに返しません。このフラグは、クラスターのログ・ファイルに書き込まれるデータには影響しません。 Databricks では、このフラグを設定するとノートブックの結果が無効になるため JAR ジョブのジョブ クラスターにのみ設定することをお勧めします。
推奨事項: 共有 SparkContext
Databricks はマネージドサービスであるため、Apache Spark ジョブを正しく実行するために、一部のコード変更が必要になる場合があります。JAR ジョブ・プログラムは、共有 SparkContext API を使用して SparkContextを取得する必要があります。 Databricks は SparkContextを初期化するため、 new SparkContext() を呼び出すプログラムは失敗します。 SparkContextを取得するには、Databricks によって作成された共有SparkContextのみを使用します。
val goodSparkContext = SparkContext.getOrCreate()
val goodSparkSession = SparkSession.builder().getOrCreate()
共有 SparkContextを使用する際に避けるべき方法もいくつかあります。
- SparkContext.stop()に電話しないでください。
- Mainプログラムの最後に- System.exit(0)または- sc.stop()に電話しないでください。これにより、未定義の動作が発生する可能性があります。
推奨事項: ジョブのクリーンアップには try-finally ブロックを使用します
次の 2 つの部分で構成される JAR について考えてみます。
- jobBody()これには、ジョブの主要部分が含まれています。
- jobCleanup()これは、その関数が成功したか例外を返したかに関係なく、- jobBody()後に実行する必要があります。
たとえば、jobBody() はテーブルを作成し、jobCleanup() はそれらのテーブルを削除します。
クリーンアップメソッドが呼び出されるようにする安全な方法は、コードに try-finally ブロックを配置することです。
try {
  jobBody()
} finally {
  jobCleanup()
}
sys.addShutdownHook(jobCleanup) または次のコードを使用してクリーンアップを試み ないでください 。
val cleanupThread = new Thread { override def run = jobCleanup() }
Runtime.getRuntime.addShutdownHook(cleanupThread)
Spark コンテナーの有効期間が Databricks で管理される方法のため、シャットダウン フックは確実に実行されません。
JAR ジョブ・パラメーターの構成
JARジョブにパラメーターを渡JSON文字列配列を使用します。ジョブ API の Create a new job 操作 (POST /jobs/create) に渡される要求本文の spark_jar_task オブジェクトを参照してください。これらのパラメーターにアクセスするには、main 関数に渡された String 配列を調べます。
ライブラリの依存関係の管理
Spark ドライバーには、オーバーライドできない特定のライブラリ依存関係があります。 ジョブで競合するライブラリが追加された場合は、 Spark ドライバー ライブラリの依存関係が優先されます。
ドライバー ライブラリの依存関係の完全な一覧を取得するには、同じ Spark バージョン (または調査するドライバーを含むクラスター) で構成されたクラスターにアタッチされたノートブックで次のコマンドを実行します。
%sh
ls /databricks/jars
JAR のライブラリ依存関係を定義する場合、Databricks では Spark と Hadoop を provided 依存関係としてリストすることをお勧めします。 Maven で、指定された依存関係として Spark と Hadoop を追加します。
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.3.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.2.1</version>
  <scope>provided</scope>
</dependency>
sbtで、指定された依存関係として Spark と Hadoop を追加します。
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" % "provided"
libraryDependencies += "org.apache.hadoop" %% "hadoop-core" % "1.2.1" % "provided"
実行しているバージョンに基づいて、依存関係の正しい Scala バージョンを指定します。
次のステップ
ジョブの作成と実行の詳細については、Lakeflowジョブを参照してください。