Observer System - adnanhd/observer-pattern GitHub Wiki
The Observer System is the heart of CallPyBack, implementing a formal observer pattern that allows multiple components to monitor function execution without tight coupling.
The observer pattern in CallPyBack provides:
- Decoupled Monitoring: Observers operate independently without affecting function execution
- Thread-Safe Operations: Concurrent observer notification with error isolation
- Priority-Based Execution: Higher priority observers execute first
- State-Aware Filtering: Observers can subscribe to specific execution states
- Error Isolation: Observer failures don't impact function execution or other observers
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Observer Pattern β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββ βββββββββββββββββββββββββββββββ β
β β CallPyBack β β Observer Manager β β
β β Decorator ββββββΆβ β β
β β β β β’ Add/Remove Observers β β
β β β’ Function Exec β β β’ Priority Sorting β β
β β β’ State Changes β β β’ Thread-Safe Notification β β
β β β’ Error Handlingβ β β’ Error Isolation β β
β βββββββββββββββββββ βββββββββββββββββββββββββββββββ β
β β β β
β β ExecutionContext β notify() β
β βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Observers β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Observer1 β β Observer2 β β Observer3 β β β
β β β Priority:100β β Priority: 50β β Priority: 10β β β
β β β States: ALL β β States:SUCC β β States:FAIL β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
All observers must implement the Observer
protocol:
from callpyback.protocols import Observer
from callpyback.core.context import ExecutionContext
class MyObserver:
def init(self, priority: int = 0, name: str = "MyObserver"):
self.priority = priority
self.name = name
def update(self, context: ExecutionContext) -> None:
"""Handle execution context update."""
print(f"Observer {self.name} received update: {context.state}")
Attribute | Type | Description |
---|---|---|
priority | int | Execution priority (higher = earlier) |
name | str | Observer identifier for debugging |
Observers can subscribe to specific execution states:
from callpyback.core.state_machine import ExecutionState
Available states
INITIALIZED # Function decoration complete
PRE_EXECUTION # Before function execution
EXECUTING # During function execution
POST_SUCCESS # After successful execution
POST_FAILURE # After failed execution
COMPLETED # Final state (always reached)
ERROR # Unexpected error state
INITIALIZED
β
βΌ
PRE_EXECUTION
β
βΌ
EXECUTING ββββββ¬βββ POST_SUCCESS βββ COMPLETED
β
ββββ POST_FAILURE ββββ COMPLETED
β
ββββ ERROR
CallPyBack provides several production-ready observers:
from callpyback.observers.builtin import MetricsObserver
metrics = MetricsObserver(priority=100)
@CallPyBack(observers=[metrics])
def monitored_function():
return "result"
Get metrics
data = metrics.get_metrics()
print(f"Total executions: {data['total_executions']}")
from callpyback.observers.builtin import LoggingObserver
import logging
logger = LoggingObserver(
logger=logging.getLogger("myapp"),
log_level=logging.INFO,
priority=50
)
@CallPyBack(observers=[logger])
def logged_function():
return "result"
from callpyback.observers.builtin import TimingObserver
timing = TimingObserver(
threshold=0.1, # 100ms threshold
priority=75
)
@CallPyBack(observers=[timing])
def timed_function():
time.sleep(0.2) # Will trigger slow execution alert
return "result"
Simple function-based observers using the factory pattern:
from callpyback import on_call, on_success, on_failure, on_completion
def before_execution(context):
print(f"Starting {context.function_signature.name}")
def after_success(result):
print(f"Success: {result.value}")
def after_failure(result):
print(f"Error: {result.exception}")
def always_execute(context):
print(f"Completed with state: {context.state}")
@CallPyBack(observers=[
on_call(before_execution),
on_success(after_success),
on_failure(after_failure),
on_completion(always_execute)
])
def example_function():
return "hello world"
Create domain-specific observers by extending BaseObserver
:
from callpyback.observers.base import BaseObserver
from callpyback.core.state_machine import ExecutionState
class DatabaseAuditObserver(BaseObserver):
def init(self):
super().init(priority=90, name="DatabaseAudit")
self.queries = []
self.connection_pool = None
def update(self, context):
if context.state == ExecutionState.COMPLETED:
query_info = {
'function': context.function_signature.name,
'timestamp': context.timestamp,
'success': context.is_successful,
'execution_time': getattr(context.result, 'execution_time', 0)
}
# Log to database
self._log_to_database(query_info)
# Keep in-memory for analysis
self.queries.append(query_info)
def _log_to_database(self, query_info):
# Implementation for database logging
pass
Usage
db_audit = DatabaseAuditObserver()
@CallPyBack(observers=[db_audit])
def database_operation():
return "query result"
Observers execute in priority order (highest first):
high_priority = MyObserver(priority=100) # Executes first
medium_priority = MyObserver(priority=50) # Executes second
low_priority = MyObserver(priority=10) # Executes last
@CallPyBack(observers=[low_priority, high_priority, medium_priority])
def prioritized_function():
return "result"
Subscribe observers to specific execution states:
# Manual state registration
decorator = CallPyBack()
Observer only for successful executions
decorator.add_observer(
success_observer,
states={ExecutionState.POST_SUCCESS}
)
Observer for both success and failure
decorator.add_observer(
completion_observer,
states={ExecutionState.POST_SUCCESS, ExecutionState.POST_FAILURE}
)
@decorator
def stateful_function():
return "result"
Observer errors are automatically isolated:
def failing_observer(context):
raise RuntimeError("Observer error")
def working_observer(context):
print("This observer still works!")
@CallPyBack(observers=[failing_observer, working_observer])
def resilient_function():
return "result" # Function succeeds despite observer failure
CallPyBack uses thread-safe observer management:
from concurrent.futures import ThreadPoolExecutor
import threading
metrics = MetricsObserver()
@CallPyBack(observers=[metrics])
def concurrent_function(worker_id):
time.sleep(0.01) # Simulate work
return f"Worker {worker_id} completed"
Execute concurrently
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(concurrent_function, i)
for i in range(100)
]
results = [f.result() for f in futures]
All executions are safely tracked
print(f"Total executions: {metrics.get_metrics()['total_executions']}")
Advanced error isolation with circuit breaker pattern:
from callpyback.management.observer_manager import ErrorIsolatingObserverManager
Create decorator with error isolation
decorator = CallPyBack(max_observer_failures=3)
Failing observer will be disabled after 3 failures
class FlakyObserver(BaseObserver):
def init(self):
super().init(priority=50, name="FlakyObserver")
self.call_count = 0
def update(self, context):
self.call_count += 1
if self.call_count % 2 == 0: # Fail every other call
raise RuntimeError("Flaky observer error")
print(f"Observer call {self.call_count} succeeded")
flaky = FlakyObserver()
decorator.add_observer(flaky)
@decorator
def protected_function():
return "result"
Execute multiple times - observer gets disabled
for i in range(10):
protected_function()
Track observer performance and health:
# Get observer statistics
decorator = CallPyBack()
metrics = decorator.get_metrics()
print(f"Active observers: {metrics['observer_count']}")
Check observer manager state
manager = decorator._observer_manager
if hasattr(manager, '_disabled_observers'):
print(f"Disabled observers: {len(manager._disabled_observers)}")
Create observers for debugging and development:
class DebugObserver(BaseObserver):
def __init__(self):
super().__init__(priority=999, name="Debug") # Highest priority
def update(self, context):
print(f"""
π DEBUG INFO:
Function: {context.function_signature.name}
State: {context.state.name}
Arguments: {context.arguments}
Success: {context.is_successful if context.result else 'N/A'}
Timestamp: {context.timestamp}
""")
Enable debug mode
debug = DebugObserver()
@CallPyBack(observers=[debug])
def debugged_function(x, y):
return x + y
- Single Responsibility: Each observer should have one clear purpose
- Error Resilience: Handle exceptions gracefully within observers
- Minimal Overhead: Keep observer logic lightweight
- Thread Safety: Use locks when modifying shared state
# Recommended priority ranges
CRITICAL_OBSERVERS = 900-999 # Security, audit logging
BUSINESS_OBSERVERS = 800-899 # Domain-specific logic
MONITORING_OBSERVERS = 100-199 # Metrics, performance
DEBUG_OBSERVERS = 1-99 # Development, testing
- Use
PRE_EXECUTION
for setup and validation - Use
POST_SUCCESS
for success-specific logic - Use
POST_FAILURE
for error handling and cleanup - Use
COMPLETED
for always-executing cleanup
# Use weak references for large observer sets
from weakref import WeakSet
class LargeScaleObserver(BaseObserver):
def init(self):
super().init(priority=50, name="LargeScale")
self.tracked_functions = WeakSet() # Won't prevent GC
def update(self, context):
self.tracked_functions.add(context.function_signature.name)
- State Machine - Understanding execution flow
- Built-in Observers - Ready-to-use observer implementations
- Custom Observers - Building domain-specific monitors
- Error Handling - Exception management strategies
- Performance Monitoring - Production monitoring patterns
Next: Learn about the State Machine that drives observer notifications.