다음을 통해 공유


detect_anomalous_new_entity_fl()

적용 대상: ✅Microsoft Fabric✅Azure Data ExplorerAzure MonitorMicrosoft Sentinel

타임스탬프가 지정된 데이터에서 비정상적인 새 엔터티의 모양을 검색합니다.

이 함수 detect_anomalous_new_entity_fl()트래픽 로그와 같은 타임스탬프가 지정된 데이터에서 IP 주소 또는 사용자와 같은 비정상적인 새 엔터티의 모양을 감지하는 UDF(사용자 정의 함수) 입니다. 사이버 보안 컨텍스트에서 이러한 이벤트는 의심스럽고 잠재적인 공격 또는 타협을 나타낼 수 있습니다.

변칙 모델은 각 범위에 대한 시간 bin(예: 일)에 나타나는 새 엔터티의 수를 나타내는 포아송 분포를 기반으로 합니다. 포아송 분포 매개 변수는 학습 기간에 새 엔터티의 모양 속도에 따라 추정되며, 최근 모양이 이전 엔터티보다 더 중요하다는 사실을 반영하는 감쇠 요인이 추가되었습니다. 따라서 구독 또는 계정과 같은 일부 범위당 정의된 검색 기간에 새 엔터티가 발생할 확률을 계산합니다. 모델 출력은 변칙, 감쇠 비율 매개 변수 등에 대한 최소 임계값과 같은 몇 가지 선택적 매개 변수에 의해 제어됩니다.

모델의 직접 출력은 새 엔터티를 발견할 예상 확률의 역을 기반으로 하는 변칙 점수입니다. 점수는 [0, 1]의 범위에서 단조롭고 1은 비정상적인 항목을 나타냅니다. 변칙 점수 외에도 검색된 변칙(최소 임계값 매개 변수로 제어됨) 및 기타 설명 필드에 대한 이진 플래그가 있습니다.

구문

detect_anomalous_new_entity_fl(entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [maxEntitiesThresh], [minTrainingDaysThresh], [decayParam], [anomalyScoreThresh])

구문 규칙에 대해 자세히 알아봅니다.

매개 변수

이름 Type 필수 설명
entityColumnName string ✔️ 변칙 모델이 계산되는 엔터티의 이름 또는 ID를 포함하는 입력 테이블 열의 이름입니다.
scopeColumnName string ✔️ 각 범위에 대해 다른 변칙 모델을 빌드할 수 있도록 파티션 또는 범위를 포함하는 입력 테이블 열의 이름입니다.
timeColumnName string ✔️ 학습 및 검색 기간을 정의하는 데 사용되는 타임스탬프를 포함하는 입력 테이블 열의 이름입니다.
startTraining datetime ✔️ 변칙 모델에 대한 학습 기간의 시작입니다. 해당 끝은 검색 기간의 시작 부분에 의해 정의됩니다.
startDetection datetime ✔️ 변칙 검색에 대한 검색 기간의 시작입니다.
endDetection datetime ✔️ 변칙 검색에 대한 검색 기간의 끝입니다.
maxEntitiesThresh int 변칙을 계산할 범위 내의 기존 엔터티의 최대 수입니다. 엔터티 수가 임계값을 초과하면 범위가 너무 시끄러운 것으로 간주되고 변칙이 계산되지 않습니다. 기본값은 60입니다.
minTrainingDaysThresh int 변칙을 계산하기 위해 범위가 존재하는 학습 기간의 최소 일 수입니다. 임계값 미만이면 범위가 너무 새롭고 알 수 없는 것으로 간주되므로 변칙이 계산되지 않습니다. 기본값은 14입니다.
decayParam real 변칙 모델의 감쇠 비율 매개 변수로, 범위(0,1])의 숫자입니다. 값이 낮을수록 감소 속도가 빨라지므로 학습 기간의 이후 모양에 더 많은 중요도가 부여됩니다. 값이 1이면 감쇠가 없으므로 포아송 분포 매개 변수 추정에 간단한 평균이 사용됩니다. 기본값은 0.95입니다.
anomalyScoreThresh real 변칙이 검색되는 변칙 점수의 최소값이며 범위 [0, 1]의 숫자입니다. 값이 높을수록 더 중요한 사례만 비정상으로 간주되므로 감지되는 변칙이 적습니다(정밀도가 높고 회수율이 낮습니다). 기본값은 0.9입니다.

함수 정의

다음과 같이 해당 코드를 쿼리 정의 함수로 포함하거나 데이터베이스에 저장된 함수로 만들어 함수를 정의할 수 있습니다.

다음 let 문을 사용하여 함수를 정의합니다. 사용 권한이 필요 없습니다.

Important

let 문자체적으로 실행할 수 없습니다. 그 뒤에 테이블 형식 식 문이 있어야 합니다. 작업 예제 detect_anomalous_new_entity_fl()를 실행하려면 예제를 참조 하세요.

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSet, firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
	| project-away scope1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// Write your query to use the function here.

예시

다음 예제에서는 호출 연산자를 사용하여 함수를 실행합니다.

