class diagram - osok/hawkeye GitHub Wiki
The HawkEye AI threat analysis module implements a sophisticated architecture for AI-powered security threat analysis of MCP servers. The system uses multiple AI providers, advanced prompt engineering, capability analysis, and threat modeling to generate comprehensive security assessments. This document provides detailed class diagrams and architectural analysis of the AI threat analysis components.
The AI threat analysis module follows a layered architecture with clear separation between analysis orchestration, AI provider abstraction, data processing, and result generation:
- Orchestration Layer: Main analyzer and coordinator components
- AI Provider Layer: Abstract provider interface with multiple implementations
- Analysis Layer: Specialized analyzers for capabilities, threats, and attack chains
- Data Layer: Models, caches, and data structures
- Support Layer: Utilities, monitoring, and optimization components
classDiagram
class AIThreatAnalyzer {
-capability_analyzer: MCPCapabilityAnalyzer
-prompt_engine: ThreatAnalysisPrompts
-ai_provider: AIProvider
-fallback_provider: AIProvider
-cache: ThreatIntelligenceCache
-memory_optimizer: MemoryOptimizer
-response_monitor: ResponseTimeMonitor
-stats: Dict[str, Any]
+__init__(config)
+analyze_threats(server, context, type) ThreatAnalysis
+assess_capabilities(server) ToolCapabilities
+generate_threat_report(analysis) str
+get_statistics() Dict[str, Any]
+clear_cache() None
+shutdown() None
}
class AIProvider {
<<abstract>>
#config: Dict[str, Any]
#prompt_engine: ThreatAnalysisPrompts
#response_parser: ResponseParser
#retry_handler: AdvancedRetryHandler
#usage_stats: Dict[str, Any]
+__init__(config)
+generate_threat_analysis(request)* AnalysisResponse
+assess_risk_level(capabilities)* Tuple[ThreatLevel, float]
+estimate_cost(request)* float
+get_usage_stats() Dict[str, Any]
#_update_stats(success, cost, tokens) None
#_execute_with_retry(operation, provider, name) Any
}
class MCPCapabilityAnalyzer {
-risk_categories: Dict[str, List[str]]
-dependency_patterns: Dict[str, Pattern]
-security_analyzers: List[SecurityAnalyzer]
+__init__()
+analyze_capabilities(server) ToolCapabilities
+analyze_tool_functions(tools) List[ToolFunction]
+assess_risk_surface(capabilities) RiskSurface
+build_environment_context(system_info) EnvironmentContext
+categorize_capabilities(functions) Dict[CapabilityCategory, List[str]]
}
class ThreatModeler {
-stride_analyzer: STRIDEAnalyzer
-attack_tree_builder: AttackTreeBuilder
-threat_patterns: List[ThreatPattern]
+__init__()
+model_threats(capabilities, context) ThreatModelingResult
+build_attack_tree(threat) AttackTree
+analyze_stride_threats(capabilities) List[STRIDEThreat]
+generate_threat_scenarios(analysis) List[ThreatScenario]
}
class AttackChainAnalyzer {
-chain_patterns: List[ChainPattern]
-feasibility_calculator: FeasibilityCalculator
-impact_assessor: ImpactAssessor
+__init__()
+analyze_attack_chains(threats) List[AttackChain]
+calculate_chain_feasibility(chain) ChainFeasibilityScore
+assess_chain_impact(chain) ImpactScore
+generate_attack_paths(start, end) List[AttackPath]
}
AIThreatAnalyzer --> AIProvider : uses
AIThreatAnalyzer --> MCPCapabilityAnalyzer : uses
AIThreatAnalyzer --> ThreatModeler : uses
AIThreatAnalyzer --> AttackChainAnalyzer : uses
classDiagram
class AIProvider {
<<abstract>>
#config: Dict[str, Any]
#prompt_engine: ThreatAnalysisPrompts
#response_parser: ResponseParser
#retry_handler: AdvancedRetryHandler
#usage_stats: Dict[str, Any]
+__init__(config: Dict[str, Any])
+generate_threat_analysis(request: AnalysisRequest)* AnalysisResponse
+assess_risk_level(capabilities: List[str])* Tuple[ThreatLevel, float]
+estimate_cost(request: AnalysisRequest)* float
+get_usage_stats() Dict[str, Any]
+_update_stats(success: bool, cost: float, tokens: int) None
+_execute_with_retry(operation: Callable, provider: str, name: str) Any
}
class OpenAIProvider {
-client: OpenAI
-model: str
-temperature: float
-max_tokens: int
-cost_per_token: Dict[str, float]
+__init__(config: Dict[str, Any])
+generate_threat_analysis(request: AnalysisRequest) AnalysisResponse
+assess_risk_level(capabilities: List[str]) Tuple[ThreatLevel, float]
+estimate_cost(request: AnalysisRequest) float
+_make_api_call(messages: List[Dict]) Dict[str, Any]
+_calculate_cost(usage: Dict) float
+_handle_rate_limit(retry_after: int) None
}
class AnthropicProvider {
-client: Anthropic
-model: str
-max_tokens: int
-temperature: float
-cost_per_token: Dict[str, float]
+__init__(config: Dict[str, Any])
+generate_threat_analysis(request: AnalysisRequest) AnalysisResponse
+assess_risk_level(capabilities: List[str]) Tuple[ThreatLevel, float]
+estimate_cost(request: AnalysisRequest) float
+_make_api_call(messages: List[Dict]) Dict[str, Any]
+_calculate_cost(usage: Dict) float
+_handle_anthropic_errors(error: Exception) None
}
class LocalLLMProvider {
-model_path: str
-model: Any
-tokenizer: Any
-device: str
-generation_config: Dict[str, Any]
+__init__(config: Dict[str, Any])
+generate_threat_analysis(request: AnalysisRequest) AnalysisResponse
+assess_risk_level(capabilities: List[str]) Tuple[ThreatLevel, float]
+estimate_cost(request: AnalysisRequest) float
+_load_model() None
+_generate_local_response(prompt: str) str
+_optimize_for_hardware() None
}
AIProvider <|-- OpenAIProvider
AIProvider <|-- AnthropicProvider
AIProvider <|-- LocalLLMProvider
classDiagram
class AdvancedRetryHandler {
-retry_config: RetryConfig
-circuit_breaker_config: CircuitBreakerConfig
-error_classifier: ErrorClassifier
-provider_health: Dict[str, ProviderHealth]
-_lock: threading.Lock
+__init__(retry_config, circuit_breaker_config)
+execute_with_retry(operation, provider) Any
+is_circuit_breaker_open(provider) bool
+record_success(provider, response_time) None
+record_failure(provider, error) None
+get_health_statistics() Dict[str, Any]
+reset_circuit_breaker(provider) None
}
class ResponseParser {
-parsing_stats: Dict[str, int]
-confidence_threshold: float
-validation_rules: List[ValidationRule]
+__init__(confidence_threshold)
+parse_threat_analysis(response, analysis_type, capabilities, context) ThreatAnalysis
+validate_response_structure(data) bool
+extract_threat_components(data) Dict[str, Any]
+calculate_confidence_score(analysis) float
+get_parsing_statistics() Dict[str, Any]
}
class ErrorClassifier {
-error_patterns: Dict[ErrorType, List[str]]
+__init__()
+classify_error(error_message) ErrorType
+is_retryable_error(error_type) bool
+get_retry_delay(error_type, attempt) float
}
class CircuitBreaker {
-state: CircuitBreakerState
-failure_count: int
-success_count: int
-last_failure_time: datetime
-config: CircuitBreakerConfig
+__init__(config)
+call(operation) Any
+record_success() None
+record_failure() None
+is_open() bool
+is_half_open() bool
}
AIProvider --> AdvancedRetryHandler : uses
AIProvider --> ResponseParser : uses
AdvancedRetryHandler --> ErrorClassifier : uses
AdvancedRetryHandler --> CircuitBreaker : uses
classDiagram
class MCPCapabilityAnalyzer {
-risk_categories: Dict[str, List[str]]
-dependency_patterns: Dict[str, Pattern]
-security_analyzers: List[SecurityAnalyzer]
-system_profiler: SystemProfiler
+__init__()
+analyze_capabilities(server: MCPServerInfo) ToolCapabilities
+analyze_tool_functions(tools: List[MCPTool]) List[ToolFunction]
+assess_risk_surface(capabilities: ToolCapabilities) RiskSurface
+build_environment_context(system_info: SystemInfo) EnvironmentContext
+categorize_capabilities(functions: List[ToolFunction]) Dict[CapabilityCategory, List[str]]
+detect_external_dependencies(tools: List[MCPTool]) List[ExternalDependency]
+assess_access_requirements(tools: List[MCPTool]) AccessRequirements
+profile_security_posture() SecurityPosture
}
class ToolFunction {
+name: str
+description: str
+parameters: Dict[str, Any]
+return_type: str
+risk_score: float
+category: CapabilityCategory
+security_implications: List[str]
+required_permissions: List[str]
+external_dependencies: List[str]
+__init__(name, description, parameters)
+calculate_risk_score() float
+get_security_implications() List[str]
+requires_elevated_privileges() bool
}
class RiskSurface {
+total_functions: int
+high_risk_functions: int
+external_network_access: bool
+file_system_access: bool
+system_command_execution: bool
+database_access: bool
+api_integrations: List[str]
+privilege_requirements: List[AccessLevel]
+data_sensitivity_levels: List[DataSensitivity]
+calculate_overall_risk_score() float
+get_critical_attack_vectors() List[str]
+assess_containment_feasibility() float
}
class SystemProfiler {
+gather_system_info() SystemInfo
+detect_containerization() bool
+assess_network_exposure() NetworkExposure
+determine_user_privileges() UserPrivileges
+identify_security_controls() List[str]
+analyze_deployment_context() DeploymentType
}
MCPCapabilityAnalyzer --> ToolFunction : creates
MCPCapabilityAnalyzer --> RiskSurface : creates
MCPCapabilityAnalyzer --> SystemProfiler : uses
classDiagram
class ThreatModeler {
-stride_analyzer: STRIDEAnalyzer
-attack_tree_builder: AttackTreeBuilder
-threat_patterns: List[ThreatPattern]
-mitigation_database: MitigationDatabase
+__init__()
+model_threats(capabilities: ToolCapabilities, context: EnvironmentContext) ThreatModelingResult
+build_attack_tree(threat: ThreatScenario) AttackTree
+analyze_stride_threats(capabilities: ToolCapabilities) List[STRIDEThreat]
+generate_threat_scenarios(analysis: ThreatAnalysis) List[ThreatScenario]
+map_mitigations(threats: List[ThreatScenario]) List[MitigationStrategy]
+assess_threat_likelihood(scenario: ThreatScenario, context: EnvironmentContext) float
}
class STRIDEAnalyzer {
+analyze_spoofing_threats(capabilities) List[ThreatScenario]
+analyze_tampering_threats(capabilities) List[ThreatScenario]
+analyze_repudiation_threats(capabilities) List[ThreatScenario]
+analyze_information_disclosure_threats(capabilities) List[ThreatScenario]
+analyze_denial_of_service_threats(capabilities) List[ThreatScenario]
+analyze_elevation_of_privilege_threats(capabilities) List[ThreatScenario]
}
class AttackTreeBuilder {
+build_tree(root_threat: ThreatScenario) AttackTree
+add_attack_vector(tree: AttackTree, vector: AttackVector) None
+calculate_attack_probability(tree: AttackTree) float
+identify_critical_paths(tree: AttackTree) List[AttackPath]
+optimize_tree_structure(tree: AttackTree) AttackTree
}
class AttackTree {
+root_node: AttackTreeNode
+nodes: List[AttackTreeNode]
+edges: List[AttackTreeEdge]
+metadata: Dict[str, Any]
+add_node(node: AttackTreeNode) None
+add_edge(source: str, target: str, relationship: str) None
+calculate_overall_probability() float
+get_attack_paths() List[AttackPath]
+visualize() str
}
ThreatModeler --> STRIDEAnalyzer : uses
ThreatModeler --> AttackTreeBuilder : uses
AttackTreeBuilder --> AttackTree : creates
classDiagram
class AttackChainAnalyzer {
-chain_patterns: List[ChainPattern]
-feasibility_calculator: FeasibilityCalculator
-impact_assessor: ImpactAssessor
-chain_optimizer: ChainOptimizer
+__init__()
+analyze_attack_chains(threats: List[ThreatScenario]) List[AttackChain]
+calculate_chain_feasibility(chain: AttackChain) ChainFeasibilityScore
+assess_chain_impact(chain: AttackChain) ImpactScore
+generate_attack_paths(start: str, end: str) List[AttackPath]
+optimize_attack_chains(chains: List[AttackChain]) List[AttackChain]
+validate_chain_logic(chain: AttackChain) bool
}
class AttackChain {
+chain_id: str
+name: str
+description: str
+links: List[ChainLink]
+overall_feasibility: float
+impact_score: float
+estimated_time: str
+required_skills: List[str]
+detection_difficulty: DifficultyLevel
+add_link(link: ChainLink) None
+calculate_feasibility() float
+get_critical_dependencies() List[str]
+estimate_execution_time() str
}
class ChainLink {
+link_id: str
+attack_vector: AttackVector
+prerequisites: List[str]
+outcomes: List[str]
+success_probability: float
+detection_probability: float
+mitigation_effectiveness: float
+validate_prerequisites(context: Dict) bool
+calculate_success_rate(environment: EnvironmentContext) float
+get_detection_indicators() List[DetectionIndicator]
}
class FeasibilityCalculator {
+calculate_technical_feasibility(chain: AttackChain) float
+calculate_resource_requirements(chain: AttackChain) Dict[str, Any]
+assess_skill_requirements(chain: AttackChain) List[str]
+estimate_execution_complexity(chain: AttackChain) ComplexityLevel
}
AttackChainAnalyzer --> AttackChain : creates
AttackChain --> ChainLink : contains
AttackChainAnalyzer --> FeasibilityCalculator : uses
classDiagram
class ThreatAnalysis {
+server_id: str
+analysis_timestamp: datetime
+tool_capabilities: ToolCapabilities
+environment_context: EnvironmentContext
+threat_level: ThreatLevel
+attack_vectors: List[AttackVector]
+abuse_scenarios: List[AbuseScenario]
+mitigation_strategies: List[MitigationStrategy]
+detection_indicators: List[DetectionIndicator]
+compliance_impact: ComplianceImpact
+business_impact: BusinessImpact
+confidence_score: float
+analysis_metadata: AnalysisMetadata
+risk_score: float
+severity_level: SeverityLevel
+from_dict(data: Dict) ThreatAnalysis
+to_dict() Dict[str, Any]
+calculate_overall_risk() float
+get_critical_findings() List[str]
+validate_analysis() bool
}
class AttackVector {
+vector_id: str
+name: str
+severity: SeverityLevel
+description: str
+attack_steps: List[AttackStep]
+example_code: str
+prerequisites: List[str]
+impact: str
+likelihood: float
+mitigations: List[str]
+cvss_score: float
+cwe_mapping: List[str]
+calculate_risk_score() float
+get_attack_complexity() ComplexityLevel
+estimate_exploitation_time() str
}
class AbuseScenario {
+scenario_id: str
+scenario_name: str
+threat_actor: ThreatActorType
+motivation: str
+attack_flow: List[AttackStep]
+required_access: AccessLevel
+detection_difficulty: DifficultyLevel
+business_impact: BusinessImpact
+timeline: str
+success_indicators: List[str]
+validate_scenario() bool
+calculate_likelihood() float
+get_impact_assessment() Dict[str, Any]
}
class MitigationStrategy {
+strategy_id: str
+name: str
+description: str
+implementation_steps: List[str]
+effectiveness_score: float
+cost_estimate: str
+implementation_time: str
+requires_configuration: bool
+affects_functionality: bool
+compliance_alignment: List[ComplianceFramework]
+assess_implementation_complexity() ComplexityLevel
+calculate_cost_benefit_ratio() float
+validate_strategy() bool
}
ThreatAnalysis --> AttackVector : contains
ThreatAnalysis --> AbuseScenario : contains
ThreatAnalysis --> MitigationStrategy : contains
classDiagram
class EnvironmentContext {
+deployment_type: DeploymentType
+security_posture: SecurityPosture
+network_exposure: NetworkExposure
+data_sensitivity: DataSensitivity
+user_privileges: UserPrivileges
+compliance_requirements: List[ComplianceFramework]
+system_hardening: Dict[str, bool]
+monitoring_capabilities: List[str]
+incident_response_maturity: int
+assess_overall_security_posture() float
+get_risk_modifiers() Dict[str, float]
+validate_context() bool
}
class ToolCapabilities {
+total_functions: int
+function_categories: Dict[CapabilityCategory, int]
+high_risk_functions: List[ToolFunction]
+external_dependencies: List[ExternalDependency]
+access_requirements: AccessRequirements
+data_access_patterns: List[str]
+network_access_patterns: List[str]
+privilege_escalation_potential: float
+calculate_capability_score() float
+get_attack_surface() Dict[str, Any]
+assess_containment_difficulty() float
}
class SystemInfo {
+platform: str
+python_version: str
+memory_total: int
+cpu_count: int
+disk_usage: Dict[str, Any]
+network_interfaces: List[Dict[str, Any]]
+running_processes: List[str]
+environment_variables: Dict[str, str]
+installed_packages: List[str]
+user_groups: List[str]
+system_uptime: float
+security_features: Dict[str, bool]
+gather_system_info() SystemInfo
+assess_security_baseline() Dict[str, Any]
+detect_anomalies() List[str]
}
EnvironmentContext --> SystemInfo : uses
ThreatAnalysis --> EnvironmentContext : uses
ThreatAnalysis --> ToolCapabilities : uses
classDiagram
class ThreatIntelligenceCache {
-cache_data: Dict[str, CacheEntry]
-ttl: int
-max_size: int
-hit_count: int
-miss_count: int
-_lock: threading.Lock
+__init__(ttl: int, max_size: int)
+get(key: str) Optional[Any]
+set(key: str, value: Any, ttl: Optional[int]) None
+invalidate(key: str) None
+clear() None
+get_statistics() Dict[str, Any]
+cleanup_expired() None
}
class MemoryOptimizer {
-optimization_level: MemoryOptimizationLevel
-config: MemoryConfig
-cleanup_threshold: float
-monitoring_enabled: bool
+__init__(level: MemoryOptimizationLevel, config: MemoryConfig)
+start_optimization() None
+stop_optimization() None
+force_cleanup() None
+get_memory_usage() Dict[str, Any]
+optimize_data_structures() None
+monitor_memory_usage() None
}
class ResponseTimeMonitor {
-response_times: deque
-window_size: int
-warning_threshold: float
-alert_threshold: float
-statistics: Dict[str, float]
+__init__(window_size: int)
+record_response_time(duration: float) None
+get_average_response_time() float
+get_percentile(percentile: float) float
+is_performance_degraded() bool
+get_statistics() Dict[str, Any]
}
AIThreatAnalyzer --> ThreatIntelligenceCache : uses
AIThreatAnalyzer --> MemoryOptimizer : uses
AIThreatAnalyzer --> ResponseTimeMonitor : uses
classDiagram
class EnhancedProviderSelector {
-selection_criteria: SelectionCriteria
-provider_metrics: Dict[str, ProviderMetrics]
-load_balancer: ProviderLoadBalancer
+__init__(criteria: SelectionCriteria)
+select_provider(request: AnalysisRequest) ProviderSelection
+update_provider_metrics(provider: str, metrics: Dict) None
+get_optimal_provider_mix() List[str]
+assess_provider_health() Dict[str, bool]
}
class RiskPrioritizationAlgorithm {
-technical_weight: float
-business_weight: float
-likelihood_weight: float
-environmental_modifiers: EnvironmentalModifiers
+__init__(weights: Dict[str, float])
+prioritize_threats(threats: List[ThreatAnalysis]) List[PrioritizedThreat]
+calculate_priority_score(threat: ThreatAnalysis) float
+apply_environmental_modifiers(score: float, context: EnvironmentContext) float
+generate_priority_matrix() PriorityMatrix
}
class ThreatIntelligenceDB {
-threat_patterns: List[ThreatPattern]
-similarity_engine: SimilarityEngine
-learning_metrics: LearningMetrics
-pattern_cache: Dict[str, Any]
+__init__()
+find_similar_threats(analysis: ThreatAnalysis) List[SimilarityMatch]
+update_threat_patterns(new_pattern: ThreatPattern) None
+learn_from_analysis(analysis: ThreatAnalysis) None
+get_threat_statistics() Dict[str, Any]
}
class DynamicExampleGenerator {
-example_templates: Dict[ExampleType, List[str]]
-code_generators: Dict[ProgrammingLanguage, CodeGenerator]
-context_analyzer: ContextAnalyzer
+__init__()
+generate_examples(threat: AttackVector, context: ExampleGenerationContext) List[GeneratedExample]
+create_code_snippet(vector: AttackVector, language: ProgrammingLanguage) CodeSnippet
+adapt_example_to_context(example: GeneratedExample, context: Dict) GeneratedExample
}
AIThreatAnalyzer --> EnhancedProviderSelector : uses
AIThreatAnalyzer --> RiskPrioritizationAlgorithm : uses
AIThreatAnalyzer --> ThreatIntelligenceDB : uses
AIThreatAnalyzer --> DynamicExampleGenerator : uses
sequenceDiagram
participant CLI as CLI Command
participant Analyzer as AIThreatAnalyzer
participant CapAnalyzer as MCPCapabilityAnalyzer
participant Provider as AIProvider
participant Parser as ResponseParser
participant Modeler as ThreatModeler
participant ChainAnalyzer as AttackChainAnalyzer
CLI->>Analyzer: analyze_threats(server, context)
Analyzer->>CapAnalyzer: analyze_capabilities(server)
CapAnalyzer->>Analyzer: ToolCapabilities
Analyzer->>Analyzer: build_analysis_request(capabilities, context)
Analyzer->>Provider: generate_threat_analysis(request)
Provider->>Provider: _execute_with_retry(api_call)
Provider->>Parser: parse_threat_analysis(response)
Parser->>Provider: ThreatAnalysis
Provider->>Analyzer: AnalysisResponse
Analyzer->>Modeler: model_threats(capabilities, context)
Modeler->>Analyzer: ThreatModelingResult
Analyzer->>ChainAnalyzer: analyze_attack_chains(threats)
ChainAnalyzer->>Analyzer: List[AttackChain]
Analyzer->>Analyzer: aggregate_analysis_results()
Analyzer->>CLI: ThreatAnalysis
stateDiagram-v2
[*] --> ProviderSelection
ProviderSelection --> PrimaryProvider : Provider Available
ProviderSelection --> FallbackProvider : Primary Unavailable
PrimaryProvider --> ExecuteRequest : Circuit Closed
PrimaryProvider --> FallbackProvider : Circuit Open
ExecuteRequest --> Success : Request Successful
ExecuteRequest --> RetryLogic : Request Failed
RetryLogic --> ExecuteRequest : Retry Available
RetryLogic --> FallbackProvider : Max Retries Reached
RetryLogic --> Failed : All Providers Failed
FallbackProvider --> ExecuteRequest : Switch to Fallback
Success --> [*]
Failed --> [*]
flowchart TD
A[Memory Monitor Start] --> B[Monitor Memory Usage]
B --> C{Memory Above Threshold?}
C -->|No| B
C -->|Yes| D[Trigger Cleanup]
D --> E[Clear Cache Entries]
E --> F[Compress Data Structures]
F --> G[Release Unused Objects]
G --> H[Force Garbage Collection]
H --> I{Memory Still High?}
I -->|No| J[Log Success]
I -->|Yes| K[Escalate to Aggressive Cleanup]
K --> L[Clear All Caches]
L --> M[Restart Components]
M --> N[Log Warning]
J --> B
N --> B
graph TD
A[Main Analysis Thread] --> B[Provider Selection]
B --> C[Request Preparation]
C --> D[Parallel Processing]
D --> E[OpenAI Provider Thread]
D --> F[Anthropic Provider Thread]
D --> G[Local LLM Thread]
E --> H[Response Processing]
F --> H
G --> H
H --> I[Result Aggregation]
I --> J[Quality Assessment]
J --> K[Final Analysis Result]
subgraph "Memory Management"
L[Memory Monitor]
M[Cache Manager]
N[Garbage Collector]
end
subgraph "Error Handling"
O[Retry Logic]
P[Circuit Breaker]
Q[Fallback Strategy]
end
H --> L
H --> O
Performance Optimizations:
- Provider Load Balancing: Distribute requests across multiple AI providers
- Response Caching: Cache similar analysis requests to reduce API calls
- Memory Management: Proactive memory cleanup and optimization
- Circuit Breakers: Prevent cascading failures in provider chains
- Parallel Processing: Concurrent analysis of multiple components
Cost Optimizations:
- Request Batching: Combine related analysis requests
- Smart Provider Selection: Choose cost-effective providers based on request type
- Token Optimization: Minimize prompt length while maintaining quality
- Caching Strategy: Reduce redundant API calls through intelligent caching
classDiagram
class ThreatAnalysisConfig {
+ai_provider: str
+fallback_provider: str
+max_analysis_time: float
+confidence_threshold: float
+cache_enabled: bool
+cache_ttl: int
+memory_optimization_level: str
+cost_limit: float
+parallel_processing: bool
+max_workers: int
+retry_attempts: int
+circuit_breaker_threshold: int
+validate_config() bool
+get_provider_config(provider: str) Dict[str, Any]
+apply_optimization_settings() None
}
class ProviderPlugin {
<<interface>>
+provider_name: str
+supported_models: List[str]
+cost_per_token: Dict[str, float]
+initialize(config: Dict) None
+generate_analysis(request: AnalysisRequest) AnalysisResponse
+estimate_cost(request: AnalysisRequest) float
+health_check() bool
}
class AnalysisPlugin {
<<interface>>
+plugin_name: str
+supported_analysis_types: List[str]
+analyze(data: Any, context: Dict) Any
+validate_input(data: Any) bool
+get_dependencies() List[str]
}
ThreatAnalysisConfig --> ProviderPlugin : configures
ThreatAnalysisConfig --> AnalysisPlugin : configures
classDiagram
class AIThreatAnalyzerTest {
<<test>>
+test_analyze_threats_success()
+test_analyze_threats_with_fallback()
+test_capability_analysis_integration()
+test_caching_behavior()
+test_memory_optimization()
+test_error_handling()
+test_concurrent_analysis()
}
class AIProviderTest {
<<test>>
+test_openai_provider()
+test_anthropic_provider()
+test_local_llm_provider()
+test_provider_failover()
+test_circuit_breaker()
+test_retry_logic()
+test_cost_estimation()
}
class ThreatModelingTest {
<<test>>
+test_stride_analysis()
+test_attack_tree_building()
+test_threat_prioritization()
+test_mitigation_mapping()
+test_scenario_validation()
}
class MockAIProvider {
<<mock>>
+mock_successful_response()
+mock_rate_limit_error()
+mock_network_error()
+mock_invalid_response()
+simulate_high_latency()
+inject_specific_errors()
}
AIThreatAnalyzerTest --> MockAIProvider : uses
AIProviderTest --> MockAIProvider : uses
class ThreatAnalysisIntegrationTest:
"""Integration tests for AI threat analysis system."""
def test_end_to_end_analysis(self):
"""Test complete threat analysis workflow."""
# Setup test MCP server
test_server = MCPServerInfo(
server_id="test-server",
tools=[
MCPTool(name="file_read", description="Read file contents"),
MCPTool(name="execute_command", description="Execute system command"),
MCPTool(name="network_request", description="Make HTTP requests")
],
capabilities=["file_operations", "system_commands", "network_access"]
)
# Initialize analyzer with test configuration
config = {
"ai_provider": "mock",
"enable_caching": True,
"memory_optimization_level": "standard"
}
analyzer = AIThreatAnalyzer(config)
# Execute analysis
result = analyzer.analyze_threats(
test_server,
environment_context=EnvironmentContext(
deployment_type=DeploymentType.CLOUD,
security_posture=SecurityPosture.BASIC
),
analysis_type="comprehensive"
)
# Validate results
assert isinstance(result, ThreatAnalysis)
assert result.threat_level in [ThreatLevel.LOW, ThreatLevel.MEDIUM, ThreatLevel.HIGH, ThreatLevel.CRITICAL]
assert len(result.attack_vectors) > 0
assert len(result.mitigation_strategies) > 0
assert result.confidence_score >= 0.0 and result.confidence_score <= 1.0
# Validate analysis components
assert result.tool_capabilities.total_functions == 3
assert CapabilityCategory.FILE_OPERATIONS in result.tool_capabilities.function_categories
assert CapabilityCategory.SYSTEM_COMMANDS in result.tool_capabilities.function_categories
def test_provider_failover_scenario(self):
"""Test AI provider failover behavior."""
# Configure primary provider to fail
config = {
"ai_provider": "failing_provider",
"fallback_provider": "mock_provider"
}
analyzer = AIThreatAnalyzer(config)
# Simulate provider failure
with patch.object(analyzer.ai_provider, 'generate_threat_analysis') as mock_primary:
mock_primary.side_effect = Exception("Provider unavailable")
# Analysis should succeed using fallback
result = analyzer.analyze_threats(test_server)
# Verify fallback was used
assert result is not None
assert analyzer.get_statistics()['fallback_usage_count'] > 0
def test_concurrent_analysis_performance(self):
"""Test concurrent analysis performance and correctness."""
servers = [create_test_server(f"server-{i}") for i in range(10)]
analyzer = AIThreatAnalyzer({"parallel_processing": True, "max_workers": 3})
start_time = time.time()
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(analyzer.analyze_threats, server)
for server in servers
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
duration = time.time() - start_time
# Validate results
assert len(results) == 10
assert all(isinstance(r, ThreatAnalysis) for r in results)
# Check performance improvement from concurrency
assert duration < 30.0 # Should complete within reasonable time
# Validate resource usage
stats = analyzer.get_statistics()
assert stats['concurrent_analyses'] == 10
assert stats['memory_peak_usage'] < 500 * 1024 * 1024 # < 500MBThe HawkEye AI threat analysis architecture demonstrates a sophisticated, enterprise-grade approach to AI-powered security analysis with the following key strengths:
Multi-Provider AI Architecture:
- Abstract Provider Interface: Clean abstraction supporting OpenAI, Anthropic, and local LLM providers
- Intelligent Provider Selection: Automatic failover and load balancing across providers
- Circuit Breaker Pattern: Protection against cascading failures
- Advanced Retry Logic: Exponential backoff with jitter and error classification
Comprehensive Analysis Framework:
- Capability Analysis: Deep analysis of MCP tool capabilities and risk surfaces
- Threat Modeling: STRIDE-based threat analysis with attack tree generation
- Attack Chain Analysis: Multi-step attack scenario modeling with feasibility assessment
- Risk Prioritization: Business-context-aware threat prioritization
Enterprise-Grade Features:
- Memory Optimization: Proactive memory management with configurable optimization levels
- Performance Monitoring: Response time tracking and performance degradation detection
- Intelligent Caching: Multi-layer caching with TTL and intelligent invalidation
- Cost Management: API cost estimation and budget controls
Extensibility and Integration:
- Plugin Architecture: Support for custom AI providers and analysis components
- Configuration Management: Comprehensive configuration with validation
- Testing Framework: Comprehensive unit and integration testing support
- Monitoring and Observability: Detailed statistics and health monitoring
Key Architectural Patterns:
- Abstract Factory Pattern: AI provider creation and management
- Strategy Pattern: Pluggable analysis algorithms and provider selection
- Circuit Breaker Pattern: Fault tolerance and provider protection
- Observer Pattern: Performance monitoring and health tracking
- Template Method Pattern: Analysis workflow with customization points
The architecture provides a robust foundation for AI-powered threat analysis while maintaining scalability, reliability, and extensibility for future enhancements in security analysis capabilities.