使用 Azure 入口網站以累加方式將資料從 Azure SQL Database 載入至 Azure Blob 儲存體
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
在本教學課程中,您會建立 Azure Data Factory 與管線,以將差異資料從 Azure SQL Database 中的資料表載入至 Azure Blob 儲存體。
您會在本教學課程中執行下列步驟:
- 準備資料存放區來儲存水位線值。
- 建立資料處理站。
- 建立連結的服務。
- 建立來源、接收及水位線資料集。
- 建立管線。
- 執行管線。
- 監視管道執行。
- 檢閱結果
- 將更多資料新增至來源。
- 再次執行管線。
- 監視第二次管線執行
- 檢閱第二次執行的結果
概觀
高階解決方案圖表如下:
以下是建立此解決方案的重要步驟:
選取水位線資料行。 選取來源資料存放區中的一個資料行,可用於切割每次執行時新增或更新的記錄。 一般來說,當建立或更新資料列時,這個選取的資料行 (例如,last_modify_time 或 ID) 中的資料會持續增加。 此資料行中的最大值就作為水位線。
準備資料存放區來儲存水位線值。 在本教學課程中,您會將水位線值儲存在 SQL 資料庫中。
使用下列工作流程建立管線:
此解決方案中的管道有下列活動:
- 建立兩個查閱活動。 使用第一個查閱活動來取出最後一個水位線值。 使用第二個查閱活動來取出新的水位線值。 這些水位線值會傳遞給複製活動。
- 建立複製活動,以複製來源資料存放區的資料列,而這些資料列的水位線資料行值大於舊水位線值,且小於新水位線值。 然後,它會將來源資料存放區的差異資料複製到 Blob 儲存體作為新檔案。
- 建立 StoredProcedure 活動,以更新下次執行的管線水位線值。
如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶。
必要條件
- Azure SQL Database。 您需要使用資料庫作為來源資料存放區。 如果您在 Azure SQL Database 中沒有資料庫,請參閱在 Azure SQL Database 中建立資料庫,按照步驟建立資料庫。
- Azure 儲存體。 您需要使用 Blob 儲存體作為接收資料存放區。 如果您沒有儲存體帳戶,請參閱建立儲存體帳戶,按照步驟來建立儲存體帳戶。 建立名為 adftutorial 的容器。
在 SQL 資料庫中建立資料來源資料表
開啟 SQL Server Management Studio。 在 [伺服器總管] 中,以滑鼠右鍵按一下資料庫,然後選擇 [新增查詢]。
對 SQL 資料庫執行下列 SQL 命令,以建立名為
data_source_table
的資料表作為資料來源存放區:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');
在本教學課程中,您會使用 LastModifytime 作為水位線資料行。 下表顯示資料來源存放區中的資料:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
在 SQL 資料庫中建立另一個資料表來儲存高水位線值
對 SQL 資料庫執行下列 SQL 命令,以建立名為
watermarktable
的資料表來儲存水位線值:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );
使用來源資料存放區的資料表名稱來設定高水位線的預設值。 在本教學課程中,資料表名稱是 data_source_table。
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
檢閱資料表
watermarktable
中的資料。Select * from watermarktable
輸出:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
在 SQL 資料庫中建立預存程序
執行下列命令,在您的 SQL 資料庫中建立預存程序:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
建立資料處理站
啟動 Microsoft Edge 或 Google Chrome 網頁瀏覽器。 目前,只有 Microsoft Edge 和 Google Chrome 網頁瀏覽器支援 Data Factory UI。
在左側功能表上,選取 [建立資源]>[整合]>[Data Factory]:
在 [新增 Data Factory] 頁面中,輸入 [ADFTutorialOnPremDF] 作為 [名稱]。
Azure Data Factory 的名稱必須是「全域唯一的」。 如果您看到有以下錯誤的紅色驚嘆號,請變更 Data Factory 名稱 (例如 yournameADFTutorialDataFactory),然後試著重新建立。 請參閱 Data Factory - 命名規則一文,以了解 Data Factory 成品的命名規則。
Data factory 名稱 "ADFIncCopyTutorialDF" 無法使用
選取您要在其中建立資料處理站的 Azure 訂用帳戶。
針對 [資源群組],請執行下列其中一個步驟︰
選取 [使用現有的] ,然後從下拉式清單選取現有的資源群組。
選取 [建立新的] ,然後輸入資源群組的名稱。
若要了解資源群組,請參閱 使用資源群組管理您的 Azure 資源。
針對 [版本] 選取 [V2]。
選取 Data Factory 的 [位置] 。 只有受到支援的位置會顯示在下拉式清單中。 資料處理站所使用的資料存放區 (Azure 儲存體、Azure SQL Database、Azure SQL 受控執行個體等) 和計算 (HDInsight 等) 可位於其他區域。
按一下 [建立]。
建立完成之後,您會看到如圖中所示的 [Data Factory] 頁面。
若要在另一個索引標籤中啟動 Azure Data Factory 使用者介面 (UI),請在 [開啟 Azure Data Factory Studio] 圖格上選取 [開啟]。
建立新管線
在本教學課程中,您會建立具有兩個查閱活動、一個複製活動和一個 StoredProcedure 活動的管線,這些活動都在一個管線中鏈結。
在 Data Factory UI 的首頁上,按一下 [協調] 圖格。
在 [屬性] 下的 [一般] 面板中,為 [名稱] 指定 IncrementalCopyPipeline。 然後按一下右上角的 [屬性] 圖示來摺疊面板。
讓我們新增第一個查閱活動來取得舊的浮水印值。 在 [活動] 工具箱中展開 [一般],並將 [查閱] 活動拖放至管線設計工具介面。 將活動名稱變更為 LookupOldWaterMarkActivity。
切換至 [設定] 索引標籤,然後按一下 [+ 新增] 以新增來源資料集。 在此步驟中,您會建立資料集來代表浮水印資料表中的資料。 此資料表包含先前複製作業中所使用的舊浮水印。
在 [新增資料集] 視窗中選取 [Azure SQL Database],然後按一下 [繼續]。 您會看到系統為該資料集開啟新視窗。
在該資料集的 [設定屬性] 視窗中,輸入 WatermarkDataset 作為 [名稱]。
在 [已連結的服務] 視窗中,選取 [新增],然後執行下列步驟:
輸入 AzureSqlDatabaseLinkedService 作為 [名稱]。
選取要供伺服器名稱使用的伺服器。
從下拉式清單中選取您的資料庫名稱。
輸入您的使用者名稱與密碼。
若要測試您的 SQL 資料庫連線,請按一下 [測試連線]。
按一下完成。
確認已為 [已連結的服務] 選取 [AzureSqlDatabaseLinkedService]。
選取 [完成]。
在 [連線] 索引標籤中,為 [資料表] 選取 [[dbo].[watermarktable]]。 如果您想要預覽資料表中的資料,請按一下 [預覽資料]。
按一下頂端的 [管線] 索引標籤或左側樹狀檢視中的管線名稱,即可切換到管線編輯器。 在 [查閱] 活動的 [屬性] 視窗中,確認已為 [來源資料集] 欄位選取 WatermarkDataset。
在 [活動] 工具箱中展開 [一般],並將另一個 [查閱] 活動拖放至管線設計工具介面,然後在 [屬性] 視窗的 [一般] 索引標籤中,將名稱設為 LookupNewWaterMarkActivity。 此查閱活動會從資料表取得新浮水印值,該資料表具備要複製到目的地的來源資料。
在第二個 [查閱] 活動的 [屬性] 視窗中,切換到 [設定] 索引標籤,然後按一下 [新增]。 您建立的資料集會指向來源資料表,其中包含新浮水印值 (LastModifyTime 最大值)。
在 [新增資料集] 視窗中選取 [Azure SQL Database],然後按一下 [繼續]。
在 [設定屬性] 視窗中,輸入 [SourceDataset] 作為 [名稱]。 選取 [AzureSqlDatabaseLinkedService] 作為 [連結服務]。
選取 [dbo].[data_source_table] 作為 [資料表]。 您稍後可在本教學課程中指定對此資料集的查詢。 查詢會優先於您在此步驟中指定的資料表。
選取 [完成]。
按一下頂端的 [管線] 索引標籤或左側樹狀檢視中的管線名稱,即可切換到管線編輯器。 在 [查閱] 活動的 [屬性] 視窗中,確認已為 [來源資料集] 欄位選取 [SourceDataset]。
為[使用查詢] 欄位選取 [查詢],並輸入下列查詢:您從 data_source_table 中選取的只有 LastModifytime 的最大值。 請確定您也已勾選 [僅限第一列]。
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
在 [活動] 工具箱中,展開 [移動和轉換],並從 [活動] 工具箱中拖放 [複製] 活動,以及將名稱設定為 IncrementalCopyActivity。
透過將 [查閱] 活動所附加的綠色按鈕拖曳至 [複製] 活動,即可將兩個 [查閱] 活動同時連線至 [複製] 活動。 當您看到 [複製] 活動的框線顏色變為藍色時即鬆開滑鼠按鈕。
選取 [複製] 活動並確認您在 [屬性] 視窗中看到活動的屬性。
在 [屬性] 視窗中切換至 [來源] 索引標籤,並執行下列步驟:
為 [來源資料集] 欄位選取 [SourceDataset]。
為 [使用查詢] 欄位選取 [查詢]。
為 [查詢] 欄位輸入下列 SQL 查詢。
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
切換至 [接收] 索引標籤,然後按一下 [接收資料集] 欄位的 [+ 新增]。
在本教學課程中,接收資料存放區是 Azure Blob 儲存體類型。 因此,選取 [Azure Blob 儲存體],然後按一下 [新增資料集] 視窗中的 [繼續]。
在 [選取格式] 視窗中,選取您資料的格式類型,然後按一下 [繼續]。
在 [設定屬性] 視窗中,輸入 SinkDataset 作為 [名稱]。 為 [已連結的服務] 選取 [+ 新增]。 在此步驟中,您將建立與 Azure Blob 儲存體的連線 (連結的服務)。
在 [新增連結服務 (Azure Blob 儲存體)] 視窗中,執行下列步驟:
- 輸入 AzureStorageLinkedService 作為 [名稱]。
- 為 [儲存體帳戶名稱] 選取 Azure 儲存體帳戶。
- 測試連線,然後按一下 [完成]。
在 [設定屬性] 視窗中,確認已為 [已連結的服務] 選取 [AzureStorageLinkedService]。 然後選取 [完成]。
移至 SinkDataset 的 [連線] 索引標籤,然後執行下列步驟:
- 在 [檔案路徑] 欄位中,輸入 adftutorial/incrementalcopy。 adftutorial 是 blob 容器名稱而 incrementalcopy 是資料夾名稱。 此程式碼片段假設您在 Blob 儲存體中有一個名為 adftutorial 的 Blob 容器。 建立容器 (若不存在),或設為現有容器的名稱。 如果輸出資料夾 incrementalcopy 不存在,Azure Data Factory 將會自動建立。 您也可以對檔案路徑使用 [瀏覽] 按鈕來瀏覽至 blob 容器中的資料夾。
- 為 [檔案路徑] 欄位的 [檔案] 部分選取 [新增動態內容 [Alt+P]],然後在開啟的視窗中輸入
@CONCAT('Incremental-', pipeline().RunId, '.txt')
。 然後選取 [完成]。 系統會使用運算式來動態產生此檔案名稱。 每個管線執行都有唯一的識別碼。 複製活動會使用執行識別碼來產生檔案名稱。
按一下頂端的 [管線] 索引標籤或左側樹狀檢視中的管線名稱,即可切換到管線編輯器。
在 [活動] 工具箱中展開 [一般],並將 [預存程序] 活動從 [活動] 工具箱拖放至管線設計工具介面。 將 [複製] 活動的綠色 (成功) 輸出連線至 [預存程序] 活動。
選取管線設計工具中的 [預存程序活動],將其名稱變更為 StoredProceduretoWriteWatermarkActivity。
切換至 [SQL 帳戶] 索引標籤,然後選取 [AzureSqlDatabaseLinkedService] 作為 [已連結的服務]。
切換至 [預存程序] 索引標籤,然後執行下列步驟:
針對 [預存程序名稱],選取 usp_write_watermark。
若要指定預存程序參數的值,請按一下 [匯入參數],然後輸入參數的下列值:
名稱 類型 值 LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
若要驗證管線設定,請按一下工具列上的 [驗證]。 確認沒有任何驗證錯誤。 若要關閉 [管線驗證報告] 視窗,請按一下 >>。
選取 [全部發佈] 按鈕,將實體 (連結的服務、資料集和管線) 發佈至 Azure Data Factory 服務。 請等候直至您看見成功發佈的訊息。
觸發管線執行
按一下工具列上的 [新增觸發程序],然後按一下 [立即觸發]。
在 [管線執行] 視窗中,選取 [完成]。
監視管道執行
切換至左側的 [監視] 索引標籤。 您可查看手動觸發程序所觸發的管線執行狀態。 您可以使用 [管線名稱] 資料行下的連結來檢視執行詳細資料,以及重新執行管線。
若要查看與管線執行相關聯的活動執行,請選取 [管線名稱] 資料行下的連結。 如需有關活動執行的詳細資料,請選取 [活動名稱] 資料行下的 [詳細資料] 連結 (眼鏡圖示)。 選取頂端的 [所有管線執行] 以回到管線執行檢視。 若要重新整理檢視,請選取 [重新整理]。
檢閱結果
透過使用 Azure 儲存體總管等工具來連線到您的 Azure 儲存體帳戶。 確認輸出檔案已建立於 adftutorial 容器的 incrementalcopy 資料夾中。
開啟輸出檔,請注意,所有的資料都會從 data_source_table 複製到 blob 檔案。
1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000
檢查
watermarktable
中的最新值。 您會看到水位線值已更新。Select * from watermarktable
輸出如下:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-05 8:06:00.000 |
將更多資料新增至來源
將新資料插入您的資料庫 (資料來源存放區) 中。
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
您的資料庫中更新的資料如下:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
觸發另一個管線執行
切換至 [編輯] 索引標籤。如果管線沒有在設計工具中開啟,請在樹狀檢視中按一下它。
按一下工具列上的 [新增觸發程序],然後按一下 [立即觸發]。
監視第二次管線執行
切換至左側的 [監視] 索引標籤。 您可查看手動觸發程序所觸發的管線執行狀態。 您可以使用 [管線名稱] 資料行下的連結來檢視活動詳細資料,以及重新執行管線。
若要查看與管線執行相關聯的活動執行,請選取 [管線名稱] 資料行下的連結。 如需有關活動執行的詳細資料,請選取 [活動名稱] 資料行下的 [詳細資料] 連結 (眼鏡圖示)。 選取頂端的 [所有管線執行] 以回到管線執行檢視。 若要重新整理檢視,請選取 [重新整理]。
確認第二個輸出
在 blob 儲存體中,您會看到已建立另一個檔案。 在本教學課程中,新的檔案名稱是
Incremental-<GUID>.txt
。 開啟該檔案,您會在其中看到兩列記錄。6,newdata,2017-09-06 02:23:00.0000000 7,newdata,2017-09-07 09:01:00.0000000
檢查
watermarktable
中的最新值。 您會看到水位線值再次更新。Select * from watermarktable
範例輸出:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-07 09:01:00.000 |
相關內容
在本教學課程中,您已執行下列步驟:
- 準備資料存放區來儲存水位線值。
- 建立資料處理站。
- 建立連結的服務。
- 建立來源、接收及水位線資料集。
- 建立管線。
- 執行管線。
- 監視管道執行。
- 檢閱結果
- 將更多資料新增至來源。
- 再次執行管線。
- 監視第二次管線執行
- 檢閱第二次執行的結果
在本教學課程中,管線已從 SQL Database 中的單一資料表將資料複製到 Blob 儲存體。 請前進到下列教學課程,了解如何將資料從 SQL Server 資料庫中的多個資料表複製到 SQL 資料庫。