メインコンテンツまでスキップ

従来のワークスペースを Unity Catalog にアップグレードするときにジョブを更新する

従来のワークスペースを Unity Catalog にアップグレードする場合、アップグレードされたテーブルとファイルパスを参照するように既存のジョブを更新する必要がある場合があります。次の表に、ジョブの更新に関する一般的なシナリオと推奨事項を示します。

シナリオ

ソリューション

ジョブは、initスクリプトまたはクラスタリング定義のライブラリを介してカスタムライブラリへの参照を持つノートブックを使用しています。

カスタム ライブラリは、コードに埋め込まれた Apache Spark または SQL の読み取りまたは書き込み操作を実行する、公開されていない pip パッケージまたは jar として定義されます。

カスタムライブラリを変更して、次のことを確認します。

  • データベース名は 3 レベルの名前空間です。
  • マウント ポイントはコードでは使用されません。

ジョブは、 Hive metastore テーブルから読み取ったり、テーブルに対して書き込んだりするノートブックを使用しています。

  • ジョブ クラスタリング Spark config でのデフォルト カタログの設定を評価します。 spark.databricks.sql.initial.catalog.name my_catalog
  • ワークスペースのデフォルトカタログをhive_metastore以外に設定して、ジョブコードを変更する必要がないかどうかを評価します。
  • それ以外の場合は、ジョブ・コードを変更して、2 レベルの名前空間の名前を適切なテーブルの 3 レベルの名前空間に変更します。
  • ジョブが純粋 SQL を使用している場合は、 USE CATALOG ステートメントを追加することを検討してください。

ジョブは、テーブルのサブフォルダーであるパスに対して読み取りまたは書き込みを行うノートブックを使用しています。これは Unity Catalog では不可能です。

  • パーティション列の述語を持つテーブルから読み取るようにコードを変更します。
  • overwriteByPartitionまたはその他の適切なオプションを使用してテーブルに書き込むようにコードを変更します。

ジョブは、Unity Catalog に登録されているテーブルであるマウント パスから読み取ったり、マウント パスに書き込んだりするノートブックを使用しています

  • 正しい 3 レベルの名前空間テーブルを参照するようにコードを変更します。
  • テーブルが登録されていない、または登録されない場合でも、マウントパスではなくボリュームパスに書き込むようにコードを変更する必要があります。

ジョブは、マウントパスを介してテーブルではなくファイルを読み書きするノートブックを使用しています。

代わりに、ボリュームの場所に書き込むようにコードを変更します。

ジョブは、 applyInPandasWithState.

現在サポートされていません。可能であれば書き直すことを検討するか、サポートが提供されるまでこのジョブのリファクタリングを試みないでください。

ジョブは、連続処理モードを使用するストリーミング ジョブです。

連続処理モードは Spark で試験段階であり、Unity Catalog ではサポートされていません。構造化ストリーミングを使用するようにジョブをリファクタリングします。これが不可能な場合は、ジョブを Hive metastoreに対して実行し続けることを検討してください。

ジョブは、チェックポイント・ディレクトリーを使用するストリーミング ジョブです。

  • チェックポイント・ディレクトリをボリュームに移動します。
  • ノートブックのコードを変更して、ボリューム パスを使用します。
  • ジョブ所有者は、そのパスで読み取り/書き込みを持っている必要があります。
  • ジョブを停止します。
  • チェックポイントを新しいボリュームの場所に移動します。
  • ジョブを再起動します。

ジョブのクラスタリング定義は 11.3 より Databricks Runtime 未満です。

  • ジョブ クラスタリングの定義を Databricks Runtime 11.3 以降に変更します。
  • ジョブ クラスタリング定義を変更して、指定アクセス モードまたは標準アクセス モードを使用します。

ジョブには、ストレージまたはテーブルと対話するノートブックがあります。

ジョブが実行されていたサービスプリンシパルには、ボリューム、テーブル、外部ロケーションなど、 Unity Catalog内の必要なリソースに対する読み取りおよび書き込みアクセスを提供する必要があります。

ジョブは Lakeflow 宣言型パイプラインです。

  • ジョブのクラスタリングを Databricks Runtime 13.1 以降に変更します。
  • Lakeflow Declarative パイプライン ジョブを停止します。
  • データを Unity Catalog マネージドテーブルに移動します。
  • Lakeflow新しい Declarative パイプライン ジョブ定義を変更して、新しい Unity Catalog マネージドテーブルを使用します。
  • Lakeflow Declarative パイプライン ジョブを再起動します。

