共用方式為


使用 Azure 入口網站中的變更追蹤,以累加方式將資料從 Azure SQL Database 複製到 Blob 儲存體

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

在資料整合解決方案中,我們經常會見到在初次載入資料之後,再以累加方式載入資料的情形。 您可以輕鬆地對來源資料存放區中某段期間的已變更資料進行分割 (例如 LastModifyTimeCreationTime)。 但也有某些情況是,您並無法明確地識別自上次處理資料後所產生的差異資料。 您可以使用 Azure SQL Database 和 SQL Server 等資料存放區所支援的變更追蹤技術來識別差異資料。

本教學課程說明如何搭配變更追蹤使用 Azure Data Factory,以累加方式將差異資料從 Azure SQL Database 載入到 Azure Blob 儲存體。 如需變更追蹤的詳細資訊,請參閱 SQL Server 中的變更追蹤

您會在本教學課程中執行下列步驟:

  • 準備來源資料存放區。
  • 建立資料處理站。
  • 建立連結的服務。
  • 建立來源、接收和變更追蹤資料集。
  • 建立、執行和監視完整複製管線。
  • 新增或更新來源資料表中的資料。
  • 建立、執行和監視累加複製管線。

高階解決方案

在本教學課程中,您會建立兩個管線以執行下列作業。

注意

本教學課程會使用 Azure SQL Database 作為來源資料存放區。 您也可以使用 SQL Server。

  1. 初始載入歷程記錄資料:您會建立具有複製活動的管線,以將整個資料從來源資料存放區 (Azure SQL Database) 複製到目的地資料存放區 (Azure Blob 儲存體):

    1. 在 Azure SQL Database 的來源資料庫中啟用變更追蹤技術。
    2. 取得資料庫中的 SYS_CHANGE_VERSION 初始值作為基準,以擷取變更的資料。
    3. 將完整資料從來源資料庫載入到 Azure Blob 儲存體。

    顯示完整載入數據的圖表。

  2. 依排程累加載入差異資料:您會建立具有下列活動的管線,並定期執行:

    1. 建立「兩個查閱活動」以從 Azure SQL Database 取得 SYS_CHANGE_VERSION 的新舊值。

    2. 建立「一個複製活動」,將兩個 SYS_CHANGE_VERSION 值之間所插入、更新或刪除的資料 (差異資料),從 Azure SQL Database 複製到 Azure Blob 儲存體。

      您可以藉由將 sys.change_tracking_tables 中已變更資料列 (在兩個 SYS_CHANGE_VERSION 值之間) 的主索引鍵聯結到來源資料表中的資料來載入差異資料,然後將差異資料移至目的地。

    3. 建立「一個預存程序活動」,以更新下一次管線執行的 SYS_CHANGE_VERSION 值。

    顯示數據累加式載入的圖表。

必要條件

  • Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶
  • Azure SQL Database。 您會使用 Azure SQL Database 中的資料庫作為「來源」資料存放區。 如果您沒有此資料庫,請參閱在 Azure SQL Database 中建立資料庫,按照步驟建立一個。
  • Azure 儲存體帳戶。 您會使用 Blob 儲存體作為「接收」資料存放區。 如果您沒有 Azure 儲存體帳戶,請參閱建立儲存體帳戶,按照步驟建立一個。 建立名為 adftutorial 的容器。

注意

建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 若要開始使用,請參閱安裝 Azure PowerShell (部分機器翻譯)。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az

在 Azure SQL Database 中建立資料來源資料表

  1. 開啟 SQL Server Management Studio,然後連線到 SQL Database。

  2. 在伺服器總管中,以滑鼠右鍵按一下您的資料庫,然後選取 [新增查詢]

  3. 對資料庫執行下列 SQL 命令,以建立名為 data_source_table 的資料表作為來源資料存放區:

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
  4. 執行下列 SQL 查詢,在您的資料庫和來源資料表 (data_source_table) 啟用變更追蹤。

    注意

    • <your database name> 取代為 Azure SQL Database 中具有 data_source_table 的資料庫名稱。
    • 在目前的範例中,有變更的資料會保留兩天。 如果您每隔三天以上才會載入變更的資料,則裡面可能不會包含某些已變更的資料。 您必須將 CHANGE_RETENTION 的值變更為更大的數字,或確保您用來載入已變更資料的期間是在兩天內。 如需詳細資訊,請參閱啟用資料庫的變更追蹤
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. 執行下列查詢,建立新資料表並使用預設值儲存 ChangeTracking_version

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    注意

    如果在您啟用 SQL Database 的變更追蹤後,資料並未變更,變更追蹤版本的值會是 0

  6. 執行下列查詢,在您的資料庫中建立預存程序。 該管線會叫用此預存程序,以更新您在上一個步驟中建立的資料表變更追蹤版本。

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    BEGIN
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    END    
    

