다음을 통해 공유


detect_anomalous_spike_fl()

적용 대상: ✅Microsoft FabricAzure Data ExplorerAzure MonitorMicrosoft Sentinel

타임스탬프가 지정된 데이터의 숫자 변수에서 비정상적인 급증의 모양을 검색합니다.

detect_anomalous_spike_fl() 함수는 UDF(사용자 정의 함수), 트래픽 로그와 같은 타임스탬프가 있는 데이터에서 유출된 데이터 양 또는 실패한 로그인 시도와 같은 숫자 변수에서 비정상적인 급증의 모양을 감지합니다. 사이버 보안 컨텍스트에서 이러한 이벤트는 의심스럽고 잠재적인 공격 또는 타협을 나타낼 수 있습니다.

변칙 모델은 Z 점수(평균 이상의 표준 편차 수) 및 Q 점수(높은 분위수 이상의 시퀀티 범위 수)의 조합을 기반으로 합니다. Z 점수는 간단하고 일반적인 이상값 메트릭입니다. Q 점수는 Tukey의 울타리를 기반으로 하지만 더 많은 제어를 위해 정의를 모든 분위수로 확장합니다. 다른 분위수(기본적으로 95번째 및 25번째 분위수 사용)를 선택하면 더 중요한 이상값을 감지하여 정밀도를 향상시킬 수 있습니다. 모델은 일부 숫자 변수를 기반으로 빌드되며 구독 또는 계정과 같은 범위 및 엔터티당(예: 사용자 또는 디바이스)에 따라 계산됩니다.