ジョブには、AWSKinesis などの非ストレージクラウドサービスを使用するノートブックがあり、接続に使用される設定ではインスタンスプロファイルが使用されます。

  • SDK で使用できる一時的な資格情報を生成することで、非ストレージ クラウド サービスと対話できる資格情報を制御する Unity Catalog サービス資格情報を使用するようにコードを変更します。

ジョブは Scala を使用

  • Databricks Runtime 13.3未満の場合は、専用のコンピュートで実行してください。
  • Standard クラスタリングは、 Databricks Runtime 13.3 以降でサポートされています。

ジョブには、Scala UDFsを使用するノートブックがあります

  • Databricks Runtime 13.3未満の場合は、専用のコンピュートで実行してください。
  • Standard クラスタリングは、 Databricks Runtime 14.2 でサポートされています。

ジョブに MLR を使用するタスクがあります

専用のコンピュートで実行.

ジョブには、グローバルinitスクリプトに依存するクラスタリング構成があります。

  • 完全なサポートを受けるには、Databricks Runtime 13.3 以降を使用してください。
  • クラスタリング-scoped initスクリプトを使用するか、クラスターポリシーを使用するように変更します。 スクリプト、ファイル、パッケージを実行するには、Unity Catalog ボリュームにインストールする必要があります。

ジョブには、jars/Maven、 Spark 拡張機能、またはカスタム データソース ( Sparkから) を使用するクラスタリング構成またはノートブックがあります。

  • Databricks Runtime 13.3 以降を使用します。
  • クラスターポリシーを使用してライブラリをインストールします。

ジョブには、PySpark UDFsを使用するノートブックがあります。

Databricks Runtime 13.2 以降を使用します。

ジョブには、ネットワーク呼び出しを行うPythonコードを含むノートブックがあります。

Databricks Runtime 12.2 以降を使用します。

ジョブには、Pandas UDF (スカラー) を使用するノートブックがあります。

Databricks Runtime 13.2 以降を使用します。

ジョブは Unity Catalog ボリュームを使用します。

Databricks Runtime 13.3 以降を使用します。

ジョブには、 spark.catalog.X (tableExistslistTablessetDefaultCatalog) を使用し、共有クラスタリングを使用して実行するノートブックがあります

  • Databricks Runtime 14.2 以降を使用します。

  • Databricks Runtime のアップグレードが不可能な場合は、次の手順を使用します。

tableExistsの代わりに、次のコードを使用します。

# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table")

listTablesの代わりに SHOW TABLES を使用します (データベースまたはパターンの一致も制限できます)。

spark.sql("SHOW TABLES")

setDefaultCatalog の実行

spark.sql("USE CATALOG ")

ジョブには、内部 DButils を使用する ノートブック API: コマンド コンテキストと、共有クラスタリングを使用した実行があります。

コマンドを使用してコマンドアクセスにアクセスしようとするワークロード (たとえば、ジョブ ID を取得するために、

dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()

.toJson()の代わりに .safeToJson()を使用します。これにより、共有クラスタリングで安全に共有できるすべてのコマンド コンテキスト情報のサブセットが提供されます。

Databricks Runtime 13.3 LTS+ が必要です

ジョブには、spark.udf.registerJavaFunctionという PySparkを使用するノートブックがあり、共有クラスタリングを使用して実行されています

  • Databricks Runtime 14.3 LTS 以降を使用する
  • ノートブックとジョブの場合は、%scala セルを使用して spark.udf.registration Scala UDF 登録する PythonとScalaは実行コンテキストを共有するため、Scala UDFはPythonからも使用できます。
  • IDEs(Databricks Connect v2を使用)を利用しているお客様にとっては、UDFをUnity Catalog Python UDFとして書き換えるしか選択肢がありません。将来的には、Unity Catalog UDFs のサポートを Scala に拡張する予定です。

ジョブには RDD を使用するノートブックがあります: sc.parallelize & spark.read.json() を使用して JSON オブジェクトを DF に変換し、共有クラスタリングを使用して実行します。

  • 代わりに JSON.loads を使用してください

例-

以前は:

json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)

後:

from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()

ジョブにはRDDを使用するノートブックがあります:sc.emptyRDD()を介した空のデータフレームと共有クラスタリングを使用して実行

例-

以前は:

val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)

後:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)

ジョブには、RDD を使用するノートブックがあります: mapPartitions (高価な初期化ロジック + 行あたりの操作が安価) と共有クラスタリングを使用して実行されます

  • 理由-

/Unity Catalog プログラムとSpark PythonScalaSparkサーバー間の通信に Connectを使用してクラスタリングを共有し、RDDにアクセスできなくなりました。

以前は:

RDD の一般的な使用例は、コストのかかる初期化ロジックを 1 回だけ実行し、その後、行ごとにコストのかかる操作を実行することです。このようなユースケースは、外部サービスの呼び出しや暗号化ロジックの初期化などです。

後:

Dataframe API を使用し、PySpark ネイティブの Arrow UDF を使用して RDD 操作を書き換えます。

ジョブには、SparkContext (sc) と sqlContext を使用し、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

Spark Context (sc) と sqlContext は、共有クラスタリング アーキテクチャと SparkConnect Unity Catalog 設計上は使用できません。

解決方法:

spark 変数を使用して SparkSession インスタンスと対話する

制限:

Spark JVM には、Python / Scala REPL から直接アクセスすることはできず、Sparkコマンドを介してのみアクセスできます。つまりsc._jvmコマンドは設計上失敗します。

次の sc コマンドはサポートされていません: emptyRDD、range、init_batched_serializer、parallelize、pickleFile、textFile、wholeTextFiles、binaryFiles、binaryRecords、sequenceFile、newAPIHadoopFile、newAPIHadoopRDD、hadoopFile、hadoopRDD、union、runJob、setSystemProperty、uiWebUrl、stop、setJobGroup、setLocalProperty、getConf

ジョブには、 Spark Conf - sparkContext.getConf を使用し、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

sparkContext、df.sparkContext、sc.sparkContext などの APIs は、設計上は使用できません。

解決方法:

代わりに spark.conf を使用してください

ジョブには、SparkContext - SetJobDescription() を使用し、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

sc.setJobDescription("文字列")は、共有クラスタリングアーキテクチャとSparkConnect Unity Catalog ため、設計上は使用できません。

解決方法:

可能であれば、代わりにタグを使用してください [PySpark ドキュメント]

spark.addTag() はタグをアタッチでき、 getTags() と interruptTag(tag) を使用してタグの有無を操作できます

Databricks Runtime 14.1+ が必要です

ジョブには、sc.setLogLevel("INFO") などのコマンドを使用して Spark Log Levels を設定するノートブックと、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

シングルユーザーおよび分離クラスタリングなしでは、 Spark コンテキストにアクセスして、ドライバーとエグゼキューター間のログレベルを直接動的に設定できます。 共有クラスタリングでは、この方法は Spark コンテキストからアクセスできず、 Databricks Runtime 14+ では Spark コンテキストは使用できなくなりました。

解決方法:

log4j.confを提供せずにログレベルを制御するために、クラスタリング設定で Spark 設定値を使用できるようになりました。 クラスタリング設定のSpark設定値として spark.log.level を DEBUG、WARN、INFO、ERROR に設定して、Sparkログレベルを使用します。

ジョブには、深くネストされた式/クエリを使用し、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

RecursionError / Protobuf の最大ネスト レベルを超えました (深くネストされた式 / クエリの場合)

を使用して深くネストされたDataFrames PySparkDataFrameAPIと式を再帰的に作成する場合、場合によっては次のいずれかが発生する可能性があります。

  • Pythonの例外:RecursionError:最大再帰の深さを超えました
  • SparkConnectGprcException: Protobuf の最大ネスト レベルを超えました

解決方法:

この問題を回避するには、深くネストされたコードパスを特定し、線形式/サブクエリまたは一時的なビューを使用してそれらを書き換えます。

たとえば、 df.withColumn を再帰的に呼び出す代わりに、 df.withColumns(dict) を呼び出します。

ジョブには、コード内で input_file_name() を使用し、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

input_file_name() は、共有クラスタリングの Unity Catalog ではサポートされていません。

解決方法:

ファイル名を取得するには

.withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))

spark.readで動作します

ファイルパス全体を取得するには

.withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))

spark.readで動作します

ジョブには、 DBFS ファイル・システムに対してデータ操作を実行し、共有クラスタリングを使用して実行するノートブックがあります

  • 理由-

FUSEサービスを使用して共有クラスタリングで DBFS を使用すると、ファイルシステムに到達できず、ファイルが見つからないというエラーが発生します

例:

共有クラスタリング アクセスを使用して DBFS が失敗する場合の例を次に示します

with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv

解決方法:

どちらかを使用します -

  • Databricks Unity Catalog DBFS を使用する代わりにボリューム (推奨)
  • dbutils または spark を使用してコードを更新し、この spark はストレージへの直接アクセス・パスを経由し、共有クラスタリングから DBFS へのアクセスが許可されます