자습서: Azure Stream Analytics 작업에서 Azure Functions 실행
이 자습서에서는 Azure Event Hubs에서 이벤트를 읽고, 이벤트 데이터에 대한 쿼리를 실행한 다음, Azure Cache for Redis 인스턴스에 쓰는 Azure 함수를 호출하는 Azure Stream Analytics 작업을 만듭니다.
참고 항목
- Functions를 Stream Analytics 작업에 대한 싱크(출력) 중 하나로 구성하면 Azure Stream Analytics에서 Azure Functions를 실행할 수 있습니다. Functions는 Azure 또는 타사 서비스에서 발생하는 이벤트로 트리거되는 코드를 구현할 수 있게 해주는 이벤트 중심의 컴퓨팅 온 디맨드 환경입니다. 트리거에 응답하는 이러한 Functions 기능 때문에 Stream Analytics 작업에 대한 출력이 자연스럽게 제공됩니다.
- Stream Analytics는 HTTP 트리거를 통해 Functions를 호출합니다. Functions 출력 어댑터를 통해 사용자는 Functions를 Stream Analytics에 연결할 수 있으므로, Stream Analytics 쿼리를 기준으로 그러한 이벤트를 트리거할 수 있습니다.
- 다중 테넌트 클러스터에서 실행 중인 Stream Analytics 작업에서 VNet(가상 네트워크) 내의 Azure Functions에 대한 연결은 지원되지 않습니다.
이 자습서에서는 다음을 하는 방법을 알아볼 수 있습니다.
- Azure Event Hubs 인스턴스 만들기
- Azure Cache for Redis 인스턴스 만들기
- Azure Function 만들기
- Stream Analytics 작업 만들기
- 이벤트 허브를 입력으로 구성하고 출력으로 작동
- Stream Analytics 작업 실행
- Azure Cache for Redis에서 결과 확인
Azure 구독이 아직 없는 경우 시작하기 전에 무료 계정을 만듭니다.
필수 조건
시작하기 전에 다음 단계를 완료해야 합니다.
- Azure 구독이 아직 없는 경우 무료 계정을 만듭니다.
- Microsoft 다운로드 센터에서 전화 통화 이벤트 생성기 앱 TelcoGenerator.zip을 다운로드하거나 GitHub에서 소스 코드를 가져옵니다.
Azure에 로그인
Azure Portal에 로그인합니다.
이벤트 허브 만들기
Stream Analytics에서 사기성 호출 데이터 스트림을 분석하려면 몇 가지 샘플 데이터를 이벤트 허브로 보내야 합니다. 이 자습서에서는 Azure Event Hubs를 사용하여 Azure로 데이터를 보냅니다.
다음 단계에 따라 이벤트 허브를 만들고 해당 이벤트 허브로 호출 데이터를 전송합니다.
Azure Portal에 로그인합니다.
왼쪽 메뉴에서 모든 서비스를 선택하고, 사물 인터넷을 선택하고, Event Hubs 위로 마우스를 누른 다음, +(추가) 단추를 선택합니다.
네임스페이스 만들기 페이지에서 다음 단계를 수행합니다.
이벤트 허브를 만들 Azure 구독을 선택합니다.
리소스 그룹의 경우 새로 만들기를 선택하고 리소스 그룹의 이름을 입력합니다. Event Hubs 네임스페이스는 이 리소스 그룹에 만들어집니다.
네임스페이스 이름에 Event Hubs 네임스페이스의 고유한 이름을 입력합니다.
위치에서 네임스페이스를 만들려는 지역을 선택합니다.
가격 책정 계층으로 표준을 선택합니다.
페이지 아래쪽에서 검토 + 만들기를 선택합니다.
네임스페이스 만들기 마법사의 검토 + 만들기 페이지에서 모든 설정을 검토한 후 페이지 하단의 만들기를 선택합니다.
네임스페이스가 성공적으로 배포되면 리소스로 이동을 선택하여 Event Hubs 네임스페이스 페이지로 이동합니다.
Event Hubs 네임스페이스 페이지의 명령 모음에서 +이벤트 허브를 선택합니다.
이벤트 허브 만들기 페이지에서 이벤트 허브의 이름을 입력합니다. 파티션 수를 2로 설정합니다. 나머지 설정에서 기본 옵션을 사용하고 검토 + 만들기를 선택합니다.
검토 + 만들기 페이지의 맨 아래에서 만들기를 선택합니다. 배포가 성공할 때까지 기다립니다.
이벤트 허브에 대한 액세스 부여 및 연결 문자열 가져오기
애플리케이션에서 Event Hubs로 데이터를 보낼 수 있으려면 이벤트 허브에 액세스 권한을 허용하는 정책이 있어야 합니다. 액세스 정책은 권한 부여 정보를 포함하는 연결 문자열을 생성합니다.
Event Hubs 네임스페이스 페이지의 왼쪽 메뉴에서 공유 액세스 정책을 선택합니다.
정책 목록에서 RootManageSharedAccessKey를 선택합니다.
그런 다음 연결 문자열 - 기본 키 옆에 있는 복사 단추를 선택합니다.
연결 문자열을 텍스트 편집기에 붙여넣습니다. 그 다음 섹션에서 이 연결 문자열이 필요합니다.
연결 문자열은 다음과 비슷합니다.
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>
연결 문자열 세미콜론으로 구분된 여러 키-값 쌍(엔드포인트, SharedAccessKeyName 및 SharedAccessKey)을 포함합니다.
이벤트 생성기 애플리케이션 시작
TelcoGenerator 앱을 시작하기 전에, 앞에서 만든 Azure Event Hubs로 데이터를 전송하도록 앱을 구성해야 합니다.
TelcoGenerator.zip 파일의 콘텐츠를 추출합니다.
원하는 텍스트 편집기에서
TelcoGenerator\TelcoGenerator\telcodatagen.exe.config
파일을 엽니다..config
파일이 여러 개 있으므로 올바른 파일을 열어야 합니다.구성 파일의
<appSettings>
요소를 다음 세부 정보로 업데이트합니다.- EventHubName 키 값을 연결 문자열 끝에 있는 EntityPath 값으로 설정합니다.
- Microsoft.ServiceBus.ConnectionString 키의 값을 네임스페이스에 대한 연결 문자열 설정합니다. 네임스페이스가 아닌 이벤트 허브에 대한 연결 문자열 사용하는 경우 마지막에 값(
;EntityPath=myeventhub
)을 제거EntityPath
합니다. EntityPath 값 앞에 오는 세미콜론은 제거해야 합니다.
파일을 저장합니다.
명령 창을 열고 TelcoGenerator 애플리케이션 압축을 푼 폴더로 변경합니다. 다음 명령을 입력합니다.
.\telcodatagen.exe 1000 0.2 2
이 명령은 다음 매개 변수를 사용합니다.
- 시간당 호출 데이터 레코드 수.
- 사기 확률 - 즉, 앱에서 사기성 호출을 시뮬레이션해야 하는 빈도. 값 0.2는 호출 레코드의 약 20%가 사기성으로 나타남을 의미합니다.
- 기간(시간) - 앱을 실행해야 하는 시간. 명령줄에서 프로세스(Ctrl+C)를 종료하여 언제든지 앱을 중지할 수도 있습니다.
몇 초 후 앱에서 이벤트 허브로 데이터를 전송함에 따라 화면에 전화 통화 레코드가 표시되기 시작합니다. 이 전화 통화 데이터에는 다음 필드가 포함되어 있습니다.
녹음 정의 CallrecTime 호출 시작 시간에 대한 타임스탬프 SwitchNum 호출 연결에 사용되는 전화 스위치. 이 예에서는 스위치는 발신 국가/지역(미국, 중국, 영국, 독일 또는 오스트레일리아)를 나타내는 문자열입니다. CallingNum 호출자의 전화번호. CallingIMSI 국제 모바일 구독자 ID(IMSI) 호출자의 고유 식별자. CalledNum 호출 수신자의 전화번호. CalledIMSI 국제 모바일 구독자 ID(IMSI) 호출 수신자의 고유 식별자.
Stream Analytics 작업 만들기
이제 호출 이벤트 스트림이 생겼으니, 이벤트 허브에서 데이터를 읽는 Stream Analytics 작업을 만들 수 있습니다.
- Stream Analytics 작업을 만들려면 Azure Portal로 이동합니다.
- 리소스 만들기를 선택하고 Stream Analytics 작업을 검색합니다. Stream Analytics 작업 타일을 선택하고 만들기를 선택합니다.
- 새 Stream Analytics 작업 페이지에서 다음 단계를 수행합니다.
구독에서 Event Hubs 네임스페이스가 포함된 구독을 선택합니다.
리소스 그룹에서 이전에 만든 리소스 그룹을 선택합니다.
인스턴스 세부 정보 섹션의 이름에 Stream Analytics 작업의 고유한 이름을 입력합니다.
지역에서 Stream Analytics 작업을 만들려는 지역을 선택합니다. 최상의 성능을 위해 동일한 지역에 작업과 이벤트 허브를 배치하여 지역 간 데이터 전송 비용을 지불하지 않는 것이 좋습니다.
호스팅 환경<에서 아직 선택하지 않은 경우 클라우드를 선택합니다. Stream Analytics 작업은 클라우드 또는 에지에 배포할 수 있습니다. 클라우드를 사용하면 Azure 클라우드에 배포할 수 있고, 에지를 사용하면 IoT Edge 디바이스에 배포할 수 있습니다.
스트리밍 단위의 경우 1을 선택합니다. 스트리밍 단위는 작업을 실행하는 데 필요한 컴퓨팅 리소스를 나타냅니다. 기본적으로 이 값은 1로 설정됩니다. 스트리밍 단위 크기를 조정하는 방법에 대한 자세한 내용은 스트리밍 단위의 이해 및 크기 조정 문서를 참조하세요.
페이지 아래쪽에서 검토 + 만들기를 선택합니다.
- 검토 + 만들기 페이지에서 설정을 검토한 다음 만들기를 선택하여 Stream Analytics 작업을 만듭니다.
- 작업이 배포된 후 리소스로 이동을 선택하여 Stream Analytics 작업 페이지로 이동합니다.
작업 입력 구성
다음 단계는 이전 섹션에서 만든 이벤트 허브를 사용하여 작업이 데이터를 읽을 입력 원본을 정의하는 것입니다.
Stream Analytics 작업 페이지의 왼쪽 메뉴에 있는 작업 토폴로지 섹션에서 입력을 선택합니다.
입력 페이지에서 + 입력 및 이벤트 허브 추가를 선택합니다.
이벤트 허브 페이지에서 다음 단계를 따릅니다.
입력 별칭에 CallStream을 입력합니다. 입력 별칭은 입력을 식별하기 위한 식별 이름입니다. 입력 별칭은 영숫자 문자, 하이픈, 밑줄만 사용할 수 있으며 길이가 3자에서 63자 사이여야 합니다.
구독에서 이벤트 허브를 만든 Azure 구독을 선택합니다. 이벤트 허브는 Stream Analytics 작업과 같은 구독일 수도 있고 다른 구독일 수도 있습니다.
Event Hubs 네임스페이스에서 이전 섹션에서 만든 Event Hubs 네임스페이스를 선택합니다. 현재 구독에서 사용할 수 있는 모든 네임스페이스가 드롭다운에 나열됩니다.
이벤트 허브 이름에서 이전 섹션에서 만든 이벤트 허브를 선택합니다. 선택한 네임스페이스에서 사용할 수 있는 모든 이벤트 허브가 드롭다운에 나열됩니다.
이벤트 허브 소비자 그룹의 경우 이벤트 허브에 새 소비자 그룹이 만들어지도록 새로 만들기 옵션을 선택한 상태로 유지합니다. 각 Stream Analytics 작업마다 고유한 소비자 그룹을 사용하는 것이 좋습니다. 소비자 그룹이 지정되지 않은 경우 Stream Analytics 작업은
$Default
소비자 그룹을 사용합니다. 작업에 셀프 조인이 포함되어 있거나 입력이 여러 개인 경우 나중에 둘 이상의 읽기 권한자가 일부 입력을 읽을 수 있습니다. 이러한 상황은 단일 소비자 그룹의 읽기 권한자 수에 영향을 줍니다.인증 모드에 대해 연결 문자열을 선택합니다. 이 옵션을 사용하면 자습서를 테스트하기가 더 쉽습니다.
이벤트 허브 정책 이름에서 기존 항목 사용을 선택한 다음 이전에 만든 정책을 선택합니다.
페이지 맨 아래에서 저장을 선택합니다.
Azure Cache for Redis 인스턴스 만들기
Azure Cache for Redis 인스턴스 만들기에 설명된 단계를 사용하여 Azure Cache for Redis에서 캐시를 만듭니다.
캐시를 만든 다음 설정 아래에서 액세스 키를 선택합니다. 기본 연결 문자열을 기록해 둡니다.
Azure Functions에서 데이터를 Azure Cache for Redis에 쓸 수 있는 함수 만들기
Functions 설명서의 함수 앱 만들기 섹션을 참조하세요. 이 샘플은 다음을 기반으로 작성되었습니다.
이 자습서에 따라 Visual Studio Code에서 기본 HttpTrigger 함수 앱을 만듭니다. 다음 정보가 사용됩니다. 언어:
C#
, 런타임:.NET 6
(함수 v4 아래), 템플릿:HTTP trigger
.프로젝트 폴더에 있는 터미널에서 다음 명령을 실행하여 Redis 클라이언트 라이브러리를 설치합니다.
dotnet add package StackExchange.Redis --version 2.2.88
대상 서버의 연결 문자열을 채우는
local.settings.json
의Values
섹션에RedisConnectionString
및RedisDatabaseIndex
항목을 추가합니다.{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "RedisConnectionString": "Your Redis Connection String", "RedisDatabaseIndex":"0" } }
Redis 데이터베이스 인덱스는 인스턴스에서 데이터베이스를 식별하는 0에서 15까지의 숫자입니다.
전체 함수(프로젝트의.cs 파일)를 다음 코드 조각으로 바꿉니다. 네임스페이스, 클래스 이름 및 함수 이름을 사용자 고유의 이름으로 업데이트합니다.
using System; using System.IO; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using StackExchange.Redis; namespace Company.Function { public static class HttpTrigger1{ [FunctionName("HttpTrigger1")] public static async Task<IActionResult> Run( [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req, ILogger log) { // Extract the body from the request string requestBody = await new StreamReader(req.Body).ReadToEndAsync(); if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check dynamic data = JsonConvert.DeserializeObject(requestBody); // Reject if too large, as per the doc if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString"); int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex")); using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString)) { // Connection refers to a property that returns a ConnectionMultiplexer IDatabase db = connection.GetDatabase(RedisDatabaseIndex); // Parse items and send to binding for (var i = 0; i < data.Count; i++) { string key = data[i].Time + " - " + data[i].CallingNum1; db.StringSet(key, data[i].ToString()); log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}"); // Simple get of data types from the cache string value = db.StringGet(key); log.LogInformation($"Database got: {key} => {value}"); } } return new OkResult(); // 200 } } }
Stream Analytics가 함수에서 "HTTP 요청 엔터티가 너무 큼" 예외를 수신할 경우 함수로 보내는 일괄 처리 크기를 줄입니다. 다음 코드는 Stream Analytics가 너무 큰 일괄 처리를 보내지 않도록 합니다. 함수에 사용되는 최대 일괄 처리 수 및 크기 값이 Stream Analytics 포털에 입력한 값과 일치하는지 확인합니다.
이제 함수를 Azure에 게시할 수 있습니다.
Azure Portal에서 함수를 열고
RedisConnectionString
및RedisDatabaseIndex
에 대한 애플리케이션 설정을 지정합니다.
출력으로 사용할 함수로 Stream Analytics 작업 업데이트
Azure 포털에서 Stream Analytics 작업을 엽니다.
함수를 찾아서 개요>출력>추가를 선택합니다. 새 출력을 추가하려면 싱크 옵션에 대해 Azure Function을 선택합니다. Functions 출력 어댑터에는 다음과 같은 속성이 있습니다.
속성 이름 설명 출력 별칭 작업 쿼리에서 출력을 참조하기 위해 사용하는 친숙한 이름입니다. 가져오기 옵션 현재 구독에서 함수를 사용하거나 함수가 다른 구독에 있는 경우 설정을 수동으로 제공할 수 있습니다. 함수 앱 Functions 앱의 이름입니다. 함수 Functions 앱의 이름(run.csx 함수 이름)입니다. 최대 일괄 처리 크기 함수로 전송되는 각 출력 일괄 처리의 최대 크기(바이트)를 설정합니다. 기본적으로 이 값은 256KB(262,144바이트)로 설정됩니다. 최대 일괄 처리 수 함수로 전송되는 각 일괄 처리에서 최대 이벤트 수를 지정합니다. 기본값은 100입니다. 이 속성은 선택 사항입니다. 키 다른 구독의 함수를 사용할 수 있습니다. 함수에 액세스하기 위한 키 값을 제공합니다. 이 속성은 선택 사항입니다. 출력 별칭의 이름을 제공합니다. 이 자습서에서는 이름이 saop1로 지정되어 있으며 원하는 이름을 사용할 수 있습니다. 기타 세부 정보를 채웁니다.
Stream Analytics 작업을 열고 쿼리를 다음과 같이 업데이트합니다.
Important
다음 샘플 스크립트에서는 입력 이름에 CallStream을 사용하고 출력 이름에 saop1을 사용했다고 가정합니다. 다른 이름을 사용한 경우 쿼리를 업데이트하는 것을 잊지 마세요.
SELECT System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1, CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2 INTO saop1 FROM CallStream CS1 TIMESTAMP BY CallRecTime JOIN CallStream CS2 TIMESTAMP BY CallRecTime ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5 WHERE CS1.SwitchNum != CS2.SwitchNum
명령줄에서 다음 명령을 실행하여 telcodatagen.exe 애플리케이션을 시작합니다. 명령은
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours]
형식을 사용합니다.telcodatagen.exe 1000 0.2 2
Stream Analytics 작업을 시작합니다.
Azure 함수의 모니터 페이지에서 함수가 호출되었음을 확인할 수 있습니다.
Azure Cache for Redis 페이지에서 캐시 왼쪽 메뉴의 메트릭을 선택하고 캐시 쓰기 메트릭을 추가한 다음 기간을 지난 1시간으로 설정합니다. 다음 이미지와 유사한 차트가 표시됩니다.
Azure Cache for Redis에서 결과 확인
Azure Functions 로그에서 키 가져오기
먼저 Azure Cache for Redis에 삽입된 레코드의 키를 가져옵니다. 코드에서 키는 다음 코드 조각과 같이 Azure 함수에서 계산됩니다.
string key = data[i].Time + " - " + data[i].CallingNum1;
db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
Azure Portal로 이동하여 Azure Functions 앱을 찾습니다.
왼쪽 메뉴에서 함수를 선택합니다.
함수 목록에서 HTTPTrigger1을 선택합니다.
왼쪽 메뉴에서 모니터를 선택합니다.
로그 탭으로 전환합니다.
다음 스크린샷과 같이 정보 메시지의 키를 기록해 둡니다. 이 키를 사용하여 Azure Cache for Redis에서 값을 찾습니다.
키를 사용하여 Azure Cache for Redis에서 레코드 찾기
Azure Portal로 이동하여 Azure Cache for Redis를 찾습니다. 콘솔을 선택합니다.
Azure Cache for Redis 명령을 사용하여 데이터가 Azure Cache for Redis에 있는지 확인합니다. (이 명령은 Get {key} 형식을 사용합니다.) Azure 함수에 대한 모니터 로그에서 복사한 키를 사용합니다(이전 섹션).
Get "KEY-FROM-THE-PREVIOUS-SECTION"
이 명령은 지정된 키에 대해 값을 인쇄합니다.
오류 처리 및 재시도
이벤트를 Azure Functions에 전송하는 동안 오류가 발생하면 Stream Analytics는 대부분의 작업을 다시 시도합니다. http 오류 413(엔터티가 너무 큼)을 제외하고 모든 http 예외는 성공할 때까지 다시 시도됩니다. 엔터티가 너무 큰 오류는 정책 다시 시도 또는 삭제가 적용되는 데이터 오류로 처리됩니다.
참고 항목
Stream Analytics에서 Azure Functions로의 HTTP 요청에 대한 시간 제한은 100초로 설정됩니다. Azure Functions 앱이 일괄 처리를 처리하는 데 100초 이상 걸리는 경우 Stream Analytics가 오류를 발생시키고 일괄 처리에 대해 다시 시도합니다.
시간 제한을 다시 시도하면 출력 싱크에 중복 이벤트가 기록될 수 있습니다. Stream Analytics는 실패한 일괄 처리를 다시 시도할 때 일괄 처리의 모든 이벤트를 다시 시도합니다. 예를 들어 Stream Analytics에서 Azure Functions로 전송되는 20개의 이벤트를 일괄 처리할 수 있습니다. Azure Functions가 해당 일괄 처리에서 처음 10개의 이벤트를 처리하는 데 100초가 걸린다고 가정합니다. 100초 후 Stream Analytics는 Azure Functions에서 긍정적인 응답을 받지 못했기 때문에 요청을 일시 중단하고 동일한 일괄 처리에 대해 다른 요청을 보냅니다. 일괄 처리의 처음 10개 이벤트가 Azure Functions로 다시 처리되며, 이로 인해 중복이 발생합니다.
알려진 문제
Azure 포털에서 최대 일괄 처리 크기/최대 일괄 처리 수 값을 빈 값(기본값)으로 재설정하려고 시도하면, 저장할 때 값이 이전에 입력된 값으로 다시 변경됩니다. 이 경우 이러한 필드에 대해 기본값을 수동으로 입력하십시오.
Azure Functions에서 HTTP 라우팅을 사용하는 것은 현재 Stream Analytics에서 지원되지 않습니다.
가상 네트워크에서 호스트되는 Azure Functions에 연결하는 지원이 사용하도록 설정되지 않습니다.
리소스 정리
더 이상 필요하지 않으면 리소스 그룹, 스트리밍 작업 및 모든 관련 리소스를 삭제합니다. 작업을 삭제하면 작업에서 사용된 스트리밍 단위에 대한 청구를 방지합니다. 작업을 나중에 사용하려는 경우 중지하고 나중에 필요할 때 다시 시작할 수 있습니다. 이 작업을 계속 사용하지 않으려면 다음 단계를 사용하여 이 빠른 시작에서 만든 모든 리소스를 삭제합니다.
- Azure Portal의 왼쪽 메뉴에서 리소스 그룹을 선택한 다음, 만든 리소스의 이름을 선택합니다.
- 리소스 그룹 페이지에서 삭제를 선택하고 텍스트 상자에서 삭제할 리소스의 이름을 입력한 다음, 삭제를 선택합니다.
다음 단계
이 자습서에서는 Azure 함수를 실행하는 간단한 Stream Analytics 작업을 만들었습니다. Stream Analytics 작업에 대해 자세히 알아보려면 그 다음 자습서를 계속 진행하세요.