在 Azure SQL Edge 中建立資料串流工作
重要
Azure SQL Edge 將於 2025 年 9 月 30 日淘汰。 如需詳細資訊和移轉選項,請參閱淘汰通知。
注意
Azure SQL Edge 不再支援 ARM64 平台。
本文說明如何在 Azure SQL Edge 中建立 T-SQL 串流工作。 您可以建立外部串流輸入和輸出物件,然後將串流工作查詢定義為串流工作建立的一部分。
設定外部串流的輸入和輸出物件
T-SQL 串流會使用 SQL Server 的外部資料來源功能,來定義與外部串流的串流工作輸入和輸出相關聯的資料來源。 使用下列 T-SQL 命令來建立外部串流的輸入或輸出物件:
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
此外,如果將 Azure SQL Edge、SQL Server 或 Azure SQL Database 用作輸出串流,您需要 CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL)。 此 T-SQL 命令會定義存取資料庫的認證。
所支援的輸入和輸出串流資料來源
Azure SQL Edge 目前僅支援使用下列資料來源作為串流的輸入和輸出。
資料來源類型 | 輸入 | 輸出 | 描述 |
---|---|---|---|
Azure IoT Edge 中樞 | Y | Y | 用來讀取和寫入串流資料至 Azure IoT Edge 中樞的資料來源。 如需詳細資訊,請參閱 IoT Edge 中樞。 |
SQL Database | 否 | 是 | 用來將串流資料寫入到 SQL Database 的資料來源連線。 資料庫可以是 Azure SQL Edge 中的本機資料庫,也可以是 SQL Server 或 Azure SQL Database 中的遠端資料庫。 |
Kafka | 是 | 否 | 用來從 Kafka 主題讀取串流資料的資料來源。 |
範例:為 Azure IoT Edge 中樞建立外部串流的輸入/輸出物件
下列範例會為 Azure IoT Edge 中樞建立外部串流物件。 若要為 Azure IoT Edge 中樞建立外部串流的輸入/輸出資料來源,您必須先為要讀取/寫入資料的配置建立外部檔案格式。
建立 JSON 類型的外部檔案格式。
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
建立 Azure IoT Edge 中樞的外部資料來源。 下列 T-SQL 指令碼會建立對 IoT Edge 中樞 (在與 Azure SQL Edge 相同的Docker 主機上執行) 的資料來源連線。
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
建立 Azure IoT Edge 中樞的外部串流物件。 下列 T-SQL 指令碼會為 IoT Edge 中樞建立串流物件。 如果是 IoT Edge 中樞串流物件,LOCATION 參數會是所要讀取或寫入的 IoT Edge 中樞主題或通道名稱。
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
範例:建立對 Azure SQL Database 的外部串流物件
下列範例會在 Azure SQL Edge 中建立本機資料庫的外部串流物件。
在資料庫上建立主要金鑰。 這是加密認證祕密的必要項目。
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
建立資料庫範圍認證以存取 SQL Server 來源。 下列範例會針對 IDENTITY = 'username' 和 SECRET = 'password' 的外部資料來源建立認證。
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
使用 CREATE EXTERNAL DATA SOURCE 建立外部資料來源。 下列範例將:
- 建立名為 LocalSQLOutput 的外部資料來源。
- 指定外部資料來源 (
LOCATION = '<vendor>://<server>[:<port>]'
)。 在此範例中,其會指向 Azure SQL Edge 的本機執行個體。 - 使用先前建立的認證。
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
建立外部串流物件。 下列範例會建立指向資料庫 MySQLDatabase 中資料表 dbo.TemperatureMeasurements 的外部串流物件。
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
範例:建立 Kafka 的外部串流物件
下列範例會在 Azure SQL Edge 中建立本機資料庫的外部串流物件。 此範例假設 kafka 伺服器已針對匿名存取設定。
使用 CREATE EXTERNAL DATA SOURCE 建立外部資料來源。 下列範例將:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
建立 Kafka 輸入的外部檔案格式。 下列範例會使用 GZipped 壓縮建立 JSON 檔案格式。
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
建立外部串流物件。 下列範例會建立指向 Kafka 主題
TemperatureMeasurement
的外部串流物件。CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
建立串流工作和串流查詢
使用 sys.sp_create_streaming_job
系統預存程序來定義串流查詢,並建立串流作業。 sp_create_streaming_job
預存程序會採用下列參數:
@job_name
:串流工作的名稱。 串流作業名稱在整個執行個體中是唯一的。@statement
:以串流分析查詢語言為基礎的串流查詢陳述式。
下列範例會建立具有一個串流查詢的簡易串流工作。 此查詢會從 IoT Edge 中樞讀取輸入,並寫入到資料庫中的 dbo.TemperatureMeasurements
。
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
下列範例會建立具有多個不同查詢的較複雜的串流工作。 這些查詢包含一個使用內建 AnomalyDetection_ChangePoint
函式來識別溫度資料異常的查詢。
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
開始、停止、捨棄和監視串流工作
若要在 Azure SQL Edge 中開始串流工作,請執行 sys.sp_start_streaming_job
預存程序。 預存程序需要要開始的串流工作的名稱 (作為輸入)。
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
若要停止串流工作,請執行 sys.sp_stop_streaming_job
預存程序。 預存程序需要要停止的串流工作的名稱 (作為輸入)。
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
若要捨棄 (或刪除) 串流工作,請執行 sys.sp_drop_streaming_job
預存程序。 預存程序需要要捨棄的串流工作的名稱 (作為輸入)。
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
若要取得串流工作的目前狀態,請執行 sys.sp_get_streaming_job
預存程序。 預存程序需要要捨棄的串流工作的名稱 (作為輸入)。 其會輸出串流工作的名稱和目前狀態。
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
串流工作可以具有下列任何一種狀態:
狀態 | 描述 |
---|---|
已建立 | 已建立串流工作,但尚未開始。 |
啟動中 | 串流作業處於正在啟動的階段。 |
閒置 | 串流工作正在執行,但沒有任何要處理的輸入。 |
正在處理 | 串流作業正在執行,且正在處理輸入。 此狀態表示串流作業處於健全狀態。 |
已降級 | 串流工作正在執行,但在輸入處理期間有一些非嚴重錯誤。 輸入作業會繼續執行,但會捨棄發生錯誤的輸入。 |
已停止 | 串流作業已停止。 |
失敗 | 串流工作失敗。 這通常表示處理期間發生嚴重錯誤。 |
注意
由於串流工作是非同步執行,因此工作可能會在執行階段遇到錯誤。 為針對串流工作失敗進行疑難排解,請使用 sys.sp_get_streaming_job
預存程序,或檢閱來自 Azure SQL Edge 容器的 Docker 記錄,其可提供串流工作的錯誤詳細資料。