단일 분산 숫자 데이터 요소에 대한 점수를 계산하고 다른 요구 사항을 확인한 후(예: 범위에 대한 학습 기간의 활성 일 수가 미리 정의된 임계값을 초과함) 각 점수가 미리 정의된 임계값을 초과하는지 확인합니다. 이 경우 급증이 감지되고 데이터 포인트가 비정상으로 플래그가 지정됩니다. 두 가지 모델이 빌드됩니다. 하나는 엔터티 수준(entityColumnName 매개 변수로 정의됨)(예: 범위당 사용자 또는 디바이스(scopeColumnName 매개 변수로 정의됨)에 대한 모델(예: 계정 또는 구독)입니다. 두 번째 모델은 전체 범위에 대해 빌드됩니다. 변칙 검색 논리는 각 모델에 대해 실행되며 그 중 하나에서 변칙이 검색되면 표시됩니다. 기본적으로 상승 급증이 감지됩니다. 하향 급증('딥')은 일부 컨텍스트에서도 흥미로울 수 있으며 논리를 조정하여 검색할 수 있습니다.

모델의 직접 출력은 점수를 기반으로 하는 변칙 점수입니다. 점수는 [0, 1]의 범위에서 단조롭고 1은 비정상적인 항목을 나타냅니다. 변칙 점수 외에도 검색된 변칙(최소 임계값 매개 변수로 제어됨) 및 기타 설명 필드에 대한 이진 플래그가 있습니다.

함수는 변수의 임시 구조를 무시합니다(주로 확장성 및 설명성을 위해). 변수에 추세 및 계절성과 같은 중요한 임시 구성 요소가 있는 경우 series_decompose_anomalies() 함수를 고려하거나 series_decompose() 사용하여 잔차를 계산하고 그 위에 detect_anomalous_spike_fl() 실행하는 것이 좋습니다.

통사론

detect_anomalous_spike_fl( numericColumnName, entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [minTrainingDaysThresh], [lowPercentileForQscore], [highPercentileForQscore], [minSlicesPerEntity], [zScoreThreshEntity], [qScoreThreshEntity], [minNumValueThreshEntity], [minSlicesPerScope], [zScoreThreshScope], [qScoreThreshScope], [minNumValueThreshScope])

구문 규칙에 대해 자세히 알아봅니다.

매개 변수

이름 필수 묘사
numericColumnName string ✔️ 변칙 모델이 계산되는 숫자 변수를 포함하는 입력 테이블 열의 이름입니다.
entityColumnName string ✔️ 변칙 모델이 계산되는 엔터티의 이름 또는 ID를 포함하는 입력 테이블 열의 이름입니다.
scopeColumnName string ✔️ 각 범위에 대해 다른 변칙 모델을 빌드할 수 있도록 파티션 또는 범위를 포함하는 입력 테이블 열의 이름입니다.
timeColumnName string ✔️ 학습 및 검색 기간을 정의하는 데 사용되는 타임스탬프를 포함하는 입력 테이블 열의 이름입니다.
startTraining datetime ✔️ 변칙 모델에 대한 학습 기간의 시작입니다. 해당 끝은 검색 기간의 시작 부분에 의해 정의됩니다.
startDetection datetime ✔️ 변칙 검색에 대한 검색 기간의 시작입니다.
endDetection datetime ✔️ 변칙 검색에 대한 검색 기간의 끝입니다.
minTrainingDaysThresh int 변칙을 계산하기 위해 범위가 존재하는 학습 기간의 최소 일 수입니다. 임계값 미만이면 범위가 너무 새롭고 알 수 없는 것으로 간주되므로 변칙이 계산되지 않습니다. 기본값은 14입니다.
lowPercentileForQscore real Q 점수에 대해 낮은 제한으로 계산할 백분위수를 나타내는 범위 [0.0,1.0]의 숫자입니다. Tukey의 울타리에서 0.25가 사용됩니다. 기본값은 0.25입니다. 더 낮은 백분위수는 더 중요한 변칙이 검색되면 정밀도를 향상시킵니다.
highPercentileForQscore real Q 점수에 대해 높은 제한으로 계산할 백분위수를 나타내는 범위 [0.0,1.0]의 숫자입니다. Tukey의 울타리에서 0.75가 사용됩니다. 기본값은 0.9입니다. 더 높은 백분위수 선택에서는 더 중요한 변칙이 검색되면 정밀도가 향상됩니다.
minSlicesPerEntity int 변칙 모델을 빌드하기 전에 엔터티에 존재할 'slices'(예: 일)의 최소 임계값입니다. 숫자가 임계값보다 낮으면 엔터티가 너무 신규적이고 불안정한 것으로 간주됩니다. 기본값은 20입니다.
zScoreThreshEntity real 엔터티 수준 Z 점수(평균 이상의 표준 편차 수)에 대한 최소 임계값은 변칙으로 플래그가 지정됩니다. 더 높은 값을 선택하면 더 중요한 변칙만 검색됩니다. 기본값은 3.0입니다.
qScoreThreshEntity real 변칙으로 플래그가 지정될 엔터티 수준 Q 점수(높은 분위수 이상의 시퀀티 범위 수)에 대한 최소 임계값입니다. 더 높은 값을 선택하면 더 중요한 변칙만 검색됩니다. 기본값은 2.0입니다.
minNumValueThreshEntity long 엔터티에 대한 변칙으로 플래그가 지정되는 숫자 변수의 최소 임계값입니다. 이는 값이 비정상적으로(높은 Z 점수 및 Q 점수) 대/소문자를 필터링하는 데 유용하지만 값 자체가 너무 작아 흥미로울 수 없습니다. 기본값은 0입니다.
minSlicesPerScope int 변칙 모델을 빌드하기 전에 범위에 존재하는 '조각'(예: 일)의 최소 임계값입니다. 숫자가 임계값보다 낮으면 범위가 너무 새롭고 불안정한 것으로 간주됩니다. 기본값은 20입니다.
zScoreThreshScope real 변칙으로 플래그가 지정될 범위 수준 Z 점수(평균 이상의 표준 편차 수)에 대한 최소 임계값입니다. 더 높은 값을 선택하면 더 중요한 변칙만 검색됩니다. 기본값은 3.0입니다.
qScoreThreshScope real 변칙으로 플래그가 지정될 범위 수준 Q 점수(높은 분위수 이상의 시퀀티 범위 수)에 대한 최소 임계값입니다. 더 높은 값을 선택하면 더 중요한 변칙만 검색됩니다. 기본값은 2.0입니다.
minNumValueThreshScope long 범위의 변칙으로 플래그를 지정하는 숫자 변수의 최소 임계값입니다. 이는 값이 비정상적으로(높은 Z 점수 및 Q 점수) 대/소문자를 필터링하는 데 유용하지만 값 자체가 너무 작아 흥미로울 수 없습니다. 기본값은 0입니다.

함수 정의

다음과 같이 해당 코드를 쿼리 정의 함수로 포함하거나 데이터베이스에 저장된 함수로 만들어 함수를 정의할 수 있습니다.

  • 쿼리 정의
  • 저장된

다음 let 문사용하여 함수를 정의합니다. 권한이 필요하지 않습니다.

중요하다

let 문 자체적으로 실행할 수 없습니다. 테이블 형식 식 문 뒤에합니다. detect_anomalous_spike_fl()작업 예제를 실행하려면 예제참조하세요.

let detect_anomalous_spike_fl = (T:(*), numericColumnName:string, entityColumnName:string, scopeColumnName:string
                            , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime, minTrainingDaysThresh:int = 14
                            , lowPercentileForQscore:real = 0.25, highPercentileForQscore:real = 0.9
                            , minSlicesPerEntity:int = 20, zScoreThreshEntity:real = 3.0, qScoreThreshEntity:real = 2.0, minNumValueThreshEntity:long = 0
                            , minSlicesPerScope:int = 20, zScoreThreshScope:real = 3.0, qScoreThreshScope:real = 2.0, minNumValueThreshScope:long = 0)
{
// pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend numVec     = tolong(column_ifexists(numericColumnName, 0))
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
let aggregatedCandidateScopeData = (
    processedData
    | summarize firstSeenScope = min(sliceTime), lastSeenScope = max(sliceTime) by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection
);
let entityModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesEntity = dcount(sliceTime), avgNumEntity = avg(numVec), sdNumEntity = stdev(numVec)
            , lowPrcNumEntity = percentile(numVec, lowPercentileForQscore), highPrcNumEntity = percentile(numVec, highPercentileForQscore)
            , firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime)
        by scope, entity
    | extend slicesInTrainingEntity = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
);
let scopeModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesScope = dcount(sliceTime), avgNumScope = avg(numVec), sdNumScope = stdev(numVec)
            , lowPrcNumScope = percentile(numVec, lowPercentileForQscore), highPrcNumScope = percentile(numVec, highPercentileForQscore)
        by scope
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | join kind = leftouter (entityModelData) on scope, entity 
    | join kind = leftouter (scopeModelData) on scope
    | extend zScoreEntity       = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - avgNumEntity)/(sdNumEntity + 1), 2), 0.0)
            , qScoreEntity      = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - highPrcNumEntity)/(highPrcNumEntity - lowPrcNumEntity + 1), 2), 0.0)
            , zScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - avgNumScope)/(sdNumScope + 1), 2), 0.0)
            , qScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - highPrcNumScope)/(highPrcNumScope - lowPrcNumScope + 1), 2), 0.0)
    | extend isSpikeOnEntity    = iff((slicesInTrainingEntity >= minTrainingDaysThresh and zScoreEntity > zScoreThreshEntity and qScoreEntity > qScoreThreshEntity and numVec >= minNumValueThreshEntity), 1, 0)
            , entityHighBaseline= round(max_of((avgNumEntity + sdNumEntity), highPrcNumEntity), 2)
            , isSpikeOnScope    = iff((countSlicesScope >= minTrainingDaysThresh and zScoreScope > zScoreThreshScope and qScoreScope > qScoreThreshScope and numVec >= minNumValueThreshScope), 1, 0)
            , scopeHighBaseline = round(max_of((avgNumEntity + 2 * sdNumEntity), highPrcNumScope), 2)
    | extend entitySpikeAnomalyScore = iff(isSpikeOnEntity  == 1, round(1.0 - 0.25/(max_of(zScoreEntity, qScoreEntity)),4), 0.00)
            , scopeSpikeAnomalyScore = iff(isSpikeOnScope == 1, round(1.0 - 0.25/(max_of(zScoreScope, qScoreScope)), 4), 0.00)
    | where isSpikeOnEntity == 1 or isSpikeOnScope == 1
    | extend avgNumEntity   = round(avgNumEntity, 2), sdNumEntity = round(sdNumEntity, 2)
            , avgNumScope   = round(avgNumScope, 2), sdNumScope = round(sdNumScope, 2)
   | project-away entity1, scope1, scope2, scope3
   | extend anomalyType = iff(isSpikeOnEntity == 1, strcat('spike_', entityColumnName), strcat('spike_', scopeColumnName)), anomalyScore = max_of(entitySpikeAnomalyScore, scopeSpikeAnomalyScore)
   | extend anomalyExplainability = iff(isSpikeOnEntity == 1
        , strcat('The value of numeric variable ', numericColumnName, ' for ', entityColumnName, ' ', entity, ' is ', numVec, ', which is abnormally high for this '
            , entityColumnName, ' at this ', scopeColumnName
            , '. Based on observations from last ' , slicesInTrainingEntity, ' ', timePeriodBinSize, 's, the expected baseline value is below ', entityHighBaseline, '.')
        , strcat('The value of numeric variable ', numericColumnName, ' on ', scopeColumnName, ' ', scope, ' is ', numVec, ', which is abnormally high for this '
            , scopeColumnName, '. Based on observations from last ' , slicesInTrainingScope, ' ', timePeriodBinSize, 's, the expected baseline value is below ', scopeHighBaseline, '.'))
   | extend anomalyState = iff(isSpikeOnEntity == 1
        , bag_pack('avg', avgNumEntity, 'stdev', sdNumEntity, strcat('percentile_', lowPercentileForQscore), lowPrcNumEntity, strcat('percentile_', highPercentileForQscore), highPrcNumEntity)
        , bag_pack('avg', avgNumScope, 'stdev', sdNumScope, strcat('percentile_', lowPercentileForQscore), lowPrcNumScope, strcat('percentile_', highPercentileForQscore), highPrcNumScope))
   | project-away lowPrcNumEntity, highPrcNumEntity, lowPrcNumScope, highPrcNumScope
);
resultsData
};
// Write your query to use the function here.

