メインコンテンツまでスキップ

クラシック コンピュートからサーバレス コンピュートへの移行

ワークロードをクラシック コンピュートからサーバレス コンピュートに移行します。 サーバレス コンピュートは、プロビジョニング、スケーリング、ランタイム アップグレード、最適化を自動的に処理します。

ほとんどの従来型ワークロードは、最小限のコード変更、あるいはコード変更なしで移行できます。このページでは、そうしたワークロードに焦点を当てています。df.cacheなどの一部の機能は、 レスではまだサポートされていませんが、利用可能になった際にはコードの変更は必要ありません。 R またはScalaノートブックに依存する特定のワークロードはクラシック コンピュートを必要とするため、サーバーレスに移行できません。 現在の制限の完全なリストについては、 「サーバレス コンピュートの制限」を参照してください。

移行ステップ

ワークロードをクラシック コンピュートからサーバレス コンピュートに移行するには、次のステップに従います。

  1. 前提条件を確認してください :ワークスペース、ネットワーク、クラウドストレージへのアクセスが要件を満たしていることを確認してください。始める前にご覧ください。
  2. コードの更新 :必要なコードおよび設定の変更を行います。コードの更新を参照してください。
  3. ワークロードをテストする :切り替え前に互換性と正確性を検証する。ワークロードのテストを参照してください。
  4. パフォーマンスモードを選択してください :ワークロードの要件に最適なパフォーマンスモードを選択してください。「パフォーマンスモードの選択」を参照してください。
  5. 段階的に移行する :展開 新規かつリスクの低いワークロードから始めて、段階的に展開します。 段階的な移行を参照してください。
  6. コストの監視 : サーバレスDBU消費を追跡し、アラートを設定します。 監視コストを参照してください。

始める前に

移行を開始する前に、ワークスペース内の既存の設定をいくつか更新する必要がある場合があります。

前提条件

操作

詳細

ワークスペースがUnity Catalogに対して有効になっています

必要に応じてHive metastoreから移行する

DatabricksワークスペースをUnity Catalogにアップグレードする

ネットワーク設定済み

VPCピアリングをNCC、プライベートリンク、またはファイアウォールルールに置き換える

サーバーレスコンピュートプレーンネットワーキング

クラウドストレージへのアクセス

従来のデータ アクセス パターンをUnity Catalog外部ロケーションに置き換える

Unity Catalogを使用したクラウドオブジェクトストレージへの接続

コードを更新してください

以下のセクションでは、ワークロードを レスと互換性を持たせるために必要なコードと構成の変更を一覧表示します。

データアクセス

従来のデータアクセスパターンは、 ではサポートされていません。 コードを更新して、代わりにUnity Catalog使用するようにしてください。

クラシックなパターン

サーバレス交換

詳細