建立資料處理站

  1. 開啟 Microsoft Edge 或 Google Chrome 網頁瀏覽器。 目前,只有這些瀏覽器支援 Data Factory 使用者介面 (UI)。

  2. Azure 入口網站的左側功能表上,選取 [建立資源]

  3. 選取 [整合]>[Data Factory]

    顯示建立資源時選取數據處理站的螢幕快照。

  4. 在 [新增資料處理站] 頁面上,輸入 ADFTutorialDataFactory 作為名稱。

    資料處理站的名稱必須是全域唯一名稱。 如果您收到錯誤,指出您選擇的名稱無法使用,請變更名稱 (例如變更為 yournameADFTutorialDataFactory),然後嘗試再次建立資料處理站。 如需詳細資訊,請參閱 Azure Data Factory 命名規則

  5. 選取您要在其中建立資料處理站的 Azure 訂用帳戶。

  6. 針對 [資源群組],採取下列其中一個步驟︰

    • 選取 [使用現有的],然後從下拉式清單中選取現有資源群組。
    • 選取 [建立新的],然後輸入資源群組的名稱。

    若要了解資源群組,請參閱 使用資源群組管理您的 Azure 資源

  7. 針對 [版本],選取 [V2]

  8. 針對 [區域],選取資料處理站的區域。

    下拉式清單只會顯示支援的位置。 資料處理站所使用的資料存放區 (例如 Azure 儲存體和 Azure SQL Database) 和計算 (例如 Azure HDInsight) 可位於其他區域。

  9. 選取 [下一步:Git 設定]。 依照設定方法 4:在建立處理站期間中的指示來設定存放庫,或選取 [稍後設定 Git] 核取方塊。 顯示建立數據處理站中 Git 設定選項的螢幕快照。

  10. 選取 [檢閱 + 建立]。

  11. 選取 建立

    儀表板上的 [部署資料處理站] 磚會顯示狀態。

    顯示部署數據處理站狀態的磚螢幕快照。

  12. 建立完成之後,[Data Factory] 頁面隨即出現。 選取 [啟動工作室] 磚,以在不同的索引標籤上開啟 Azure Data Factory UI。

建立連結服務

您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本節中,您會建立 Azure 儲存體帳戶和 Azure SQL Database 中資料庫的連結服務。

建立 Azure 儲存體連結服務

若要將您的 Azure 儲存體帳戶連結至資料處理站:

  1. 在 Data Factory UI 中,於 [管理] 索引標籤的 [連線] 下,選取 [連結服務]。 然後選取 [+ 新增] 或 [建立連結服務] 按鈕。 顯示建立連結服務的選取項目的螢幕快照。
  2. 在 [新增連結服務] 視窗中,選取 [Azure Blob 儲存體],然後選取 [繼續]
  3. 輸入下列資訊:
    1. 針對 [名稱],輸入 AzureStorageLinkedService
    2. 針對 [透過整合執行階段連線],選取整合執行階段。
    3. 針對 [驗證類型],選取驗證方法。
    4. 針對 [儲存體帳戶名稱],選取您的 Azure 儲存體帳戶。
  4. 選取 建立

建立 Azure SQL Database 連結服務

若要將資料庫連結至資料處理站:

  1. 在 Data Factory UI 中,於 [管理] 索引標籤的 [連線] 下,選取 [連結服務]。 然後選取 [+ 新增]

  2. 在 [新增連結服務] 視窗中,選取 [Azure SQL Database],然後選取 [繼續]

  3. 輸入下列資訊:

    1. 針對 [名稱],輸入 AzureSqlDatabaseLinkedService
    2. 針對 [伺服器名稱],選取您的伺服器。
    3. 針對 [資料庫名稱],選取您的資料庫。
    4. 針對 [驗證類型],選取驗證方法。 本教學課程使用 SQL 驗證進行示範。
    5. 針對 [使用者名稱],輸入使用者的名稱。
    6. 針對 [密碼],輸入使用者的密碼。 或者,為 Azure Key Vault 的 [AKV 連結服務]、[祕密名稱] 和 [祕密版本] 提供資訊。
  4. 選取 [測試連線] 以測試連線。

  5. 選取 [建立] 以建立連結服務。

    顯示 Azure SQL 資料庫 連結服務的設定的螢幕快照。

