メインコンテンツまでスキップ

従来のワークスペースを Unity Catalog にアップグレードするときにジョブを更新する

従来のワークスペースをUnity Catalogにアップグレードする場合、既存のジョブを更新して、アップグレードされたテーブルとファイルパスを参照するようにする必要がある場合があります。このページにあるメインの表には、ジョブを更新するための典型的なシナリオと提案が記載されています。コード例が必要なシナリオについては、 「詳細シナリオ」セクションへのリンクを参照してください。

ジョブをUnity Catalogに更新するデモについては、「ジョブをUnity Catalogにアップグレードする」を参照してください。

シナリオの概要

シナリオ

ソリューション

ジョブは、init スクリプトまたはクラスター定義のライブラリを通じてカスタム ライブラリを使用します。

カスタム ライブラリは、コードに埋め込まれた Apache Spark または SQL の読み取りまたは書き込み操作を実行する、公開されていない pip パッケージまたは jar として定義されます。

カスタムライブラリを変更して、次のことを確認します。

  • データベース名は 3 レベルの名前空間です。
  • マウント ポイントはコードでは使用されません。

ジョブは、 Hive metastoreテーブルに対して読み取りまたは書き込みを行います。

  • ジョブ クラスタリング Spark config でのデフォルト カタログの設定を評価します。 spark.databricks.sql.initial.catalog.name my_catalog
  • ワークスペースのデフォルトカタログをhive_metastore以外に設定して、ジョブコードを変更する必要がないかどうかを評価します。
  • それ以外の場合は、ジョブ・コードを変更して、2 レベルの名前空間の名前を適切なテーブルの 3 レベルの名前空間に変更します。
  • ジョブが純粋 SQL を使用している場合は、 USE CATALOG ステートメントを追加することを検討してください。

ジョブは、テーブルのサブフォルダーであるパスから読み書きします(Unity Catalogではサポートされていません)。

  • パーティション列の述語を持つテーブルから読み取るようにコードを変更します。
  • overwriteByPartitionまたはその他の適切なオプションを使用してテーブルに書き込むようにコードを変更します。

Unity Catalogテーブルであるマウントパスから読み書きします。

  • 正しい 3 レベルの名前空間テーブルを参照するようにコードを変更します。
  • テーブルが登録されていない、または登録されない場合でも、マウントパスではなくボリュームパスに書き込むようにコードを変更する必要があります。

このジョブは、マウントパスを使用してファイル(テーブルではない)を読み書きします。

代わりに、ボリュームの場所に書き込むようにコードを変更します。

ジョブは、 applyInPandasWithState.

現在サポートされていません。可能であれば書き直すことを検討するか、サポートが提供されるまでこのジョブのリファクタリングを試みないでください。

ジョブは、連続処理モードを使用するストリーミング ジョブです。

連続処理モードは Spark で試験段階であり、Unity Catalog ではサポートされていません。構造化ストリーミングを使用するようにジョブをリファクタリングします。これが不可能な場合は、ジョブを Hive metastoreに対して実行し続けることを検討してください。

ジョブは、チェックポイント・ディレクトリーを使用するストリーミング ジョブです。

  • チェックポイント・ディレクトリをボリュームに移動します。
  • ノートブックのコードを変更して、ボリューム パスを使用します。
  • ジョブ所有者は、そのパスで読み取り/書き込みを持っている必要があります。
  • ジョブを停止します。
  • チェックポイントを新しいボリュームの場所に移動します。
  • ジョブを再起動します。

ジョブのクラスタリング定義は 11.3 より Databricks Runtime 未満です。

  • ジョブ クラスタリングの定義を Databricks Runtime 11.3 以降に変更します。
  • ジョブ クラスタリング定義を変更して、指定アクセス モードまたは標準アクセス モードを使用します。

ジョブには、ストレージまたはテーブルと対話するノートブックがあります。

ジョブが実行されていたサービスプリンシパルには、ボリューム、テーブル、外部ロケーションなど、 Unity Catalog内の必要なリソースに対する読み取りおよび書き込みアクセスを提供する必要があります。

ジョブはLakeflow Spark宣言型パイプラインです。

  • ジョブのクラスタリングを Databricks Runtime 13.1 以降に変更します。
  • Lakeflow Spark宣言型パイプライン ジョブを停止します。
  • データをUnity Catalogマネージドテーブルに移動します。
  • 新しいUnity Catalogマネージドテーブルを使用するように、 Lakeflow Spark宣言型パイプライン ジョブ定義を変更します。
  • Lakeflow Spark宣言型パイプライン ジョブを再起動します。

