graph_path_discovery_fl()
적용 대상: ✅Microsoft Fabric✅Azure Data Explorer✅Azure Monitor✅Microsoft Sentinel
그래프 데이터(에지 및 노드)를 통해 관련 엔드포인트(원본 및 대상) 간에 유효한 경로를 검색합니다.
graph_path_discovery_fl()
함수는 그래프 데이터를 통해 관련 엔드포인트 간에 유효한 경로를 검색할 수 있는 UDF(사용자 정의 함수). 그래프 데이터는 노드(예: 리소스, 애플리케이션 또는 사용자) 및 에지(예: 기존 액세스 권한)로 구성됩니다. 사이버 보안 컨텍스트에서 이러한 경로는 잠재적 공격자가 활용할 수 있는 가능한 횡적 이동 경로를 나타낼 수 있습니다. 일부 기준(예: 중요한 대상에 연결된 노출된 원본)에 의해 관련되는 것으로 정의된 엔드포인트를 연결하는 경로에 관심이 있습니다. 함수의 구성에 따라 다른 보안 시나리오에 적합한 다른 유형의 경로를 검색할 수 있습니다.
이 함수에 대한 입력으로 사용할 수 있는 데이터는 'SourceId, EdgeId, TargetId' 형식의 에지 테이블과 유효한 경로를 정의하는 데 사용할 수 있는 선택적 노드 속성이 있는 노드 목록입니다. 또는 다른 유형의 데이터에서 그래프 입력을 추출할 수 있습니다. 예를 들어 '사용자 A가 리소스 B에 로그인한 사용자'의 항목이 있는 트래픽 로그는 '(User A)-[로그인 대상]->(리소스 B)' 형식의 에지로 모델링할 수 있으며, 고유한 사용자 및 리소스 목록은 노드로 모델링할 수 있습니다.
다음과 같은 몇 가지 가정이 있습니다.
- 모든 에지는 경로 검색에 유효합니다. 관련이 없는 에지는 경로 검색을 실행하기 전에 필터링해야 합니다.
- 에지는 비중이 같고 독립적이며 무조건적입니다. 즉, 모든 에지가 동일한 확률을 가지며 B에서 C로 이동하는 것은 A에서 B로의 이전 이동에 종속되지 않습니다.
- 검색하려는 경로는 A->B->C 형식의 주기가 없는 간단한 방향 경로입니다. 함수에서 그래프 일치 연산자의 내부 구문을 변경하여 더 복잡한 정의를 만들 수 있습니다.
이러한 가정은 함수의 내부 논리를 변경하여 필요에 따라 조정할 수 있습니다.
이 함수는 경로 길이 제한, 최대 출력 크기 등과 같은 선택적 제약 조건 하에서 유효한 원본 간에 유효한 대상에 대해 가능한 모든 경로를 검색합니다. 출력은 원본 및 대상 ID가 있는 검색된 경로 목록과 연결 에지 및 노드 목록입니다. 함수는 노드 ID 및 에지 ID와 같은 필수 필드만 사용합니다. 형식, 속성 목록, 보안 관련 점수 또는 외부 신호와 같은 다른 관련 필드를 입력 데이터에서 사용할 수 있는 경우 함수 정의를 변경하여 논리 및 출력에 추가할 수 있습니다.
통사론
graph_path_discovery_fl(
edgesTableName, , nodesTableName, scopeColumnName, isValidPathStartColumnName, isValidPathEndColumnName, nodeIdColumnName, edgeIdColumnName, sourceIdColumnName, targetIdColumnName, [minPathLength], [maxPathLength], [resultCountLimit])
매개 변수
이름 | 형 | 필수 | 묘사 |
---|---|---|---|
edgesTableName | string |
✔️ | 그래프의 가장자리를 포함하는 입력 테이블의 이름입니다. |
nodesTableName | string |
✔️ | 그래프의 노드를 포함하는 입력 테이블의 이름입니다. |
scopeColumnName | string |
✔️ | 각 범위에 대해 다른 변칙 모델을 빌드할 수 있도록 파티션 또는 범위(예: 구독 또는 계정)를 포함하는 노드 및 에지 테이블의 열 이름입니다. |
isValidPathStartColumnName | string |
✔️ | 노드에 대한 부울 플래그가 포함된 노드 테이블의 열 이름입니다. |
isValidPathEndColumnName | string |
✔️ | 노드에 대한 부울 플래그가 포함된 노드 테이블의 열 이름입니다. |
nodeIdColumnName |
string |
✔️ | 노드 ID를 포함하는 노드 테이블의 열 이름입니다. |
edgeIdColumnName |
string |
✔️ | 에지 ID를 포함하는 에지 테이블의 열 이름입니다. |
sourceIdColumnName | string |
✔️ | 에지의 원본 노드 ID를 포함하는 에지 테이블의 열 이름입니다. |
targetIdColumnName |
string |
✔️ | 에지의 대상 노드 ID를 포함하는 에지 테이블의 열 이름입니다. |
minPathLength | long |
경로의 최소 단계 수(가장자리)입니다. 기본값은 1입니다. | |
maxPathLength |
long |
경로의 최대 단계 수(가장자리)입니다. 기본값은 8입니다. | |
resultCountLimit | long |
출력에 대해 반환되는 최대 경로 수입니다. 기본값은 100000입니다. |
함수 정의
다음과 같이 해당 코드를 쿼리 정의 함수로 포함하거나 데이터베이스에 저장된 함수로 만들어 함수를 정의할 수 있습니다.
-
쿼리 정의
-
저장된
다음 let 문사용하여 함수를 정의합니다. 권한이 필요하지 않습니다.
중요하다
let 문 자체적으로 실행할 수 없습니다.
테이블 형식 식 문 뒤에합니다.
graph_path_discovery_fl()
작업 예제를 실행하려면 예제참조하세요.
let graph_path_discovery_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing paths between source nodes and target nodes with less than predefined number of hops
// Current configurations looks for directed paths without any cycles; this can be changed if needed
graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, edgeAllTargetIds = e.targetId
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
, pathAllNodeIds = array_concat(pack_array(sourceId), edgeAllTargetIds)
| project-away edgeAllTargetIds
| mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
extend step = strcat(
iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
, iff(isnotempty(edgesInPath), strcat('-[', edgesInPath, ']->'), ''))
| summarize fullPath = array_strcat(make_list(step), '')
)
);
paths
};
// Write your query to use the function here.
본보기
다음 예제에서는 호출 연산자 사용하여 함수를 실행합니다.
-
쿼리 정의
-
저장된
쿼리 정의 함수를 사용하려면 포함된 함수 정의 후에 호출합니다.
쿼리 실행
let graph_path_discovery_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing paths between source nodes and target nodes with less than predefined number of hops
// Current configurations looks for directed paths without any cycles; this can be changed if needed
graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, edgeAllTargetIds = e.targetId
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
, pathAllNodeIds = array_concat(pack_array(sourceId), edgeAllTargetIds)
| project-away edgeAllTargetIds
| mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
extend step = strcat(
iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
, iff(isnotempty(edgesInPath), strcat('-[', edgesInPath, ']->'), ''))
| summarize fullPath = array_strcat(make_list(step), '')
)
);
paths
};
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[
'vm-work-1', 'e1', 'can use', 'webapp-prd', 'US',
'vm-custom', 'e2', 'can use', 'webapp-prd', 'US',
'webapp-prd', 'e3', 'can access', 'vm-custom', 'US',
'webapp-prd', 'e4', 'can access', 'test-machine', 'US',
'vm-custom', 'e5', 'can access', 'server-0126', 'US',
'vm-custom', 'e6', 'can access', 'hub_router', 'US',
'webapp-prd', 'e7', 'can access', 'hub_router', 'US',
'test-machine', 'e8', 'can access', 'vm-custom', 'US',
'test-machine', 'e9', 'can access', 'hub_router', 'US',
'hub_router', 'e10', 'routes traffic to', 'remote_DT', 'US',
'vm-work-1', 'e11', 'can access', 'storage_main_backup', 'US',
'hub_router', 'e12', 'routes traffic to', 'vm-work-2', 'US',
'vm-work-2', 'e13', 'can access', 'backup_prc', 'US',
'remote_DT', 'e14', 'can access', 'backup_prc', 'US',
'backup_prc', 'e15', 'moves data to', 'storage_main_backup', 'US',
'backup_prc', 'e16', 'moves data to', 'storage_DevBox', 'US',
'device_A1', 'e17', 'is connected to', 'sevice_B2', 'EU',
'sevice_B2', 'e18', 'is connected to', 'device_A1', 'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
'vm-work-1', 'Virtual Machine', 'Production', 'US',
'vm-custom', 'Virtual Machine', 'Production', 'US',
'webapp-prd', 'Application', 'None', 'US',
'test-machine', 'Virtual Machine', 'Test', 'US',
'hub_router', 'Traffic Router', 'None', 'US',
'vm-work-2', 'Virtual Machine', 'Production', 'US',
'remote_DT', 'Virtual Machine', 'Production', 'US',
'backup_prc', 'Service', 'Production', 'US',
'server-0126', 'Server', 'Production', 'US',
'storage_main_backup', 'Cloud Storage', 'Production', 'US',
'storage_DevBox', 'Cloud Storage', 'Test', 'US',
'device_A1', 'Device', 'Backend', 'EU',
'device_B2', 'Device', 'Backend', 'EU'
];
let nodesEnriched = (
nodes
| extend IsValidStart = (NodeType == 'Virtual Machine'), IsValidEnd = (NodeType == 'Cloud Storage') // option 1
//| extend IsValidStart = (NodeName in('vm-work-1', 'vm-work-2')), IsValidEnd = (NodeName in('storage_main_backup')) // option 2
//| extend IsValidStart = (NodeEnvironment == 'Test'), IsValidEnd = (NodeEnvironment == 'Production') // option 3
);
graph_path_discovery_fl(edgesTableName = 'edges'
, nodesTableName = 'nodesEnriched'
, scopeColumnName = 'Region'
, nodeIdColumnName = 'NodeName'
, edgeIdColumnName = 'EdgeName'
, sourceIdColumnName = 'SourceNodeName'
, targetIdColumnName = 'TargetNodeName'
, isValidPathStartColumnName = 'IsValidStart'
, isValidPathEndColumnName = 'IsValidEnd'
)
출력
sourceId | isSourceValidPathStart | targetId | isTargetValidPathEnd | 범위 | edgeIds | pathLength | pathId | pathAllNodeIds | fullPath |
---|---|---|---|---|---|---|---|---|---|
test-machine | 참 | storage_DevBox | 참 | 우리 | ["e9","e10","e14","e16"] | 4 | 00605d35b6e1d28024fd846f217b43ac | ["test-machine","hub_router","remote_DT","backup_prc","storage_DevBox"] | (test-machine)-[e9]->(hub_router)-[e10]->(remote_DT)-[e14]->(backup_prc)-[e16]->(storage_DevBox) |
함수를 실행하면 유효한 시작점으로 플래그가 지정된 원본 노드(isSourceValidPathStart == True) 간에 유효한 엔드포인트로 플래그가 지정된 모든 대상(isTargetValidPathEnd == True) 간에 연결하는 입력 가장자리를 사용하여 모든 경로를 찾습니다. 출력은 각 행이 단일 경로(resultCountLimit 매개 변수에 의해 최대 행 수로 제한됨)를 설명하는 테이블입니다. 각 행에는 다음 필드가 포함됩니다.
-
sourceId
: 원본의 nodeId - 경로의 첫 번째 노드입니다. -
isSourceValidPathStart
: 원본 노드가 유효한 경로 시작인 부울 플래그입니다. 는 True와 같아야 합니다. -
targetId
: 대상의 nodeId - 경로의 마지막 노드입니다. -
isTargetValidPathEnd
: 유효한 경로 끝인 대상 노드에 대한 부울 플래그입니다. 는 항상 True와 같아야 합니다. -
scope
: 경로를 포함하는 범위입니다. -
edgeIds
: 경로의 정렬된 가장자리 목록입니다. -
pathLength
: 경로의 가장자리(홉) 수입니다. -
pathId
: 경로의 엔드포인트 및 단계 해시를 경로의 고유 식별자로 사용할 수 있습니다. -
pathAllNodeIds
: 경로에 있는 노드의 순서가 지정된 목록입니다. -
fullPath
: 전체 경로를 형식(원본 노드)-[에지 1]->(node2)-.....->(대상 노드)로 나타내는 문자열입니다.
위의 예제에서는 노드 테이블을 전처리하고 가능한 엔드포인트 정의의 몇 가지 옵션을 추가합니다. 다른 옵션을 주석 처리/주석 처리 해제하면 다음과 같은 여러 시나리오를 검색할 수 있습니다.
- 옵션 1: Virtual Machines에서 Cloud Storage 리소스로의 경로를 찾습니다. 노드 형식 간의 연결 패턴을 탐색하는 데 유용합니다.
- 옵션 2: 특정 노드(vm-work-1, vm-work-2)에서 특정 노드(storage_main_backup) 사이의 경로를 찾습니다. 알려진 손상된 자산에서 알려진 중요한 자산으로의 경로와 같은 알려진 사례를 조사하는 데 유용합니다.
- 옵션 3: 다른 환경의 노드와 같은 노드 그룹 간의 경로를 찾습니다. 테스트 환경과 프로덕션 환경 간의 경로와 같은 안전하지 않은 경로를 모니터링하는 데 유용합니다.
위의 예제에서는 첫 번째 옵션을 사용하여 저장된 데이터에 액세스하려는 잠재적 공격자가 사용할 수 있는 클라우드 스토리지 리소스에 대한 VM 간의 모든 경로를 찾습니다. 예를 들어 알려진 취약성이 있는 VM을 중요한 데이터가 포함된 스토리지 계정에 연결하는 등 유효한 엔드포인트에 더 많은 필터를 추가하여 이 시나리오를 강화할 수 있습니다.
이 함수 graph_path_discovery_fl()
사이버 보안 도메인에서 그래프로 모델링된 데이터에 대한 횡적 이동 경로와 같은 흥미로운 경로를 검색하는 데 사용할 수 있습니다.