다음을 통해 공유


graph_path_discovery_fl()

적용 대상: ✅Microsoft FabricAzure Data ExplorerAzure MonitorMicrosoft Sentinel

그래프 데이터(에지 및 노드)를 통해 관련 엔드포인트(원본 및 대상) 간에 유효한 경로를 검색합니다.

graph_path_discovery_fl() 함수는 그래프 데이터를 통해 관련 엔드포인트 간에 유효한 경로를 검색할 수 있는 UDF(사용자 정의 함수). 그래프 데이터는 노드(예: 리소스, 애플리케이션 또는 사용자) 및 에지(예: 기존 액세스 권한)로 구성됩니다. 사이버 보안 컨텍스트에서 이러한 경로는 잠재적 공격자가 활용할 수 있는 가능한 횡적 이동 경로를 나타낼 수 있습니다. 일부 기준(예: 중요한 대상에 연결된 노출된 원본)에 의해 관련되는 것으로 정의된 엔드포인트를 연결하는 경로에 관심이 있습니다. 함수의 구성에 따라 다른 보안 시나리오에 적합한 다른 유형의 경로를 검색할 수 있습니다.

이 함수에 대한 입력으로 사용할 수 있는 데이터는 'SourceId, EdgeId, TargetId' 형식의 에지 테이블과 유효한 경로를 정의하는 데 사용할 수 있는 선택적 노드 속성이 있는 노드 목록입니다. 또는 다른 유형의 데이터에서 그래프 입력을 추출할 수 있습니다. 예를 들어 '사용자 A가 리소스 B에 로그인한 사용자'의 항목이 있는 트래픽 로그는 '(User A)-[로그인 대상]->(리소스 B)' 형식의 에지로 모델링할 수 있으며, 고유한 사용자 및 리소스 목록은 노드로 모델링할 수 있습니다.

다음과 같은 몇 가지 가정이 있습니다.

  • 모든 에지는 경로 검색에 유효합니다. 관련이 없는 에지는 경로 검색을 실행하기 전에 필터링해야 합니다.
  • 에지는 비중이 같고 독립적이며 무조건적입니다. 즉, 모든 에지가 동일한 확률을 가지며 B에서 C로 이동하는 것은 A에서 B로의 이전 이동에 종속되지 않습니다.
  • 검색하려는 경로는 A->B->C 형식의 주기가 없는 간단한 방향 경로입니다. 함수에서 그래프 일치 연산자의 내부 구문을 변경하여 더 복잡한 정의를 만들 수 있습니다.

이러한 가정은 함수의 내부 논리를 변경하여 필요에 따라 조정할 수 있습니다.

이 함수는 경로 길이 제한, 최대 출력 크기 등과 같은 선택적 제약 조건 하에서 유효한 원본 간에 유효한 대상에 대해 가능한 모든 경로를 검색합니다. 출력은 원본 및 대상 ID가 있는 검색된 경로 목록과 연결 에지 및 노드 목록입니다. 함수는 노드 ID 및 에지 ID와 같은 필수 필드만 사용합니다. 형식, 속성 목록, 보안 관련 점수 또는 외부 신호와 같은 다른 관련 필드를 입력 데이터에서 사용할 수 있는 경우 함수 정의를 변경하여 논리 및 출력에 추가할 수 있습니다.

통사론

graph_path_discovery_fl( edgesTableName, , nodesTableName, scopeColumnName, isValidPathStartColumnName, isValidPathEndColumnName, nodeIdColumnName, edgeIdColumnName, sourceIdColumnName, targetIdColumnName, [minPathLength], [maxPathLength], [resultCountLimit])

구문 규칙에 대해 자세히 알아봅니다.

매개 변수