このジョブは、認証のためにインスタンスプロファイルを使用して、ストレージ以外のクラウドサービス(AWS Kinesisなど)を利用します。

SDK で使用できる一時的な資格情報を生成することで、非ストレージ クラウド サービスと対話できる資格情報を制御する Unity Catalog サービス資格情報を使用するようにコードを変更します。

このジョブはScalaを使用しています。

  • Databricks Runtime 13.3未満の場合は、専用のコンピュートで実行してください。
  • Standard クラスタリングは、 Databricks Runtime 13.3 以降でサポートされています。

このジョブには、Scala UDFを使用するノートブックが含まれています。

  • Databricks Runtime 13.3未満の場合は、専用のコンピュートで実行してください。
  • Standard クラスタリングは、 Databricks Runtime 14.2 でサポートされています。

このジョブにはMLRを使用するタスクが含まれています。

専用のコンピュートで実行.

ジョブには、グローバルinitスクリプトに依存するクラスタリング構成があります。

  • 完全なサポートを受けるには、Databricks Runtime 13.3 以降を使用してください。
  • クラスタースコープの init スクリプトまたはクラスターポリシーを使用するように変更します。 スクリプト、ファイル、パッケージは、実行するためにUnity Catalogボリュームにインストールする必要があります。

このジョブは、jarファイル/Maven、Spark拡張機能、またはカスタムデータソース(Spark由来)を使用します。

  • Databricks Runtime 13.3 以降を使用します。
  • クラスターポリシーを使用してライブラリをインストールします。

ジョブには、PySpark UDF を含むノートブックがあります。

Databricks Runtime 13.2 以降を使用します。

そのジョブには、ネットワーク呼び出しを行うPythonコードを含むノートブックがあります。

Databricks Runtime 12.2 以降を使用します。

ジョブには、Pandas UDF(スカラー)を含むノートブックがあります。

Databricks Runtime 13.2 以降を使用します。

ジョブは Unity Catalog ボリュームを使用します。

Databricks Runtime 13.3 以降を使用します。

ジョブは共有クラスター上でspark.catalog.X ( tableExistslistTablessetDefaultCatalog ) を使用します。

ジョブノートブックが共有クラスター上で spark.catalog.X を使用する方法については、こちらをご覧ください。

ジョブは共有クラスターでdbutils...getContext().toJson()を使用します。

コマンド コンテキストにアクセスする場合 (たとえば、ジョブ ID を取得する場合)、 .toJson()ではなく.safeToJson()使用します。 これは、共有クラスター上で安全に共有できる情報の一部を提供するものです。

Databricks Runtime 13.3 LTS+ が必要です

ジョブは共有クラスターでspark.udf.registerJavaFunctionを使用します。

  • Databricks Runtime 14.3 LTS以降を使用してください。
  • ノートブックとジョブの場合は、 %scalaセルを使用して、 spark.udf.registerを使用してScala UDFを登録します。 PythonとScalaは実行コンテキストを共有するため、ScalaのUDFはPythonからも利用可能です。
  • IDEs ( Databricks Connect v2を使用)を使用している顧客の場合、唯一の選択肢はUDF Unity Catalog Python UDFとして書き換えることです。 Databricks Unity Catalog UDFのサポートをScalaにも拡張する予定です。

ジョブは共有クラスターでsc.parallelizespark.read.json()を使用します。

ジョブ ノートブックの sc.Parallelize と Spark の使用を参照してください。共有クラスター上のread.json ()

ジョブはsc.emptyRDD()を使用して、共有クラスター上に空のDataFramesを作成します。

共有クラスター上で sc.emptyRDD() を使用して空のDataFramesを作成するジョブ ノートブックを参照してください。

ジョブは共有クラスター上でRDD mapPartitionsを使用します。

Unity Catalogの共有クラスターは、PythonおよびScalaプログラムとSparkサーバー間の通信にSpark Connectを使用するため、RDDにアクセスできなくなります。

RDDの典型的な使用例としては、コストのかかる初期化ロジックを一度だけ実行し、その後は外部サービスの呼び出しや暗号化ロジックの初期化など、行ごとにコストの低い操作を実行することが挙げられます。

DataFrame APIとPySparkネイティブのArrow UDFを使用してRDD操作を書き換えます。

ジョブは共有クラスター上で SparkContext ( sc ) とsqlContextを使用します。