본보기

다음 예제에서는 호출 연산자 사용하여 함수를 실행합니다.

  • 쿼리 정의
  • 저장된

쿼리 정의 함수를 사용하려면 포함된 함수 정의 후에 호출합니다.

쿼리 실행

let detect_anomalous_spike_fl = (T:(*), numericColumnName:string, entityColumnName:string, scopeColumnName:string
                            , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime, minTrainingDaysThresh:int = 14
                            , lowPercentileForQscore:real = 0.25, highPercentileForQscore:real = 0.9
                            , minSlicesPerEntity:int = 20, zScoreThreshEntity:real = 3.0, qScoreThreshEntity:real = 2.0, minNumValueThreshEntity:long = 0
                            , minSlicesPerScope:int = 20, zScoreThreshScope:real = 3.0, qScoreThreshScope:real = 2.0, minNumValueThreshScope:long = 0)
{
// pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend numVec     = tolong(column_ifexists(numericColumnName, 0))
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
let aggregatedCandidateScopeData = (
    processedData
    | summarize firstSeenScope = min(sliceTime), lastSeenScope = max(sliceTime) by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection
);
let entityModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesEntity = dcount(sliceTime), avgNumEntity = avg(numVec), sdNumEntity = stdev(numVec)
            , lowPrcNumEntity = percentile(numVec, lowPercentileForQscore), highPrcNumEntity = percentile(numVec, highPercentileForQscore)
            , firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime)
        by scope, entity
    | extend slicesInTrainingEntity = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
);
let scopeModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesScope = dcount(sliceTime), avgNumScope = avg(numVec), sdNumScope = stdev(numVec)
            , lowPrcNumScope = percentile(numVec, lowPercentileForQscore), highPrcNumScope = percentile(numVec, highPercentileForQscore)
        by scope
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | join kind = leftouter (entityModelData) on scope, entity 
    | join kind = leftouter (scopeModelData) on scope
    | extend zScoreEntity       = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - avgNumEntity)/(sdNumEntity + 1), 2), 0.0)
            , qScoreEntity      = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - highPrcNumEntity)/(highPrcNumEntity - lowPrcNumEntity + 1), 2), 0.0)
            , zScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - avgNumScope)/(sdNumScope + 1), 2), 0.0)
            , qScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - highPrcNumScope)/(highPrcNumScope - lowPrcNumScope + 1), 2), 0.0)
    | extend isSpikeOnEntity    = iff((slicesInTrainingEntity >= minTrainingDaysThresh and zScoreEntity > zScoreThreshEntity and qScoreEntity > qScoreThreshEntity and numVec >= minNumValueThreshEntity), 1, 0)
            , entityHighBaseline= round(max_of((avgNumEntity + sdNumEntity), highPrcNumEntity), 2)
            , isSpikeOnScope    = iff((countSlicesScope >= minTrainingDaysThresh and zScoreScope > zScoreThreshScope and qScoreScope > qScoreThreshScope and numVec >= minNumValueThreshScope), 1, 0)
            , scopeHighBaseline = round(max_of((avgNumEntity + 2 * sdNumEntity), highPrcNumScope), 2)
    | extend entitySpikeAnomalyScore = iff(isSpikeOnEntity  == 1, round(1.0 - 0.25/(max_of(zScoreEntity, qScoreEntity)),4), 0.00)
            , scopeSpikeAnomalyScore = iff(isSpikeOnScope == 1, round(1.0 - 0.25/(max_of(zScoreScope, qScoreScope)), 4), 0.00)
    | where isSpikeOnEntity == 1 or isSpikeOnScope == 1
    | extend avgNumEntity   = round(avgNumEntity, 2), sdNumEntity = round(sdNumEntity, 2)
            , avgNumScope   = round(avgNumScope, 2), sdNumScope = round(sdNumScope, 2)
   | project-away entity1, scope1, scope2, scope3
   | extend anomalyType = iff(isSpikeOnEntity == 1, strcat('spike_', entityColumnName), strcat('spike_', scopeColumnName)), anomalyScore = max_of(entitySpikeAnomalyScore, scopeSpikeAnomalyScore)
   | extend anomalyExplainability = iff(isSpikeOnEntity == 1
        , strcat('The value of numeric variable ', numericColumnName, ' for ', entityColumnName, ' ', entity, ' is ', numVec, ', which is abnormally high for this '
            , entityColumnName, ' at this ', scopeColumnName
            , '. Based on observations from last ' , slicesInTrainingEntity, ' ', timePeriodBinSize, 's, the expected baseline value is below ', entityHighBaseline, '.')
        , strcat('The value of numeric variable ', numericColumnName, ' on ', scopeColumnName, ' ', scope, ' is ', numVec, ', which is abnormally high for this '
            , scopeColumnName, '. Based on observations from last ' , slicesInTrainingScope, ' ', timePeriodBinSize, 's, the expected baseline value is below ', scopeHighBaseline, '.'))
   | extend anomalyState = iff(isSpikeOnEntity == 1
        , bag_pack('avg', avgNumEntity, 'stdev', sdNumEntity, strcat('percentile_', lowPercentileForQscore), lowPrcNumEntity, strcat('percentile_', highPercentileForQscore), highPrcNumEntity)
        , bag_pack('avg', avgNumScope, 'stdev', sdNumScope, strcat('percentile_', lowPercentileForQscore), lowPrcNumScope, strcat('percentile_', highPercentileForQscore), highPrcNumScope))
   | project-away lowPrcNumEntity, highPrcNumEntity, lowPrcNumScope, highPrcNumScope
);
resultsData
};
let detectPeriodStart   	= datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart    	= datetime(2022-03-01 05:00);
let names               	= pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          	= array_length(names);
let testData            	= range t from 1 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend countEvents    = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = hash_md5(rand())
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend countEvents 	= iff(timeSlice == detectPeriodStart, 3*countEvents, countEvents)
    | sort by timeSlice desc