DBFSパス( dbfs:/...

Unity Catalogボリューム

Unity Catalogのボリュームとは何ですか?

Hive metastoreテーブル

Unity Catalogテーブル(またはHMS Federation)

DatabricksワークスペースをUnity Catalogにアップグレードする

ストレージアカウントの認証情報

Unity Catalog外部位置

Unity Catalogを使用したクラウドオブジェクトストレージへの接続

カスタムJDBC JAR

レイクハウスフェデレーション

クエリフェデレーションとは何ですか?

警告

DBFSアクセスはサーバーレスで制限されています。 移行前に、 Unity Catalogボリュームへのすべてのdbfs:/パスを更新してください。 詳細については、 DBFSに保存されているファイルの移行」を参照してください。

例: DBFSパスとHive metastore参照を置き換える

Python
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")

# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table") # three-level namespace

APIsとコード

特定のAPIsとコードパターンは、 レスではサポートされていません。 コードの更新が必要かどうかを確認するには、この表を参照してください。

クラシックなパターン

サーバレス交換

詳細

RDD APIs ( sc.parallelizerdd.map )

DataFrame APIs

Spark ConnectとSpark Classicを比較する

df.cache(), df.persist()

キャッシュ呼び出しを削除する

サーバレスコンピュートの制限

spark.sparkContext, sqlContext

spark (SparkSession) を直接使用する

Spark ConnectとSpark Classicを比較する

Hive変数( ${var}

SQL DECLARE VARIABLEまたは Python f 文字列

DECLARE VARIABLE

サポートされていない Spark 設定

サポートされていない設定を削除します。サーバーレスはほとんどの設定を自動調整します。

サーバレス ノートブックとジョブのSparkプロパティを構成する

例: RDD操作をDataFramesに置き換える

Python
from pyspark.sql import functions as F

# sc.parallelize + rdd.map
# Classic: rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()

# rdd.flatMap
# Classic: sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()

# rdd.groupByKey
# Classic: rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()

# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
.groupBy(F.spark_partition_id())
.applyInPandas(process_group, schema="total long").collect())

# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")

例:SparkContextとキャッシングの置き換え

Python
from pyspark.sql.functions import broadcast

# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")

# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]

# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")

# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")

ライブラリと環境

基本環境を使用してワークスペース レベルでライブラリと環境を管理し、ノートブックのサーバレス環境を使用してノートブック レベルでライブラリと環境を管理できます。

クラシックなパターン

サーバレス交換

詳細

initスクリプト

サーバーレス環境

サーバレス環境を設定する

クラスタースコープのライブラリ

ノートブックスコープまたは環境ライブラリ

サーバレス環境を設定する

Maven / JARライブラリ

JARタスクのサポート。ノートブック用のPyPI

JARタスク

Dockerコンテナ

ライブラリのニーズに対応したサーバーレス環境

サーバレス環境を設定する

再現可能な環境用のrequirements.txtのピン留めPythonパッケージ。 「サーバレス コンピュートのベスト プラクティス」を参照してください。

ストリーミング

ストリーミング ワークロードはサーバレスでサポートされていますが、特定のトリガーはサポートされていません。 サポートされているトリガーを使用するようにコードを更新してください。

Sparkトリガー

サポート対象

Trigger.AvailableNow()

はい

推奨

Trigger.Once()

はい

これは非推奨です。代わりにTrigger.AvailableNow()を使用してください。

Trigger.ProcessingTime(interval)

No

戻り値 INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED

Trigger.Continuous(interval)

No

代わりにLakeFlow Spark宣言型パイプライン連続モードを使用してください

デフォルト( .trigger()を設定しない)

No

.trigger()を省略してProcessingTime("0 seconds")にすると、サーバレスではサポートされません。 常に.trigger(availableNow=True)明示的に設定してください。

連続ストリーミングの場合は、連続モードでSpark宣言型パイプラインに移行するか、 AvailableNow連続スケジュール ジョブを使用します。 大きなソースの場合は、メモリ不足エラーを防ぐためにmaxFilesPerTriggerまたはmaxBytesPerTriggerを設定してください。

例:ストリーミングトリガーを修正する

Python
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()

# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
query.awaitTermination()

# With OOM prevention for large sources
query = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", 100)
.option("maxBytesPerTrigger", "10g")
.load(input_path)
.writeStream.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))

ワークロードをテストする

  1. クイック互換性テスト : 標準 アクセス モードおよびDatabricks Runtime 14.3 以降を使用して、クラシック コンピュートでワークロードを実行します。 実行が成功すると、コードを変更せずにワークロードをサーバレスに移行できます。
  2. A/B 比較 (本番運用に推奨): クラシック (コントロール) とサーバレス (エクスペリメント) で同じワークロードを実行します。 出力テーブルを比較し、正当性を検証する。出力が一致するまで繰り返す。
  3. 一時的な設定 :テスト中に、サポートされているSparkの設定を一時的に設定できます。安定したら取り外してください。

パフォーマンスモードを選択してください

サーバレス ジョブとパイプラインは、標準とパフォーマンス最適化の 2 つのパフォーマンス モードをサポートします。 選択するパフォーマンスモードは、ワークロードの要件によって異なります。

モード

可用性

起動する

どのようなタスクにベストなのか

Standard

Jobs、 LakeFlow Spark宣言型パイプライン

4~6分

コスト重視のバッチ

パフォーマンス最適化済み

ノートブック、ジョブズ、 LakeFlow Spark宣言型パイプライン

インタラクティブで、レイテンシに敏感

段階的に移行する

  1. 新しいワークロード : すべての新しいノートブックとジョブをサーバレスで開始します。
  2. 低リスクワークロード :標準アクセスモードおよびDatabricks Runtime 14.3以降で既に稼働しているPySpark/SQLワークロードを移行します。
  3. 複雑なワークロード :コード変更が必要なワークロード(RDDの書き換え、DBFSの更新、トリガーの修正など)を移行します。
  4. 残りのワークロード :機能拡張に伴い、定期的に見直してください。

コストを監視する

サーバーレスの請求は、クラスターの稼働時間ではなく、 DBU消費量に基づいて行われます。 大規模移行を行う前に、代表的なワークロードを用いてコスト予測を検証してください。

その他のリソース

さらに詳しい情報については、以下のブログ記事もご参照ください。