建立資料集

在本節中,您會建立資料集來代表資料來源和資料目的地,以及要儲存 SYS_CHANGE_VERSION 值的位置。

建立資料集來代表來源資料

  1. 在 Data Factory UI 的 [作者] 索引標籤上,選取加號 (+)。 然後選取 [資料集],或選取資料集動作的省略符號。

    顯示開始建立數據集之選取項目的螢幕快照。

  2. 選取 [Azure SQL Database],然後選取 [繼續]

  3. 在 [設定屬性] 視窗中,執行下列步驟:

    1. 針對 [名稱],輸入 SourceDataset
    2. 針對 [連結服務],選取 [AzureSqlDatabaseLinkedService]
    3. 針對 [資料表名稱],選取 [dbo.data_source_table]
    4. 針對 [匯入結構描述],選取 [來自連線/存放區] 選項。
    5. 選取 [確定]。

    顯示來源數據集屬性設定的螢幕快照。

建立資料集來代表要複製到接收資料存放區的資料

在下列程序中,您會建立資料集來代表從來源資料存放區複製的資料。 您已在 Azure Blob 儲存體中建立 adftutorial 容器,以滿足其中一項先決條件。 建立容器 (若不存在),或設為現有容器的名稱。 在本教學課程中,會從 @CONCAT('Incremental-', pipeline().RunId, '.txt') 運算式動態產生輸出檔案名稱。

  1. 在 Data Factory UI 的 [作者] 索引標籤上,選取 +。 然後選取 [資料集],或選取資料集動作的省略符號。

    顯示開始建立數據集之選取項目的螢幕快照。

  2. 選取 [Azure Blob 儲存體],然後選取 [繼續]

  3. 選取資料類型的格式 [DelimitedText],然後選取 [繼續]

  4. 在 [設定屬性] 視窗中,執行下列步驟:

    1. 針對 [名稱],輸入 SinkDataset
    2. 針對 [連結服務],選取 [AzureBlobStorageLinkedService]
    3. 針對 [檔案路徑],輸入 adftutorial/incchgtracking
    4. 選取 [確定]。
  5. 資料集出現在樹狀檢視之後,請前往 [連線] 索引標籤,然後選取 [檔案名稱] 文字方塊。 當 [新增動態內容] 選項出現時,請選取該選項。

    顯示設定接收數據集動態檔案路徑選項的螢幕快照。

  6. [轉銷售案源運算式產生器] 視窗隨即出現。 在文字方塊中貼上 @concat('Incremental-',pipeline().RunId,'.csv')

  7. 選取 [確定]。

建立資料集來代表變更追蹤資料

在下列程序中,您會建立資料集來儲存變更追蹤版本。 您已建立 table_store_ChangeTracking_version 資料表,以滿足其中一項先決條件。

  1. 在 Data Factory UI 的 [作者] 索引標籤上,選取 +,然後選取 [資料集]
  2. 選取 [Azure SQL Database],然後選取 [繼續]
  3. 在 [設定屬性] 視窗中,執行下列步驟:
    1. 針對 [名稱],輸入 ChangeTrackingDataset
    2. 針對 [連結服務],選取 [AzureSqlDatabaseLinkedService]
    3. 針對 [資料表名稱],選取 [dbo.table_store_ChangeTracking_version]
    4. 針對 [匯入結構描述],選取 [來自連線/存放區] 選項。
    5. 選取 [確定]。

建立完整複本的管線