;    
testData
| invoke detect_anomalous_spike_fl(numericColumnName        = 'countEvents'
                                , entityColumnName          = 'userName'
                                , scopeColumnName           = 'accountName'
                                , timeColumnName            = 'timeSlice'
                                , startTraining             = trainPeriodStart
                                , startDetection            = detectPeriodStart
                                , endDetection              = detectPeriodStart
                            )

출력

t timeSlice countEvents userName deviceId accountName 범위 실체 numVec sliceTime dataSet firstSeenScope lastSeenScope slicesInTrainingScope countSlicesEntity avgNumEntity sdNumEntity firstSeenEntity lastSeenEntity slicesInTrainingEntity countSlicesScope avgNumScope sdNumScope zScoreEntity qScoreEntity zScoreScope qScoreScope isSpikeOnEntity entityHighBaseline isSpikeOnScope scopeHighBaseline entitySpikeAnomalyScore scopeSpikeAnomalyScore anomalyType anomalyScore anomalyExplainability anomalyState
1440 2022-04-30 05:00:00.0000000 5079 H4ck3r 9e8e151aced5a64938b93ee0c13fe940 prodEnvironment prodEnvironment H4ck3r 5079 2022-04-30 05:00:00.0000000 detectSet 2022-03-01 08:00:00.0000000 2022-04-30 05:00:00.0000000 60 1155 1363.22 267.51 0 0 13.84 185.46 0 1 628 0 0.9987 spike_accountName 0.9987 accountName prodEnvironment의 숫자 변수 countEvents 값은 5079이며 이 accountName의 경우 비정상적으로 높습니다. 지난 60일 간의 관찰에 따라 예상 기준 값은 628.0 미만입니다. {"avg": 1363.22,"stdev": 267.51,"percentile_0.25": 605,"percentile_0.9": 628}