이름 필수 묘사
edgesTableName string ✔️ 그래프의 가장자리를 포함하는 입력 테이블의 이름입니다.
nodesTableName string ✔️ 그래프의 노드를 포함하는 입력 테이블의 이름입니다.
scopeColumnName string ✔️ 각 범위에 대해 다른 변칙 모델을 빌드할 수 있도록 파티션 또는 범위(예: 구독 또는 계정)를 포함하는 노드 및 에지 테이블의 열 이름입니다.
isValidPathStartColumnName string ✔️ 노드에 대한 부울 플래그가 포함된 노드 테이블의 열 이름입니다. True 이는 노드가 경로에 대한 유효한 시작점이고 유효한 시작점이 아니라 False 의미합니다.
isValidPathEndColumnName string ✔️ 노드에 대한 부울 플래그가 포함된 노드 테이블의 열 이름입니다. True 이는 노드가 경로에 대한 유효한 끝점이고 유효한 엔드포인트가 아니라 false 의미합니다.
nodeIdColumnName string ✔️ 노드 ID를 포함하는 노드 테이블의 열 이름입니다.
edgeIdColumnName string ✔️ 에지 ID를 포함하는 에지 테이블의 열 이름입니다.
sourceIdColumnName string ✔️ 에지의 원본 노드 ID를 포함하는 에지 테이블의 열 이름입니다.
targetIdColumnName string ✔️ 에지의 대상 노드 ID를 포함하는 에지 테이블의 열 이름입니다.
minPathLength long 경로의 최소 단계 수(가장자리)입니다. 기본값은 1입니다.
maxPathLength long 경로의 최대 단계 수(가장자리)입니다. 기본값은 8입니다.
resultCountLimit long 출력에 대해 반환되는 최대 경로 수입니다. 기본값은 100000입니다.

함수 정의

다음과 같이 해당 코드를 쿼리 정의 함수로 포함하거나 데이터베이스에 저장된 함수로 만들어 함수를 정의할 수 있습니다.

  • 쿼리 정의
  • 저장된

다음 let 문사용하여 함수를 정의합니다. 권한이 필요하지 않습니다.

중요하다

let 문 자체적으로 실행할 수 없습니다. 테이블 형식 식 문 뒤에합니다. graph_path_discovery_fl()작업 예제를 실행하려면 예제참조하세요.

let graph_path_discovery_fl = (   edgesTableName:string, nodesTableName:string, scopeColumnName:string
								, isValidPathStartColumnName:string, isValidPathEndColumnName:string
								, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
								, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000) 
{
let edges = (
    table(edgesTableName)
    | extend sourceId           = column_ifexists(sourceIdColumnName, '')
    | extend targetId           = column_ifexists(targetIdColumnName, '')
    | extend edgeId             = column_ifexists(edgeIdColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
    );
let nodes = (
    table(nodesTableName)
    | extend nodeId             = column_ifexists(nodeIdColumnName, '')
    | extend isValidPathStart   = column_ifexists(isValidPathStartColumnName, '')
    | extend isValidPathEnd     = column_ifexists(isValidPathEndColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
);
let paths = (
    edges
    // Build graph object partitioned by scope, so that no connections are allowed between scopes.
    // In case no scopes are relevant, partitioning should be removed for better performance.
    | make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
    // Look for existing paths between source nodes and target nodes with less than predefined number of hops
    // Current configurations looks for directed paths without any cycles; this can be changed if needed
      graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
        // Filter only by paths with that connect valid endpoints
        where ((s.isValidPathStart) and (t.isValidPathEnd))
        project   sourceId                  = s.nodeId
                , isSourceValidPathStart    = s.isValidPathStart
                , targetId                  = t.nodeId
                , isTargetValidPathEnd      = t.isValidPathEnd
                , scope                     = s.scope
                , edgeIds                   = e.edgeId
                , edgeAllTargetIds          = e.targetId
    | limit resultCountLimit
    )
    | extend  pathLength                    = array_length(edgeIds)
            , pathId                        = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
            , pathAllNodeIds                = array_concat(pack_array(sourceId), edgeAllTargetIds)
    | project-away edgeAllTargetIds
    | mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
        extend step = strcat(
              iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
            , iff(isnotempty(edgesInPath), strcat('-[',  edgesInPath, ']->'), ''))
       | summarize fullPath = array_strcat(make_list(step), '')
    )
);
paths
};
// Write your query to use the function here.

본보기

다음 예제에서는 호출 연산자 사용하여 함수를 실행합니다.

  • 쿼리 정의
  • 저장된

쿼리 정의 함수를 사용하려면 포함된 함수 정의 후에 호출합니다.

쿼리 실행