在下列程序中,您會建立具有複製活動的管線,以將整個資料從來源資料存放區 (Azure SQL Database) 複製到目的地資料存放區 (Azure Blob 儲存體):

  1. 在 Data Factory UI 的 [作者] 索引標籤上,選取 +,然後選取 [管線] > [管線]

    顯示開始建立數據處理站管線之選取項目的螢幕快照。

  2. 新的索引標籤隨即出現以供設定管線。 該管線也會出現在樹狀檢視中。 在 [屬性] 視窗中,將管線的名稱變更為 FullCopyPipeline

  3. 在 [活動] 工具箱中,展開 [移動和轉換]。 採取下列其中一個步驟:

    • 將複製活動拖曳至管線設計工具介面。
    • 在 [活動] 下的搜尋列上,搜尋複製資料活動,然後將名稱設定為 FullCopyActivity
  4. 切換至 [來源] 索引標籤。針對 [來源資料集],選取 [SourceDataset]

  5. 切換至 [接收] 索引標籤。針對 [接收資料集],選取 [SinkDataset]

  6. 若要驗證管線定義,請在工具列上選取 [驗證]。 確認沒有任何驗證錯誤。 關閉管線驗證輸出。

  7. 若要發佈實體 (連結服務、資料集和管線),請選取 [全部發佈]。 請靜待 [發佈成功] 訊息顯示。

    顯示發佈成功的訊息螢幕快照。

  8. 若要查看通知,請選取 [顯示通知] 按鈕。

執行完整的複製管線

  1. 在 Data Factory UI 的管線工具列上,選取 [新增觸發程序],然後選取 [立即觸發]

    顯示立即觸發完整復本選項的螢幕快照。

  2. 在 [管線執行] 視窗中,選取 [確定]

    顯示具有參數檢查之管線執行確認的螢幕快照。

監視完整的複製管線

  1. 在 Data Factory UI 中,選取 [監視] 索引標籤。管線執行及其狀態隨即出現在清單中。 若要重新整理清單,請選取 [重新整理]。 暫留在管線執行上,以取得 [重新執行] 或 [取用] 選項。

    顯示管線執行和狀態的螢幕快照。

  2. 若要檢視與管線執行建立關聯的活動執行,請從 [管線名稱] 資料行中選取管線名稱。 管線中只有一個活動,因此清單中只有一個項目。 若要切換回管線執行的檢視,請選取頂端的 [所有管線執行] 連結。

檢閱結果

adftutorial 容器的 incchgtracking 資料夾包含名為 incremental-<GUID>.csv 的檔案。

完整複本輸出檔案的螢幕快照。

檔案應該有您資料庫中的資料:


PersonID,Name,Age
1,"aaaa",21
2,"bbbb",24
3,"cccc",20
4,"dddd",26
5,"eeee",22

5,eeee,PersonID,Name,Age
1,"aaaa",21
2,"bbbb",24
3,"cccc",20
4,"dddd",26
5,"eeee",22

新增更多資料至來源資料表

對您的資料庫執行下列查詢,以新增資料列並加以更新:

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

建立差異複本的管線

