attack chain analysis - osok/hawkeye GitHub Wiki
The HawkEye Attack Chain Analysis system provides sophisticated multi-step attack scenario modeling, identifying how individual MCP tools can be combined to create complex attack chains. The system models attack path discovery, lateral movement, privilege escalation, and multi-objective attack sequences to provide comprehensive threat intelligence.
classDiagram
class AttackChainAnalyzer {
+capability_relationships: Dict
+attack_patterns: Dict
+chain_cache: Dict
+stats: Dict
+identify_attack_chains(tools, context) List[AttackChain]
+identify_advanced_attack_chains(tools, context) List[AttackChain]
+discover_attack_chains(tools, context) Dict[str, Any]
+find_lateral_movement_chains(tools, context) List[AttackChain]
+find_data_exfiltration_chains(tools, context) List[AttackChain]
+assess_chain_feasibility(chain, environment) ChainFeasibilityScore
}
class AttackVectorGenerator {
+attack_templates: Dict
+generate_attack_vectors(capabilities, context) List[Dict]
+_get_function_risk_indicators(name) List[str]
+_generate_vectors_for_indicator(func, indicator, context) List[Dict]
+_prioritize_attack_vectors(vectors, context) List[Dict]
}
class ScenarioBuilder {
+threat_actors: Dict[ThreatActorType, ThreatActorProfile]
+scenario_templates: Dict
+generate_scenarios(capabilities, context) List[Dict]
+build_detailed_scenarios(capabilities, actor_type) List[Dict]
+_generate_threat_actor_scenarios(profile, capabilities) List[Dict]
}
class ThreatModeler {
+threat_actor_templates: Dict
+attack_patterns: Dict
+generate_threat_model(servers, context) Dict[str, Any]
+analyze_stride_threats(capabilities) Dict[str, List]
+generate_attack_trees(threats) Dict[str, Any]
+model_threat_scenarios(actors, capabilities) List[Dict]
}
class LikelihoodAssessor {
+threat_actor_profiles: Dict
+_attack_patterns: Dict
+assess_attack_likelihood(scenario, context) float
+_identify_scenario_threat_actors(scenario) List[ThreatActorType]
+_generate_scenario_attack_vectors(scenario, capabilities) List[AttackVector]
}
AttackChainAnalyzer --> AttackVectorGenerator : uses
AttackChainAnalyzer --> ScenarioBuilder : uses
AttackChainAnalyzer --> ThreatModeler : uses
AttackChainAnalyzer --> LikelihoodAssessor : uses
classDiagram
class AttackChain {
+chain_id: str
+chain_name: str
+description: str
+threat_actor: ThreatActorType
+attack_objective: str
+chain_links: List[ChainLink]
+overall_difficulty: DifficultyLevel
+overall_impact: BusinessImpact
+total_time_estimate: int
+detection_difficulty: DifficultyLevel
+mitigation_strategies: List[str]
+prerequisites: List[str]
+success_probability: float
}
class ChainLink {
+step_number: int
+tool_name: str
+capability_used: str
+action_description: str
+input_requirements: List[str]
+output_produced: List[str]
+risk_level: ThreatLevel
+difficulty: DifficultyLevel
+time_estimate: int
}
class ChainFeasibilityScore {
+technical_feasibility: float
+resource_requirements: float
+detection_evasion: float
+environmental_suitability: float
+overall_score: float
+confidence_level: float
}
class AttackVector {
+vector_id: str
+name: str
+description: str
+required_capabilities: List[CapabilityCategory]
+complexity: AttackComplexity
+required_access: AccessLevel
+detection_difficulty: DetectionDifficulty
+time_to_execute: float
+success_probability: float
+stealth_level: float
+prerequisites: List[str]
+indicators: List[str]
}
class ThreatActorProfile {
+name: str
+type: ThreatActorType
+skill_level: str
+resources: str
+motivation: str
+typical_objectives: List[str]
+common_techniques: List[str]
+time_horizon: str
+stealth_requirement: str
}
AttackChain --> ChainLink : contains
AttackChain --> ChainFeasibilityScore : assessed_by
AttackChain --> AttackVector : utilizes
AttackChain --> ThreatActorProfile : executed_by
flowchart TD
A[Detected MCP Tools] --> B[Extract Tool Capabilities]
B --> C[Build Capability Graph]
C --> D[Identify Pivot Points]
D --> E[Generate Chain Candidates]
E --> F[Calculate Feasibility Scores]
F --> G{Score >= Threshold}
G -->|Yes| H[Generate Attack Narrative]
G -->|No| I[Discard Chain]
H --> J[Assess Overall Risk]
J --> K[Rank Attack Chains]
K --> L[Final Chain Results]
subgraph "Path Discovery"
C
D
E
end
subgraph "Feasibility Assessment"
F
G
end
subgraph "Chain Construction"
H
J
K
end
def identify_attack_chains(self,
detected_tools: List[MCPServerInfo],
environment_context: EnvironmentContext,
max_chain_length: int = 5,
min_feasibility_score: float = 0.3) -> List[AttackChain]:
"""
Identify potential attack chains through multi-tool combinations.
Steps:
1. Extract tool capabilities from detected servers
2. Build capability dependency graph
3. Find attack paths using depth-first search
4. Convert paths to structured attack chains
5. Assess feasibility and filter viable chains
6. Rank chains by risk and complexity
"""def identify_advanced_attack_chains(self,
detected_tools: List[MCPServerInfo],
environment_context: EnvironmentContext,
threat_actor_types: List[ThreatActorType] = None,
include_time_based_chains: bool = True,
include_conditional_chains: bool = True) -> List[AttackChain]:
"""
Advanced attack chain analysis including:
- Multi-objective attack paths
- Time-based attack sequences
- Conditional attack paths
- Threat actor-specific chains
"""The system builds a directed graph representing how tool capabilities can be chained together:
# Capability relationships example
capability_relationships = {
"file_read": ["data_analysis", "credential_extraction", "configuration_discovery"],
"network_access": ["service_enumeration", "lateral_movement", "data_exfiltration"],
"code_execution": ["privilege_escalation", "persistence", "defense_evasion"],
"authentication": ["credential_theft", "session_hijacking", "impersonation"]
}def _dfs_find_paths(self,
graph: Dict[str, List[str]],
current: str,
path: List[str],
all_paths: List[List[str]],
max_length: int) -> None:
"""
Depth-first search to find all possible attack paths.
- Explores all possible tool combinations
- Avoids cycles to prevent infinite loops
- Respects maximum chain length constraints
- Identifies multi-tool attack sequences
"""def find_lateral_movement_chains(self,
detected_tools: List[MCPServerInfo],
environment_context: EnvironmentContext) -> List[AttackChain]:
"""
Specialized analysis for lateral movement scenarios.
Focus Areas:
- Network access capabilities
- System information gathering
- File system access
- Code execution opportunities
"""lateral_movement_categories = [
CapabilityCategory.NETWORK_ACCESS, # Network discovery and access
CapabilityCategory.SYSTEM_INFORMATION, # Host enumeration
CapabilityCategory.FILE_SYSTEM, # Credential hunting
CapabilityCategory.CODE_EXECUTION # Remote execution
]privilege_escalation_vector = AttackVector(
vector_id="priv_escalation",
name="Privilege Escalation",
description="Gaining higher-level system permissions",
required_capabilities=[CapabilityCategory.CODE_EXECUTION],
complexity=AttackComplexity.HIGH,
required_access=AccessLevel.USER,
detection_difficulty=DetectionDifficulty.HARD,
time_to_execute=8.0,
success_probability=0.6,
stealth_level=0.7,
prerequisites=["initial_access", "vulnerability_research"],
indicators=["unusual_process_elevation", "permission_changes"]
)The system identifies tools that can be combined for privilege escalation:
- Initial Access: Tools providing basic system access
- Information Gathering: System enumeration capabilities
- Vulnerability Exploitation: Code execution or configuration abuse
- Privilege Validation: Verification of elevated access
threat_actor_profiles = {
ThreatActorType.INSIDER_THREAT: ThreatActorProfile(
name="Malicious Insider",
skill_level="intermediate",
resources="moderate",
motivation="Financial gain, revenge, espionage",
typical_objectives=["Data theft", "Sabotage", "Fraud"],
common_techniques=["Privilege abuse", "Data exfiltration"],
time_horizon="days",
stealth_requirement="high"
),
ThreatActorType.NATION_STATE: ThreatActorProfile(
name="Advanced Persistent Threat",
skill_level="expert",
resources="unlimited",
motivation="Espionage, disruption, intelligence gathering",
typical_objectives=["Long-term access", "Data collection"],
common_techniques=["Advanced techniques", "Zero-day exploits"],
time_horizon="months",
stealth_requirement="critical"
)
}def generate_scenarios(self,
tool_capabilities: ToolCapabilities,
environment_context: EnvironmentContext,
threat_actor_type: ThreatActorType = None,
max_scenarios: int = 5) -> List[Dict[str, Any]]:
"""
Generate realistic attack scenarios based on:
- Available tool capabilities
- Environment characteristics
- Threat actor motivations and skills
- Business context and assets
"""Pattern: Information Gathering → Target Analysis → Attack Vector Selection
Example Chain:
- System Information Tool: Gather host details, running processes
- Network Access Tool: Discover network topology and services
- File System Tool: Search for configuration files and credentials
- Analysis Tool: Process gathered information for attack planning
Pattern: Initial Compromise → Persistence Establishment → Defense Evasion
Example Chain:
- Authentication Tool: Exploit weak credentials or authentication bypass
- Code Execution Tool: Execute initial payload or commands
- File System Tool: Install backdoors or persistence mechanisms
- System Configuration Tool: Modify system settings for stealth
Pattern: Privilege Escalation → Credential Harvesting → Lateral Movement
Example Chain:
- Code Execution Tool: Exploit local privilege escalation vulnerability
- System Information Tool: Enumerate elevated privileges and access
- File System Tool: Access credential stores and configuration files
- Network Access Tool: Use credentials for lateral movement
Pattern: Data Discovery → Collection → Exfiltration → Impact
Example Chain:
- File System Tool: Search for and identify sensitive data
- Data Processing Tool: Collect and package target data
- Network Access Tool: Establish exfiltration channel
- Encryption Tool: Encrypt or destroy data for impact
def _assess_technical_feasibility(self, chain: AttackChain,
environment: EnvironmentContext) -> float:
"""
Assess technical feasibility based on:
- Tool compatibility and data flow
- Required technical skills
- System dependencies
- Environmental constraints
"""Factors Considered:
- Tool Compatibility: Can tools actually chain together?
- Data Flow Validation: Does output of Tool A work as input for Tool B?
- Skill Requirements: Does threat actor have required capabilities?
- System Dependencies: Are required system components available?
def _assess_environmental_suitability(self, chain: AttackChain,
environment: EnvironmentContext) -> float:
"""
Assess how well the chain fits the target environment:
- Deployment type compatibility
- Security posture alignment
- Network exposure considerations
- Monitoring and detection capabilities
"""Assessment Criteria:
- Deployment Type: Cloud vs on-premise compatibility
- Security Posture: Alignment with security controls
- Network Exposure: Internet-facing vs internal systems
- Monitoring Coverage: Detection system capabilities
def _assess_resource_requirements(self, chain: AttackChain,
environment: EnvironmentContext) -> float:
"""
Evaluate resource requirements:
- Time investment needed
- Technical skill levels required
- Tool availability and setup
- Infrastructure requirements
"""def _find_time_based_attack_sequences(self,
capability_graph: Dict,
tool_capabilities: Dict,
environment_context: EnvironmentContext) -> List[Dict]:
"""
Identify attack sequences that exploit timing dependencies:
- Maintenance windows
- Backup schedules
- User activity patterns
- System update cycles
"""def _find_conditional_attack_paths(self,
capability_graph: Dict,
tool_capabilities: Dict,
environment_context: EnvironmentContext) -> List[Dict]:
"""
Find attack paths with conditional branching:
- Success/failure scenarios
- Alternative attack methods
- Adaptive attack strategies
- Fallback mechanisms
"""def _find_multi_objective_paths(self,
capability_graph: Dict,
tool_capabilities: Dict,
environment_context: EnvironmentContext) -> List[Dict]:
"""
Identify attack chains targeting multiple objectives:
- Data theft + system disruption
- Reconnaissance + persistence
- Privilege escalation + lateral movement
- Collection + exfiltration
"""def _generate_attack_progression_narrative(self,
chain: Dict,
environment_context: EnvironmentContext) -> str:
"""
Generate detailed narrative describing:
- Step-by-step attack progression
- Tool usage and outputs
- Decision points and alternatives
- Timeline and duration estimates
- Detection opportunities
"""The system generates business-focused narratives explaining:
- Initial Compromise: How attackers gain access
- Progression Timeline: Step-by-step attack evolution
- Business Consequences: Impact on operations and data
- Detection Points: Where attacks might be caught
- Mitigation Opportunities: How to break the attack chain
Small Environments (< 10 tools):
- Analysis time: 30-60 seconds
- Memory usage: 10-20MB
- Chains discovered: 5-15 viable chains
Medium Environments (10-50 tools):
- Analysis time: 2-5 minutes
- Memory usage: 50-100MB
- Chains discovered: 15-50 viable chains
Large Environments (> 50 tools):
- Analysis time: 5-15 minutes
- Memory usage: 100-500MB
- Chains discovered: 50-200 viable chains
Graph Pruning:
- Remove low-probability paths early
- Filter based on environmental constraints
- Limit chain depth for performance
Caching:
- Cache feasibility assessments
- Store capability relationship mappings
- Reuse threat actor profiles
Parallel Processing:
- Concurrent path exploration
- Parallel feasibility assessment
- Distributed chain generation
# Attack chain analysis in threat analysis pipeline
chain_analyzer = AttackChainAnalyzer()
attack_chains = chain_analyzer.discover_attack_chains(
detected_tools=mcp_servers,
environment_context=environment,
max_chain_depth=5,
min_chain_score=0.4
)
# Generate narratives for each chain
for chain in attack_chains:
narrative = chain_analyzer._generate_attack_progression_narrative(
chain, environment
)
chain['detailed_narrative'] = narrativeAttack chain analysis integrates with report generation to provide:
- Executive Summary: High-level chain risks and priorities
- Technical Details: Step-by-step attack progression
- Mitigation Guidance: Specific countermeasures for each chain
- Risk Assessment: Quantitative risk scoring for chains
The system evaluates attack chain complexity across multiple dimensions:
- Technical Complexity: Skill level required for execution
- Operational Security: Stealth and detection evasion requirements
- Resource Requirements: Time, tools, and infrastructure needed
- Success Probability: Likelihood of successful chain execution
For each identified attack chain, the system generates:
- Prevention Controls: Security measures to prevent chain initiation
- Detection Strategies: Monitoring approaches to identify chain progression
- Response Procedures: Incident response actions to break attack chains
- Recovery Plans: Business continuity measures post-compromise
Planned Capabilities:
- Pattern Recognition: ML-based attack pattern identification
- Probability Modeling: Statistical models for chain success rates
- Adaptive Analysis: Learning from historical attack data
- Behavioral Analysis: User and system behavior modeling
Advanced Features:
- Quantum-Resistant Analysis: Post-quantum cryptography considerations
- IoT Chain Analysis: Internet of Things device attack chains
- Cloud-Native Patterns: Container and serverless attack chains
- Supply Chain Integration: Third-party dependency attack chains
This comprehensive attack chain analysis system provides security teams with detailed understanding of how individual MCP tools can be combined into sophisticated multi-step attacks, enabling proactive defense strategy development and risk mitigation planning.