let graph_path_discovery_fl = (   edgesTableName:string, nodesTableName:string, scopeColumnName:string
								, isValidPathStartColumnName:string, isValidPathEndColumnName:string
								, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
								, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000) 
{
let edges = (
    table(edgesTableName)
    | extend sourceId           = column_ifexists(sourceIdColumnName, '')
    | extend targetId           = column_ifexists(targetIdColumnName, '')
    | extend edgeId             = column_ifexists(edgeIdColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
    );
let nodes = (
    table(nodesTableName)
    | extend nodeId             = column_ifexists(nodeIdColumnName, '')
    | extend isValidPathStart   = column_ifexists(isValidPathStartColumnName, '')
    | extend isValidPathEnd     = column_ifexists(isValidPathEndColumnName, '')
    | extend scope              = column_ifexists(scopeColumnName, '')
);
let paths = (
    edges
    // Build graph object partitioned by scope, so that no connections are allowed between scopes.
    // In case no scopes are relevant, partitioning should be removed for better performance.
    | make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
    // Look for existing paths between source nodes and target nodes with less than predefined number of hops
    // Current configurations looks for directed paths without any cycles; this can be changed if needed
      graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
        // Filter only by paths with that connect valid endpoints
        where ((s.isValidPathStart) and (t.isValidPathEnd))
        project   sourceId                  = s.nodeId
                , isSourceValidPathStart    = s.isValidPathStart
                , targetId                  = t.nodeId
                , isTargetValidPathEnd      = t.isValidPathEnd
                , scope                     = s.scope
                , edgeIds                   = e.edgeId
                , edgeAllTargetIds          = e.targetId
    | limit resultCountLimit
    )
    | extend  pathLength                    = array_length(edgeIds)
            , pathId                        = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
            , pathAllNodeIds                = array_concat(pack_array(sourceId), edgeAllTargetIds)
    | project-away edgeAllTargetIds
    | mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
        extend step = strcat(
              iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
            , iff(isnotempty(edgesInPath), strcat('-[',  edgesInPath, ']->'), ''))
       | summarize fullPath = array_strcat(make_list(step), '')
    )
);
paths
};
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[						
    'vm-work-1',            'e1',           'can use',	            'webapp-prd', 	          'US',
    'vm-custom',        	'e2',           'can use',	            'webapp-prd', 	          'US',
    'webapp-prd',           'e3',           'can access',	        'vm-custom', 	          'US',
    'webapp-prd',       	'e4',           'can access',	        'test-machine', 	      'US',
    'vm-custom',        	'e5',           'can access',	        'server-0126', 	          'US',
    'vm-custom',        	'e6',	        'can access',	        'hub_router', 	          'US',
    'webapp-prd',       	'e7',	        'can access',	        'hub_router', 	          'US',
    'test-machine',       	'e8',	        'can access',	        'vm-custom',              'US',
    'test-machine',        	'e9',	        'can access',	        'hub_router', 	          'US',
    'hub_router',           'e10',	        'routes traffic to',	'remote_DT', 	          'US',
    'vm-work-1',            'e11',	        'can access',	        'storage_main_backup', 	  'US',
    'hub_router',           'e12',	        'routes traffic to',	'vm-work-2', 	          'US',
    'vm-work-2',        	'e13',          'can access',	        'backup_prc', 	          'US',
    'remote_DT',            'e14',	        'can access',	        'backup_prc', 	          'US',
    'backup_prc',           'e15',	        'moves data to',        'storage_main_backup', 	  'US',
    'backup_prc',           'e16',	        'moves data to',        'storage_DevBox', 	      'US',
    'device_A1',            'e17',	        'is connected to',      'sevice_B2', 	          'EU',
    'sevice_B2',            'e18',	        'is connected to',      'device_A1', 	          'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
        'vm-work-1',                'Virtual Machine',      'Production',       'US',
        'vm-custom',                'Virtual Machine',      'Production',       'US',
        'webapp-prd',               'Application',          'None',             'US',
        'test-machine',             'Virtual Machine',      'Test',             'US',
        'hub_router',               'Traffic Router',       'None',             'US',
        'vm-work-2',                'Virtual Machine',      'Production',       'US',
        'remote_DT',                'Virtual Machine',      'Production',       'US',
        'backup_prc',               'Service',              'Production',       'US',
        'server-0126',              'Server',               'Production',       'US',
        'storage_main_backup',      'Cloud Storage',        'Production',       'US',
        'storage_DevBox',           'Cloud Storage',        'Test',             'US',
        'device_A1',                'Device',               'Backend',          'EU',
        'device_B2',                'Device',               'Backend',          'EU'
];
let nodesEnriched = (
    nodes
    | extend IsValidStart = (NodeType == 'Virtual Machine'),             IsValidEnd = (NodeType == 'Cloud Storage')              // option 1
    //| extend IsValidStart = (NodeName in('vm-work-1', 'vm-work-2')),     IsValidEnd = (NodeName in('storage_main_backup'))       // option 2
    //| extend IsValidStart = (NodeEnvironment == 'Test'),                 IsValidEnd = (NodeEnvironment == 'Production')          // option 3
);
graph_path_discovery_fl(edgesTableName                = 'edges'
                , nodesTableName                = 'nodesEnriched'
                , scopeColumnName               = 'Region'
                , nodeIdColumnName              = 'NodeName'
                , edgeIdColumnName              = 'EdgeName'
                , sourceIdColumnName            = 'SourceNodeName'
                , targetIdColumnName            = 'TargetNodeName'
                , isValidPathStartColumnName    = 'IsValidStart'
                , isValidPathEndColumnName      = 'IsValidEnd'
)