在下列程序中,您會建立具有多項活動的管線,並定期執行。 當您執行管線時:

  • 「查閱活動」會從 Azure SQL Database 取得 SYS_CHANGE_VERSION 的新舊值,並將其傳遞給複製活動。
  • 「複製活動」會將兩個 SYS_CHANGE_VERSION 值之間所插入、更新或刪除的資料,從 Azure SQL Database 複製到 Azure Blob 儲存體。
  • 「預存程序活動」會更新下一次管線執行的 SYS_CHANGE_VERSION 值。
  1. 在 Data Factory UI 中,切換至 [作者] 索引標籤。選取 +,然後選取 [管線]>[管線]

    顯示如何在數據處理站中建立管線的螢幕快照。

  2. 新的索引標籤隨即出現以供設定管線。 該管線也會出現在樹狀檢視中。 在 [屬性] 視窗中,將管線的名稱變更為 IncrementalCopyPipeline

  3. 在 [活動] 工具箱中,展開 [一般]。 將查閱活動拖曳至管線設計工具介面,或在 [搜尋活動] 方塊中搜尋。 將活動的名稱設定為 LookupLastChangeTrackingVersionActivity。 此活動會取得在上次複製作業中使用的變更追蹤版本,這項資訊儲存在 table_store_ChangeTracking_version 資料表中。

  4. 在 [屬性] 視窗中切換至 [設定] 索引標籤。 針對 [來源資料集],選取 [ChangeTrackingDataset]

  5. 將 [活動] 工具箱中的查閱活動拖曳至管線設計工具介面。 將活動的名稱設定為 LookupCurrentChangeTrackingVersionActivity。 此活動會取得目前的變更追蹤版本。

  6. 切換至 [屬性] 視窗中的 [設定] 索引標籤,然後執行下列步驟:

    1. 針對 [來源資料集],選取 [SourceDataset]

    2. 針對 [使用查詢],選取 [查詢]

    3. 針對 [查詢],輸入下列 SQL 查詢:

      SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion
      

    顯示已新增至 [設定] 索引標籤中 屬性視窗 查詢的螢幕快照。

  7. 在 [活動] 工具箱中,展開 [移動和轉換]。 將複製資料活動拖曳至管線設計工具介面。 將活動的名稱設定為 IncrementalCopyActivity。 此活動會將上一個變更追蹤版本與目前變更追蹤版本之間的資料,複製到目的地資料存放區。

  8. 切換至 [屬性] 視窗中的 [來源] 索引標籤,然後執行下列步驟:

    1. 針對 [來源資料集],選取 [SourceDataset]

    2. 針對 [使用查詢],選取 [查詢]

    3. 針對 [查詢],輸入下列 SQL 查詢:

      SELECT data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) AS CT ON data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
      

    顯示已新增至 屬性視窗 中 [來源] 索引標籤之查詢的螢幕快照。

  9. 切換至 [接收] 索引標籤。針對 [接收資料集],選取 [SinkDataset]

  10. 逐一將兩個查閱活動連線到複製活動。 將連結到查閱活動的綠色按鈕拖曳至複製活動。

  11. 將 [活動] 工具箱中的預存程序活動拖曳至管線設計工具介面。 將活動的名稱設定為 StoredProceduretoUpdateChangeTrackingActivity。 此活動會更新 table_store_ChangeTracking_version 資料表中的變更追蹤版本。

  12. 切換至 [設定] 索引標籤,然後執行下列步驟:

    1. 針對 [連結服務],選取 [AzureSqlDatabaseLinkedService]
    2. 針對 [預存程序名稱],選取 Update_ChangeTracking_Version
    3. 選取匯入
    4. 在 [預存程序參數] 區段中,指定參數的下列值:
    名稱 類型
    CurrentTrackingVersion Int64 @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
    TableName String @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}

    顯示預存程式活動的設定參數的螢幕快照。

  13. 將複製活動連線到預存程序活動。 將連結到複製活動的綠色按鈕拖曳至預存程序活動。

  14. 在工具列上選取 [驗證]。 確認沒有任何驗證錯誤。 關閉 [管線驗證報告] 視窗。

  15. 選取 [全部發佈] 按鈕,將實體 (連結服務、資料集和管線) 發佈至 Data Factory 服務。 請等候 [發佈成功] 訊息出現。

    顯示發佈數據處理站所有實體之按鈕的螢幕快照。

執行累加複製管線

  1. 在管線的工具列上選取 [新增觸發程序],然後選取 [立即觸發]

    顯示立即觸發累加複製選項的螢幕快照。

  2. 在 [管線執行] 視窗中,選取 [確定]

監視累加複製管線

  1. 選取 [ 監視 ] 索引標籤。管線執行及其狀態隨即出現在清單中。 若要重新整理清單,請選取 [重新整理]

    顯示數據處理站管線執行的螢幕快照。

  2. 若要檢視與管線執行建立關聯的活動執行,請選取 [管線名稱] 資料行中的 [IncrementalCopyPipeline] 連結。 活動執行隨即出現在清單中。

    顯示數據處理站活動執行的螢幕快照。

檢閱結果

第二個檔案會出現在 adftutorial 容器的 incchgtracking 資料夾中。

顯示增量複本輸出檔案的螢幕快照。

此檔案應該只會有您資料庫中的差異資料。 具有 U 的記錄是資料庫中更新的資料列,具有 I 的記錄則是一個新增的資料列。

PersonID,Name,Age,SYS_CHANGE_VERSION,SYS_CHANGE_OPERATION
1,update,10,2,U
6,new,50,1,I

前三個資料行是 data_source_table 中已變更的資料。 最後兩個資料行是資料表中變更追蹤系統的中繼資料。 第四個資料行是每個已變更資料列的 SYS_CHANGE_VERSION。 第五個資料行是作業:U = 更新,I = 插入。 如需變更追蹤資訊的詳細資料,請參閱 CHANGETABLE

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50	       1                            	 I

繼續進行下列教學課程,以了解如何根據 LastModifiedDate 只複製新的和已變更的檔案: