クラシック コンピュートからサーバレス コンピュートへの移行
ワークロードをクラシック コンピュートからサーバレス コンピュートに移行します。 サーバレス コンピュートは、プロビジョニング、スケーリング、ランタイム アップグレード、最適化を自動的に処理します。
ほとんどの従来型ワークロードは、最小限のコード変更、あるいはコード変更なしで移行できます。このページでは、そうしたワークロードに焦点を当てています。df.cacheなどの一部の機能は、 レスではまだサポートされていませんが、利用可能になった際にはコードの変更は必要ありません。 R またはScalaノートブックに依存する特定のワークロードはクラシック コンピュートを必要とするため、サーバーレスに移行できません。 現在の制限の完全なリストについては、 「サーバレス コンピュートの制限」を参照してください。
移行ステップ
ワークロードをクラシック コンピュートからサーバレス コンピュートに移行するには、次のステップに従います。
- 前提条件を確認してください :ワークスペース、ネットワーク、クラウドストレージへのアクセスが要件を満たしていることを確認してください。始める前にご覧ください。
- コードの更新 :必要なコードおよび設定の変更を行います。コードの更新を参照してください。
- ワークロードをテストする :切り替え前に互換性と正確性を検証する。ワークロードのテストを参照してください。
- パフォーマンスモードを選択してください :ワークロードの要件に最適なパフォーマンスモードを選択してください。「パフォーマンスモードの選択」を参照してください。
- 段階的に移行する :展開 新規かつリスクの低いワークロードから始めて、段階的に展開します。 段階的な移行を参照してください。
- コストの監視 : サーバレスDBU消費を追跡し、アラートを設定します。 監視コストを参照してください。
始める前に
移行を開始する前に、ワークスペース内の既存の設定をいくつか更新する必要がある場合があります。
前提条件 | 操作 | 詳細 |
|---|---|---|
ワークスペースがUnity Catalogに対して有効になっています | 必要に応じてHive metastoreから移行する | |
ネットワーク設定済み | VPCピアリングをNCC、プライベートリンク、またはファイアウォールルールに置き換える | |
クラウドストレージへのアクセス | 従来のデータ アクセス パターンをUnity Catalog外部ロケーションに置き換える |
コードを更新してください
以下のセクションでは、ワークロードを レスと互換性を持たせるために必要なコードと構成の変更を一覧表示します。
データアクセス
従来のデータアクセスパターンは、 ではサポートされていません。 コードを更新して、代わりにUnity Catalog使用するようにしてください。
クラシックなパターン | サーバレス交換 | 詳細 |
|---|---|---|
DBFSパス( | Unity Catalogボリューム | |
Hive metastoreテーブル | Unity Catalogテーブル(またはHMS Federation) | |
ストレージアカウントの認証情報 | Unity Catalog外部位置 | |
カスタムJDBC JAR | レイクハウスフェデレーション |
DBFSアクセスはサーバーレスで制限されています。 移行前に、 Unity Catalogボリュームへのすべてのdbfs:/パスを更新してください。 詳細については、 DBFSに保存されているファイルの移行」を参照してください。
例: DBFSパスとHive metastore参照を置き換える
# Classic
df = spark.read.csv("dbfs:/mnt/datalake/data.csv", header=True)
df.write.parquet("dbfs:/mnt/output/results")
df = spark.table("my_database.my_table")
# Serverless
df = spark.read.csv("/Volumes/main/sales/raw_data/data.csv", header=True)
df.write.parquet("/Volumes/main/analytics/output/results")
df = spark.table("main.my_database.my_table") # three-level namespace
APIsとコード
特定のAPIsとコードパターンは、 レスではサポートされていません。 コードの更新が必要かどうかを確認するには、この表を参照してください。
クラシックなパターン | サーバレス交換 | 詳細 |
|---|---|---|
RDD APIs ( | DataFrame APIs | |
| キャッシュ呼び出しを削除する | |
|
| |
Hive変数( | SQL | |
サポートされていない Spark 設定 | サポートされていない設定を削除します。サーバーレスはほとんどの設定を自動調整します。 |
例: RDD操作をDataFramesに置き換える
from pyspark.sql import functions as F
# sc.parallelize + rdd.map
# Classic: rdd = sc.parallelize([1, 2, 3]); rdd.map(lambda x: x * 2).collect()
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
result = df.select((F.col("value") * 2).alias("value")).collect()
# rdd.flatMap
# Classic: sc.parallelize(["hello world"]).flatMap(lambda l: l.split(" ")).collect()
df = spark.createDataFrame([("hello world",)], ["line"])
words = df.select(F.explode(F.split("line", " ")).alias("word")).collect()
# rdd.groupByKey
# Classic: rdd.groupByKey().mapValues(list).collect()
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
grouped = df.groupBy("key").agg(F.collect_list("value").alias("values")).collect()
# rdd.mapPartitions → applyInPandas
import pandas as pd
def process_group(pdf: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({"total": [pdf["id"].sum()]})
result = (spark.range(100).repartition(4)
.groupBy(F.spark_partition_id())
.applyInPandas(process_group, schema="total long").collect())
# sc.textFile → spark.read.text
df = spark.read.text("/Volumes/catalog/schema/volume/file.txt")
例:SparkContextとキャッシングの置き換え
from pyspark.sql.functions import broadcast
# sc.broadcast → broadcast join
result = main_df.join(broadcast(lookup_df), "key")
# sc.accumulator → DataFrame aggregation
total = df.agg(F.sum("amount")).collect()[0][0]
# sqlContext.sql → spark.sql
result = spark.sql("SELECT * FROM main.db.table")
# df.cache() → remove caching calls
# Materialize expensive intermediate results to Delta as a workaround:
df = spark.read.parquet(path)
result = df.filter("status = 'active'")
expensive_df.write.format("delta").mode("overwrite").saveAsTable("main.scratch.temp")
result = spark.table("main.scratch.temp")
ライブラリと環境
基本環境を使用してワークスペース レベルでライブラリと環境を管理し、ノートブックのサーバレス環境を使用してノートブック レベルでライブラリと環境を管理できます。
クラシックなパターン | サーバレス交換 | 詳細 |
|---|---|---|
initスクリプト | サーバーレス環境 | |
クラスタースコープのライブラリ | ノートブックスコープまたは環境ライブラリ | |
Maven / JARライブラリ | JARタスクのサポート。ノートブック用のPyPI | |
Dockerコンテナ | ライブラリのニーズに対応したサーバーレス環境 |
再現可能な環境用のrequirements.txtのピン留めPythonパッケージ。 「サーバレス コンピュートのベスト プラクティス」を参照してください。
ストリーミング
ストリーミング ワークロードはサーバレスでサポートされていますが、特定のトリガーはサポートされていません。 サポートされているトリガーを使用するようにコードを更新してください。
Sparkトリガー | サポート対象 | 注 |
|---|---|---|
| はい | 推奨 |
| はい | これは非推奨です。代わりに |
| No | 戻り値 |
| No | 代わりにLakeFlow Spark宣言型パイプライン連続モードを使用してください |
デフォルト( | No |
|
連続ストリーミングの場合は、連続モードでSpark宣言型パイプラインに移行するか、 AvailableNowで連続スケジュール ジョブを使用します。 大きなソースの場合は、メモリ不足エラーを防ぐためにmaxFilesPerTriggerまたはmaxBytesPerTriggerを設定してください。
例:ストリーミングトリガーを修正する
# Classic (not supported on serverless — default trigger is ProcessingTime)
query = df.writeStream.format("delta").outputMode("append").start()
# Serverless (explicit AvailableNow trigger)
query = (df.writeStream.format("delta").outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
query.awaitTermination()
# With OOM prevention for large sources
query = (spark.readStream.format("delta")
.option("maxFilesPerTrigger", 100)
.option("maxBytesPerTrigger", "10g")
.load(input_path)
.writeStream.format("delta")
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.start(output_path))
ワークロードをテストする
- クイック互換性テスト : 標準 アクセス モードおよびDatabricks Runtime 14.3 以降を使用して、クラシック コンピュートでワークロードを実行します。 実行が成功すると、コードを変更せずにワークロードをサーバレスに移行できます。
- A/B 比較 (本番運用に推奨): クラシック (コントロール) とサーバレス (エクスペリメント) で同じワークロードを実行します。 出力テーブルを比較し、正当性を検証する。出力が一致するまで繰り返す。
- 一時的な設定 :テスト中に、サポートされているSparkの設定を一時的に設定できます。安定したら取り外してください。
パフォーマンスモードを選択してください
サーバレス ジョブとパイプラインは、標準とパフォーマンス最適化の 2 つのパフォーマンス モードをサポートします。 選択するパフォーマンスモードは、ワークロードの要件によって異なります。
モード | 可用性 | 起動する | どのようなタスクにベストなのか |
|---|---|---|---|
Standard | Jobs、 LakeFlow Spark宣言型パイプライン | 4~6分 | コスト重視のバッチ |
パフォーマンス最適化済み | ノートブック、ジョブズ、 LakeFlow Spark宣言型パイプライン | 秒 | インタラクティブで、レイテンシに敏感 |
段階的に移行する
- 新しいワークロード : すべての新しいノートブックとジョブをサーバレスで開始します。
- 低リスクワークロード :標準アクセスモードおよびDatabricks Runtime 14.3以降で既に稼働しているPySpark/SQLワークロードを移行します。
- 複雑なワークロード :コード変更が必要なワークロード(RDDの書き換え、DBFSの更新、トリガーの修正など)を移行します。
- 残りのワークロード :機能拡張に伴い、定期的に見直してください。
コストを監視する
サーバーレスの請求は、クラスターの稼働時間ではなく、 DBU消費量に基づいて行われます。 大規模移行を行う前に、代表的なワークロードを用いてコスト予測を検証してください。
- サーバーレス使用のコスト帰属に関するポリシー
- ダッシュボードとアラート用のシステムテーブル
- アカウント予算アラート
- 事前設定された使用状況ダッシュボードを使用して、支出の概要を確認してください。
その他のリソース
- サーバレス コンピュートのベスト プラクティス: サーバレス ワークロードの最適化のヒント
- サーバーレス コンピュートの制限事項: 現在の制限事項とサポートされていない機能の完全なリスト
- サーバーレス環境の構成: ライブラリと依存関係の管理
- サポートされているSpark構成: Spark構成は、 レス
- Spark Connectと従来のSpark :動作の違い レスアーキテクチャ
- サーバレス ネットワーク セキュリティ: NCC、プライベート リンク、ファイアウォール設定
- サーバーレス コンピュート リリース ノート: 新機能の出荷時に追跡する
- Unity Catalogアップグレードガイド: Hive metastoreからUnity Catalogへの移行
さらに詳しい情報については、以下のブログ記事もご参照ください。
- サーバレスコンピューティングとは何ですか? : サーバレス機能の概要と顧客実績
- データエンジニアリングの進化: サーバレス コンピュートがノートブックとLakeFlow Jobどのように変革するか: サーバレスがLakeFlow Jobs とパイプラインをどのように強化するか