출력

sourceId isSourceValidPathStart targetId isTargetValidPathEnd 범위 edgeIds pathLength pathId pathAllNodeIds fullPath
test-machine storage_DevBox 우리 ["e9","e10","e14","e16"] 4 00605d35b6e1d28024fd846f217b43ac ["test-machine","hub_router","remote_DT","backup_prc","storage_DevBox"] (test-machine)-[e9]->(hub_router)-[e10]->(remote_DT)-[e14]->(backup_prc)-[e16]->(storage_DevBox)

함수를 실행하면 유효한 시작점으로 플래그가 지정된 원본 노드(isSourceValidPathStart == True) 간에 유효한 엔드포인트로 플래그가 지정된 모든 대상(isTargetValidPathEnd == True) 간에 연결하는 입력 가장자리를 사용하여 모든 경로를 찾습니다. 출력은 각 행이 단일 경로(resultCountLimit 매개 변수에 의해 최대 행 수로 제한됨)를 설명하는 테이블입니다. 각 행에는 다음 필드가 포함됩니다.

  • sourceId: 원본의 nodeId - 경로의 첫 번째 노드입니다.
  • isSourceValidPathStart: 원본 노드가 유효한 경로 시작인 부울 플래그입니다. 는 True와 같아야 합니다.
  • targetId: 대상의 nodeId - 경로의 마지막 노드입니다.
  • isTargetValidPathEnd: 유효한 경로 끝인 대상 노드에 대한 부울 플래그입니다. 는 항상 True와 같아야 합니다.
  • scope: 경로를 포함하는 범위입니다.
  • edgeIds: 경로의 정렬된 가장자리 목록입니다.
  • pathLength: 경로의 가장자리(홉) 수입니다.
  • pathId: 경로의 엔드포인트 및 단계 해시를 경로의 고유 식별자로 사용할 수 있습니다.
  • pathAllNodeIds: 경로에 있는 노드의 순서가 지정된 목록입니다.
  • fullPath: 전체 경로를 형식(원본 노드)-[에지 1]->(node2)-.....->(대상 노드)로 나타내는 문자열입니다.

위의 예제에서는 노드 테이블을 전처리하고 가능한 엔드포인트 정의의 몇 가지 옵션을 추가합니다. 다른 옵션을 주석 처리/주석 처리 해제하면 다음과 같은 여러 시나리오를 검색할 수 있습니다.

  • 옵션 1: Virtual Machines에서 Cloud Storage 리소스로의 경로를 찾습니다. 노드 형식 간의 연결 패턴을 탐색하는 데 유용합니다.
  • 옵션 2: 특정 노드(vm-work-1, vm-work-2)에서 특정 노드(storage_main_backup) 사이의 경로를 찾습니다. 알려진 손상된 자산에서 알려진 중요한 자산으로의 경로와 같은 알려진 사례를 조사하는 데 유용합니다.
  • 옵션 3: 다른 환경의 노드와 같은 노드 그룹 간의 경로를 찾습니다. 테스트 환경과 프로덕션 환경 간의 경로와 같은 안전하지 않은 경로를 모니터링하는 데 유용합니다.

위의 예제에서는 첫 번째 옵션을 사용하여 저장된 데이터에 액세스하려는 잠재적 공격자가 사용할 수 있는 클라우드 스토리지 리소스에 대한 VM 간의 모든 경로를 찾습니다. 예를 들어 알려진 취약성이 있는 VM을 중요한 데이터가 포함된 스토리지 계정에 연결하는 등 유효한 엔드포인트에 더 많은 필터를 추가하여 이 시나리오를 강화할 수 있습니다.

이 함수 graph_path_discovery_fl() 사이버 보안 도메인에서 그래프로 모델링된 데이터에 대한 횡적 이동 경로와 같은 흥미로운 경로를 검색하는 데 사용할 수 있습니다.