Custom Observers - adnanhd/observer-pattern GitHub Wiki
Custom Observers
Custom observers are the key to unlocking CallPyBack's full potential. This guide shows you how to build powerful, domain-specific monitoring solutions that go far beyond basic logging.
π― Why Custom Observers?
While built-in observers handle common scenarios, custom observers enable:
- Domain-Specific Logic: Business rules and specialized monitoring
- Advanced Analytics: Complex metrics and pattern detection
- Integration: Connecting to external systems and databases
- Compliance: Regulatory requirements and audit trails
- Performance Optimization: Targeted monitoring and alerting
ποΈ Observer Architecture
Base Observer Pattern
All custom observers inherit from BaseObserver
:
from callpyback.observers.base import BaseObserver
from callpyback.core.context import ExecutionContext
from callpyback.core.state_machine import ExecutionState
class MyCustomObserver(BaseObserver):
def __init__(self, custom_param="default"):
super().__init__(priority=50, name="MyCustomObserver")
self.custom_param = custom_param
# Initialize your observer state
def update(self, context: ExecutionContext) -> None:
"""Handle execution updates."""
if context.state == ExecutionState.COMPLETED:
# Your custom logic here
self._process_execution(context)
def _process_execution(self, context):
"""Custom processing logic."""
pass
Observer Lifecycle
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Observer Lifecycle β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββ β
β β Registration β β Execution β β Cleanup β β
β β β β β β β β
β β β’ __init__() βββββΆβ β’ update() βββββΆβ β’ reset()β β
β β β’ configure() β β β’ process() β β β’ close()β β
β β β’ validate() β β β’ store() β β β β
β βββββββββββββββββββ βββββββββββββββββββ ββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββ β
β β State Filtering β β
β β β β
β β β’ PRE_EXECUTION β β
β β β’ POST_SUCCESS β β
β β β’ POST_FAILURE β β
β β β’ COMPLETED β β
β β β’ ERROR β β
β βββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π§ͺ Basic Custom Observer Examples
1. Simple Counter Observer
from collections import defaultdict
class FunctionCounterObserver(BaseObserver):
def __init__(self):
super().__init__(priority=50, name="FunctionCounter")
self.call_counts = defaultdict(int)
self.success_counts = defaultdict(int)
self.failure_counts = defaultdict(int)
def update(self, context):
func_name = context.function_signature.name
if context.state == ExecutionState.PRE_EXECUTION:
self.call_counts[func_name] += 1
elif context.state == ExecutionState.POST_SUCCESS:
self.success_counts[func_name] += 1
elif context.state == ExecutionState.POST_FAILURE:
self.failure_counts[func_name] += 1
def get_stats(self):
return {
"calls": dict(self.call_counts),
"successes": dict(self.success_counts),
"failures": dict(self.failure_counts)
}
# Usage
counter = FunctionCounterObserver()
@CallPyBack(observers=[counter])
def tracked_function():
return "result"
tracked_function()
print(counter.get_stats())
2. Rate Limiting Observer
import time
from collections import defaultdict
class RateLimitObserver(BaseObserver):
def __init__(self, max_calls_per_minute=60):
super().__init__(priority=100, name="RateLimit") # High priority
self.max_calls = max_calls_per_minute
self.call_history = defaultdict(list)
def update(self, context):
if context.state == ExecutionState.PRE_EXECUTION:
func_name = context.function_signature.name
current_time = time.time()
# Clean old entries (older than 1 minute)
cutoff_time = current_time - 60
self.call_history[func_name] = [
t for t in self.call_history[func_name]
if t > cutoff_time
]
# Check rate limit
if len(self.call_history[func_name]) >= self.max_calls:
raise Exception(f"Rate limit exceeded for {func_name}")
# Record this call
self.call_history[func_name].append(current_time)
# Usage
rate_limiter = RateLimitObserver(max_calls_per_minute=5)
@CallPyBack(observers=[rate_limiter])
def limited_function():
return "allowed"
3. Cache Observer
import hashlib
import json
class CacheObserver(BaseObserver):
def __init__(self, ttl_seconds=300): # 5 minute TTL
super().__init__(priority=90, name="Cache")
self.cache = {}
self.ttl = ttl_seconds
def _get_cache_key(self, context):
"""Generate cache key from function signature and arguments."""
key_data = {
'function': context.function_signature.name,
'args': context.arguments
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_str.encode()).hexdigest()
def update(self, context):
cache_key = self._get_cache_key(context)
current_time = time.time()
if context.state == ExecutionState.PRE_EXECUTION:
# Check for cached result
if cache_key in self.cache:
cached_data = self.cache[cache_key]
if current_time - cached_data['timestamp'] < self.ttl:
# Return cached result (this would need integration with the decorator)
print(f"Cache hit for {context.function_signature.name}")
return cached_data['result']
else:
# Expired, remove from cache
del self.cache[cache_key]
elif context.state == ExecutionState.POST_SUCCESS:
# Cache successful results
self.cache[cache_key] = {
'result': context.result.value,
'timestamp': current_time
}
print(f"Cached result for {context.function_signature.name}")
def get_cache_stats(self):
return {
'cached_items': len(self.cache),
'cache_keys': list(self.cache.keys())[:5] # Show first 5
}
π’ Enterprise-Grade Examples
1. Security Audit Observer
import json
import logging
from datetime import datetime
class SecurityAuditObserver(BaseObserver):
def __init__(self, audit_logger=None, sensitive_functions=None):
super().__init__(priority=999, name="SecurityAudit") # Highest priority
self.audit_logger = audit_logger or logging.getLogger("security_audit")
self.sensitive_functions = sensitive_functions or set()
self.security_events = []
def update(self, context):
func_name = context.function_signature.name
# Monitor all function calls to sensitive functions
if func_name in self.sensitive_functions:
self._log_security_event(context)
# Monitor for suspicious patterns
if context.state == ExecutionState.POST_FAILURE:
self._check_for_security_issues(context)
def _log_security_event(self, context):
"""Log security-relevant function calls."""
event = {
'timestamp': datetime.utcnow().isoformat(),
'function': context.function_signature.name,
'user': context.arguments.get('user_id', 'unknown'),
'action': context.arguments.get('action', 'unknown'),
'source_ip': context.arguments.get('source_ip', 'unknown'),
'success': context.is_successful,
'state': context.state.name
}
if context.is_failed:
event['error'] = str(context.result.exception)
self.security_events.append(event)
self.audit_logger.info(f"SECURITY_EVENT: {json.dumps(event)}")
def _check_for_security_issues(self, context):
"""Check for potential security issues."""
error_msg = str(context.result.exception).lower()
# Check for common attack patterns
suspicious_patterns = [
'permission denied',
'access denied',
'unauthorized',
'authentication failed',
'sql injection',
'xss'
]
if any(pattern in error_msg for pattern in suspicious_patterns):
alert = {
'timestamp': datetime.utcnow().isoformat(),
'function': context.function_signature.name,
'error': str(context.result.exception),
'arguments': self._sanitize_arguments(context.arguments),
'severity': 'HIGH'
}
self.audit_logger.warning(f"SECURITY_ALERT: {json.dumps(alert)}")
def _sanitize_arguments(self, arguments):
"""Remove sensitive data from arguments."""
sanitized = {}
sensitive_keys = {'password', 'token', 'secret', 'key', 'auth'}
for key, value in arguments.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
sanitized[key] = "***REDACTED***"
else:
sanitized[key] = str(value)[:100] # Truncate long values
return sanitized
# Usage
security_observer = SecurityAuditObserver(
sensitive_functions={'login', 'change_password', 'delete_user', 'admin_action'}
)
@CallPyBack(observers=[security_observer])
def login(username, password, source_ip):
# Login logic here
if username == "admin" and password == "wrong":
raise PermissionError("Authentication failed")
return {"user_id": username, "session_token": "abc123"}
2. Performance Profiling Observer
import statistics
import time
from collections import defaultdict, deque
class PerformanceProfileObserver(BaseObserver):
def __init__(self, window_size=1000, alert_threshold_ms=500):
super().__init__(priority=80, name="PerformanceProfile")
self.window_size = window_size
self.alert_threshold = alert_threshold_ms / 1000 # Convert to seconds
# Rolling windows for each function
self.execution_times = defaultdict(lambda: deque(maxlen=window_size))
self.memory_usage = defaultdict(lambda: deque(maxlen=window_size))
self.alerts = []
def update(self, context):
if context.state == ExecutionState.POST_SUCCESS and context.result:
func_name = context.function_signature.name
exec_time = context.result.execution_time
# Store execution time
self.execution_times[func_name].append(exec_time)
# Performance analysis
self._analyze_performance(func_name, exec_time, context)
def _analyze_performance(self, func_name, exec_time, context):
"""Analyze performance patterns and generate alerts."""
times = list(self.execution_times[func_name])
if len(times) < 10: # Need minimum samples
return
# Calculate statistics
mean_time = statistics.mean(times)
median_time = statistics.median(times)
p95_time = statistics.quantiles(times, n=20)[18] # 95th percentile
# Performance degradation detection
if len(times) >= 100:
recent_times = times[-10:] # Last 10 executions
historical_times = times[-100:-10] # Previous 90 executions
recent_avg = statistics.mean(recent_times)
historical_avg = statistics.mean(historical_times)
# Alert if recent performance is 50% worse than historical
if recent_avg > historical_avg * 1.5:
self._create_alert(
func_name,
"PERFORMANCE_DEGRADATION",
f"Recent avg: {recent_avg*1000:.1f}ms vs Historical: {historical_avg*1000:.1f}ms",
context
)
# Absolute threshold alert
if exec_time > self.alert_threshold:
self._create_alert(
func_name,
"SLOW_EXECUTION",
f"Execution time: {exec_time*1000:.1f}ms (threshold: {self.alert_threshold*1000:.1f}ms)",
context
)
def _create_alert(self, func_name, alert_type, message, context):
"""Create performance alert."""
alert = {
'timestamp': time.time(),
'function': func_name,
'type': alert_type,
'message': message,
'arguments': context.arguments,
'severity': 'HIGH' if alert_type == 'SLOW_EXECUTION' else 'MEDIUM'
}
self.alerts.append(alert)
print(f"π¨ PERFORMANCE ALERT: {alert_type} - {func_name}: {message}")
def get_performance_report(self, func_name=None):
"""Generate performance report."""
if func_name:
functions = [func_name] if func_name in self.execution_times else []
else:
functions = list(self.execution_times.keys())
report = {}
for fname in functions:
times = list(self.execution_times[fname])
if not times:
continue
report[fname] = {
'total_calls': len(times),
'mean_time_ms': statistics.mean(times) * 1000,
'median_time_ms': statistics.median(times) * 1000,
'min_time_ms': min(times) * 1000,
'max_time_ms': max(times) * 1000,
'std_dev_ms': statistics.stdev(times) * 1000 if len(times) > 1 else 0,
'p95_time_ms': statistics.quantiles(times, n=20)[18] * 1000 if len(times) >= 20 else None
}
return {
'functions': report,
'total_alerts': len(self.alerts),
'recent_alerts': [a for a in self.alerts if time.time() - a['timestamp'] < 3600] # Last hour
}
# Usage
profiler = PerformanceProfileObserver(window_size=500, alert_threshold_ms=200)
@CallPyBack(observers=[profiler])
def database_query(query, params):
# Simulate variable execution time
time.sleep(random.uniform(0.01, 0.3))
return f"Query result for: {query}"
# Run multiple times to build performance history
for i in range(150):
database_query(f"SELECT * FROM table_{i % 5}", {"id": i})
# Get performance report
report = profiler.get_performance_report("database_query")
print(json.dumps(report, indent=2))
3. Business Metrics Observer
from datetime import datetime, timedelta
import json
class BusinessMetricsObserver(BaseObserver):
def __init__(self, metrics_config=None):
super().__init__(priority=70, name="BusinessMetrics")
self.metrics_config = metrics_config or {}
self.metrics = defaultdict(lambda: defaultdict(float))
self.events = []
def update(self, context):
if context.state == ExecutionState.COMPLETED:
self._extract_business_metrics(context)
def _extract_business_metrics(self, context):
"""Extract business-relevant metrics from function execution."""
func_name = context.function_signature.name
# Check if this function has business metric configuration
if func_name not in self.metrics_config:
return
config = self.metrics_config[func_name]
# Extract metrics based on configuration
for metric_name, extraction_config in config.items():
value = self._extract_metric_value(context, extraction_config)
if value is not None:
self.metrics[func_name][metric_name] += value
# Record event for time-series analysis
self.events.append({
'timestamp': datetime.utcnow(),
'function': func_name,
'metric': metric_name,
'value': value,
'success': context.is_successful
})
def _extract_metric_value(self, context, config):
"""Extract metric value based on configuration."""
source = config.get('source', 'result')
path = config.get('path', [])
default = config.get('default', 0)
try:
if source == 'result' and context.result:
data = context.result.value
elif source == 'arguments':
data = context.arguments
elif source == 'variables' and context.local_variables:
data = context.local_variables
else:
return default
# Navigate the path to extract value
for key in path:
if isinstance(data, dict):
data = data.get(key)
else:
return default
if data is None:
return default
return float(data) if isinstance(data, (int, float)) else default
except (ValueError, TypeError, KeyError):
return default
def get_business_summary(self):
"""Generate business metrics summary."""
summary = {
'total_metrics': dict(self.metrics),
'recent_events': len([e for e in self.events if datetime.utcnow() - e['timestamp'] < timedelta(hours=1)]),
'daily_trends': self._calculate_daily_trends(),
'function_performance': self._calculate_function_performance()
}
return summary
def _calculate_daily_trends(self):
"""Calculate daily metric trends."""
now = datetime.utcnow()
yesterday = now - timedelta(days=1)
daily_metrics = defaultdict(lambda: defaultdict(float))
for event in self.events:
if event['timestamp'] >= yesterday:
day_key = event['timestamp'].strftime('%Y-%m-%d')
daily_metrics[day_key][event['metric']] += event['value']
return dict(daily_metrics)
def _calculate_function_performance(self):
"""Calculate function-level performance metrics."""
performance = {}
for func_name in self.metrics:
func_events = [e for e in self.events if e['function'] == func_name]
if func_events:
successful = [e for e in func_events if e['success']]
performance[func_name] = {
'total_calls': len(func_events),
'success_rate': len(successful) / len(func_events) * 100,
'total_business_value': sum(e['value'] for e in successful)
}
return performance
# Usage
business_config = {
'process_order': {
'revenue': {'source': 'result', 'path': ['order_total']},
'items_sold': {'source': 'result', 'path': ['item_count']},
'shipping_cost': {'source': 'arguments', 'path': ['shipping_method', 'cost']}
},
'user_signup': {
'new_users': {'source': 'result', 'path': [], 'default': 1},
'conversion_value': {'source': 'arguments', 'path': ['referral_value'], 'default': 10}
}
}
business_observer = BusinessMetricsObserver(business_config)
@CallPyBack(observers=[business_observer])
def process_order(customer_id, items, shipping_method):
order_total = sum(item['price'] for item in items)
return {
'order_id': f"ORD_{random.randint(1000, 9999)}",
'order_total': order_total,
'item_count': len(items),
'customer_id': customer_id
}
@CallPyBack(observers=[business_observer])
def user_signup(email, referral_code=None):
referral_value = 25 if referral_code else 10
return {
'user_id': f"USER_{random.randint(1000, 9999)}",
'referral_value': referral_value
}
π§ Observer Design Patterns
1. Template Method Pattern
class BaseAnalyticsObserver(BaseObserver):
"""Template for analytics observers."""
def update(self, context):
if self._should_process(context):
data = self._extract_data(context)
processed_data = self._process_data(data, context)
self._store_data(processed_data)
def _should_process(self, context):
"""Override to control when processing occurs."""
return context.state == ExecutionState.COMPLETED
def _extract_data(self, context):
"""Override to extract relevant data."""
raise NotImplementedError
def _process_data(self, data, context):
"""Override to process extracted data."""
return data
def _store_data(self, data):
"""Override to store processed data."""
raise NotImplementedError
class DatabaseAnalyticsObserver(BaseAnalyticsObserver):
def __init__(self, connection_string):
super().__init__(priority=60, name="DatabaseAnalytics")
self.connection_string = connection_string
def _extract_data(self, context):
return {
'function_name': context.function_signature.name,
'execution_time': getattr(context.result, 'execution_time', 0),
'success': context.is_successful,
'timestamp': context.timestamp
}
def _store_data(self, data):
# Store to database
print(f"Storing to database: {data}")
2. Strategy Pattern for Data Processing
class DataProcessor:
def process(self, data, context):
raise NotImplementedError
class JSONProcessor(DataProcessor):
def process(self, data, context):
return json.dumps(data)
class CSVProcessor(DataProcessor):
def process(self, data, context):
# Convert to CSV format
return f"{data.get('timestamp')},{data.get('function_name')},{data.get('success')}"
class ConfigurableObserver(BaseObserver):
def __init__(self, processor: DataProcessor):
super().__init__(priority=50, name="Configurable")
self.processor = processor
self.processed_data = []
def update(self, context):
if context.state == ExecutionState.COMPLETED:
data = {
'timestamp': context.timestamp,
'function_name': context.function_signature.name,
'success': context.is_successful
}
processed = self.processor.process(data, context)
self.processed_data.append(processed)
# Usage
json_observer = ConfigurableObserver(JSONProcessor())
csv_observer = ConfigurableObserver(CSVProcessor())
π― Best Practices for Custom Observers
1. Error Handling
class RobustObserver(BaseObserver):
def update(self, context):
try:
self._safe_processing(context)
except Exception as e:
# Log error but don't raise (observer isolation)
logging.error(f"Observer {self.name} failed: {e}")
self._handle_error(e, context)
def _safe_processing(self, context):
# Your observer logic here
pass
def _handle_error(self, error, context):
# Error recovery logic
pass
2. Memory Management
from collections import deque
class MemoryEfficientObserver(BaseObserver):
def __init__(self, max_items=10000):
super().__init__(priority=50, name="MemoryEfficient")
self.data = deque(maxlen=max_items) # Automatic cleanup
self.counters = defaultdict(int) # Small memory footprint
def update(self, context):
# Only store essential data
essential_data = {
'timestamp': context.timestamp,
'function': context.function_signature.name,
'success': context.is_successful
}
self.data.append(essential_data)
self.counters[context.function_signature.name] += 1
3. Performance Optimization
class OptimizedObserver(BaseObserver):
def __init__(self):
super().__init__(priority=50, name="Optimized")
self._batch = []
self._batch_size = 100
self._last_flush = time.time()
self._flush_interval = 30 # seconds
def update(self, context):
# Batch operations for efficiency
self._batch.append(self._extract_minimal_data(context))
# Flush batch when full or time-based
if (len(self._batch) >= self._batch_size or
time.time() - self._last_flush > self._flush_interval):
self._flush_batch()
def _extract_minimal_data(self, context):
# Extract only what you need
return (
context.timestamp,
context.function_signature.name,
context.is_successful
)
def _flush_batch(self):
if self._batch:
self._process_batch(self._batch)
self._batch.clear()
self._last_flush = time.time()
def _process_batch(self, batch):
# Process batch efficiently
pass
π Integration Examples
1. Database Integration
import sqlite3
class DatabaseObserver(BaseObserver):
def __init__(self, db_path="function_audit.db"):
super().__init__(priority=60, name="Database")
self.db_path = db_path
self._init_database()
def _init_database(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS function_calls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
function_name TEXT,
success BOOLEAN,
execution_time REAL,
arguments TEXT
)
''')
def update(self, context):
if context.state == ExecutionState.COMPLETED:
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT INTO function_calls
(timestamp, function_name, success, execution_time, arguments)
VALUES (?, ?, ?, ?, ?)
''', (
context.timestamp,
context.function_signature.name,
context.is_successful,
getattr(context.result, 'execution_time', 0),
json.dumps(context.arguments)
))
2. External API Integration
import requests
class WebhookObserver(BaseObserver):
def __init__(self, webhook_url, api_key=None):
super().__init__(priority=40, name="Webhook")
self.webhook_url = webhook_url
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers.update({'Authorization': f'Bearer {api_key}'})
def update(self, context):
if context.state == ExecutionState.POST_FAILURE:
# Send alerts for failures
self._send_alert(context)
def _send_alert(self, context):
payload = {
'alert_type': 'function_failure',
'function_name': context.function_signature.name,
'error_message': str(context.result.exception),
'timestamp': context.timestamp,
'arguments': context.arguments
}
try:
response = self.session.post(
self.webhook_url,
json=payload,
timeout=5
)
response.raise_for_status()
except requests.RequestException as e:
logging.error(f"Failed to send webhook: {e}")
π§ͺ Testing Custom Observers
Unit Testing
import unittest
from unittest.mock import Mock, patch
class TestCustomObserver(unittest.TestCase):
def setUp(self):
self.observer = MyCustomObserver()
self.mock_context = Mock()
self.mock_context.function_signature.name = "test_function"
self.mock_context.state = ExecutionState.COMPLETED
self.mock_context.is_successful = True
def test_observer_initialization(self):
self.assertEqual(self.observer.name, "MyCustomObserver")
self.assertEqual(self.observer.priority, 50)
def test_update_method(self):
self.observer.update(self.mock_context)
# Assert expected behavior
@patch('your_module.external_api_call')
def test_external_integration(self, mock_api):
mock_api.return_value = {'status': 'success'}
self.observer.update(self.mock_context)
mock_api.assert_called_once()
π Advanced Patterns
Check out these advanced observer patterns:
- ML Workflows - Machine learning model monitoring
- Financial Auditing - Transaction compliance tracking
- Distributed Systems - Service mesh monitoring
- Security Auditing - Security event tracking
Next: Explore Performance Monitoring to see production-ready observer implementations.