以累加方式將資料從資料倉儲載入 Lakehouse
在本教學課程中,您將了解如何以累加方式將資料從資料倉儲載入 Lakehouse。
概觀
以下是高階解決方案圖表:
以下是建立此解決方案的重要步驟:
選取水位線資料行。 選取來源資料資料表中的一個資料行,可用於切割每次執行時新增或更新的記錄。 一般來說,當建立或更新資料列時,這個選取的資料行 (例如,last_modify_time 或 ID) 中的資料會持續增加。 此資料行中的最大值就作為水位線。
準備資料表,以將最後一個水位線值儲存在您的資料倉儲中。
使用下列工作流程建立管線:
此解決方案中的管道有下列活動:
- 建立兩個查閱活動。 使用第一個查閱活動來取出最後一個水位線值。 使用第二個查閱活動來擷取新的水位線值。 這些水位線值會傳遞給複製活動。
- 建立複製活動,以複製來源資料資料表的資料列,而這些資料列的水位線資料行值大於舊水位線值,且小於新水位線值。 然後,其會將資料從資料倉儲複製到 Lakehouse 作為新檔案。
- 建立預存程序活動,以更新下次管線執行的最後一個管線水位線值。
必要條件
- 資料倉儲。 您需要使用資料倉儲作為來源資料存放區。 如果您沒有此資料庫,請參閱建立資料倉儲,按照步驟建立一個。
- Lakehouse。 您會使用 Lakehouse 作為目的地資料存放區。 如果您沒有此資料庫,請參閱建立 Lakehouse,按照步驟建立一個。 建立名為 IncrementalCopy 的資料夾來儲存複製的資料。
準備來源
以下是設定累加複製管線之前,您需要在來源資料倉儲中準備的一些資料表和預存程序。
1. 在資料倉儲中建立資料來源資料表
在資料倉儲中執行下列 SQL 命令,以建立名為 data_source_table 的資料表,作為資料來源資料表。 在本教學課程中,您會使用它作為執行增量複製的範例資料。
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
資料來源資料表中的資料如下所示:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
在本教學課程中,您會使用 LastModifytime 作為水位線資料行。
2. 在您的資料倉儲中建立另一個資料表來儲存最後一個水位線值
對資料倉儲執行下列 SQL 命令,以建立名為 watermarktable 的資料表來儲存最後一個水位線值:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
使用來源資料資料表的資料表名稱來設定最後一個水位線的預設值。 在本教學課程中,資料表名稱是 data_source_table,而預設值為
1/1/2010 12:00:00 AM
。INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
檢閱資料表 watermarktable 中的資料。
Select * from watermarktable
輸出:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. 在您的資料倉儲中建立預存程序
執行下列命令,在您的資料倉儲中建立預存程序。 此預存程序可用來協助更新上次管線執行之後的最後一個水位線值。
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
設定累加複製的管線
步驟 1:建立管線
瀏覽至 Power BI。
選取畫面左下方的 Power BI 圖示,然後選取 "Data Factory" 以開啟 Data Factory 的首頁。
瀏覽至 Microsoft Fabric 工作區。
選取 [資料管線],然後輸入管線名稱以建立新的管線。
步驟 2:新增最後一個水位線的查閱活動
在此步驟中,您會建立查閱活動,以取得最後一個水位線值。 取得先前設定的預設值 1/1/2010 12:00:00 AM
。
選取 [新增管線活動],然後從下拉式清單中選取 [查閱]。
在 [一般] 索引標籤下,將此活動重新命名為 LookupOldWaterMarkActivity。
在 [設定] 索引標籤下,執行下列設定:
- 資料存放區類型:選取 [工作區]。
- 工作區資料存放區類型:選取 [資料倉儲]。
- 資料倉儲:選取您的資料倉儲。
- 使用查詢:選擇 [資料表]。
- 資料表:選擇 "dbo.watermarktable"。
- 僅限第一個資料列:已選取。
步驟 3:新增新水位線的查閱活動
在此步驟中,您會建立查閱活動,以取得新水位線值。 您可以使用查詢,從源數據數據表取得新的浮水印。 在 data_source_table 中的 LastModifytime 欄位中獲得最大值。
在頂端列上,選取 [活動] 索引標籤下的 [查閱],以新增第二個查閱活動。
在 [一般] 索引標籤下,將此活動重新命名為 LookupNewWaterMarkActivity。
在 [設定] 索引標籤下,執行下列設定:
資料存放區類型:選取 [工作區]。
工作區資料存放區類型:選取 [資料倉儲]。
資料倉儲:選取您的資料倉儲。
使用查詢:選擇 [查詢]。
查詢:輸入下列查詢,以挑選上次修改時間上限作為新的水位線:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
僅限第一個資料列:已選取。
步驟 4:新增複製活動以複製累加資料
在此步驟中,您會新增複製活動,以將最後一個水位線與新水位線之間的累加資料從資料倉儲複製到 Lakehouse。
選取頂端列上的 [活動],然後選取 [複製資料] -> [新增至畫布] 以取得複製活動。
在 [一般] 索引標籤下,將此活動重新命名為 IncrementalCopyActivity。
透過將 [查閱] 活動所附加的綠色按鈕 (成功時) 拖移至 [複製] 活動,即可將兩個 [查閱] 活動同時連線至 [複製] 活動。 當您看到 [複製] 活動的框線顏色變為綠色時即鬆開滑鼠按鈕。
在 [來源] 索引標籤下,執行下列設定:
資料存放區類型:選取 [工作區]。
工作區資料存放區類型:選取 [資料倉儲]。
資料倉儲:選取您的資料倉儲。
使用查詢:選擇 [查詢]。
查詢:輸入下列查詢,以在最後一個水位線和新水位線之間複製累加資料。
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
在 [目的地] 索引標籤下,提供下列設定:
- 資料存放區類型:選取 [工作區]。
- 工作區資料存放區類型:選取 "Lakehouse"。
- Lakehouse:選取您的 Lakehouse。
- 根資料夾:選擇 [檔案]。
-
檔案路徑:指定您要儲存複製的資料的資料夾。 選取 [瀏覽] 以選取您的資料夾。 針對檔案名稱,開啟 [新增動態內容],然後在開啟的視窗中輸入
@CONCAT('Incremental-', pipeline().RunId, '.txt')
,為 Lakehouse 中複製的資料檔案建立檔案名稱。 - 檔案格式:選取資料的格式類型。
步驟 5:新增預存程序活動
在此步驟中,您新增預存程序活動,以更新下次管線執行的最後一個管線水位線值。
選取頂端列上的 [活動],然後選取 [預存程序] 以新增預存程序活動。
在 [一般] 索引標籤下,將此活動重新命名為 StoredProceduretoWriteWatermarkActivity。
將 [複製] 活動的綠色 (成功時) 輸出連線至 [預存程序] 活動。
在 [設定] 索引標籤下,執行下列設定:
資料存放區類型:選取 [工作區]。
資料倉儲:選取您的資料倉儲。
預存程序名稱:指定您在資料倉儲中建立的預存程序:[dbo].[usp_write_watermark]。
展開 [預存程序參數]。 若要指定預存程序參數的值,請選取 [匯入],然後輸入參數的下列值:
名稱 類型 值 LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
步驟 6:執行管線並監視結果
在頂端列上,選取 [首頁] 索引標籤下的 [執行]。然後選取 [儲存並執行]。 管線開始執行,您可以在 [輸出] 索引標籤下監視管線。
移至您的 Lakehouse,您會發現資料檔案位於您指定的資料夾底下,而且您可以選取檔案來預覽複製的資料。
新增更多資料,以查看累加複製結果
在您完成第一次管線執行之後,讓我們嘗試在資料倉儲來源資料表中新增更多資料,以查看此管線是否可以複製累加資料。
步驟 1:將更多資料新增至來源
執行下列查詢,將新資料插入資料倉儲中:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
data_source_table 的更新資料為:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
步驟 2:觸發另一個管線執行並監視結果
返回您的管線頁面。 在頂端列上,再次選取 [首頁] 索引標籤下的 [執行]。 管線開始執行,您可以在 [輸出] 下監視管線。
移至您的 Lakehouse,您會發現新複製的資料檔案位於您指定的資料夾底下,而且您可以選取檔案來預覽複製的資料。 您會看到累加資料顯示在此檔案中。
相關內容
接下來,請繼續進行以深入了解從 Azure Blob 儲存體複製到 Lakehouse。