함수를 실행하는 출력은 범위 또는 엔터티 수준에서 비정상적인 급증으로 태그가 지정된 검색 데이터 세트의 행입니다. 명확성을 위해 일부 다른 필드가 추가됩니다.

  • dataSet: 현재 데이터 세트(항상 detectSet)입니다.
  • firstSeenScope: 범위가 처음 표시되었을 때 타임스탬프입니다.
  • lastSeenScope: 범위를 마지막으로 본 타임스탬프입니다.
  • slicesInTrainingScope: 범위가 학습 데이터 세트에 있는 조각 수(예: 일)입니다.
  • countSlicesEntity: 엔터티가 범위에 존재하는 조각 수(예: 일)입니다.
  • avgNumEntity: 범위의 엔터티당 학습 집합에 있는 숫자 변수의 평균입니다.
  • sdNumEntity: 범위의 엔터티당 학습 집합에 있는 숫자 변수의 표준 편차입니다.
  • firstSeenEntity: 범위에서 엔터티가 처음 표시되었을 때의 타임스탬프입니다.
  • lastSeenEntity: 엔터티가 범위에서 마지막으로 표시된 타임스탬프입니다.
  • slicesInTrainingEntity: 엔터티가 학습 데이터 세트의 범위에 존재하는 조각 수(예: 일)입니다.
  • countSlicesScope: 범위가 존재하는 조각 수(예: 일)입니다.
  • avgNumScope: 범위당 학습 집합의 숫자 변수 평균입니다.
  • sdNumScope: 범위당 학습 집합에서 숫자 변수의 표준 편차입니다.
  • zScoreEntity: 엔터티 모델을 기반으로 하는 숫자 변수의 현재 값에 대한 Z 점수입니다.
  • qScoreEntity: 엔터티 모델을 기반으로 하는 숫자 변수의 현재 값에 대한 Q 점수입니다.
  • zScoreScope: 범위 모델을 기반으로 하는 숫자 변수의 현재 값에 대한 Z 점수입니다.
  • qScoreScope: 범위 모델을 기반으로 하는 숫자 변수의 현재 값에 대한 Q 점수입니다.
  • isSpikeOnEntity: 엔터티 모델을 기반으로 비정상적인 급증에 대한 이진 플래그입니다.
  • entityHighBaseline: 엔터티 모델을 기반으로 하는 숫자 변수 값에 대한 높은 기준선이 필요합니다.
  • isSpikeOnScope: 범위 모델을 기반으로 비정상적인 급증에 대한 이진 플래그입니다.
  • scopeHighBaseline: 범위 모델을 기반으로 하는 숫자 변수 값에 대한 높은 기준선이 필요합니다.
  • entitySpikeAnomalyScore: 엔터티 모델을 기반으로 하는 급증에 대한 변칙 점수입니다. 범위 [0,1]의 숫자, 더 높은 값은 더 많은 변칙을 의미합니다.
  • scopeSpikeAnomalyScore: 범위 모델을 기반으로 하는 급증에 대한 변칙 점수입니다. 범위 [0,1]의 숫자, 더 높은 값은 더 많은 변칙을 의미합니다.
  • anomalyType: 변칙 유형을 보여 줍니다(여러 변칙 검색 논리를 함께 실행할 때 유용함).
  • anomalyScore: 선택한 모델에 따라 급증에 대한 변칙 점수입니다.
  • anomalyExplainability: 생성된 변칙 및 해당 설명에 대한 텍스트 래퍼입니다.
  • anomalyState: 모델을 설명하는 선택한 모델(평균, 표준 편차 및 백분위수)의 메트릭 모음입니다.

