使用異動資料擷取 (CDC),以累加方式從 Azure SQL 受控執行個體將資料載入 Azure 儲存體
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
在本教學課程中,您會建立 Azure data factory 與管道,以根據來源 Azure SQL 受控執行個體資料庫中的異動資料擷取 (CDC) 資訊,將差異資料載入至 Azure blob 儲存體。
您會在本教學課程中執行下列步驟:
- 準備來源資料存放區
- 建立資料處理站。
- 建立連結的服務。
- 建立來源和接收資料集。
- 建立、偵錯及執行管道以檢查變更的資料
- 修改來源資料表中的資料
- 完成、執行和監視完整的累加複製管道
概觀
資料存放區 (例如 Azure SQL 受控執行個體 (MI) 和 SQL Server) 支援的異動資料擷取技術,可用於識別變更的資料。 本教學課程說明如何使用 Azure Data Factory 搭配 SQL 異動資料擷取技術,以累加方式將差異資料從 Azure SQL 受控執行個體載入 Azure Blob 儲存體。 如需有關 SQL 異動資料擷取技術更具的體資訊,請參閱 SQL Server 中的異動資料擷取。
端對端工作流程
以下是使用異動資料擷取技術,以累加方式載入資料的一般端對端工作流程步驟。
注意
Azure SQL MI 和 SQL Server 都支援異動資料擷取技術。 本教學課程使用 Azure SQL 受控執行個體做為來源資料存放區。 但您也可以使用內部部署的 SQL Server。
高階解決方案
在本教學課程中,您會建立管道來執行下列作業:
- 建立查閱活動以計算 SQL Database CDC 資料表中已變更記錄的數目,並將其傳遞給 IF 條件活動。
- 建立 If 條件以檢查是否有變更的記錄,如果有,則叫用複製活動。
- 建立複製活動,將 CDC 資料表之間所插入/更新/刪除的資料複製到 Azure Blob 儲存體。
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
必要條件
- Azure SQL 受控執行個體。 您需要使用資料庫作為來源資料存放區。 如果您沒有 Azure SQL 受控執行個體,請參閱 建立 Azure SQL Database 受控執行個體一文,以了解建立的步驟。
- Azure 儲存體帳戶。 您需要使用 Blob 儲存體作為接收資料存放區。 如果您沒有 Azure 儲存體帳戶,請參閱建立儲存體帳戶一文,按照步驟來建立帳戶。 建立名為 raw 的容器。
在 Azure SQL Database 中建立資料來源資料表
啟動 SQL Server Management Studio,並連線到您的 Azure SQL 受控執行個體伺服器。
在伺服器總管中,以滑鼠右鍵按一下您的資料庫,然後選擇 [新增查詢]。
針對您的 Azure SQL 受控執行個體資料庫執行下列 SQL 命令,藉此建立名為
customers
的資料表做為資料來源存放區。create table customers ( customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );
藉由執行下列 SQL 查詢,在您的資料庫和來源資料表 (customers)上啟用異動資料擷取機制:
注意
- 以具有 customers 資料表之 Azure SQL MI 的結構描述取代 <您的來源結構描述名稱>。
- 異動資料擷取不會在變更要追蹤之資料表的交易中執行任何動作。 而是會將插入、更新和刪除作業寫入交易記錄中。 如果您沒有定期且有系統地清除儲放在變更資料表中的資料,這項資料將無限制地成長。 如需詳細資訊,請參閱 啟用資料庫的異動資料擷取
EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 1
執行下列命令,將資料插入 customers 資料表:
insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'), (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'), (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
注意
在啟用異動資料擷取之前,不會先擷取資料表的任何歷程記錄變更。
建立資料處理站
要是您還沒有使用資料處理站,請遵循快速入門:使用 Azure 入口網站建立資料處理站一文中的步驟,建立資料處理站。
建立連結服務
您在資料處理站中建立的連結服務會將您的資料存放區和計算服務連結到資料處理站。 在本節中,您會建立 Azure 儲存體帳戶和 Azure SQL MI 的連結服務。
建立 Azure 儲存體連結服務。
在此步驟中,您會將您的 Azure 儲存體帳戶連結到 Data Factory。
按一下 [連線],然後按一下 [+ 新增]。
在 [新增連結服務] 視窗中,選取 [Azure Blob 儲存體],然後按一下 [繼續]。
在 [新增連結服務] 視窗中,執行下列步驟:
- 輸入 AzureStorageLinkedService 作為 [名稱]。
- 為 [儲存體帳戶名稱] 選取 Azure 儲存體帳戶。
- 按一下 [檔案] 。
建立 Azure SQL MI Database 連結服務。
在此步驟中,您會將 Azure SQL MI Database 連結至資料處理站。
注意
對於 SQL MI 的使用者,請參閱此處內容,瞭解透過公用與私人端點存取的相關資訊 如果是私人端點的使用者,則必須使用自我裝載整合執行階段來執行此管道。 這同樣適用於在 VM 或 VNet 使用案例中執行 SQL Server 內部部署的使用者。
按一下 [連線],然後按一下 [+ 新增]。
在 [新增連結服務] 視窗中,選取 [Azure SQL Database 受控執行個體],然後按一下 [繼續]。
在 [新增連結服務] 視窗中,執行下列步驟:
- 在 [名稱] 欄位中,輸入 AzureSqlMI1。
- 在 [伺服器名稱] 欄位中選取您的 SQL server。
- 在 [資料庫名稱] 欄位中選取您的 SQL 資料庫。
- 在 [使用者名稱] 欄位輸入使用者的名稱。
- 在 [密碼] 欄位輸入使用者的密碼。
- 按一下 [測試連線] 以測試連線。
- 按一下 [儲存] 以儲存連結服務。
建立資料集
在此步驟中,您會建立資料集來代表資料來源和資料目的地。
建立資料集來代表來源資料
在此步驟中,您會建立資料集來代表來源資料。
在樹狀檢視中,按一下 [+] (加號),然後按一下 [資料集]。
選取 Azure SQL Database 受控執行個體,然後按一下 [繼續]。
在 [設定屬性] 索引標籤中,設定資料集名稱和連線資訊:
- 針對連結服務選取 [AzureSqlMI1]。
- 選取 [dbo].[dbo_customers_CT] 做為資料表名稱。 注意:當 [customers] 資料表上啟用 CDC 時,會自動建立此資料表。 變更的資料永遠不會直接從這個資料表查詢,而是透過 CDC 函式 進行解壓縮。
建立資料集來表示要複製到接收資料存放區的資料。
在此步驟中,您會建立資料集來代表從來源資料存放區複製的資料。 您已在 Azure Blob 儲存體中建立資料湖容器,做為必要條件的一部分。 建立容器 (若不存在),或設為現有容器的名稱。 在此教學課程中,會使用觸發時間以動態方式產生輸出檔案名稱,稍後將會設定此觸發時間。
在樹狀檢視中,按一下 [+] (加號),然後按一下 [資料集]。
選取 [Azure Blob 儲存體],然後按一下 [繼續]。
選取 [DelimitedText],然後按一下 [繼續]。
在 [設定屬性] 索引標籤中,設定資料集名稱和連線資訊:
- 選取 [AzureStorageLinkedService] 作為 [連結服務]。
- 針對 filePath 的容器部分,輸入原始資料。
- 啟用以第一個資料列做為標題
- 按一下 [確定]。
建立管道以複製變更的資料
在此步驟中,您會建立管道,而且管道會先使用查閱活動,檢查變更資料表中變更的記錄數目。 IF 條件活動會檢查已變更記錄的數目是否大於零,並執行複製活動,將插入/更新/刪除的資料從 Azure SQL Database 複製到 Azure Blob 儲存體。 最後,會設定輪轉視窗觸發程序,而且開始和結束時間會傳遞至活動做為開始和結束視窗參數。
在 [Data Factory] 使用者介面中,切換至 [編輯] 索引標籤。按一下左窗格中的 [+] (加號),然後按一下 [管線]。
您會看到用於設定管線的新索引標籤。 你也會在樹狀檢視中看到該管線。 在 [屬性] 視窗中,將管線的名稱變更為 IncrementalCopyPipeline。
在 [活動] 工具箱中展開 [一般],並將 [查閱] 活動拖放至管線設計工具介面。 將活動的名稱設定為 GetChangeCount。 此活動會指定的時間範圍內取得變更資料表中的記錄數目。
切換至 [屬性] 視窗中的 [設定]:
針對 [來源資料集] 欄位,指定 SQL MI 資料集名稱。
選取 [查詢] 選項,並在查詢方塊中輸入下列內容:
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
- 啟用 [僅第一個資料列]
按一下 [預覽資料] 按鈕,確認查閱活動可以取得有效的輸出
在 [活動] 工具箱中展開 [反覆項目和條件式],然後將 If 條件活動拖放至管道設計工具介面。 將活動的名稱設定為 HasChangedRows。
切換至 [屬性] 視窗中的 [活動]:
- 輸入下列運算式
@greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
- 按一下鉛筆圖示以編輯 True 條件。
- 在 [活動] 工具箱中展開 [一般],然後將 [等候] 活動拖放至管道設計工具介面。 這是一個暫時的活動,可用於偵錯 If 條件,稍後將在本教學課程中變更。
- 按一下 IncrementalCopyPipeline 階層連結以返回主要管道。
在偵錯模式中執行管道以確認管道執行成功。
接下來,返回 True 條件步驟,並刪除等候活動。 在 [活動] 工具箱中,展開 [移動與轉換],然後將 [複製] 活動拖放至管道設計工具介面。 將活動的名稱設定為 IncrementalCopyActivity。
在 [屬性] 視窗中切換至 [來源] 索引標籤,並執行下列步驟:
針對 [來源資料集] 欄位,指定 SQL MI 資料集名稱。
為 [使用查詢] 選取 [查詢]。
針對查詢輸入下列程式。
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
按一下 [預覽] 以確認查詢正確地傳回變更後的資料列。
切換至 [接收] 索引標籤,並針對 [接收資料集] 欄位指定 Azure 儲存體資料集。
按一下以回到主要管道畫布,然後將 [查閱] 活動逐一連線至 [If 條件] 活動。 將附加至 [查閱] 活動的 [綠色] 按鈕拖曳至 [If 條件] 活動。
按一下工具列上的 [驗證]。 確認沒有任何驗證錯誤。 按一下 >> 關閉 [管線驗證報告] 視窗。
按一下 [偵錯] 以測試管道,並確認已在儲存位置產生檔案。
按一下 [全部發佈] 按鈕,將實體 (連結的服務、資料集和管道) 發佈至 Data Factory 服務。 請靜待 [發佈成功] 訊息顯示。
設定輪轉視窗觸發程序和 CDC 視窗參數
在此步驟中,您會建立輪轉視窗觸發程序,以頻繁的排程執行作業。 您將使用輪轉視窗觸發程序的 WindowStart 和 WindowEnd 系統變數,並將它們當做參數傳遞至管道,以便用於 CDC 查詢中。
瀏覽至 IncrementalCopyPipeline 管道的 [參數] 索引標籤,然後使用 [+ 新增] 按鈕,將兩個參數 (triggerStartTime 和 triggerEndTime) 新增至管道,這將代表輪轉視窗的開始和結束時間。 為了進行偵錯,請以 YYYY-MM-DD HH24:MI:SS.FFF 的格式來新增預設值,但請確認資料表上的 triggerStartTime 並未在 CDC 之前啟用,否則會導致錯誤發生。
按一下 [查閱] 活動的 [設定] 索引標籤,並將查詢設定為使用開始和結束參數。 將下列內容複製到查詢中:
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
在 If 條件活動為 True 的情況下,瀏覽至複製活動,然後按一下 [來源] 索引標籤。將下列內容複製到查詢中:
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
按一下複製活動的 [接收] 索引標籤,然後按一下 [開啟] 來編輯資料集屬性。 按一下 [參數] 索引標籤,然後新增名為 triggerStart 的參數
接下來,設定資料集屬性,將資料儲存在具有日期型分割區的 customers/incremental 子目錄中。
按一下資料集屬性的 [連線] 索引標籤,並針對 [目錄] 和 [檔案] 區段新增動態內容。
按一下文字方塊下的動態內容連結,在 [目錄] 區段中輸入下列運算式:
@concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
在 [檔案] 區段中輸入下列運算式。 這會根據觸發程序開始日期和時間來建立檔案名稱,並以 csv 附檔名做為尾碼:
@concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
按一下 [IncrementalCopyPipeline] 索引標籤,在 [複製] 活動中,瀏覽返回至 [接收] 設定。
展開資料集屬性,並在 triggerStart 參數值中輸入具有下列運算式的動態內容:
@pipeline().parameters.triggerStartTime
按一下 [偵錯] 以測試管道,並確定已如預期的產生了資料夾結構和輸出檔案。 下載並開啟檔案以驗證內容。
藉由檢閱管道執行的輸入參數,確保將參數插入查詢中。
按一下 [全部發佈] 按鈕,將實體 (連結的服務、資料集和管道) 發佈至 Data Factory 服務。 請靜待 [發佈成功] 訊息顯示。
最後,設定輪轉視窗觸發程序以定期執行管道,並設定開始和結束時間參數。
- 按一下 [新增觸發程序] 按鈕,然後選取 [新增/編輯]
- 輸入觸發程序名稱並指定開始時間,此時間就形同偵錯視窗的結束時間。
在下一個畫面中,分別為 start 和 end 參數指定下列值。
@formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff') @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
注意
觸發程序只會在發佈後執行。 此外,輪轉視窗的預期行為是從開始日期到目前為止,執行所有歷程記錄間隔。 如需有關輪轉視窗觸發程序的詳細資訊,請參閱這裡。
使用 SQL Server Management Studio 藉由執行下列 SQL,對 customer 資料表進行一些額外的變更:
insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading'); insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth'); insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth'); update customers set first_name='Elon' where customer_id=6; delete from customers where customer_id=5;
按一下 [全部發佈] 按鈕。 請靜待 [發佈成功] 訊息顯示。
數分鐘後,管道就會觸發,並將新的檔案載入 Azure 儲存體
監視累加複製管線
按一下左側的 [監視] 索引標籤。 您會在清單中看到管線執行和其狀態。 若要重新整理清單,按一下 [重新整理]。 將滑鼠停留在管道名稱附近,即可存取重新執行動作和使用情況報告。
若要檢視與管道執行相關聯的活動執行情況,請按一下管道名稱。 如果偵測到變更的資料,則會有三個活動 (包括複製活動),否則清單中只會有兩個項目。 若要切換回管道執行情況檢視,請按一下頂端的 [所有管道] 連結。
檢閱結果
您會在 raw
容器的 customers/incremental/YYYY/MM/DD
資料夾中看到第二個檔案。
相關內容
進入下列教學課程,深入了解如何只根據其 LastModifiedDate 複製全新和變更檔案: