従来のワークスペースを Unity Catalog にアップグレードするときにジョブを更新する
従来のワークスペースをUnity Catalogにアップグレードする場合、既存のジョブを更新して、アップグレードされたテーブルとファイルパスを参照するようにする必要がある場合があります。このページにあるメインの表には、ジョブを更新するための典型的なシナリオと提案が記載されています。コード例が必要なシナリオについては、 「詳細シナリオ」セクションへのリンクを参照してください。
ジョブをUnity Catalogに更新するデモについては、「ジョブをUnity Catalogにアップグレードする」を参照してください。
シナリオの概要
シナリオ | ソリューション |
|---|---|
ジョブは、init スクリプトまたはクラスター定義のライブラリを通じてカスタム ライブラリを使用します。 カスタム ライブラリは、コードに埋め込まれた Apache Spark または SQL の読み取りまたは書き込み操作を実行する、公開されていない | カスタムライブラリを変更して、次のことを確認します。
|
ジョブは、 Hive metastoreテーブルに対して読み取りまたは書き込みを行います。 |
|
ジョブは、テーブルのサブフォルダーであるパスから読み書きします(Unity Catalogではサポートされていません)。 |
|
Unity Catalogテーブルであるマウントパスから読み書きします。 |
|
このジョブは、マウントパスを使用してファイル(テーブルではない)を読み書きします。 | 代わりに、ボリュームの場所に書き込むようにコードを変更します。 |
ジョブは、 | 現在サポートされていません。可能であれば書き直すことを検討するか、サポートが提供されるまでこのジョブのリファクタリングを試みないでください。 |
ジョブは、連続処理モードを使用するストリーミング ジョブです。 | 連続処理モードは Spark で試験段階であり、Unity Catalog ではサポートされていません。構造化ストリーミングを使用するようにジョブをリファクタリングします。これが不可能な場合は、ジョブを Hive metastoreに対して実行し続けることを検討してください。 |
ジョブは、チェックポイント・ディレクトリーを使用するストリーミング ジョブです。 |
|
ジョブのクラスタリング定義は 11.3 より Databricks Runtime 未満です。 |
|
ジョブには、ストレージまたはテーブルと対話するノートブックがあります。 | ジョブが実行されていたサービスプリンシパルには、ボリューム、テーブル、外部ロケーションなど、 Unity Catalog内の必要なリソースに対する読み取りおよび書き込みアクセスを提供する必要があります。 |
ジョブはLakeflow Spark宣言型パイプラインです。 |
|
このジョブは、認証のためにインスタンスプロファイルを使用して、ストレージ以外のクラウドサービス(AWS Kinesisなど)を利用します。 | SDK で使用できる一時的な資格情報を生成することで、非ストレージ クラウド サービスと対話できる資格情報を制御する Unity Catalog サービス資格情報を使用するようにコードを変更します。 |
このジョブはScalaを使用しています。 |
|
このジョブには、Scala UDFを使用するノートブックが含まれています。 |
|
このジョブにはMLRを使用するタスクが含まれています。 | 専用のコンピュートで実行. |
ジョブには、グローバルinitスクリプトに依存するクラスタリング構成があります。 |
|
このジョブは、jarファイル/Maven、Spark拡張機能、またはカスタムデータソース(Spark由来)を使用します。 |
|
ジョブには、PySpark UDF を含むノートブックがあります。 | Databricks Runtime 13.2 以降を使用します。 |
そのジョブには、ネットワーク呼び出しを行うPythonコードを含むノートブックがあります。 | Databricks Runtime 12.2 以降を使用します。 |
ジョブには、Pandas UDF(スカラー)を含むノートブックがあります。 | Databricks Runtime 13.2 以降を使用します。 |
ジョブは Unity Catalog ボリュームを使用します。 | Databricks Runtime 13.3 以降を使用します。 |
ジョブは共有クラスター上で | ジョブノートブックが共有クラスター上で spark.catalog.X を使用する方法については、こちらをご覧ください。 |
ジョブは共有クラスターで | コマンド コンテキストにアクセスする場合 (たとえば、ジョブ ID を取得する場合)、 Databricks Runtime 13.3 LTS+ が必要です |
ジョブは共有クラスターで |
|
ジョブは共有クラスターで | ジョブ ノートブックの sc.Parallelize と Spark の使用を参照してください。共有クラスター上のread.json () 。 |
ジョブは | 共有クラスター上で sc.emptyRDD() を使用して空のDataFramesを作成するジョブ ノートブックを参照してください。 |
ジョブは共有クラスター上でRDD | Unity Catalogの共有クラスターは、PythonおよびScalaプログラムとSparkサーバー間の通信にSpark Connectを使用するため、RDDにアクセスできなくなります。 RDDの典型的な使用例としては、コストのかかる初期化ロジックを一度だけ実行し、その後は外部サービスの呼び出しや暗号化ロジックの初期化など、行ごとにコストの低い操作を実行することが挙げられます。 DataFrame APIとPySparkネイティブのArrow UDFを使用してRDD操作を書き換えます。 |
ジョブは共有クラスター上で SparkContext ( |
Spark JVMには、 PythonまたはScala REPLから直接アクセスすることはできません。Spark Sparkを介してのみアクセスできます。 以下の |
ジョブは共有クラスターで |
|
ジョブは共有クラスターで |
タグを付けるには Databricks Runtime 14.1+ が必要です |
ジョブは共有クラスターで | 共有クラスターでは、ログレベルを直接設定するためにSparkコンテキストにアクセスすることはできません。Databricks Runtime 14以降では、Spark Contextは利用できなくなりました。 クラスター設定で、Spark構成値として |
ジョブは、共有クラスター上で深くネストされた式またはクエリを使用します。 | PySpark DataFrame APIを使用して再帰的に作成された深くネストされたDataFramesと式は、以下を生成する可能性があります。
深くネストされたコードパスを特定し、線形式、サブクエリ、または一時ビューを使用して書き換えます。例えば、 |
ジョブは共有クラスターで | |
ジョブは、共有クラスター上のDBFSでデータ操作を実行します。 |
詳細なシナリオ
以下のシナリオでは、コード例が必要です。
ジョブ ノートブックは共有クラスターでspark.catalog.Xを使用します
Databricks Runtime 14.2 以降を使用します。
Databricks Runtimeのアップグレードが不可能な場合は、以下の回避策を使用してください。
tableExistsの代わりに以下を使用してください。
# SQL workaround
def tableExistsSql(tablename):
try:
spark.sql(f"DESCRIBE TABLE {tablename};")
except Exception as e:
return False
return True
tableExistsSql("jakob.jakob.my_table")
listTablesの代わりにSHOW TABLESを使用してください(データベースまたはパターンマッチングによる制限もサポートしています)。
spark.sql("SHOW TABLES")
setDefaultCatalogの場合、以下を実行します。
spark.sql("USE CATALOG <catalog_name>")
ジョブ ノートブックは共有クラスターでsc.parallelizeとspark.read.json()を使用します
代わりにjson.loads使用してください。
以前は:
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)
後:
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()
ジョブ ノートブックは共有クラスター上でsc.emptyRDD()を使用して空のDataFramesを作成します
以前は:
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)
後:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)
ジョブ ノートブックは共有クラスターでinput_file_name()を使用します
input_file_name() 共有クラスターのUnity Catalogではサポートされていません。
ファイル名を取得するには:
.withColumn("RECORD_FILE_NAME", col("_metadata.file_name"))
完全なファイルパスを取得するには:
.withColumn("RECORD_FILE_PATH", col("_metadata.file_path"))
どちらのオプションもspark.readで動作します。
ジョブノートブックは共有クラスター上のDBFSでデータ操作を実行します
FUSE サービスを介して共有クラスターでDBFS使用すると、クラスターはファイルシステムに到達できず、ファイルが見つからないエラーが生成されます。
次の例は、共有クラスターで失敗します。
with open('/dbfs/test/sample_file.csv', 'r') as file:
ls -ltr /dbfs/test
cat /dbfs/test/sample_file.csv
次のソリューションのいずれかを使用します。
- DBFSの代わりにDatabricks Unity Catalogボリュームを使用する(推奨)。
- コードを更新して、
dbutilsまたはsparkを使用するようにします。これらはストレージへの直接アクセスパスを使用し、共有からDBFSへのアクセスが許可されます。