위의 예제에서 사용자를 엔터티로 사용하고 기본 매개 변수가 있는 범위로 계정을 사용하여 countEvents 변수에서 이 함수를 실행하면 범위 수준에서 급증이 감지됩니다. 사용자 'H4ck3r'에는 학습 기간에 충분한 데이터가 없으므로 엔터티 수준에 대해 변칙이 계산되지 않으며 모든 관련 필드는 비어 있습니다. 범위 수준 변칙의 변칙 점수는 0.998입니다. 즉, 이 급증은 범위에 대해 비정상입니다.

최소 임계값을 충분히 높게 올리면 요구 사항이 너무 높기 때문에 변칙이 검색되지 않습니다.

출력은 표준화된 형식의 설명 필드와 함께 비정상적인 급증이 있는 행을 보여 줍니다. 이러한 필드는 변칙을 조사하고 여러 숫자 변수에서 비정상적인 스파이크 검색을 실행하거나 다른 알고리즘을 함께 실행하는 데 유용합니다.

사이버 보안 컨텍스트에서 제안된 사용법은 의미 있는 범위(예: 계정의 구독) 및 엔터티(예: 사용자 또는 디바이스)에 따라 의미 있는 숫자 변수(다운로드된 데이터 양, 업로드된 파일 수 또는 로그인 시도 실패)에 대한 함수를 실행합니다. 검색된 비정상적인 급증은 숫자 값이 해당 범위 또는 엔터티에서 예상한 것보다 높고 의심스럽다는 것을 의미합니다.