メインコンテンツまでスキップ

Spark Submit タスクの廃止通知と移行ガイド

警告

Spark Submit タスクは非推奨であり、削除が保留中です。このタスク タイプの使用は、新しいユース ケースでは許可されておらず、既存の顧客に対しては強く推奨されません。このタスク タイプの元のドキュメントについては、「Spark Submit (レガシー)」を参照してください。移行手順については、読み続けてください。

Spark Submit が廃止されるのはなぜですか?

Spark Submit タスク タイプは、技術的な制限と、 JARノートブック、またはPythonスクリプトタスクにはない機能ギャップのため、非推奨になります。 これらのタスクにより、Databricks 機能との統合が向上し、パフォーマンスと信頼性が向上します。

廃止措置

Databricks は廃止に関連して以下の対策を実施しています。

  • 作成の制限 : 2025 年 11 月以降、前月に Spark Submit タスクを使用したユーザーのみが新しい Spark Submit タスクを作成できます。例外が必要な場合は、アカウント サポートにお問い合わせください。
  • DBR バージョンの制限 : Spark Submit の 使用は、既存の DBR バージョンとメンテナンス リリースに制限されます。 Spark Submit を 備えた既存の DBR バージョンでは、機能が完全にシャットダウンされるまで、セキュリティおよびバグ修正のメンテナンス リリースが引き続き提供されます。DBR 17.3+ および 18.x+ ではこのタスク タイプはサポートされません。
  • UI 警告 : Spark Submit タスクが使用されている Databricks UI 全体に警告が表示され、既存のユーザーのアカウントのワークスペース管理者に通信が送信されます。

JVM ワークロードを JAR タスクに移行する

JVM ワークロードの場合、 Spark Submit タスクを JAR タスクに移行します。JAR タスクは、より優れた機能サポートと Databricks との統合を提供します。

移行するには、次のステップに従ってください。

  1. ジョブに新しい JAR タスクを作成します。
  2. Spark Submit タスクのパラメーターから、最初の 3 つの引数を特定します。 一般的には次のパターンに従います。 ["--class", "org.apache.spark.mainClassName", "dbfs:/path/to/jar_file.jar"]
  3. --classパラメーターを削除します。
  4. メインクラス名 (たとえば、 org.apache.spark.mainClassName ) を JAR タスクの メインクラス として設定します。
  5. JAR タスク構成で JAR ファイルへのパス (例: dbfs:/path/to/jar_file.jar ) を指定します。
  6. Spark Submit タスクの残りの引数を JAR タスク パラメーターにコピーします。
  7. JAR タスクを実行し、期待どおりに動作することを確認します。

JARタスクの構成の詳細については、 JARタスク」を参照してください。

Rワークロードの移行

Spark Submit タスクから R スクリプトを直接起動する場合は、複数の移行パスが利用できます。

オプションA: ノートブックタスクを使用する

R スクリプトを Databricks ノートブックに移行します。ノートブックタスクは、クラスターオートスケールを含む完全な機能セットをサポートし、 Databricksプラットフォームとの統合を強化します。

オプションB: ノートブックタスクからRスクリプトをブートストラップする

ノートブックタスクを使用して R スクリプトをブートストラップします。次のコードでノートブックを作成し、R ファイルをジョブパラメーターとして参照します。 必要に応じて、R スクリプトで使用されるパラメーターを追加するように変更します。

R
dbutils.widgets.text("script_path", "", "Path to script")
script_path <- dbutils.widgets.get("script_path")
source(script_path)

Spark Submit タスクを使用するジョブを検索する

次の Python スクリプトを使用して、ワークスペース内の Spark Submit タスクを含むジョブを識別できます。有効な個人アクセス トークンまたはその他のトークンが必要となり、ワークスペース URL を使用する必要があります。

オプション A: 高速スキャン (最初にこれを実行、永続ジョブのみ)

このスクリプトは永続ジョブ( /jobs/createまたは Web インターフェース経由で作成されたもの)のみをスキャンし、 /runs/submit経由で作成された一時ジョブはスキャンしません。これは、はるかに高速であるため、Spark Submit の使用状況を識別するための第一線の方法として推奨されます。