sc Unity Catalog共有 アーキテクチャと SparkConnect のため、 sqlContextは設計上利用できません。 SparkSessionインスタンスとやり取りするには、 spark変数を使用します。

Spark JVMには、 PythonまたはScala REPLから直接アクセスすることはできません。Spark Sparkを介してのみアクセスできます。 sc._jvmコマンドは設計上失敗します。

以下のscコマンドはサポートされていません: emptyRDDrangeinit_batched_serializerparallelizepickleFiletextFilewholeTextFilesbinaryFilesbinaryRecordssequenceFilenewAPIHadoopFilenewAPIHadoopRDDhadoopFilehadoopRDDunionrunJobsetSystemPropertyuiWebUrlsetLocalPropertystopsetJobGroupgetConf

ジョブは共有クラスターでsparkContext.getConfを使用します。

sparkContext``df.sparkContextsc.sparkContext 、および同様のAPIs 、設計上利用できません。 代わりにspark.confを使用してください。

ジョブは共有クラスターでsc.setJobDescription()を使用します。

sc.setJobDescription("String") Unity Catalog共有 アーキテクチャと SparkConnect のため、設計上利用できません。

タグを付けるにはspark.addTag()使用し、タグの有無に応じて動作させるにはgetTags()interruptTag(tag)使用します。

Databricks Runtime 14.1+ が必要です

ジョブは共有クラスターでsc.setLogLevel()を使用します。

共有クラスターでは、ログレベルを直接設定するためにSparkコンテキストにアクセスすることはできません。Databricks Runtime 14以降では、Spark Contextは利用できなくなりました。

クラスター設定で、Spark構成値としてspark.log.level DEBUGWARNINFO 、またはERRORに設定します。

ジョブは、共有クラスター上で深くネストされた式またはクエリを使用します。

PySpark DataFrame APIを使用して再帰的に作成された深くネストされたDataFramesと式は、以下を生成する可能性があります。

  • RecursionError: maximum recursion depth exceeded
  • SparkConnectGrpcException: Protobuf maximum nesting level exceeded

深くネストされたコードパスを特定し、線形式、サブクエリ、または一時ビューを使用して書き換えます。例えば、 df.withColumn再帰的に呼び出す代わりに、 df.withColumns(dict)を使用します。

ジョブは共有クラスターでinput_file_name()を使用します。

ジョブ ノートブックの共有クラスターでの input_file_name() の使用を参照してください。

ジョブは、共有クラスター上のDBFSでデータ操作を実行します。

「ジョブ ノートブックが共有クラスター上のDBFSでデータ操作を実行する」を参照してください。

詳細なシナリオ

以下のシナリオでは、コード例が必要です。

ジョブ ノートブックは共有クラスターでspark.catalog.Xを使用します

Databricks Runtime 14.2 以降を使用します。

Databricks Runtimeのアップグレードが不可能な場合は、以下の回避策を使用してください。

tableExistsの代わりに以下を使用してください。

Python
# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table")

listTablesの代わりにSHOW TABLESを使用してください(データベースまたはパターンマッチングによる制限もサポートしています)。

Python
spark.sql("SHOW TABLES")

setDefaultCatalogの場合、以下を実行します。

Python
spark.sql("USE CATALOG <catalog_name>")

ジョブ ノートブックは共有クラスターでsc.parallelizespark.read.json()を使用します

代わりにjson.loads使用してください。

以前は:

Python
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)

後:

Python
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()

ジョブ ノートブックは共有クラスター上でsc.emptyRDD()を使用して空のDataFramesを作成します

以前は:

Scala
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)

後:

Scala
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
Python
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)

ジョブ ノートブックは共有クラスターでinput_file_name()を使用します

input_file_name() 共有クラスターのUnity Catalogではサポートされていません。

ファイル名を取得するには:

Python
.withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))

完全なファイルパスを取得するには:

Python
.withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))

どちらのオプションもspark.readで動作します。

ジョブノートブックは共有クラスター上のDBFSでデータ操作を実行します

FUSE サービスを介して共有クラスターでDBFS使用すると、クラスターはファイルシステムに到達できず、ファイルが見つからないエラーが生成されます。

次の例は、共有クラスターで失敗します。

Bash
with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv

次のソリューションのいずれかを使用します。

  • DBFSの代わりにDatabricks Unity Catalogボリュームを使用する(推奨)。
  • コードを更新して、 dbutilsまたはsparkを使用するようにします。これらはストレージへの直接アクセスパスを使用し、共有からDBFSへのアクセスが許可されます。