共用方式為


查詢記錄系統 table 參考編號

重要

此系統 table 處於公開預覽 。 若要存取 table,您必須在 systemcatalog中啟用 schema。 如需詳細資訊,請參閱 啟用系統 table 架構

本文包含查詢記錄系統 table的相關信息,包括 tableschema的大綱。

重要

若要存取查詢記錄系統 table,您必須啟用 queryschema。 如需啟用系統架構的指示,請參閱 啟用系統 table 架構

Table 路徑:此系統 table 位於 system.query.history

使用查詢記錄:table

查詢歷程記錄 table 包含使用 SQL 倉儲執行的查詢記錄,或 筆記本的無伺服器計算,作業。 table 包含從您存取 table的相同區域中所有工作區的整體帳戶記錄。

根據預設,只有系統管理員可以存取系統 table。 如果您想要與使用者或群組共用 table的數據,Databricks 建議為每個使用者或群組建立動態檢視。 請參閱建立動態檢視

查詢記錄系統 tableschema

查詢歷程記錄 table 使用下列 schema:

Column 名稱 資料類型 描述 範例
account_id 字串 帳戶的 ID。 11e22ba4-87b9-4cc2

-9770-d10b894b7118
workspace_id 字串 執行查詢的工作區 where 的標識碼。 1234567890123456
statement_id 字串 可唯一識別陳述式執行的 ID。 可以使用此 ID 來尋找 [查詢歷史記錄] UI 中的陳述式執行。 7a99b43c-b46c-432b

-b0a7-814217701909
session_id 字串 Spark 工作階段 ID。 01234567-cr06-a2mp

-t0nd-a14ecfb5a9c2
execution_status 字串 陳述式終止狀態。 可能的 values 包括:

- FINISHED:執行成功
- FAILED:執行失敗,原因如隨附的錯誤訊息中所述
- CANCELED:執行已取消
FINISHED
compute struct 結構,表示用來執行語句的計算資源類型,以及適用的資源標識碼 where。 type 值爲 WAREHOUSESERVERLESS_COMPUTE {

type: WAREHOUSE,

cluster_id: NULL,

warehouse_id: ec58ee3772e8d305

}
executed_by_user_id 字串 執行陳述式之使用者的 ID。 2967555311742259
executed_by 字串 執行陳述式之使用者的電子郵件地址或使用者名稱。 example@databricks.com
statement_text 字串 SQL 陳述式的文字。 如果您已設定客戶自控金鑰,則 statement_text 為空白。 SELECT 1
statement_type 字串 陳述式類型。 例如,ALTERCOPYINSERT SELECT
error_message 字串 描述錯誤狀況的訊息。 如果您已設定客戶自控金鑰,則 error_message 為空白。 [INSUFFICIENT_PERMISSIONS]

Insufficient privileges:

User does not have

permission SELECT on table