Python
#!/usr/bin/env python3
"""
Requirements:
databricks-sdk>=0.20.0

Usage:
export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-token"
python3 list_spark_submit_jobs.py

Output:
CSV format with columns: Job ID, Owner ID/Email, Job Name

Incorrect:
export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
# Get credentials from environment
workspace_url = os.environ.get("DATABRICKS_HOST")
token = os.environ.get("DATABRICKS_TOKEN")

if not workspace_url or not token:
print(
"Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
file=sys.stderr,
)
sys.exit(1)

# Initialize client
client = WorkspaceClient(host=workspace_url, token=token)

# Scan workspace for persistent jobs with Spark Submit tasks
# Using list() to scan only persistent jobs (faster than list_runs())
print(
"Scanning workspace for persistent jobs with Spark Submit tasks...",
file=sys.stderr,
)
jobs_with_spark_submit = []
total_jobs = 0

# Iterate through all jobs (pagination is handled automatically by the SDK)
skipped_jobs = 0
for job in client.jobs.list(expand_tasks=True, limit=25):
try:
total_jobs += 1
if total_jobs % 1000 == 0:
print(f"Scanned {total_jobs} jobs total", file=sys.stderr)

# Check if job has any Spark Submit tasks
if job.settings and job.settings.tasks:
has_spark_submit = any(
task.spark_submit_task is not None for task in job.settings.tasks
)

if has_spark_submit:
# Extract job information
job_id = job.job_id
owner_email = job.creator_user_name or "Unknown"
job_name = job.settings.name or f"Job {job_id}"

jobs_with_spark_submit.append(
{"job_id": job_id, "owner_email": owner_email, "job_name": job_name}
)
except PermissionDenied:
# Skip jobs that the user doesn't have permission to access
skipped_jobs += 1
continue

# Print summary to stderr
print(f"Scanned {total_jobs} jobs total", file=sys.stderr)
if skipped_jobs > 0:
print(
f"Skipped {skipped_jobs} jobs due to insufficient permissions",
file=sys.stderr,
)
print(
f"Found {len(jobs_with_spark_submit)} jobs with Spark Submit tasks",
file=sys.stderr,
)
print("", file=sys.stderr)

# Output CSV to stdout
if jobs_with_spark_submit:
writer = csv.DictWriter(
sys.stdout,
fieldnames=["job_id", "owner_email", "job_name"],
quoting=csv.QUOTE_MINIMAL,
)
writer.writeheader()
writer.writerows(jobs_with_spark_submit)
else:
print("No jobs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
main()

オプション B: 包括的なスキャン (遅い、過去 30 日間の一時ジョブを含む)

/runs/submit経由で作成された一時ジョブを識別する必要がある場合は、このより包括的なスクリプトを使用します。このスクリプトは、永続的なジョブ ( /jobs/createによって作成された) と一時的なジョブの両方を含む、ワークスペース内の過去 30 日間のすべてのジョブ実行をスキャンします。 このスクリプトは、大規模なワークスペースでは実行に数時間かかる場合があります。

Python
#!/usr/bin/env python3
"""
Requirements:
databricks-sdk>=0.20.0

Usage:
export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
export DATABRICKS_TOKEN="your-token"
python3 list_spark_submit_runs.py

Output:
CSV format with columns: Job ID, Run ID, Owner ID/Email, Job/Run Name

Incorrect:
export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
# Get credentials from environment
workspace_url = os.environ.get("DATABRICKS_HOST")
token = os.environ.get("DATABRICKS_TOKEN")

if not workspace_url or not token:
print(
"Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
file=sys.stderr,
)
sys.exit(1)

# Initialize client
client = WorkspaceClient(host=workspace_url, token=token)

thirty_days_ago_ms = int((time.time() - 30 * 24 * 60 * 60) * 1000)

# Scan workspace for runs with Spark Submit tasks
# Using list_runs() instead of list() to include ephemeral jobs created via /runs/submit
print(
"Scanning workspace for runs with Spark Submit tasks from the last 30 days... (this will take more than an hour in large workspaces)",
file=sys.stderr,
)
runs_with_spark_submit = []
total_runs = 0
seen_job_ids = set()

# Iterate through all runs (pagination is handled automatically by the SDK)
skipped_runs = 0
for run in client.jobs.list_runs(
expand_tasks=True,
limit=25,
completed_only=True,
start_time_from=thirty_days_ago_ms,
):
try:
total_runs += 1
if total_runs % 1000 == 0:
print(f"Scanned {total_runs} runs total", file=sys.stderr)

# Check if run has any Spark Submit tasks
if run.tasks:
has_spark_submit = any(
task.spark_submit_task is not None for task in run.tasks
)

if has_spark_submit:
# Extract job information from the run
job_id = run.job_id if run.job_id else "N/A"
run_id = run.run_id if run.run_id else "N/A"
owner_email = run.creator_user_name or "Unknown"
# Use run name if available, otherwise try to construct a name
run_name = run.run_name or (
f"Run {run_id}" if run_id != "N/A" else "Unnamed Run"
)

# Track unique job IDs to avoid duplicates for persistent jobs
# (ephemeral jobs may have the same job_id across multiple runs)
key = (job_id, run_id)
if key not in seen_job_ids:
seen_job_ids.add(key)
runs_with_spark_submit.append(
{
"job_id": job_id,
"run_id": run_id,
"owner_email": owner_email,
"job_name": run_name,
}
)
except PermissionDenied:
# Skip runs that the user doesn't have permission to access
skipped_runs += 1
continue

# Print summary to stderr
print(f"Scanned {total_runs} runs total", file=sys.stderr)
if skipped_runs > 0:
print(
f"Skipped {skipped_runs} runs due to insufficient permissions",
file=sys.stderr,
)
print(
f"Found {len(runs_with_spark_submit)} runs with Spark Submit tasks",
file=sys.stderr,
)
print("", file=sys.stderr)

# Output CSV to stdout
if runs_with_spark_submit:
writer = csv.DictWriter(
sys.stdout,
fieldnames=["job_id", "run_id", "owner_email", "job_name"],
quoting=csv.QUOTE_MINIMAL,
)
writer.writeheader()
writer.writerows(runs_with_spark_submit)
else:
print("No runs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
main()

ヘルプが必要ですか?

さらにサポートが必要な場合は、アカウント サポートにお問い合わせください。