Query history システムテーブル reference
プレビュー
このシステムテーブルは パブリック プレビュー段階です。 テーブルにアクセスするには、 system
カタログでスキーマを有効にする必要があります。 詳細については、「 システムテーブル スキーマを有効にする」を参照してください。
この記事には、テーブルのスキーマの概要など、クエリ履歴システムテーブルに関する情報が含まれています。
クエリ履歴システムテーブルにアクセスするには、 query
スキーマを有効にする必要があります。 システムスキーマを有効にする手順については、「 システムテーブルスキーマを有効にする」を参照してください。
テーブルパス :このシステムテーブルは system.query.history
にあります。
クエリ履歴テーブルの使用
クエリ履歴テーブルには、 SQLウェアハウスまたはサーバレス コンピュート for ノートブック と ジョブを使用して実行されたクエリのレコードが含まれます。 このテーブルには、テーブルへのアクセス元と同じリージョン内のすべてのワークスペースのアカウント全体のレコードが含まれます。
デフォルトでは、管理者のみがシステムテーブルにアクセスできます。 テーブルのデータをユーザーまたはグループと共有する場合、Databricks では、ユーザーまたはグループごとに動的ビューを作成することをお勧めします。 「動的ビューの作成」を参照してください。
Query history システムテーブル schema
クエリ履歴テーブルでは、次のスキーマが使用されます。
列名 | データ型 | 説明 | 例 |
---|---|---|---|
| string | アカウントの ID。 |
|
| string | クエリが実行されたワークスペースの ID。 |
|
| string | 文の実行を一意に識別する ID。 この ID を使用して、 クエリー履歴 UI でステートメントの実行を見つけることができます。 |
|
| string | Spark セッション ID。 |
|
| string | 文の終了状態。 可能な値は次のとおりです。 |
|
| 構造体 | ステートメントの実行に使用されるコンピュート リソースのタイプと、該当する場合はリソースの ID を表す構造体。 |
|
| string | ステートメントを実行したユーザーの ID。 |
|
| string | 文を実行したユーザーのEメールアドレスまたはユーザー名。 |
|
| string | SQL ステートメントのテキスト。 顧客管理のキーを設定している場合、 |
|
| string | ステートメントのタイプ。 たとえば、 |
|
| string | エラー状態を説明するメッセージ。 顧客管理のキーを設定している場合、 |
|
| string | 文を実行したクライアント・アプリケーション。 たとえば、Databricks SQL エディター、Tableau、Power BI などです。 このフィールドは、クライアント アプリケーションによって提供される情報から取得されます。 値は時間の経過とともに静的なままであることが予想されますが、これは保証できません。 |
|
| string | ステートメントを実行するために Databricks に接続するために使用されるコネクタ。 たとえば、Go 用 Databricks SQL ドライバー、Databricks ODBC ドライバー、Databricks JDBC ドライバーなどです。 |
|
| bigint | ステートメントの合計実行時間 (ミリ秒単位) (結果のフェッチ時間を除く)。 |
|
| bigint | コンピュート リソースがプロビジョニングされるのを待つのに費やされた時間 (ミリ秒単位)。 |
|
| bigint | 使用可能なコンピュート容量のキューで待機するのに費やされた時間 (ミリ秒単位)。 |
|
| bigint | ステートメントの実行に費やされた時間 (ミリ秒単位)。 |
|
| bigint | メタデータの読み込みとステートメントの最適化に費やされた時間 (ミリ秒単位)。 |
|
| bigint | すべてのタスク期間の合計 (ミリ秒単位)。 この時間は、すべてのノードのすべてのコアでクエリを実行するのにかかった合計時間を表します。 複数のタスクが並行して実行される場合、ウォールクロックの期間よりも大幅に長くなる可能性があります。 タスクが使用可能なノードを待機する場合は、ウォールクロックの期間よりも短くなることがあります。 |
|
| bigint | 実行の終了後に文の結果をフェッチするのに費やされた時間 (ミリ秒単位)。 |
|
| タイムスタンプ | Databricks が要求を受信した時刻。 タイムゾーン情報は、 |
|
| タイムスタンプ | 文の実行が終了した時刻 (結果のフェッチ時刻を除く)。 タイムゾーン情報は、 |
|
| タイムスタンプ | ステートメントが進行状況の更新を最後に受信した時刻。 タイムゾーン情報は、 |
|
| bigint | プルーニング後に読み取られたパーティションの数。 |
|
| bigint | プルーニングされたファイルの数。 |
|
| bigint | プルーニング後に読み取られたファイルの数。 |
|
| bigint | 文によって読み取られたローの合計数。 |
|
| bigint | 文によって返されたローの合計数。 |
|
| bigint | 文によって読み取られたデータの合計サイズ (バイト単位)。 |
|
| int | IO キャッシュから読み取られた永続データのバイト数の割合。 |
|
| ブーリアン |
|
|
| bigint | 文の実行中に一時的にディスクに書き込まれるデータのサイズ (バイト単位)。 |
|
| bigint | クラウド・オブジェクト・ストレージに書き込まれる永続データのサイズ (バイト単位)。 |
|
| bigint | ネットワーク経由で送信されたデータの合計量 (バイト単位)。 |
|
| 構造体 | このステートメントの実行に関与した 1 つ以上の Databricks エンティティ (ジョブ、ノートブック、ダッシュボードなど) を表すキーと値のペアを含む構造体。 このフィールドは、Databricks エンティティのみを記録します。 |
|
| string | 文の実行に権限が使用されたユーザーまたはサービスプリンシパルの名前。 |
|
| string | ステートメントの実行に特権が使用されたユーザーまたはサービスプリンシパルの ID。 |
|
レコードのクエリ プロファイルを表示する
クエリ履歴テーブル内のレコードに基づくクエリのクエリプロファイルに移動するには、次の操作を行います。
- 目的のレコードを特定し、レコードの
statement_id
をコピーします。 - レコードの
workspace_id
を参照して、レコードと同じワークスペースにログインしていることを確認します。 - ワークスペース
サイドバーで「 クエリー履歴 」をクリックします。
- [ステートメント ID ] フィールドに、レコードの
statement_id
を貼り付けます。 - クエリの名前をクリックします。 クエリメトリクスの概要が表示されます。
- [ クエリ プロファイルの表示 ] をクリックします。
メタストアからのクエリ履歴を具体化します
次のコードを使用して、メタストアからのクエリ履歴を具体化するために、毎時、毎日、または毎週実行されるジョブを作成できます。 それに応じて、 HISTORY_TABLE_PATH
変数と LOOKUP_PERIOD_DAYS
変数を調整します。
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
HISTORY_TABLE_PATH = "jacek.default.history"
# Adjust the lookup period according to your job schedule
LOOKUP_PERIOD_DAYS = 1
def table_exists(table_name):
try:
spark.sql(f"describe table {table_name}")
return True
except Exception:
return False
def save_as_table(table_path, df, schema, pk_columns):
deltaTable = (
DeltaTable.createIfNotExists(spark)
.tableName(table_path)
.addColumns(schema)
.execute()
)
merge_statement = " AND ".join([f"logs.{col}=newLogs.{col}" for col in pk_columns])
result = (
deltaTable.alias("logs")
.merge(
df.alias("newLogs"),
f"{merge_statement}",
)
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll()
.execute()
)
result.show()
def main():
df = spark.read.table("system.query.history")
if table_exists(HISTORY_TABLE_PATH):
df = df.filter(f"update_time >= CURRENT_DATE() - INTERVAL {LOOKUP_PERIOD_DAYS} days")
else:
print(f"Table {HISTORY_TABLE_PATH} does not exist. Proceeding to copy the whole source table.")
save_as_table(
HISTORY_TABLE_PATH,
df,
df.schema,
["workspace_id", "statement_id"]
)
main()