'default.nyctaxi_trips'.
client_application 字串 執行陳述式的用戶端應用程式。 例如:Databricks SQL 編輯器、Tableau 和 Power BI。 此欄位衍生自用戶端應用程式提供的資訊。 雖然 values 預期會隨著時間保持靜態,但無法保證這一點。 Databricks SQL Editor
client_driver 字串 用來連線到 Azure Databricks 以執行陳述式的連接器。 例如:Databricks SQL Driver for Go、Databricks ODBC Driver、Databricks JDBC Driver。 Databricks JDBC Driver
total_duration_ms bigint 陳述式的總執行時間以毫秒為單位 (不包括結果擷取時間)。 1
waiting_for_compute_duration_ms bigint 等待佈建計算資源所花費的時間 (毫秒)。 1
waiting_at_capacity_duration_ms bigint 在佇列中等待可用計算容量所花費的時間 (毫秒)。 1
execution_duration_ms bigint 執行陳述式所花費的時間 (毫秒)。 1
compilation_duration_ms bigint 載入中繼資料並優化陳述式所花費的時間 (毫秒)。 1
total_task_duration_ms bigint 所有任務持續時間的總和 (毫秒)。 此時間代表跨所有節點的所有核心執行查詢所花費的總時間。 如果並行執行多個任務,可能會比掛鐘持續時間長得多。 如果任務等待可用節點,則可能會比掛鐘持續時間短。 1
result_fetch_duration_ms bigint 執行完成後擷取陳述式結果所花費的時間 (毫秒)。 1
start_time timestamp Databricks 收到要求的時間。 Timezone 信息會記錄在值結尾,+00:00 代表 UTC。 2022-12-05T00:00:00.000+0000
end_time timestamp 陳述式執行結束的時間,不包括結果擷取時間。 Timezone 信息會記錄在值結尾,+00:00 代表 UTC。 2022-12-05T00:00:00.000+00:00
update_time timestamp 聲明上次收到進度的時間 update。 Timezone 信息會記錄在值結尾,+00:00 代表 UTC。 2022-12-05T00:00:00.000+00:00
read_partitions bigint 剪除之後分割區讀取的數目。 1
pruned_files bigint 剪除的檔案數目。 1
read_files bigint 剪除之後檔案讀取的數目。 1
read_rows bigint 陳述式所讀取的資料列總數。 1
produced_rows bigint 陳述式所傳回的資料列總數。 1
read_bytes bigint 陳述式讀取的資料大小總計 (位元組)。 1
read_io_cache_percent int 從 IO 快取讀取之永續性資料的位元組百分比。 50
from_result_cache boolean TRUE 表示從快取中擷取陳述式結果。 TRUE
spilled_local_bytes bigint 執行陳述式時暫時寫入磁碟的資料大小 (位元組)。 1
written_bytes bigint 寫入雲端物件儲存體之永續性資料的大小 (位元組)。 1
shuffle_read_bytes bigint 透過網路傳送的資料總量 (位元組)。 1
query_source struct 包含索引鍵/值組的結構,代表一或多個參與執行此陳述式的 Databricks 實體,例如作業、筆記本或儀表板。 此欄位只會記錄 Databricks 實體。 {
job_info: {
job_id: 64361233243479
job_run_id: 887406461287882
job_task_key: “job_task_1”
job_task_run_id: 110378410199121
}
executed_as 字串 用來執行陳述式之權限的使用者或服務主體名稱。 example@databricks.com
executed_as_user_id 字串 用來執行陳述式之權限的使用者或服務主體識別碼。 2967555311742259

檢視記錄的查詢設定檔

若要根據查詢歷史table中的記錄導航至查詢的查詢設定檔,請執行以下步驟:

  1. 確定感興趣的記錄,然後複製記錄的 statement_id
  2. 參考記錄的 workspace_id,以確保您已登入與記錄相同的工作區。
  3. 按一下工作區側邊欄中的 歷程記錄圖示 [查詢歷史記錄]。
  4. 在 [陳述式 ID] 欄位中,在記錄中貼上 statement_id
  5. 按一下查詢的名稱。 查詢計量的概觀隨即出現。
  6. 按一下請參閱查詢設定檔

從中繼存放區具體化查詢歷程記錄

下列程式代碼可用來建立每小時、每日或每周執行的作業,以具體化中繼存放區的查詢歷程記錄。 根據需要調整 HISTORY_TABLE_PATHLOOKUP_PERIOD_DAYS 變數。

from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

HISTORY_TABLE_PATH = "jacek.default.history"
# Adjust the lookup period according to your job schedule
LOOKUP_PERIOD_DAYS = 1

def table_exists(table_name):
    try:
        spark.sql(f"describe table {table_name}")
        return True
    except Exception:
        return False

def save_as_table(table_path, df, schema, pk_columns):
    deltaTable = (
        DeltaTable.createIfNotExists(spark)
        .tableName(table_path)
        .addColumns(schema)
        .execute()
    )

    merge_statement = " AND ".join([f"logs.{col}=newLogs.{col}" for col in pk_columns])

    result = (
        deltaTable.alias("logs")
        .merge(
            df.alias("newLogs"),
            f"{merge_statement}",
        )
        .whenNotMatchedInsertAll()
        .whenMatchedUpdateAll()
        .execute()
    )
    result.show()

def main():
    df = spark.read.table("system.query.history")
    if table_exists(HISTORY_TABLE_PATH):
        df = df.filter(f"update_time >= CURRENT_DATE() - INTERVAL {LOOKUP_PERIOD_DAYS} days")
    else:
        print(f"Table {HISTORY_TABLE_PATH} does not exist. Proceeding to copy the whole source table.")

    save_as_table(
        HISTORY_TABLE_PATH,
        df,
        df.schema,
        ["workspace_id", "statement_id"]
    )

main()