쿼리 정의 함수를 사용하려면 포함된 함수 정의 후에 호출합니다.

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSet, firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability =  round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntity), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSet
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
    | project-away scope1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// synthetic data generation
let detectPeriodStart   = datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart    = datetime(2022-03-01 05:00);
let names               = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          = array_length(names);
let testData            = range t from 1 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend countEvents    = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100 // generate a series with weekly seasonality
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = hash_md5(rand())
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == detectPeriodStart, 'abcdefghijklmnoprtuvwxyz012345678', deviceId)
    | sort by timeSlice desc
;
testData
| invoke detect_anomalous_new_entity_fl(entityColumnName    = 'userName'  //principalName for positive, deviceId for negative
                                , scopeColumnName           = 'accountName'
                                , timeColumnName            = 'timeSlice'
                                , startTraining             = trainPeriodStart
                                , startDetection            = detectPeriodStart
                                , endDetection              = detectPeriodStart
                            )

출력

scope 엔터티 sliceTime t timeSlice countEvents userName deviceId accountName dataSet firstSeenSet newEntityProbability countKnownEntities lastNewEntityTimestamp slicesOnScope newEntityAnomalyScore isAnomalousNewEntity anomalyType anomalyExplainability anomalyState
prodEnvironment H4ck3r 2022-04-30 05:00:00.0000000 1440 2022-04-30 05:00:00.0000000 1,687 H4ck3r abcdefghijklmnoprtuvwxyz012345678 prodEnvironment detectSet trainSet 0.0031 4 2022-03-01 09:00:00.0000000 60 0.9969 1 newEntity_userName userName H4ck3r는 지난 60일 동안 accountName prodEnvironment에 표시되지 않았습니다. 이전에는 4개의 엔터티가 보였고, 그 중 마지막 엔터티는 2022-03-01 09:00에 나타났습니다. ["IT 지원: 2022-03-01 07:00", "Admin : 2022-03-01 08:00", "Dev2 : 2022-03-01 09:00", "Dev1 : 2022-03-01 14:00"]

함수를 실행하는 출력은 범위당 각 엔터티에 대한 테스트 데이터 세트의 첫 번째 행으로, 비정상으로 태그가 지정된 새 엔터티(학습 기간 동안 나타나지 않음을 의미)에 대해 필터링됩니다(즉, 엔터티 변칙 점수가 anomalyScoreThresh를 초과했음을 의미). 명확성을 위해 일부 다른 필드가 추가됩니다.

  • dataSet: 현재 데이터 세트(항상 detectSet)입니다.
  • firstSeenSet: 범위가 처음 표시된 데이터 세트입니다('trainSet'이어야 합니다).
  • newEntityProbability: 포아송 모델 추정을 기반으로 새 엔터티를 볼 확률입니다.
  • countKnownEntities: 범위의 기존 엔터티입니다.
  • lastNewEntityTimestamp: 마지막으로 새 엔터티가 비정상적인 엔터티 앞에 표시되었습니다.
  • slicesOnScope: 범위당 조각 수입니다.
  • newEntityAnomalyScore: 변칙 점수는 [0, 1] 범위의 새 엔터티로, 더 높은 값은 더 많은 변칙을 의미합니다.
  • isAnomalousNewEntity: 비정상적인 새 엔터티에 대한 이진 플래그
  • anomalyType: 변칙 유형을 보여 줍니다(여러 변칙 검색 논리를 함께 실행할 때 유용함).
  • anomalyExplainability: 생성된 변칙 및 해당 설명에 대한 텍스트 래퍼입니다.
  • anomalyState: 범위를 처음 본 기존 엔터티의 모음입니다.

기본 매개 변수를 사용하여 계정당 사용자당 이 함수를 실행하면 변칙 점수가 0.9969인 이전에 보이지 않고 비정상적인 사용자('H4ck3r')가 발생합니다. 즉, 학습 기간에 기존 사용자가 적기 때문에 예상치 못한 것입니다.

deviceId에서 기본 매개 변수를 엔터티로 사용하여 함수를 실행하는 경우 예상되는 기존 디바이스 수가 많기 때문에 변칙이 표시되지 않습니다. 그러나 anomalyScoreThresh 매개 변수를 0.0001로 낮추고 매개 변수를 maxEntitiesThresh에서 10000으로 올리면 회수를 위해 정밀도를 효과적으로 줄이고 디바이스 'abcdefghijklmnoprtuvwxyz012345678'에서 변칙(낮은 변칙 점수 포함)을 감지합니다.

출력은 표준화된 형식의 설명 필드와 함께 비정상적인 엔터티를 보여 줍니다. 이러한 필드는 변칙을 조사하고 여러 엔터티에서 비정상적인 엔터티 검색을 실행하거나 다른 알고리즘을 함께 실행하는 데 유용합니다.

사이버 보안 컨텍스트에서 제안된 사용은 계정의 구독과 같은 의미 있는 범위에 따라 사용자 이름 또는 IP 주소와 같은 의미 있는 엔터티에서 함수를 실행하고 있습니다. 검색된 비정상적인 새 엔터티는 해당 모양이 범위에서 예상되지 않으며 의심스러울 수 있음을 의미합니다.