Security Standards - pascaldisse/open-sourcefy GitHub Wiki
Security Standards
Comprehensive security implementation guide for Open-Sourcefy's NSA-level security standards and practices.
Security Overview
Open-Sourcefy implements military-grade security standards throughout the entire Matrix pipeline, ensuring zero tolerance for vulnerabilities and maintaining the highest security posture.
Security Principles
- NSA-Level Standards: Military-grade security implementation
- Zero Trust Architecture: Verify every component and input
- Defense in Depth: Multiple layers of security controls
- Principle of Least Privilege: Minimal required permissions
- Secure by Design: Security built into every component
Security Compliance
- NIST Cybersecurity Framework: Complete implementation
- ISO 27001: Information security management
- SOC 2 Type II: Security, availability, and confidentiality
- Common Vulnerabilities and Exposures (CVE): Continuous monitoring
Input Validation and Sanitization
Binary File Validation
Comprehensive File Validation
class SecureBinaryValidator:
"""NSA-level binary file validation"""
def __init__(self):
self.max_file_size = 100 * 1024 * 1024 # 100MB limit
self.allowed_formats = ['PE', 'ELF', 'MACH-O']
self.signature_validator = DigitalSignatureValidator()
def validate_binary_file(self, file_path: str) -> ValidationResult:
"""Comprehensive binary validation with security checks"""
# File existence and permissions
if not self._validate_file_access(file_path):
raise SecurityError("Invalid file access or permissions")
# File size validation
if not self._validate_file_size(file_path):
raise SecurityError("File size exceeds security limits")
# Format validation
if not self._validate_file_format(file_path):
raise SecurityError("Unsupported or malicious file format")
# Malware scanning
if not self._scan_for_malware(file_path):
raise SecurityError("Potential malware detected")
# Digital signature validation
signature_result = self.signature_validator.validate(file_path)
return ValidationResult(
valid=True,
file_format=self._detect_format(file_path),
signature_status=signature_result,
security_level="HIGH"
)
def _validate_file_access(self, file_path: str) -> bool:
"""Validate file access and permissions"""
path = Path(file_path)
# Check if file exists and is readable
if not path.exists() or not path.is_file():
return False
# Check file permissions (should not be executable during analysis)
if path.stat().st_mode & 0o111: # Executable bits
self.logger.warning(f"Executable file detected: {file_path}")
# Validate path traversal attacks
if ".." in str(path) or not str(path.resolve()).startswith(str(Path.cwd())):
return False
return True
def _scan_for_malware(self, file_path: str) -> bool:
"""Basic malware detection using heuristics"""
with open(file_path, 'rb') as f:
content = f.read(1024 * 1024) # Read first 1MB
# Check for known malicious patterns
malicious_patterns = [
b'\x4d\x5a\x90\x00\x03\x00\x00\x00\x04\x00\x00\x00\xff\xff', # Suspicious PE header
b'This program cannot be run in DOS mode', # DOS stub
b'CreateProcessA', # Potentially dangerous API calls
b'VirtualAlloc',
b'WriteProcessMemory'
]
suspicious_count = sum(1 for pattern in malicious_patterns if pattern in content)
# Allow legitimate binaries but flag highly suspicious ones
return suspicious_count < 3
Path Sanitization
Secure Path Handling
class SecurePathManager:
"""Secure path validation and sanitization"""
def __init__(self, base_directory: str):
self.base_dir = Path(base_directory).resolve()
self.allowed_extensions = {'.exe', '.dll', '.so', '.dylib', '.bin'}
def sanitize_path(self, user_path: str) -> Path:
"""Sanitize and validate user-provided paths"""
# Remove dangerous characters
sanitized = re.sub(r'[<>:"|?*]', '', user_path)
# Resolve path and check it's within allowed directory
resolved_path = (self.base_dir / sanitized).resolve()
if not str(resolved_path).startswith(str(self.base_dir)):
raise SecurityError("Path traversal attack detected")
# Validate file extension
if resolved_path.suffix.lower() not in self.allowed_extensions:
raise SecurityError(f"Disallowed file extension: {resolved_path.suffix}")
return resolved_path
def create_secure_temp_directory(self) -> Path:
"""Create secure temporary directory with proper permissions"""
# Create with restricted permissions (owner only)
temp_dir = tempfile.mkdtemp(prefix="openSourcefy_", dir=self.base_dir)
temp_path = Path(temp_dir)
# Set secure permissions (700 - owner read/write/execute only)
temp_path.chmod(0o700)
return temp_path
Access Control and Authentication
Role-Based Access Control (RBAC)
User Role Management
class SecurityRole(Enum):
"""Security roles for access control"""
ADMIN = "admin"
ANALYST = "analyst"
VIEWER = "viewer"
SYSTEM = "system"
class AccessControlManager:
"""Role-based access control implementation"""
def __init__(self):
self.permissions = {
SecurityRole.ADMIN: {
'execute_pipeline', 'modify_config', 'view_logs',
'manage_users', 'access_sensitive_data'
},
SecurityRole.ANALYST: {
'execute_pipeline', 'view_logs', 'access_analysis_data'
},
SecurityRole.VIEWER: {
'view_logs', 'view_reports'
},
SecurityRole.SYSTEM: {
'execute_pipeline', 'access_system_resources'
}
}
def check_permission(self, user_role: SecurityRole, action: str) -> bool:
"""Check if user role has permission for action"""
return action in self.permissions.get(user_role, set())
def require_permission(self, user_role: SecurityRole, action: str) -> None:
"""Decorator/function to require specific permission"""
if not self.check_permission(user_role, action):
raise SecurityError(f"Insufficient permissions for action: {action}")
API Key Management
Secure API Key Handling
class SecureAPIKeyManager:
"""Secure management of API keys and secrets"""
def __init__(self):
self.encryption_key = self._get_or_create_master_key()
self.encrypted_storage = {}
def store_api_key(self, service: str, api_key: str) -> None:
"""Store API key with encryption"""
# Validate API key format
if not self._validate_api_key_format(service, api_key):
raise SecurityError("Invalid API key format")
# Encrypt API key
encrypted_key = self._encrypt_data(api_key.encode())
# Store encrypted key
self.encrypted_storage[service] = {
'encrypted_key': encrypted_key,
'created_at': datetime.utcnow().isoformat(),
'last_used': None
}
# Clear plaintext from memory
api_key = None
def get_api_key(self, service: str) -> str:
"""Retrieve and decrypt API key"""
if service not in self.encrypted_storage:
raise SecurityError(f"API key not found for service: {service}")
encrypted_data = self.encrypted_storage[service]
# Decrypt API key
decrypted_key = self._decrypt_data(encrypted_data['encrypted_key'])
# Update last used timestamp
encrypted_data['last_used'] = datetime.utcnow().isoformat()
return decrypted_key.decode()
def _validate_api_key_format(self, service: str, api_key: str) -> bool:
"""Validate API key format for specific service"""
validation_patterns = {
'anthropic': r'^sk-[a-zA-Z0-9]{40,}$',
'openai': r'^sk-[a-zA-Z0-9]{48}$',
'github': r'^ghp_[a-zA-Z0-9]{36}$'
}
pattern = validation_patterns.get(service.lower())
if pattern:
return bool(re.match(pattern, api_key))
# Generic validation for unknown services
return len(api_key) >= 20 and api_key.isalnum()
def _encrypt_data(self, data: bytes) -> bytes:
"""Encrypt data using AES-256-GCM"""
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
aesgcm = AESGCM(self.encryption_key)
nonce = os.urandom(12) # 96-bit nonce for GCM
ciphertext = aesgcm.encrypt(nonce, data, None)
return nonce + ciphertext
def _decrypt_data(self, encrypted_data: bytes) -> bytes:
"""Decrypt data using AES-256-GCM"""
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
nonce = encrypted_data[:12]
ciphertext = encrypted_data[12:]
aesgcm = AESGCM(self.encryption_key)
return aesgcm.decrypt(nonce, ciphertext, None)
Secure Communication
TLS/SSL Implementation
Secure HTTP Communications
import ssl
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
class SecureHTTPAdapter(HTTPAdapter):
"""Secure HTTP adapter with enhanced TLS configuration"""
def init_poolmanager(self, *args, **kwargs):
# Create secure SSL context
ctx = create_urllib3_context(ssl_version=ssl.PROTOCOL_TLS_CLIENT)
# Enhanced security settings
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.maximum_version = ssl.TLSVersion.TLSv1_3
# Disable weak ciphers
ctx.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
class SecureAPIClient:
"""Secure API client for external service communication"""
def __init__(self):
self.session = requests.Session()
self.session.mount('https://', SecureHTTPAdapter())
# Security headers
self.session.headers.update({
'User-Agent': 'Open-Sourcefy/2.0 Security-Scanner',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
def make_secure_request(self, url: str, data: Dict[str, Any], api_key: str) -> Dict[str, Any]:
"""Make secure API request with full validation"""
# Validate URL
if not url.startswith('https://'):
raise SecurityError("Only HTTPS connections allowed")
# Add authentication header
headers = {
'Authorization': f'Bearer {api_key}',
'X-Request-ID': str(uuid.uuid4())
}
try:
response = self.session.post(
url,
json=data,
headers=headers,
timeout=30,
verify=True # Verify SSL certificates
)
# Validate response
response.raise_for_status()
# Check content type
if 'application/json' not in response.headers.get('content-type', ''):
raise SecurityError("Unexpected response content type")
return response.json()
except requests.exceptions.SSLError as e:
raise SecurityError(f"SSL verification failed: {e}")
except requests.exceptions.RequestException as e:
raise SecurityError(f"Request failed: {e}")
Data Protection
Encryption at Rest
Sensitive Data Encryption
class DataEncryption:
"""Encryption for sensitive data at rest"""
def __init__(self, master_key: bytes = None):
self.master_key = master_key or self._derive_master_key()
def encrypt_analysis_results(self, results: Dict[str, Any]) -> bytes:
"""Encrypt analysis results before storage"""
# Serialize data
json_data = json.dumps(results, separators=(',', ':')).encode()
# Compress before encryption
compressed_data = gzip.compress(json_data)
# Encrypt with AES-256-GCM
return self._encrypt_data(compressed_data)
def decrypt_analysis_results(self, encrypted_data: bytes) -> Dict[str, Any]:
"""Decrypt and deserialize analysis results"""
# Decrypt data
compressed_data = self._decrypt_data(encrypted_data)
# Decompress
json_data = gzip.decompress(compressed_data)
# Deserialize
return json.loads(json_data.decode())
def _derive_master_key(self) -> bytes:
"""Derive master key from system entropy"""
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
# Use system-specific information
salt = hashlib.sha256(
(platform.node() + platform.machine() + str(os.getpid())).encode()
).digest()
# Get password from environment or prompt
password = os.getenv('MATRIX_MASTER_PASSWORD', getpass.getpass("Master password: "))
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 256-bit key
salt=salt,
iterations=100000, # OWASP recommended minimum
)
return kdf.derive(password.encode())
Secure Logging
Security-Aware Logging System
class SecureLogger:
"""Security-aware logging with sensitive data protection"""
def __init__(self, log_file: str):
self.log_file = log_file
self.sensitive_patterns = [
r'api[_-]?key["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_-]+)',
r'password["\']?\s*[:=]\s*["\']?([^\s"\']+)',
r'token["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_-]+)',
r'Bearer\s+([a-zA-Z0-9_-]+)',
r'sk-[a-zA-Z0-9]{40,}'
]
def log_secure(self, level: str, message: str, context: Dict[str, Any] = None) -> None:
"""Log message with sensitive data sanitization"""
# Sanitize message
sanitized_message = self._sanitize_sensitive_data(message)
# Sanitize context
sanitized_context = self._sanitize_context(context or {})
# Create log entry
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': level,
'message': sanitized_message,
'context': sanitized_context,
'process_id': os.getpid(),
'thread_id': threading.get_ident()
}
# Write to log file with rotation
self._write_log_entry(log_entry)
def _sanitize_sensitive_data(self, text: str) -> str:
"""Remove or mask sensitive data from text"""
sanitized = text
for pattern in self.sensitive_patterns:
sanitized = re.sub(pattern, lambda m: m.group(0).replace(m.group(1), '*' * 8), sanitized, flags=re.IGNORECASE)
return sanitized
def _sanitize_context(self, context: Dict[str, Any]) -> Dict[str, Any]:
"""Sanitize context dictionary"""
sanitized = {}
for key, value in context.items():
if any(sensitive in key.lower() for sensitive in ['key', 'password', 'token', 'secret']):
sanitized[key] = '***REDACTED***'
elif isinstance(value, str):
sanitized[key] = self._sanitize_sensitive_data(value)
else:
sanitized[key] = value
return sanitized
Security Monitoring
Intrusion Detection
Security Event Monitoring
class SecurityMonitor:
"""Real-time security monitoring and alerting"""
def __init__(self):
self.threat_indicators = []
self.security_events = []
self.alert_threshold = 5 # Number of events before alert
def monitor_file_access(self, file_path: str, operation: str) -> None:
"""Monitor file access patterns for suspicious activity"""
event = {
'timestamp': datetime.utcnow(),
'event_type': 'file_access',
'file_path': file_path,
'operation': operation,
'process_id': os.getpid(),
'user_id': os.getuid() if hasattr(os, 'getuid') else 'unknown'
}
# Check for suspicious patterns
if self._is_suspicious_access(event):
self._record_security_event(event, severity='HIGH')
def monitor_network_activity(self, url: str, request_type: str) -> None:
"""Monitor network requests for suspicious activity"""
event = {
'timestamp': datetime.utcnow(),
'event_type': 'network_request',
'url': url,
'request_type': request_type
}
# Check for suspicious domains or patterns
if self._is_suspicious_network_activity(event):
self._record_security_event(event, severity='MEDIUM')
def _is_suspicious_access(self, event: Dict[str, Any]) -> bool:
"""Detect suspicious file access patterns"""
suspicious_patterns = [
'/etc/passwd', '/etc/shadow', # Unix password files
'C:\\Windows\\System32\\SAM', # Windows SAM database
'.ssh/', '/.aws/', '/.docker/' # Credential directories
]
file_path = event['file_path']
return any(pattern in file_path for pattern in suspicious_patterns)
def _is_suspicious_network_activity(self, event: Dict[str, Any]) -> bool:
"""Detect suspicious network activity"""
# Check against known malicious domains (simplified)
suspicious_domains = [
'malware.example.com',
'phishing.example.org',
'c2.badactor.net'
]
url = event['url']
return any(domain in url for domain in suspicious_domains)
def _record_security_event(self, event: Dict[str, Any], severity: str) -> None:
"""Record security event and trigger alerts if needed"""
security_event = {
**event,
'severity': severity,
'alert_id': str(uuid.uuid4())
}
self.security_events.append(security_event)
# Check if alert threshold reached
recent_events = [e for e in self.security_events
if (datetime.utcnow() - e['timestamp']).seconds < 300] # 5 minutes
if len(recent_events) >= self.alert_threshold:
self._trigger_security_alert(recent_events)
def _trigger_security_alert(self, events: List[Dict[str, Any]]) -> None:
"""Trigger security alert for suspicious activity"""
alert = {
'alert_id': str(uuid.uuid4()),
'timestamp': datetime.utcnow().isoformat(),
'severity': 'CRITICAL',
'message': f'Multiple security events detected: {len(events)} events',
'events': events
}
# Log security alert
self.logger.critical(f"SECURITY ALERT: {alert['message']}")
# Send alert to security team (implementation specific)
self._send_security_alert(alert)
Compliance and Auditing
Audit Trail
Comprehensive Activity Logging
class AuditLogger:
"""Comprehensive audit trail for compliance"""
def __init__(self, audit_file: str):
self.audit_file = audit_file
self.session_id = str(uuid.uuid4())
def log_pipeline_start(self, binary_path: str, user_id: str, config: Dict[str, Any]) -> None:
"""Log pipeline execution start"""
audit_entry = {
'event_id': str(uuid.uuid4()),
'session_id': self.session_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': 'pipeline_start',
'user_id': user_id,
'binary_path': binary_path,
'configuration': self._sanitize_config(config),
'system_info': {
'hostname': platform.node(),
'platform': platform.platform(),
'python_version': platform.python_version()
}
}
self._write_audit_entry(audit_entry)
def log_agent_execution(self, agent_id: int, status: str, execution_time: float) -> None:
"""Log individual agent execution"""
audit_entry = {
'event_id': str(uuid.uuid4()),
'session_id': self.session_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': 'agent_execution',
'agent_id': agent_id,
'status': status,
'execution_time': execution_time
}
self._write_audit_entry(audit_entry)
def log_file_access(self, file_path: str, operation: str, success: bool) -> None:
"""Log file access for audit trail"""
audit_entry = {
'event_id': str(uuid.uuid4()),
'session_id': self.session_id,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': 'file_access',
'file_path': file_path,
'operation': operation,
'success': success,
'file_hash': self._calculate_file_hash(file_path) if success else None
}
self._write_audit_entry(audit_entry)
def _calculate_file_hash(self, file_path: str) -> str:
"""Calculate SHA-256 hash of file for integrity verification"""
try:
with open(file_path, 'rb') as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception:
return None
Compliance Validation
Security Compliance Checker
class ComplianceValidator:
"""Validate security compliance across the pipeline"""
def __init__(self):
self.compliance_rules = self._load_compliance_rules()
def validate_nist_compliance(self) -> ComplianceReport:
"""Validate NIST Cybersecurity Framework compliance"""
checks = {
'PR.IP-1': self._check_baseline_configuration(),
'PR.IP-3': self._check_configuration_change_control(),
'PR.DS-1': self._check_data_at_rest_protection(),
'PR.DS-2': self._check_data_in_transit_protection(),
'PR.AC-1': self._check_access_control(),
'DE.CM-1': self._check_continuous_monitoring(),
'RS.RP-1': self._check_response_plan()
}
passed_checks = sum(1 for result in checks.values() if result)
compliance_score = passed_checks / len(checks)
return ComplianceReport(
framework='NIST',
score=compliance_score,
checks=checks,
compliant=(compliance_score >= 0.9)
)
def _check_baseline_configuration(self) -> bool:
"""Check if system has secure baseline configuration"""
# Verify secure configuration files exist
config_files = ['config.yaml', 'build_config.yaml', 'security_config.yaml']
return all(Path(f).exists() for f in config_files)
def _check_data_at_rest_protection(self) -> bool:
"""Check if data at rest is properly protected"""
# Verify encryption is enabled for sensitive data
return os.getenv('MATRIX_ENCRYPTION_ENABLED', 'false').lower() == 'true'
def _check_access_control(self) -> bool:
"""Check if proper access controls are in place"""
# Verify RBAC is implemented
return hasattr(self, 'access_control_manager')
Related: Configuration Guide - Security configuration options
Next: Troubleshooting - Security issue resolution