11. Security Guide - utils-bot/utils-bot-plus GitHub Wiki
Comprehensive security implementation guide for UtilsBot+ covering permissions, best practices, and security features.
- Security Architecture
- Access Control
- Input Validation
- API Security
- Data Protection
- Deployment Security
- Monitoring & Auditing
- Security Best Practices
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β User Input Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
β β Discord β β Slash Cmd β β Rate Limiting β β
β β Validation β β Validation β β & Cooldowns β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ
β Permission Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
β β Developer β β Whitelist β β Guild/User β β
β β Checks β β Checks β β Permissions β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
β β Input β β Output β β Error β β
β β Sanitizationβ β Filtering β β Handling β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββββ
β Data Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
β β Database β β API Key β β Secrets β β
β β Protection β β Management β β Management β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- Defense in Depth: Multiple security layers
- Least Privilege: Minimal required permissions
- Input Validation: All user input sanitized
- Secure by Default: Safe configuration defaults
- Fail Securely: Graceful security failures
- Audit Everything: Complete action logging
Developer (Bot Owner)
βββ Full system access
βββ Can execute any command
βββ Can manage whitelist
βββ Can reload/sync commands
βββ Bypass all restrictions
Whitelisted User (Beta Access)
βββ Access to all public commands
βββ Subject to rate limits
βββ Cannot access dev commands
βββ Database tracked usage
Public User (Open Access)
βββ Limited command access
βββ Subject to rate limits
βββ No dev commands
βββ Basic functionality only
Blacklisted User
βββ No command access
βββ All interactions blocked
βββ Logged for monitoring
βββ Cannot use any features
Developer Check Decorator:
def dev_only():
"""Restrict command to developers only."""
async def predicate(interaction: discord.Interaction) -> bool:
user_id = interaction.user.id
return user_id in settings.dev_ids
return app_commands.check(predicate)
# Usage
@dev_only()
@app_commands.command(name="eval", description="Execute Python code")
async def eval_command(self, interaction: discord.Interaction, code: str):
# Developer-only functionality
pass
Whitelist Check Decorator:
def requires_whitelist():
"""Check if user is whitelisted (if beta mode enabled)."""
async def predicate(interaction: discord.Interaction) -> bool:
# Developers always have access
if interaction.user.id in settings.dev_ids:
return True
# If not in closed beta, allow all users
if not settings.closed_beta:
return True
# Check database whitelist
async with bot.db.async_session() as session:
user = await session.get(User, {'discord_id': str(interaction.user.id)})
return user and user.is_whitelisted and not user.is_blacklisted
return app_commands.check(predicate)
# Usage
@requires_whitelist()
@app_commands.command(name="ask", description="Ask AI a question")
async def ask_command(self, interaction: discord.Interaction, question: str):
# Whitelisted functionality
pass
Blacklist Protection:
async def check_blacklist(user_id: int) -> bool:
"""Check if user is blacklisted."""
async with bot.db.async_session() as session:
user = await session.get(User, {'discord_id': str(user_id)})
return user and user.is_blacklisted
# Global check in bot core
@bot.check
async def global_blacklist_check(interaction: discord.Interaction):
if await check_blacklist(interaction.user.id):
await interaction.response.send_message(
"β You are blacklisted from using this bot.",
ephemeral=True
)
return False
return True
Command-Level Rate Limiting:
from discord.ext import commands
# Rate limiting decorator
@app_commands.command()
@cooldown(rate=3, per=60) # 3 commands per 60 seconds
async def ai_command(self, interaction: discord.Interaction):
# Rate-limited command
pass
# Global rate limiting
@app_commands.command()
@cooldown(rate=5, per=60) # Default: 5 commands per minute
async def general_command(self, interaction: discord.Interaction):
pass
API-Specific Rate Limiting:
async def check_api_rate_limit(user_id: str, api_name: str) -> bool:
"""Check if user has exceeded API rate limits."""
async with bot.db.async_session() as session:
usage = await session.get(APIUsage, {
'user_discord_id': user_id,
'api_name': api_name
})
if not usage:
return True # First time usage
# Check daily limits
if usage.daily_reset_date < date.today():
usage.daily_usage_count = 0
usage.daily_reset_date = date.today()
# API-specific limits
daily_limits = {
'gemini': 50,
'screenshot': 20,
'ip-api': 100
}
limit = daily_limits.get(api_name, 10)
return usage.daily_usage_count < limit
URL Validation:
import re
from urllib.parse import urlparse
def validate_url(url: str) -> tuple[bool, str]:
"""Validate and sanitize URLs."""
try:
# Basic format validation
if not url.startswith(('http://', 'https://')):
return False, "URL must start with http:// or https://"
parsed = urlparse(url)
# Check for valid domain
if not parsed.netloc:
return False, "Invalid domain in URL"
# Prevent local/private IPs
if is_private_ip(parsed.hostname):
return False, "Private IP addresses are not allowed"
# Length limits
if len(url) > 2000:
return False, "URL too long (max 2000 characters)"
return True, url
except Exception as e:
return False, f"Invalid URL format: {str(e)}"
def is_private_ip(hostname: str) -> bool:
"""Check if hostname resolves to private IP."""
import ipaddress
import socket
try:
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private or ip_obj.is_loopback
except:
return False
Text Input Sanitization:
import html
import re
def sanitize_text_input(text: str, max_length: int = 2000) -> str:
"""Sanitize user text input."""
# HTML escape
text = html.escape(text)
# Remove potentially dangerous patterns
text = re.sub(r'<[^>]*>', '', text) # Remove HTML tags
text = re.sub(r'javascript:', '', text, flags=re.IGNORECASE) # Remove JS
text = re.sub(r'data:', '', text, flags=re.IGNORECASE) # Remove data URLs
# Length limits
if len(text) > max_length:
text = text[:max_length] + "..."
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
TOTP Secret Validation:
import base64
import re
def validate_totp_secret(secret: str) -> tuple[bool, str]:
"""Validate TOTP secret format."""
try:
# Remove whitespace and convert to uppercase
secret = re.sub(r'\s', '', secret.upper())
# Check base32 format
if not re.match(r'^[A-Z2-7]+=*$', secret):
return False, "Invalid base32 format"
# Check length (typically 16 or 32 characters)
if len(secret) < 16 or len(secret) > 64:
return False, "Secret length must be between 16-64 characters"
# Try to decode to validate
base64.b32decode(secret)
return True, secret
except Exception as e:
return False, f"Invalid TOTP secret: {str(e)}"
Slash Command Parameter Validation:
from discord import app_commands
from typing import Literal
@app_commands.command()
@app_commands.describe(
text="Text to encode/decode (max 2000 chars)",
operation="Choose encode or decode"
)
async def base64_command(
self,
interaction: discord.Interaction,
text: app_commands.Range[str, 1, 2000], # Length validation
operation: Literal["encode", "decode"] # Choice validation
):
# Parameter validation handled by Discord
pass
# Custom transformers for complex validation
class URLTransformer(app_commands.Transformer):
async def transform(self, interaction: discord.Interaction, value: str) -> str:
is_valid, result = validate_url(value)
if not is_valid:
raise app_commands.TransformerError(result)
return result
@app_commands.command()
async def screenshot_command(
self,
interaction: discord.Interaction,
url: app_commands.Transform[str, URLTransformer]
):
# URL is pre-validated
pass
Environment-Based Configuration:
from pydantic import BaseSettings, validator
from typing import Optional
class Settings(BaseSettings):
# Required API keys
bot_token: str
gemini_api_key: str
# Optional API keys
screenshot_api_key: Optional[str] = None
rapidapi_key: Optional[str] = None
sentry_dsn: Optional[str] = None
@validator('bot_token')
def validate_bot_token(cls, v):
if not v or len(v) < 50:
raise ValueError('Invalid Discord bot token')
return v
@validator('gemini_api_key')
def validate_gemini_key(cls, v):
if not v or not v.startswith('AI'):
raise ValueError('Invalid Gemini API key format')
return v
class Config:
env_file = '.env'
case_sensitive = False
API Key Rotation Support:
import asyncio
from datetime import datetime, timedelta
class APIKeyManager:
def __init__(self):
self.keys = {}
self.rotation_schedule = {}
async def get_api_key(self, service: str) -> str:
"""Get current API key for service."""
if service in self.rotation_schedule:
if datetime.now() > self.rotation_schedule[service]:
await self.rotate_key(service)
return self.keys.get(service)
async def rotate_key(self, service: str):
"""Rotate API key for service."""
# Implementation depends on service
pass
Secure HTTP Client Configuration:
import aiohttp
import ssl
class SecureHTTPClient:
def __init__(self):
# Create secure SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# Configure timeouts
timeout = aiohttp.ClientTimeout(
total=30,
connect=10,
sock_read=10
)
# Create session with security settings
connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=100,
limit_per_host=30
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'UtilsBot+/2.0 (+https://github.com/ad1107/utils-bot-plus)'
}
)
async def get(self, url: str, **kwargs):
"""Secure GET request."""
async with self.session.get(url, **kwargs) as response:
return response
Request Sanitization:
async def safe_api_request(url: str, params: dict = None) -> dict:
"""Make safe API request with validation."""
# Validate URL
is_valid, validated_url = validate_url(url)
if not is_valid:
raise ValueError(f"Invalid URL: {validated_url}")
# Sanitize parameters
if params:
params = {k: sanitize_text_input(str(v)) for k, v in params.items()}
try:
async with secure_http_client.get(validated_url, params=params) as response:
if response.status == 200:
return await response.json()
else:
raise APIError(f"API returned status {response.status}")
except asyncio.TimeoutError:
raise APIError("Request timed out")
except Exception as e:
logger.error(f"API request failed: {e}")
raise APIError("External service unavailable")
Password and Secret Handling:
import secrets
import hashlib
from cryptography.fernet import Fernet
class SecretManager:
def __init__(self, secret_key: str):
self.cipher = Fernet(secret_key.encode())
def encrypt_secret(self, secret: str) -> str:
"""Encrypt sensitive data."""
return self.cipher.encrypt(secret.encode()).decode()
def decrypt_secret(self, encrypted: str) -> str:
"""Decrypt sensitive data."""
return self.cipher.decrypt(encrypted.encode()).decode()
@staticmethod
def generate_secure_password(length: int = 16) -> str:
"""Generate cryptographically secure password."""
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(length))
@staticmethod
def hash_data(data: str) -> str:
"""Hash data with salt."""
salt = secrets.token_hex(16)
hash_obj = hashlib.pbkdf2_hmac('sha256', data.encode(), salt.encode(), 100000)
return f"{salt}:{hash_obj.hex()}"
Database Security:
from sqlalchemy import event
from sqlalchemy.engine import Engine
# Encrypt sensitive columns
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
discord_id = Column(String(20), unique=True, nullable=False)
username = Column(String(100))
# Encrypted fields
_encrypted_preferences = Column('preferences', Text)
@property
def preferences(self) -> dict:
if self._encrypted_preferences:
return json.loads(secret_manager.decrypt_secret(self._encrypted_preferences))
return {}
@preferences.setter
def preferences(self, value: dict):
self._encrypted_preferences = secret_manager.encrypt_secret(json.dumps(value))
# Database connection security
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""Set secure SQLite pragmas."""
if 'sqlite' in str(dbapi_connection):
cursor = dbapi_connection.cursor()
# Enable foreign key constraints
cursor.execute("PRAGMA foreign_keys=ON")
# Set secure temp store
cursor.execute("PRAGMA secure_delete=ON")
cursor.close()
Data Minimization:
class PrivacyManager:
@staticmethod
def anonymize_user_data(user_data: dict) -> dict:
"""Anonymize user data for logging."""
sensitive_fields = ['discord_id', 'username', 'email']
anonymized = user_data.copy()
for field in sensitive_fields:
if field in anonymized:
anonymized[field] = f"***{anonymized[field][-4:]}"
return anonymized
@staticmethod
def should_log_command(command_name: str) -> bool:
"""Determine if command should be logged."""
# Don't log sensitive commands
sensitive_commands = ['totp', 'password']
return command_name not in sensitive_commands
GDPR Compliance:
async def handle_data_request(user_id: str, request_type: str):
"""Handle GDPR data requests."""
async with bot.db.async_session() as session:
if request_type == "export":
# Export all user data
user_data = await get_all_user_data(session, user_id)
return format_data_export(user_data)
elif request_type == "delete":
# Delete all user data
await delete_user_data(session, user_id)
return "Data deleted successfully"
Secure Environment Configuration:
# .env file security
chmod 600 .env # Restrict file permissions
# Environment variables validation
export BOT_TOKEN="your_secure_token_here"
export SECRET_KEY="$(openssl rand -hex 32)" # Generate secure key
export DATABASE_URL="postgresql://user:pass@localhost/db"
# Production environment
export DEBUG=false
export LOG_LEVEL=WARNING
export SENTRY_DSN="your_sentry_dsn"
Docker Security:
# Dockerfile security best practices
FROM python:3.11-slim
# Create non-root user
RUN useradd --create-home --shell /bin/bash botuser
# Set working directory
WORKDIR /app
# Copy requirements first (layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Change ownership and switch to non-root user
RUN chown -R botuser:botuser /app
USER botuser
# Remove unnecessary packages
RUN apt-get autoremove -y && apt-get clean
# Health check
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
CMD ["python", "main.py"]
Docker Compose Security:
version: '3.8'
services:
bot:
build: .
restart: unless-stopped
read_only: true # Read-only filesystem
tmpfs:
- /tmp
cap_drop:
- ALL # Drop all capabilities
cap_add:
- CHOWN # Only required capabilities
security_opt:
- no-new-privileges:true
environment:
- BOT_TOKEN_FILE=/run/secrets/bot_token
secrets:
- bot_token
networks:
- bot-network
secrets:
bot_token:
external: true
networks:
bot-network:
driver: bridge
internal: true # No external access
System Security:
# Firewall configuration
ufw enable
ufw default deny incoming
ufw default allow outgoing
ufw allow ssh
# SSH hardening
sed -i 's/#PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config
sed -i 's/#PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config
# Install security updates
apt update && apt upgrade -y
apt install unattended-upgrades -y
# Install fail2ban
apt install fail2ban -y
systemctl enable fail2ban
Comprehensive Audit Trail:
import structlog
from datetime import datetime
class SecurityLogger:
def __init__(self):
self.logger = structlog.get_logger("security")
async def log_command_execution(self, interaction: discord.Interaction, command: str):
"""Log command execution for audit."""
await self.logger.info(
"command_executed",
user_id=interaction.user.id,
username=interaction.user.name,
guild_id=interaction.guild_id if interaction.guild else None,
command=command,
timestamp=datetime.utcnow().isoformat(),
channel_id=interaction.channel_id
)
async def log_security_event(self, event_type: str, details: dict):
"""Log security-related events."""
await self.logger.warning(
"security_event",
event_type=event_type,
details=details,
timestamp=datetime.utcnow().isoformat()
)
async def log_failed_permission_check(self, user_id: int, command: str, reason: str):
"""Log failed permission checks."""
await self.logger.warning(
"permission_denied",
user_id=user_id,
command=command,
reason=reason,
timestamp=datetime.utcnow().isoformat()
)
Anomaly Detection:
class AnomalyDetector:
def __init__(self):
self.user_patterns = {}
async def check_unusual_activity(self, user_id: str, command: str) -> bool:
"""Detect unusual user activity patterns."""
now = datetime.utcnow()
if user_id not in self.user_patterns:
self.user_patterns[user_id] = {
'commands': [],
'last_activity': now
}
pattern = self.user_patterns[user_id]
pattern['commands'].append((command, now))
# Clean old commands (last hour)
hour_ago = now - timedelta(hours=1)
pattern['commands'] = [
(cmd, time) for cmd, time in pattern['commands']
if time > hour_ago
]
# Check for suspicious patterns
if len(pattern['commands']) > 100: # Too many commands
await security_logger.log_security_event(
"suspicious_activity",
{"user_id": user_id, "command_count": len(pattern['commands'])}
)
return True
return False
Sentry Integration:
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
def setup_error_monitoring():
"""Configure error monitoring."""
if settings.sentry_dsn:
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[
AioHttpIntegration(),
SqlalchemyIntegration(),
],
traces_sample_rate=0.1,
profiles_sample_rate=0.1,
before_send=filter_sensitive_data
)
def filter_sensitive_data(event, hint):
"""Filter sensitive data from error reports."""
# Remove sensitive environment variables
if 'environment' in event:
sensitive_vars = ['BOT_TOKEN', 'SECRET_KEY', 'API_KEY']
for var in sensitive_vars:
if var in event['environment']:
event['environment'][var] = '[REDACTED]'
return event
Secure Coding Practices:
- Input Validation: Validate all user inputs
- Output Encoding: Escape output to prevent injection
- Error Handling: Don't expose internal details
- Secrets Management: Never hardcode secrets
- Dependency Updates: Keep dependencies current
- Code Review: Review all code changes
Pre-commit Security Checks:
# .pre-commit-config.yaml
repos:
- repo: https://github.com/PyCQA/bandit
rev: '1.7.4'
hooks:
- id: bandit
args: ['-r', '.']
- repo: https://github.com/Yelp/detect-secrets
rev: v1.4.0
hooks:
- id: detect-secrets
args: ['--baseline', '.secrets.baseline']
Security Checklist:
- All secrets in environment variables
- SSL/TLS enabled for all connections
- Database connections encrypted
- Regular security updates applied
- Monitoring and alerting configured
- Backup and recovery tested
- Access controls properly configured
- Audit logging enabled
Regular Security Tasks:
# Weekly security update script
#!/bin/bash
apt update && apt list --upgradable
pip list --outdated
docker images --filter "dangling=true" -q | xargs docker rmi
# Monthly security audit
bandit -r . -f json -o security-report.json
safety check --json --output safety-report.json
- Deployment Guide - Secure deployment practices
- Configuration Guide - Security configuration
- Troubleshooting - Security issue resolution
- Developer Guide - Secure development practices