Chapter 11 Integration and Interoperability - Bryantad/Sona GitHub Wiki

Chapter 11: Integration and Interoperability

🌟 Welcome to Integration and Interoperability!

Ages 12-55+ | All Learning Styles Welcome | Neurodivergent-Friendly


🎯 Chapter Overview

Ready to connect your Sona applications with the wider software world? Whether you're 12 years old building your first API integration or 55+ creating enterprise-grade systems, this chapter will teach you how to make your applications work seamlessly with databases, APIs, cloud services, and other systems. We'll explore modern integration patterns that make your code play well with others!

🧠 This Chapter Supports:

  • πŸ”„ ADHD: Clear integration patterns, step-by-step workflows, progress tracking
  • 🎨 Autism: Logical API structures, predictable responses, detailed documentation
  • πŸ“š Learning Disabilities: Visual diagrams, plain explanations, practical examples
  • 🎯 Executive Function: Organized integration strategies, clear methodologies
  • πŸ‘οΈ Visual Processing: Network diagrams, API flows, clear code structure
  • 🎧 Auditory Processing: Screen reader friendly, descriptive headers, clear explanations

πŸ“‹ Learning Objectives

By the end of this chapter, you will:

  • βœ… Build professional REST API clients and servers with Sona
  • βœ… Integrate with multiple database systems (SQL and NoSQL)
  • βœ… Handle authentication and authorization securely
  • βœ… Implement real-time communication (WebSockets, Server-Sent Events)
  • βœ… Create efficient data pipelines and ETL processes
  • βœ… Build microservices architectures
  • βœ… Implement event-driven systems
  • βœ… Handle cross-platform compatibility seamlessly

🎯 Progress Tracking

Chapter 11 Progress: [β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘] 0% Complete
β”œβ”€β”€ 🌐 REST API Development (0/3 sections)
β”œβ”€β”€ πŸ—„οΈ Database Integration (0/3 sections)
β”œβ”€β”€ πŸ” Authentication & Authorization (0/3 sections)
β”œβ”€β”€ ⚑ Real-Time Communication (0/3 sections)
β”œβ”€β”€ πŸ”„ Data Pipelines & ETL (0/3 sections)
β”œβ”€β”€ πŸ—οΈ Microservices Architecture (0/3 sections)
└── πŸŽ‰ Event-Driven Systems (0/3 sections)

πŸ—ΊοΈ Visual Chapter Map

🌐 Integration & Interoperability Journey
β”œβ”€β”€ 🌐 REST API Development
β”‚   β”œβ”€β”€ πŸ”§ Building API clients
β”‚   β”œβ”€β”€ πŸ–₯️ Creating API servers
β”‚   └── πŸ“Š API testing & monitoring
β”œβ”€β”€ πŸ—„οΈ Database Integration
β”‚   β”œβ”€β”€ 🐘 SQL database connectivity
β”‚   β”œβ”€β”€ πŸ“„ NoSQL database integration
β”‚   └── πŸ”„ Database migrations & pooling
β”œβ”€β”€ πŸ” Authentication & Authorization
β”‚   β”œβ”€β”€ πŸ”‘ JWT authentication
β”‚   β”œβ”€β”€ πŸ›‘οΈ OAuth integration
β”‚   └── 🎯 Role-based access control
β”œβ”€β”€ ⚑ Real-Time Communication
β”‚   β”œβ”€β”€ πŸ”Œ WebSocket connections
β”‚   β”œβ”€β”€ πŸ“‘ Server-Sent Events
β”‚   └── πŸ“¨ Message queues
β”œβ”€β”€ πŸ”„ Data Pipelines & ETL
β”‚   β”œβ”€β”€ πŸ“₯ Data extraction
β”‚   β”œβ”€β”€ πŸ”„ Data transformation
β”‚   └── πŸ“€ Data loading
β”œβ”€β”€ πŸ—οΈ Microservices Architecture
β”‚   β”œβ”€β”€ 🏠 Service discovery
β”‚   β”œβ”€β”€ 🌐 API gateways
β”‚   └── πŸ“Š Service monitoring
└── πŸŽ‰ Event-Driven Systems
    β”œβ”€β”€ πŸ“¬ Event publishing
    β”œβ”€β”€ πŸ‘‚ Event consumption
    └── πŸ”„ Event processing

πŸ’‘ Integration Philosophy

πŸ€” Why Integration Matters

In today's connected world, applications rarely work in isolation. They need to:

  • 🌐 Communicate with APIs - Exchange data with external services
  • πŸ—„οΈ Connect to databases - Store and retrieve information efficiently
  • πŸ” Handle authentication - Securely manage user access
  • ⚑ Provide real-time features - Deliver instant updates and notifications
  • πŸ”„ Process data streams - Handle continuous data flows
  • πŸ—οΈ Scale horizontally - Support growing user bases
  • πŸŽ‰ React to events - Respond to system changes automatically

🧠 Learning Path Options

Choose your learning style:

  1. 🎯 Professional Track (Ages 18-55): Enterprise integration patterns
  2. πŸ“š Beginner-Friendly (Ages 12-25): Start with simple API calls
  3. πŸ”„ ADHD-Friendly (All ages): Step-by-step integration workflows
  4. 🎨 Autism-Friendly (All ages): Logical patterns and predictable structures
  5. 🎧 Audio Learner (All ages): Descriptive explanations and screen reader support

🌐 REST API Development: Building Connected Applications

🎯 Section Goal

Master building and consuming REST APIs to create applications that communicate seamlessly with external services and provide data to other applications.

πŸ”§ Building Professional API Clients

think "API clients are the bridge between your application and external services"

class APIClient:
    function __init__(base_url, default_headers, timeout, retry_config):
        self.base_url = base_url
        self.default_headers = default_headers or {}
        self.timeout = timeout or 30
        self.retry_config = retry_config or {"max_retries": 3, "backoff_factor": 2}
        self.session_data = {}
        self.request_history = []
        self.error_handlers = {}
        
        show "🌐 API Client initialized for: " + base_url
    
    function add_auth_header(auth_type, credentials):
        think "Add authentication headers based on type"
        
        when auth_type == "bearer":
            self.default_headers["Authorization"] = "Bearer " + credentials
        elif auth_type == "api_key":
            self.default_headers["X-API-Key"] = credentials
        elif auth_type == "basic":
            import base64
            encoded = base64.b64encode((credentials["username"] + ":" + credentials["password"]).encode()).decode()
            self.default_headers["Authorization"] = "Basic " + encoded
        
        show "πŸ” Added " + auth_type + " authentication"
    
    function register_error_handler(status_code, handler_function):
        self.error_handlers[status_code] = handler_function
        show "πŸ› οΈ Registered error handler for status " + str(status_code)
    
    function make_request(method, endpoint, data, headers, params):
        think "Make HTTP request with comprehensive error handling"
        
        full_url = self.base_url + endpoint
        request_headers = {**self.default_headers, **(headers or {})}
        
        request_info = {
            "method": method.upper(),
            "url": full_url,
            "headers": request_headers,
            "data": data,
            "params": params,
            "timestamp": get_current_timestamp()
        }
        
        show "πŸ“‘ Making " + method.upper() + " request to: " + full_url
        
        for attempt in range(self.retry_config["max_retries"] + 1):
            try:
                think "Simulate HTTP request (in real implementation, use http library)"
                response = self.simulate_http_request(method, full_url, request_headers, data, params)
                
                think "Record successful request"
                request_info["response"] = response
                request_info["success"] = True
                request_info["attempt"] = attempt + 1
                self.request_history.append(request_info)
                
                think "Handle response based on status code"
                status_code = response.get("status_code", 200)
                
                when status_code >= 400:
                    when status_code in self.error_handlers:
                        return self.error_handlers[status_code](response)
                    else:
                        raise APIError("HTTP " + str(status_code) + " error", status_code, response)
                
                show "βœ… Request successful (Status: " + str(status_code) + ")"
                return response
                
            except Exception as e:
                show "❌ Request failed (Attempt " + str(attempt + 1) + "): " + str(e)
                
                when attempt < self.retry_config["max_retries"]:
                    wait_time = self.retry_config["backoff_factor"] ** attempt
                    show "πŸ”„ Retrying in " + str(wait_time) + " seconds..."
                    time.sleep(wait_time)
                else:
                    request_info["success"] = False
                    request_info["error"] = str(e)
                    request_info["attempt"] = attempt + 1
                    self.request_history.append(request_info)
                    raise e
    
    function simulate_http_request(method, url, headers, data, params):
        think "Simulate HTTP request for demonstration"
        
        import random
        
        think "Simulate occasional failures for demonstration"
        when random.random() < 0.1:  # 10% failure rate
            raise ConnectionError("Network timeout")
        
        think "Simulate different response types"
        responses = {
            "GET": {
                "status_code": 200,
                "data": {"message": "Data retrieved successfully", "items": [1, 2, 3]},
                "headers": {"Content-Type": "application/json"}
            },
            "POST": {
                "status_code": 201,
                "data": {"message": "Resource created", "id": 123},
                "headers": {"Content-Type": "application/json"}
            },
            "PUT": {
                "status_code": 200,
                "data": {"message": "Resource updated", "id": 123},
                "headers": {"Content-Type": "application/json"}
            },
            "DELETE": {
                "status_code": 204,
                "data": None,
                "headers": {"Content-Type": "application/json"}
            }
        }
        
        return responses.get(method.upper(), responses["GET"])
    
    function get(endpoint, params, headers):
        return self.make_request("GET", endpoint, None, headers, params)
    
    function post(endpoint, data, headers):
        return self.make_request("POST", endpoint, data, headers, None)
    
    function put(endpoint, data, headers):
        return self.make_request("PUT", endpoint, data, headers, None)
    
    function delete(endpoint, headers):
        return self.make_request("DELETE", endpoint, None, headers, None)
    
    function get_request_history():
        return self.request_history
    
    function get_success_rate():
        when len(self.request_history) == 0:
            return 0
        
        successful_requests = sum(1 for req in self.request_history if req.get("success", False))
        return (successful_requests / len(self.request_history)) * 100

class APIError(Exception):
    function __init__(self, message, status_code, response):
        super().__init__(message)
        self.status_code = status_code
        self.response = response

think "Let's create a comprehensive API client for a social media service"

social_api = APIClient(
    base_url="https://api.socialmedia.com/v1",
    default_headers={"Content-Type": "application/json", "User-Agent": "SonaApp/1.0"},
    timeout=30,
    retry_config={"max_retries": 3, "backoff_factor": 2}
)

think "Add authentication"
social_api.add_auth_header("bearer", "your-jwt-token-here")

think "Register custom error handlers"
social_api.register_error_handler(401, function(response):
    show "πŸ” Authentication failed - need to refresh token"
    return {"error": "auth_required", "message": "Please authenticate"}
)

social_api.register_error_handler(429, function(response):
    show "⏰ Rate limit exceeded - backing off"
    time.sleep(60)  # Wait 1 minute
    return {"error": "rate_limited", "message": "Too many requests"}
)

social_api.register_error_handler(500, function(response):
    show "🚨 Server error - using fallback response"
    return {"error": "server_error", "data": [], "message": "Server temporarily unavailable"}
)

think "Test the API client with various operations"
show "πŸ§ͺ Testing API client functionality..."

try:
    think "Test 1: Get user profile"
    user_profile = social_api.get("/users/me", None, None)
    show "πŸ‘€ User profile: " + str(user_profile["data"])
    
    think "Test 2: Create a post"
    new_post = social_api.post("/posts", {
        "content": "Hello from Sona! πŸš€",
        "tags": ["sona", "programming", "accessibility"]
    }, None)
    show "πŸ“ Created post: " + str(new_post["data"])
    
    think "Test 3: Update the post"
    updated_post = social_api.put("/posts/123", {
        "content": "Updated: Hello from Sona! πŸš€βœ¨",
        "tags": ["sona", "programming", "accessibility", "update"]
    }, None)
    show "✏️ Updated post: " + str(updated_post["data"])
    
    think "Test 4: Get posts with parameters"
    posts = social_api.get("/posts", {"limit": 10, "sort": "recent"}, None)
    show "πŸ“‹ Recent posts: " + str(len(posts["data"]["items"])) + " items"
    
    think "Test 5: Search posts"
    search_results = social_api.get("/posts/search", {"q": "sona", "limit": 5}, None)
    show "πŸ” Search results: " + str(search_results["data"])
    
except APIError as e:
    show "❌ API Error: " + str(e) + " (Status: " + str(e.status_code) + ")"
except Exception as e:
    show "πŸ’₯ Unexpected error: " + str(e)

think "Show request history and statistics"
history = social_api.get_request_history()
success_rate = social_api.get_success_rate()

show "πŸ“Š API Client Statistics:"
show "  Total requests: " + str(len(history))
show "  Success rate: " + str(round(success_rate, 2)) + "%"
show "  Recent requests:"

for req in history[-3:]:  # Show last 3 requests
    status = "βœ… Success" if req.get("success", False) else "❌ Failed"
    show "    " + req["method"] + " " + req["url"] + " - " + status

function get_current_timestamp():
    import time
    return time.time()

import time

show "βœ… API client testing completed!"

πŸ–₯️ Creating Professional API Servers

think "API servers provide data and services to other applications"

class APIServer:
    function __init__(host, port, name):
        self.host = host
        self.port = port
        self.name = name
        self.routes = {}
        self.middleware = []
        self.error_handlers = {}
        self.request_log = []
        self.server_stats = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "start_time": get_current_timestamp()
        }
        
        show "πŸ–₯️ API Server '" + name + "' initialized on " + host + ":" + str(port)
    
    function add_middleware(middleware_function, description):
        self.middleware.append({
            "function": middleware_function,
            "description": description
        })
        show "πŸ”§ Added middleware: " + description
    
    function register_error_handler(error_type, handler_function):
        self.error_handlers[error_type] = handler_function
        show "πŸ› οΈ Registered error handler for: " + str(error_type)
    
    function route(method, path, handler_function, auth_required):
        route_key = method.upper() + " " + path
        
        self.routes[route_key] = {
            "method": method.upper(),
            "path": path,
            "handler": handler_function,
            "auth_required": auth_required or False
        }
        
        show "πŸ›£οΈ Registered route: " + route_key
    
    function get(path, handler_function, auth_required):
        self.route("GET", path, handler_function, auth_required)
    
    function post(path, handler_function, auth_required):
        self.route("POST", path, handler_function, auth_required)
    
    function put(path, handler_function, auth_required):
        self.route("PUT", path, handler_function, auth_required)
    
    function delete(path, handler_function, auth_required):
        self.route("DELETE", path, handler_function, auth_required)
    
    function process_request(method, path, headers, data, query_params):
        think "Process incoming HTTP request"
        
        request_info = {
            "method": method.upper(),
            "path": path,
            "headers": headers or {},
            "data": data,
            "query_params": query_params or {},
            "timestamp": get_current_timestamp(),
            "ip": "127.0.0.1"  # Simulated IP
        }
        
        self.server_stats["total_requests"] += 1
        
        show "πŸ“¨ Incoming request: " + method.upper() + " " + path
        
        try:
            think "Run middleware chain"
            for middleware in self.middleware:
                middleware_result = middleware["function"](request_info)
                
                when middleware_result.get("block", False):
                    response = {
                        "status_code": middleware_result.get("status_code", 403),
                        "data": {"error": middleware_result.get("message", "Request blocked by middleware")},
                        "headers": {"Content-Type": "application/json"}
                    }
                    self.log_request(request_info, response)
                    return response
                
                think "Update request with middleware data"
                request_info.update(middleware_result.get("updates", {}))
            
            think "Find matching route"
            route_key = method.upper() + " " + path
            
            when route_key not in self.routes:
                raise RouteNotFoundError("Route not found: " + route_key)
            
            route = self.routes[route_key]
            
            think "Check authentication if required"
            when route["auth_required"]:
                auth_result = self.check_authentication(request_info)
                when not auth_result["valid"]:
                    raise AuthenticationError("Authentication required")
                request_info["user"] = auth_result["user"]
            
            think "Call route handler"
            response_data = route["handler"](request_info)
            
            response = {
                "status_code": 200,
                "data": response_data,
                "headers": {"Content-Type": "application/json"}
            }
            
            self.server_stats["successful_requests"] += 1
            self.log_request(request_info, response)
            
            show "βœ… Request processed successfully"
            return response
            
        except Exception as e:
            self.server_stats["failed_requests"] += 1
            
            think "Handle errors with registered handlers"
            error_type = type(e)
            
            when error_type in self.error_handlers:
                error_response = self.error_handlers[error_type](e, request_info)
            else:
                error_response = self.default_error_handler(e, request_info)
            
            self.log_request(request_info, error_response)
            
            show "❌ Request failed: " + str(e)
            return error_response
    
    function check_authentication(request_info):
        think "Check if request has valid authentication"
        
        auth_header = request_info["headers"].get("Authorization", "")
        
        when auth_header.startswith("Bearer "):
            token = auth_header[7:]  # Remove "Bearer " prefix
            
            think "Validate JWT token (simplified)"
            when token == "valid-jwt-token":
                return {"valid": True, "user": {"id": 1, "username": "testuser"}}
            else:
                return {"valid": False, "error": "Invalid token"}
        
        return {"valid": False, "error": "No authentication provided"}
    
    function log_request(request_info, response):
        log_entry = {
            "request": request_info,
            "response": {
                "status_code": response["status_code"],
                "success": response["status_code"] < 400
            },
            "duration": 0.1  # Simulated processing time
        }
        
        self.request_log.append(log_entry)
        
        think "Keep only last 100 requests"
        when len(self.request_log) > 100:
            self.request_log = self.request_log[-100:]
    
    function default_error_handler(error, request_info):
        error_responses = {
            "RouteNotFoundError": {
                "status_code": 404,
                "data": {"error": "Not Found", "message": str(error)},
                "headers": {"Content-Type": "application/json"}
            },
            "AuthenticationError": {
                "status_code": 401,
                "data": {"error": "Unauthorized", "message": str(error)},
                "headers": {"Content-Type": "application/json"}
            },
            "ValidationError": {
                "status_code": 400,
                "data": {"error": "Bad Request", "message": str(error)},
                "headers": {"Content-Type": "application/json"}
            }
        }
        
        error_type = type(error).__name__
        return error_responses.get(error_type, {
            "status_code": 500,
            "data": {"error": "Internal Server Error", "message": "An unexpected error occurred"},
            "headers": {"Content-Type": "application/json"}
        })
    
    function get_server_stats():
        uptime = get_current_timestamp() - self.server_stats["start_time"]
        
        return {
            **self.server_stats,
            "uptime_seconds": uptime,
            "requests_per_second": self.server_stats["total_requests"] / max(1, uptime),
            "success_rate": (self.server_stats["successful_requests"] / max(1, self.server_stats["total_requests"])) * 100
        }
    
    function get_request_log():
        return self.request_log

class RouteNotFoundError(Exception):
    pass

class AuthenticationError(Exception):
    pass

class ValidationError(Exception):
    pass

think "Let's create a comprehensive blog API server"

blog_server = APIServer("localhost", 8080, "BlogAPI")

think "Add middleware for logging and rate limiting"
blog_server.add_middleware(function(request_info):
    show "πŸ“ Logging request: " + request_info["method"] + " " + request_info["path"]
    return {"updates": {"logged": True}}
, "Request Logger")

blog_server.add_middleware(function(request_info):
    think "Simple rate limiting check"
    ip = request_info["ip"]
    
    think "In a real implementation, check rate limits per IP"
    import random
    when random.random() < 0.05:  # 5% chance of rate limiting
        return {
            "block": True,
            "status_code": 429,
            "message": "Rate limit exceeded. Please try again later."
        }
    
    return {"updates": {"rate_check_passed": True}}
, "Rate Limiter")

think "Register error handlers"
blog_server.register_error_handler(ValidationError, function(error, request_info):
    return {
        "status_code": 400,
        "data": {"error": "Validation Error", "message": str(error), "path": request_info["path"]},
        "headers": {"Content-Type": "application/json"}
    }
)

think "Create in-memory blog data store"
blog_posts = [
    {"id": 1, "title": "Getting Started with Sona", "content": "Sona is an accessibility-first programming language...", "author": "admin"},
    {"id": 2, "title": "Advanced Sona Features", "content": "Learn about advanced features like metaprogramming...", "author": "admin"},
    {"id": 3, "title": "Building APIs with Sona", "content": "This guide shows how to build REST APIs...", "author": "admin"}
]

blog_comments = [
    {"id": 1, "post_id": 1, "author": "user1", "content": "Great introduction!"},
    {"id": 2, "post_id": 1, "author": "user2", "content": "Very helpful, thank you!"},
    {"id": 3, "post_id": 2, "author": "user1", "content": "Advanced features are amazing!"}
]

think "Define API route handlers"
blog_server.get("/posts", function(request_info):
    think "Get all blog posts with optional filtering"
    
    query_params = request_info["query_params"]
    limit = int(query_params.get("limit", 10))
    author = query_params.get("author")
    
    filtered_posts = blog_posts
    
    when author:
        filtered_posts = [post for post in blog_posts if post["author"] == author]
    
    think "Apply limit"
    limited_posts = filtered_posts[:limit]
    
    return {
        "posts": limited_posts,
        "total": len(blog_posts),
        "filtered": len(filtered_posts),
        "limit": limit
    }
, False)

blog_server.get("/posts/{id}", function(request_info):
    think "Get specific blog post by ID"
    
    post_id = int(request_info["path"].split("/")[-1])
    
    for post in blog_posts:
        when post["id"] == post_id:
            think "Get comments for this post"
            post_comments = [comment for comment in blog_comments if comment["post_id"] == post_id]
            
            return {
                "post": post,
                "comments": post_comments,
                "comment_count": len(post_comments)
            }
    
    raise RouteNotFoundError("Post not found")
, False)

blog_server.post("/posts", function(request_info):
    think "Create a new blog post"
    
    data = request_info["data"]
    
    think "Validate required fields"
    required_fields = ["title", "content"]
    for field in required_fields:
        when field not in data or not data[field]:
            raise ValidationError("Missing required field: " + field)
    
    think "Create new post"
    new_post = {
        "id": len(blog_posts) + 1,
        "title": data["title"],
        "content": data["content"],
        "author": request_info.get("user", {}).get("username", "anonymous")
    }
    
    blog_posts.append(new_post)
    
    return {
        "message": "Post created successfully",
        "post": new_post
    }
, True)

blog_server.put("/posts/{id}", function(request_info):
    think "Update existing blog post"
    
    post_id = int(request_info["path"].split("/")[-1])
    data = request_info["data"]
    
    for post in blog_posts:
        when post["id"] == post_id:
            think "Update post fields"
            when "title" in data:
                post["title"] = data["title"]
            when "content" in data:
                post["content"] = data["content"]
            
            return {
                "message": "Post updated successfully",
                "post": post
            }
    
    raise RouteNotFoundError("Post not found")
, True)

blog_server.delete("/posts/{id}", function(request_info):
    think "Delete blog post"
    
    post_id = int(request_info["path"].split("/")[-1])
    
    for i, post in enumerate(blog_posts):
        when post["id"] == post_id:
            deleted_post = blog_posts.pop(i)
            
            think "Also delete associated comments"
            global blog_comments
            blog_comments = [comment for comment in blog_comments if comment["post_id"] != post_id]
            
            return {
                "message": "Post deleted successfully",
                "deleted_post": deleted_post
            }
    
    raise RouteNotFoundError("Post not found")
, True)

blog_server.get("/stats", function(request_info):
    think "Get server statistics"
    
    server_stats = blog_server.get_server_stats()
    
    return {
        "server": server_stats,
        "data": {
            "total_posts": len(blog_posts),
            "total_comments": len(blog_comments),
            "recent_posts": blog_posts[-3:] if blog_posts else []
        }
    }
, False)

think "Test the blog API server"
show "πŸ§ͺ Testing blog API server..."

try:
    think "Test 1: Get all posts"
    response = blog_server.process_request("GET", "/posts", {}, None, {"limit": "5"})
    show "πŸ“‹ All posts response: " + str(response["data"]["total"]) + " posts total"
    
    think "Test 2: Get specific post"
    response = blog_server.process_request("GET", "/posts/1", {}, None, None)
    show "πŸ“– Post 1: " + response["data"]["post"]["title"]
    
    think "Test 3: Create new post (with authentication)"
    response = blog_server.process_request("POST", "/posts", 
                                         {"Authorization": "Bearer valid-jwt-token"}, 
                                         {"title": "New Post", "content": "This is a new post created via API"}, 
                                         None)
    show "βž• Created post: " + response["data"]["post"]["title"]
    
    think "Test 4: Update post"
    response = blog_server.process_request("PUT", "/posts/1", 
                                         {"Authorization": "Bearer valid-jwt-token"}, 
                                         {"title": "Updated: Getting Started with Sona"}, 
                                         None)
    show "✏️ Updated post: " + response["data"]["post"]["title"]
    
    think "Test 5: Get server stats"
    response = blog_server.process_request("GET", "/stats", {}, None, None)
    show "πŸ“Š Server stats: " + str(response["data"]["server"]["total_requests"]) + " total requests"
    
    think "Test 6: Test authentication failure"
    response = blog_server.process_request("POST", "/posts", 
                                         {"Authorization": "Bearer invalid-token"}, 
                                         {"title": "Should Fail", "content": "This should fail"}, 
                                         None)
    show "πŸ” Auth failure response: " + str(response["status_code"])
    
    think "Test 7: Test route not found"
    response = blog_server.process_request("GET", "/nonexistent", {}, None, None)
    show "πŸ” Not found response: " + str(response["status_code"])
    
except Exception as e:
    show "πŸ’₯ Test error: " + str(e)

think "Show detailed server statistics"
final_stats = blog_server.get_server_stats()
show "πŸ“Š Final Server Statistics:"
show "  Total requests: " + str(final_stats["total_requests"])
show "  Successful requests: " + str(final_stats["successful_requests"])
show "  Failed requests: " + str(final_stats["failed_requests"])
show "  Success rate: " + str(round(final_stats["success_rate"], 2)) + "%"
show "  Requests per second: " + str(round(final_stats["requests_per_second"], 2))

think "Show recent request log"
request_log = blog_server.get_request_log()
show "πŸ“ Recent Request Log:"
for entry in request_log[-5:]:  # Show last 5 requests
    status = "βœ…" if entry["response"]["success"] else "❌"
    show "  " + status + " " + entry["request"]["method"] + " " + entry["request"]["path"] + " (" + str(entry["response"]["status_code"]) + ")"

show "βœ… API server testing completed!"

πŸ—„οΈ Database Integration: Connecting to Data Sources

🎯 Section Goal

Master connecting to various database systems (SQL and NoSQL) to create data-driven applications that can store, retrieve, and manipulate information efficiently.

🐘 SQL Database Connectivity and Operations

think "SQL databases are perfect for structured data with relationships"

class SQLDatabase:
    function __init__(connection_string, database_type):
        self.connection_string = connection_string
        self.database_type = database_type
        self.connection = None
        self.connection_pool = []
        self.transaction_history = []
        self.query_cache = {}
        self.performance_stats = {
            "total_queries": 0,
            "successful_queries": 0,
            "failed_queries": 0,
            "cache_hits": 0,
            "average_query_time": 0
        }
        
        show "🐘 SQL Database initialized: " + database_type
    
    function connect(max_retries):
        think "Establish database connection with retry logic"
        
        for attempt in range(max_retries or 3):
            try:
                think "In real implementation, use actual database drivers"
                self.connection = self.simulate_database_connection()
                
                show "βœ… Connected to " + self.database_type + " database"
                return True
                
            except Exception as e:
                show "❌ Connection attempt " + str(attempt + 1) + " failed: " + str(e)
                
                when attempt < (max_retries or 3) - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
                else:
                    raise DatabaseConnectionError("Failed to connect after " + str(max_retries) + " attempts")
    
    function simulate_database_connection():
        think "Simulate database connection for demonstration"
        
        import random
        when random.random() < 0.1:  # 10% chance of connection failure
            raise ConnectionError("Database connection failed")
        
        return {
            "connected": True,
            "database_type": self.database_type,
            "connection_id": random.randint(1000, 9999)
        }
    
    function execute_query(query, parameters, use_cache):
        think "Execute SQL query with comprehensive error handling"
        
        query_start_time = time.time()
        self.performance_stats["total_queries"] += 1
        
        think "Check cache first if enabled"
        cache_key = query + str(parameters or {})
        when use_cache and cache_key in self.query_cache:
            self.performance_stats["cache_hits"] += 1
            show "πŸ’Ύ Cache hit for query"
            return self.query_cache[cache_key]
        
        try:
            when not self.connection:
                raise DatabaseConnectionError("Not connected to database")
            
            think "Simulate query execution"
            result = self.simulate_query_execution(query, parameters)
            
            think "Cache result if enabled"
            when use_cache:
                self.query_cache[cache_key] = result
            
            think "Update performance statistics"
            query_time = time.time() - query_start_time
            self.performance_stats["successful_queries"] += 1
            self.update_average_query_time(query_time)
            
            think "Log transaction"
            self.transaction_history.append({
                "query": query,
                "parameters": parameters,
                "result_count": len(result.get("rows", [])),
                "execution_time": query_time,
                "timestamp": time.time(),
                "success": True
            })
            
            show "βœ… Query executed successfully (" + str(round(query_time * 1000, 2)) + "ms)"
            return result
            
        except Exception as e:
            self.performance_stats["failed_queries"] += 1
            
            think "Log failed transaction"
            self.transaction_history.append({
                "query": query,
                "parameters": parameters,
                "error": str(e),
                "execution_time": time.time() - query_start_time,
                "timestamp": time.time(),
                "success": False
            })
            
            show "❌ Query failed: " + str(e)
            raise DatabaseQueryError("Query execution failed: " + str(e))
    
    function simulate_query_execution(query, parameters):
        think "Simulate different types of SQL queries"
        
        import random
        
        think "Simulate occasional query failures"
        when random.random() < 0.05:  # 5% failure rate
            raise Exception("Query timeout")
        
        query_lower = query.lower().strip()
        
        when query_lower.startswith("select"):
            return {
                "type": "SELECT",
                "rows": [
                    {"id": 1, "name": "John Doe", "email": "[email protected]", "age": 30},
                    {"id": 2, "name": "Jane Smith", "email": "[email protected]", "age": 25},
                    {"id": 3, "name": "Bob Johnson", "email": "[email protected]", "age": 35}
                ],
                "columns": ["id", "name", "email", "age"],
                "row_count": 3
            }
        
        elif query_lower.startswith("insert"):
            return {
                "type": "INSERT",
                "rows_affected": 1,
                "last_insert_id": random.randint(100, 999)
            }
        
        elif query_lower.startswith("update"):
            return {
                "type": "UPDATE",
                "rows_affected": random.randint(0, 5)
            }
        
        elif query_lower.startswith("delete"):
            return {
                "type": "DELETE",
                "rows_affected": random.randint(0, 3)
            }
        
        else:
            return {
                "type": "OTHER",
                "message": "Query executed successfully"
            }
    
    function update_average_query_time(query_time):
        current_avg = self.performance_stats["average_query_time"]
        successful_queries = self.performance_stats["successful_queries"]
        
        self.performance_stats["average_query_time"] = (
            (current_avg * (successful_queries - 1) + query_time) / successful_queries
        )
    
    function begin_transaction():
        think "Start a database transaction"
        
        try:
            result = self.execute_query("BEGIN TRANSACTION", None, False)
            show "πŸ”„ Transaction started"
            return result
        except Exception as e:
            show "❌ Failed to start transaction: " + str(e)
            raise e
    
    function commit_transaction():
        think "Commit the current transaction"
        
        try:
            result = self.execute_query("COMMIT", None, False)
            show "βœ… Transaction committed"
            return result
        except Exception as e:
            show "❌ Failed to commit transaction: " + str(e)
            raise e
    
    function rollback_transaction():
        think "Rollback the current transaction"
        
        try:
            result = self.execute_query("ROLLBACK", None, False)
            show "πŸ”„ Transaction rolled back"
            return result
        except Exception as e:
            show "❌ Failed to rollback transaction: " + str(e)
            raise e
    
    function get_performance_stats():
        return self.performance_stats
    
    function get_transaction_history():
        return self.transaction_history
    
    function clear_cache():
        self.query_cache.clear()
        show "πŸ—‘οΈ Query cache cleared"
    
    function close_connection():
        when self.connection:
            self.connection = None
            show "πŸ”Œ Database connection closed"

class DatabaseConnectionError(Exception):
    pass

class DatabaseQueryError(Exception):
    pass

think "Let's create a comprehensive user management system with SQL database"

user_db = SQLDatabase("postgresql://localhost:5432/userdb", "PostgreSQL")

think "Connect to the database"
user_db.connect(max_retries=3)

think "Create users table"
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    full_name VARCHAR(100),
    age INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""

try:
    user_db.execute_query(create_table_sql, None, False)
    show "πŸ“‹ Users table created successfully"
except Exception as e:
    show "⚠️ Table creation error (might already exist): " + str(e)

think "Test comprehensive database operations"
show "πŸ§ͺ Testing database operations..."

try:
    think "Test 1: Insert new users"
    insert_sql = """
    INSERT INTO users (username, email, password_hash, full_name, age)
    VALUES (?, ?, ?, ?, ?)
    """
    
    users_to_insert = [
        ("john_doe", "[email protected]", "hashed_password_123", "John Doe", 30),
        ("jane_smith", "[email protected]", "hashed_password_456", "Jane Smith", 25),
        ("bob_johnson", "[email protected]", "hashed_password_789", "Bob Johnson", 35)
    ]
    
    for user_data in users_to_insert:
        result = user_db.execute_query(insert_sql, user_data, False)
        show "βž• Inserted user: " + user_data[0] + " (ID: " + str(result["last_insert_id"]) + ")"
    
    think "Test 2: Query all users"
    select_all_sql = "SELECT * FROM users ORDER BY created_at DESC"
    result = user_db.execute_query(select_all_sql, None, True)
    
    show "πŸ‘₯ All users:"
    for row in result["rows"]:
        show "  - " + row["username"] + " (" + row["email"] + ") - Age: " + str(row["age"])
    
    think "Test 3: Query with parameters"
    select_by_age_sql = "SELECT * FROM users WHERE age > ? ORDER BY age"
    result = user_db.execute_query(select_by_age_sql, [28], True)
    
    show "πŸ‘₯ Users over 28:"
    for row in result["rows"]:
        show "  - " + row["username"] + " (Age: " + str(row["age"]) + ")"
    
    think "Test 4: Update user"
    update_sql = "UPDATE users SET full_name = ?, age = ? WHERE username = ?"
    result = user_db.execute_query(update_sql, ["John Updated Doe", 31, "john_doe"], False)
    show "✏️ Updated " + str(result["rows_affected"]) + " user(s)"
    
    think "Test 5: Transaction example"
    user_db.begin_transaction()
    
    try:
        think "Update multiple users in transaction"
        update_sql = "UPDATE users SET age = age + 1 WHERE age < ?"
        result = user_db.execute_query(update_sql, [35], False)
        show "πŸŽ‚ Aged " + str(result["rows_affected"]) + " users by 1 year"
        
        think "Insert another user in the same transaction"
        insert_sql = "INSERT INTO users (username, email, password_hash, full_name, age) VALUES (?, ?, ?, ?, ?)"
        result = user_db.execute_query(insert_sql, ("alice_wonder", "[email protected]", "hashed_password_abc", "Alice Wonder", 28), False)
        show "βž• Added user in transaction: alice_wonder"
        
        user_db.commit_transaction()
        show "βœ… Transaction completed successfully"
        
    except Exception as e:
        user_db.rollback_transaction()
        show "❌ Transaction failed, rolled back: " + str(e)
    
    think "Test 6: Delete user"
    delete_sql = "DELETE FROM users WHERE username = ?"
    result = user_db.execute_query(delete_sql, ["bob_johnson"], False)
    show "πŸ—‘οΈ Deleted " + str(result["rows_affected"]) + " user(s)"
    
    think "Test 7: Final count"
    count_sql = "SELECT COUNT(*) as total FROM users"
    result = user_db.execute_query(count_sql, None, True)
    show "πŸ“Š Total users remaining: " + str(result["rows"][0]["total"])
    
except Exception as e:
    show "πŸ’₯ Database operation failed: " + str(e)

think "Show performance statistics"
stats = user_db.get_performance_stats()
show "πŸ“Š Database Performance Statistics:"
show "  Total queries: " + str(stats["total_queries"])
show "  Successful queries: " + str(stats["successful_queries"])
show "  Failed queries: " + str(stats["failed_queries"])
show "  Cache hits: " + str(stats["cache_hits"])
show "  Average query time: " + str(round(stats["average_query_time"] * 1000, 2)) + "ms"

think "Show recent transaction history"
history = user_db.get_transaction_history()
show "πŸ“ Recent Transaction History:"
for transaction in history[-5:]:  # Show last 5 transactions
    status = "βœ…" if transaction["success"] else "❌"
    show "  " + status + " " + transaction["query"][:50] + "... (" + str(round(transaction["execution_time"] * 1000, 2)) + "ms)"

user_db.close_connection()

import time

show "βœ… SQL database operations completed!"

πŸ“„ NoSQL Database Integration

think "NoSQL databases are perfect for flexible, document-based data"

class NoSQLDatabase:
    function __init__(connection_string, database_type):
        self.connection_string = connection_string
        self.database_type = database_type
        self.connection = None
        self.collections = {}
        self.query_history = []
        self.performance_metrics = {
            "total_operations": 0,
            "successful_operations": 0,
            "failed_operations": 0,
            "average_operation_time": 0
        }
        
        show "πŸ“„ NoSQL Database initialized: " + database_type
    
    function connect():
        think "Connect to NoSQL database"
        
        try:
            self.connection = self.simulate_nosql_connection()
            show "βœ… Connected to " + self.database_type + " database"
            return True
        except Exception as e:
            show "❌ Connection failed: " + str(e)
            raise NoSQLConnectionError("Failed to connect to NoSQL database")
    
    function simulate_nosql_connection():
        think "Simulate NoSQL database connection"
        
        import random
        when random.random() < 0.05:  # 5% failure rate
            raise ConnectionError("NoSQL connection failed")
        
        return {
            "connected": True,
            "database_type": self.database_type,
            "connection_id": random.randint(1000, 9999)
        }
    
    function get_collection(collection_name):
        think "Get or create collection reference"
        
        when collection_name not in self.collections:
            self.collections[collection_name] = {
                "name": collection_name,
                "documents": [],
                "indexes": {},
                "document_count": 0
            }
            show "πŸ“ Created collection: " + collection_name
        
        return self.collections[collection_name]
    
    function insert_document(collection_name, document, options):
        think "Insert document into collection"
        
        operation_start = time.time()
        self.performance_metrics["total_operations"] += 1
        
        try:
            collection = self.get_collection(collection_name)
            
            think "Generate document ID if not provided"
            when "_id" not in document:
                import random
                document["_id"] = "doc_" + str(random.randint(100000, 999999))
            
            think "Add metadata"
            document["_created_at"] = time.time()
            document["_updated_at"] = time.time()
            
            think "Insert document"
            collection["documents"].append(document)
            collection["document_count"] += 1
            
            operation_time = time.time() - operation_start
            self.update_average_operation_time(operation_time)
            self.performance_metrics["successful_operations"] += 1
            
            think "Log operation"
            self.query_history.append({
                "operation": "INSERT",
                "collection": collection_name,
                "document_id": document["_id"],
                "execution_time": operation_time,
                "timestamp": time.time(),
                "success": True
            })
            
            show "βž• Document inserted: " + document["_id"]
            return {"inserted_id": document["_id"], "success": True}
            
        except Exception as e:
            self.performance_metrics["failed_operations"] += 1
            
            self.query_history.append({
                "operation": "INSERT",
                "collection": collection_name,
                "error": str(e),
                "execution_time": time.time() - operation_start,
                "timestamp": time.time(),
                "success": False
            })
            
            show "❌ Insert failed: " + str(e)
            raise NoSQLOperationError("Insert operation failed: " + str(e))
    
    function find_documents(collection_name, query, options):
        think "Find documents matching query"
        
        operation_start = time.time()
        self.performance_metrics["total_operations"] += 1
        
        try:
            collection = self.get_collection(collection_name)
            matching_documents = []
            
            for document in collection["documents"]:
                when self.matches_query(document, query):
                    matching_documents.append(document)
            
            think "Apply options (limit, sort, etc.)"
            when options and "limit" in options:
                matching_documents = matching_documents[:options["limit"]]
            
            when options and "sort" in options:
                sort_field = options["sort"]["field"]
                sort_order = options["sort"].get("order", "asc")
                reverse = sort_order == "desc"
                
                matching_documents.sort(
                    key=lambda doc: doc.get(sort_field, ""),
                    reverse=reverse
                )
            
            operation_time = time.time() - operation_start
            self.update_average_operation_time(operation_time)
            self.performance_metrics["successful_operations"] += 1
            
            self.query_history.append({
                "operation": "FIND",
                "collection": collection_name,
                "query": query,
                "result_count": len(matching_documents),
                "execution_time": operation_time,
                "timestamp": time.time(),
                "success": True
            })
            
            show "πŸ” Found " + str(len(matching_documents)) + " documents"
            return matching_documents
            
        except Exception as e:
            self.performance_metrics["failed_operations"] += 1
            
            self.query_history.append({
                "operation": "FIND",
                "collection": collection_name,
                "query": query,
                "error": str(e),
                "execution_time": time.time() - operation_start,
                "timestamp": time.time(),
                "success": False
            })
            
            show "❌ Find failed: " + str(e)
            raise NoSQLOperationError("Find operation failed: " + str(e))
    
    function update_document(collection_name, query, update_data, options):
        think "Update documents matching query"
        
        operation_start = time.time()
        self.performance_metrics["total_operations"] += 1
        
        try:
            collection = self.get_collection(collection_name)
            updated_count = 0
            
            for document in collection["documents"]:
                when self.matches_query(document, query):
                    think "Update document fields"
                    for field, value in update_data.items():
                        document[field] = value
                    
                    document["_updated_at"] = time.time()
                    updated_count += 1
                    
                    when options and options.get("update_one", False):
                        break
            
            operation_time = time.time() - operation_start
            self.update_average_operation_time(operation_time)
            self.performance_metrics["successful_operations"] += 1
            
            self.query_history.append({
                "operation": "UPDATE",
                "collection": collection_name,
                "query": query,
                "updated_count": updated_count,
                "execution_time": operation_time,
                "timestamp": time.time(),
                "success": True
            })
            
            show "✏️ Updated " + str(updated_count) + " documents"
            return {"updated_count": updated_count, "success": True}
            
        except Exception as e:
            self.performance_metrics["failed_operations"] += 1
            
            self.query_history.append({
                "operation": "UPDATE",
                "collection": collection_name,
                "query": query,
                "error": str(e),
                "execution_time": time.time() - operation_start,
                "timestamp": time.time(),
                "success": False
            })
            
            show "❌ Update failed: " + str(e)
            raise NoSQLOperationError("Update operation failed: " + str(e))
    
    function delete_documents(collection_name, query, options):
        think "Delete documents matching query"
        
        operation_start = time.time()
        self.performance_metrics["total_operations"] += 1
        
        try:
            collection = self.get_collection(collection_name)
            documents_to_delete = []
            
            for i, document in enumerate(collection["documents"]):
                when self.matches_query(document, query):
                    documents_to_delete.append(i)
                    
                    when options and options.get("delete_one", False):
                        break
            
            think "Delete documents in reverse order to maintain indices"
            for index in reversed(documents_to_delete):
                del collection["documents"][index]
                collection["document_count"] -= 1
            
            deleted_count = len(documents_to_delete)
            
            operation_time = time.time() - operation_start
            self.update_average_operation_time(operation_time)
            self.performance_metrics["successful_operations"] += 1
            
            self.query_history.append({
                "operation": "DELETE",
                "collection": collection_name,
                "query": query,
                "deleted_count": deleted_count,
                "execution_time": operation_time,
                "timestamp": time.time(),
                "success": True
            })
            
            show "πŸ—‘οΈ Deleted " + str(deleted_count) + " documents"
            return {"deleted_count": deleted_count, "success": True}
            
        except Exception as e:
            self.performance_metrics["failed_operations"] += 1
            
            self.query_history.append({
                "operation": "DELETE",
                "collection": collection_name,
                "query": query,
                "error": str(e),
                "execution_time": time.time() - operation_start,
                "timestamp": time.time(),
                "success": False
            })
            
            show "❌ Delete failed: " + str(e)
            raise NoSQLOperationError("Delete operation failed: " + str(e))
    
    function matches_query(document, query):
        think "Check if document matches query criteria"
        
        when not query:
            return True
        
        for field, condition in query.items():
            when field not in document:
                return False
            
            document_value = document[field]
            
            when isinstance(condition, dict):
                think "Handle query operators"
                for operator, value in condition.items():
                    when operator == "$eq":
                        when document_value != value:
                            return False
                    elif operator == "$ne":
                        when document_value == value:
                            return False
                    elif operator == "$gt":
                        when document_value <= value:
                            return False
                    elif operator == "$gte":
                        when document_value < value:
                            return False
                    elif operator == "$lt":
                        when document_value >= value:
                            return False
                    elif operator == "$lte":
                        when document_value > value:
                            return False
                    elif operator == "$in":
                        when document_value not in value:
                            return False
                    elif operator == "$nin":
                        when document_value in value:
                            return False
            else:
                think "Simple equality check"
                when document_value != condition:
                    return False
        
        return True
    
    function update_average_operation_time(operation_time):
        current_avg = self.performance_metrics["average_operation_time"]
        successful_ops = self.performance_metrics["successful_operations"]
        
        self.performance_metrics["average_operation_time"] = (
            (current_avg * (successful_ops - 1) + operation_time) / successful_ops
        )
    
    function get_collection_stats(collection_name):
        collection = self.get_collection(collection_name)
        return {
            "name": collection_name,
            "document_count": collection["document_count"],
            "total_documents": len(collection["documents"]),
            "indexes": len(collection["indexes"])
        }
    
    function get_performance_metrics():
        return self.performance_metrics
    
    function get_query_history():
        return self.query_history
    
    function close_connection():
        when self.connection:
            self.connection = None
            show "πŸ”Œ NoSQL database connection closed"

class NoSQLConnectionError(Exception):
    pass

class NoSQLOperationError(Exception):
    pass

think "Let's create a comprehensive product catalog system with NoSQL database"

product_db = NoSQLDatabase("mongodb://localhost:27017/productdb", "MongoDB")

think "Connect to the database"
product_db.connect()

think "Test comprehensive NoSQL operations"
show "πŸ§ͺ Testing NoSQL database operations..."

try:
    think "Test 1: Insert product documents"
    products = [
        {
            "name": "Wireless Headphones",
            "category": "Electronics",
            "price": 99.99,
            "description": "High-quality wireless headphones with noise cancellation",
            "tags": ["audio", "wireless", "noise-cancelling"],
            "inventory": 50,
            "ratings": [4.5, 4.8, 4.2, 4.7]
        },
        {
            "name": "Smart Watch",
            "category": "Electronics",
            "price": 299.99,
            "description": "Advanced smartwatch with health monitoring",
            "tags": ["wearable", "health", "smart"],
            "inventory": 30,
            "ratings": [4.3, 4.6, 4.1, 4.4]
        },
        {
            "name": "Organic Coffee Beans",
            "category": "Food",
            "price": 24.99,
            "description": "Premium organic coffee beans from Guatemala",
            "tags": ["coffee", "organic", "fair-trade"],
            "inventory": 100,
            "ratings": [4.9, 4.7, 4.8, 4.6]
        },
        {
            "name": "Yoga Mat",
            "category": "Sports",
            "price": 39.99,
            "description": "Eco-friendly yoga mat with excellent grip",
            "tags": ["yoga", "fitness", "eco-friendly"],
            "inventory": 75,
            "ratings": [4.4, 4.5, 4.3, 4.6]
        }
    ]
    
    for product in products:
        result = product_db.insert_document("products", product, None)
        show "βž• Inserted product: " + product["name"]
    
    think "Test 2: Find all products"
    all_products = product_db.find_documents("products", {}, {"limit": 10})
    show "πŸ“‹ All products (" + str(len(all_products)) + " found):"
    for product in all_products:
        show "  - " + product["name"] + " ($" + str(product["price"]) + ")"
    
    think "Test 3: Find products by category"
    electronics = product_db.find_documents("products", {"category": "Electronics"}, None)
    show "πŸ“± Electronics products:"
    for product in electronics:
        show "  - " + product["name"] + " (Inventory: " + str(product["inventory"]) + ")"
    
    think "Test 4: Find products with complex query"
    affordable_products = product_db.find_documents("products", {
        "price": {"$lt": 100},
        "inventory": {"$gt": 40}
    }, {"sort": {"field": "price", "order": "asc"}})
    
    show "πŸ’° Affordable products with good inventory:"
    for product in affordable_products:
        show "  - " + product["name"] + " ($" + str(product["price"]) + ", Stock: " + str(product["inventory"]) + ")"
    
    think "Test 5: Find products by tags"
    healthy_products = product_db.find_documents("products", {
        "tags": {"$in": ["health", "organic"]}
    }, None)
    
    show "🌱 Health-related products:"
    for product in healthy_products:
        show "  - " + product["name"] + " (Tags: " + str(product["tags"]) + ")"
    
    think "Test 6: Update product inventory"
    result = product_db.update_document("products", 
                                      {"name": "Wireless Headphones"}, 
                                      {"inventory": 45, "status": "updated"}, 
                                      {"update_one": True})
    show "✏️ Updated inventory for Wireless Headphones"
    
    think "Test 7: Update multiple products"
    result = product_db.update_document("products", 
                                      {"category": "Electronics"}, 
                                      {"featured": True}, 
                                      {"update_one": False})
    show "✏️ Marked all electronics as featured"
    
    think "Test 8: Add new product with complex data"
    complex_product = {
        "name": "Smart Home Hub",
        "category": "Electronics",
        "price": 149.99,
        "description": "Central hub for smart home automation",
        "tags": ["smart-home", "automation", "IoT"],
        "inventory": 25,
        "ratings": [4.6, 4.8, 4.7],
        "specifications": {
            "connectivity": ["WiFi", "Bluetooth", "Zigbee"],
            "compatibility": ["Alexa", "Google Home", "Apple HomeKit"],
            "power": "AC Adapter included"
        },
        "reviews": [
            {"user": "techie123", "rating": 5, "comment": "Excellent device!"},
            {"user": "homeowner", "rating": 4, "comment": "Works great with my setup"}
        ]
    }
    
    result = product_db.insert_document("products", complex_product, None)
    show "βž• Added complex product: " + complex_product["name"]
    
    think "Test 9: Query with nested field"
    smart_products = product_db.find_documents("products", {
        "specifications.connectivity": {"$in": ["WiFi"]}
    }, None)
    
    show "πŸ“Ά WiFi-enabled products:"
    for product in smart_products:
        show "  - " + product["name"]
    
    think "Test 10: Delete products"
    result = product_db.delete_documents("products", {"inventory": {"$lt": 30}}, {"delete_one": False})
    show "πŸ—‘οΈ Deleted " + str(result["deleted_count"]) + " products with low inventory"
    
    think "Test 11: Final product count"
    remaining_products = product_db.find_documents("products", {}, None)
    show "πŸ“Š Remaining products: " + str(len(remaining_products))
    for product in remaining_products:
        show "  - " + product["name"] + " (Inventory: " + str(product["inventory"]) + ")"
    
except Exception as e:
    show "πŸ’₯ NoSQL operation failed: " + str(e)

think "Show performance metrics"
metrics = product_db.get_performance_metrics()
show "πŸ“Š NoSQL Performance Metrics:"
show "  Total operations: " + str(metrics["total_operations"])
show "  Successful operations: " + str(metrics["successful_operations"])
show "  Failed operations: " + str(metrics["failed_operations"])
show "  Average operation time: " + str(round(metrics["average_operation_time"] * 1000, 2)) + "ms"

think "Show collection statistics"
stats = product_db.get_collection_stats("products")
show "πŸ“‹ Products Collection Stats:"
show "  Collection name: " + stats["name"]
show "  Document count: " + str(stats["document_count"])
show "  Total documents: " + str(stats["total_documents"])

think "Show recent query history"
history = product_db.get_query_history()
show "πŸ“ Recent Query History:"
for query in history[-5:]:  # Show last 5 queries
    status = "βœ…" if query["success"] else "❌"
    show "  " + status + " " + query["operation"] + " on " + query["collection"] + " (" + str(round(query["execution_time"] * 1000, 2)) + "ms)"

product_db.close_connection()

show "βœ… NoSQL database operations completed!"

πŸ” Authentication & Authorization: Securing Your Applications

🎯 Section Goal

Master implementing secure authentication and authorization systems to protect your applications and control user access to resources and features.

πŸ”‘ JWT Authentication Implementation

think "JWT (JSON Web Tokens) provide stateless authentication for modern applications"

class JWTManager:
    function __init__(secret_key, algorithm, expiration_hours):
        self.secret_key = secret_key
        self.algorithm = algorithm or "HS256"
        self.expiration_hours = expiration_hours or 24
        self.blacklisted_tokens = []
        self.issued_tokens = {}
        self.token_stats = {
            "total_generated": 0,
            "total_verified": 0,
            "failed_verifications": 0,
            "blacklisted_attempts": 0
        }
        
        show "πŸ”‘ JWT Manager initialized with " + algorithm + " algorithm"
    
    function generate_token(user_data, custom_claims, expires_in):
        think "Generate a JWT token with user data and claims"
        
        import time
        import json
        import base64
        import hmac
        import hashlib
        
        current_time = int(time.time())
        expiration_time = current_time + ((expires_in or self.expiration_hours) * 3600)
        
        think "Create JWT header"
        header = {
            "alg": self.algorithm,
            "typ": "JWT"
        }
        
        think "Create JWT payload"
        payload = {
            "user_id": user_data["id"],
            "username": user_data["username"],
            "email": user_data.get("email", ""),
            "roles": user_data.get("roles", []),
            "iat": current_time,  # issued at
            "exp": expiration_time,  # expires at
            "iss": "SonaApp",  # issuer
            "sub": str(user_data["id"])  # subject
        }
        
        think "Add custom claims"
        when custom_claims:
            payload.update(custom_claims)
        
        think "Encode header and payload"
        header_encoded = self.base64_url_encode(json.dumps(header))
        payload_encoded = self.base64_url_encode(json.dumps(payload))
        
        think "Create signature"
        message = header_encoded + "." + payload_encoded
        signature = self.create_signature(message)
        
        think "Combine to create final token"
        token = header_encoded + "." + payload_encoded + "." + signature
        
        think "Track token issuance"
        self.issued_tokens[token] = {
            "user_id": user_data["id"],
            "issued_at": current_time,
            "expires_at": expiration_time,
            "payload": payload
        }
        
        self.token_stats["total_generated"] += 1
        
        show "🎫 Generated JWT token for user: " + user_data["username"]
        return {
            "token": token,
            "expires_at": expiration_time,
            "expires_in": (expires_in or self.expiration_hours) * 3600
        }
    
    function verify_token(token):
        think "Verify JWT token authenticity and validity"
        
        self.token_stats["total_verified"] += 1
        
        try:
            think "Check if token is blacklisted"
            when token in self.blacklisted_tokens:
                self.token_stats["blacklisted_attempts"] += 1
                raise AuthenticationError("Token has been revoked")
            
            think "Split token into parts"
            parts = token.split(".")
            when len(parts) != 3:
                raise AuthenticationError("Invalid token format")
            
            header_encoded, payload_encoded, signature = parts
            
            think "Verify signature"
            message = header_encoded + "." + payload_encoded
            expected_signature = self.create_signature(message)
            
            when signature != expected_signature:
                raise AuthenticationError("Invalid token signature")
            
            think "Decode payload"
            payload_json = self.base64_url_decode(payload_encoded)
            payload = json.loads(payload_json)
            
            think "Check expiration"
            current_time = int(time.time())
            when payload["exp"] < current_time:
                raise AuthenticationError("Token has expired")
            
            think "Verify issuer"
            when payload.get("iss") != "SonaApp":
                raise AuthenticationError("Invalid token issuer")
            
            show "βœ… Token verified successfully for user: " + payload["username"]
            return {
                "valid": True,
                "payload": payload,
                "user_id": payload["user_id"],
                "username": payload["username"],
                "roles": payload.get("roles", [])
            }
            
        except Exception as e:
            self.token_stats["failed_verifications"] += 1
            show "❌ Token verification failed: " + str(e)
            return {
                "valid": False,
                "error": str(e)
            }
    
    function refresh_token(old_token):
        think "Generate new token from existing valid token"
        
        verification_result = self.verify_token(old_token)
        
        when not verification_result["valid"]:
            raise AuthenticationError("Cannot refresh invalid token")
        
        payload = verification_result["payload"]
        
        think "Create new token with updated timestamp"
        user_data = {
            "id": payload["user_id"],
            "username": payload["username"],
            "email": payload.get("email", ""),
            "roles": payload.get("roles", [])
        }
        
        think "Blacklist old token"
        self.blacklist_token(old_token)
        
        new_token = self.generate_token(user_data, None, None)
        
        show "πŸ”„ Token refreshed for user: " + user_data["username"]
        return new_token
    
    function blacklist_token(token):
        think "Add token to blacklist"
        
        when token not in self.blacklisted_tokens:
            self.blacklisted_tokens.append(token)
            show "🚫 Token blacklisted"
        
        think "Clean up old blacklisted tokens"
        self.cleanup_expired_tokens()
    
    function cleanup_expired_tokens():
        think "Remove expired tokens from blacklist and issued tokens"
        
        current_time = int(time.time())
        
        think "Clean up issued tokens"
        expired_tokens = []
        for token, info in self.issued_tokens.items():
            when info["expires_at"] < current_time:
                expired_tokens.append(token)
        
        for token in expired_tokens:
            del self.issued_tokens[token]
            when token in self.blacklisted_tokens:
                self.blacklisted_tokens.remove(token)
        
        when len(expired_tokens) > 0:
            show "🧹 Cleaned up " + str(len(expired_tokens)) + " expired tokens"
    
    function base64_url_encode(data):
        think "Base64 URL-safe encoding"
        
        import base64
        
        encoded = base64.urlsafe_b64encode(data.encode()).decode()
        return encoded.rstrip("=")  # Remove padding
    
    function base64_url_decode(data):
        think "Base64 URL-safe decoding"
        
        import base64
        
        think "Add padding if needed"
        padding = len(data) % 4
        when padding:
            data += "=" * (4 - padding)
        
        return base64.urlsafe_b64decode(data).decode()
    
    function create_signature(message):
        think "Create HMAC signature for JWT"
        
        import hmac
        import hashlib
        import base64
        
        signature_bytes = hmac.new(
            self.secret_key.encode(),
            message.encode(),
            hashlib.sha256
        ).digest()
        
        signature = base64.urlsafe_b64encode(signature_bytes).decode()
        return signature.rstrip("=")
    
    function get_token_stats():
        return self.token_stats
    
    function get_active_tokens():
        think "Get list of active (non-expired) tokens"
        
        self.cleanup_expired_tokens()
        return list(self.issued_tokens.keys())

class AuthenticationError(Exception):
    pass

class AuthenticationSystem:
    function __init__(jwt_manager):
        self.jwt_manager = jwt_manager
        self.users = {}
        self.login_attempts = {}
        self.security_settings = {
            "max_login_attempts": 5,
            "lockout_duration": 300,  # 5 minutes
            "password_min_length": 8,
            "require_special_chars": True
        }
        
        show "πŸ” Authentication System initialized"
    
    function register_user(username, email, password, roles):
        think "Register a new user with secure password hashing"
        
        when username in self.users:
            raise AuthenticationError("Username already exists")
        
        think "Validate password strength"
        password_validation = self.validate_password(password)
        when not password_validation["valid"]:
            raise AuthenticationError("Password validation failed: " + password_validation["error"])
        
        think "Hash password (simplified - use bcrypt in production)"
        password_hash = self.hash_password(password)
        
        user_data = {
            "id": len(self.users) + 1,
            "username": username,
            "email": email,
            "password_hash": password_hash,
            "roles": roles or ["user"],
            "created_at": time.time(),
            "last_login": None,
            "active": True
        }
        
        self.users[username] = user_data
        
        show "πŸ‘€ User registered: " + username + " with roles: " + str(roles)
        return {
            "user_id": user_data["id"],
            "username": username,
            "email": email,
            "roles": user_data["roles"]
        }
    
    function authenticate_user(username, password):
        think "Authenticate user credentials"
        
        think "Check if user is locked out"
        when self.is_user_locked_out(username):
            raise AuthenticationError("Account temporarily locked due to failed login attempts")
        
        think "Check if user exists"
        when username not in self.users:
            self.record_failed_login(username)
            raise AuthenticationError("Invalid username or password")
        
        user = self.users[username]
        
        think "Check if user is active"
        when not user["active"]:
            raise AuthenticationError("Account is disabled")
        
        think "Verify password"
        when not self.verify_password(password, user["password_hash"]):
            self.record_failed_login(username)
            raise AuthenticationError("Invalid username or password")
        
        think "Reset failed login attempts on successful login"
        when username in self.login_attempts:
            del self.login_attempts[username]
        
        think "Update last login timestamp"
        user["last_login"] = time.time()
        
        think "Generate JWT token"
        token_data = self.jwt_manager.generate_token(user, None, None)
        
        show "πŸŽ‰ User authenticated successfully: " + username
        return {
            "user": {
                "id": user["id"],
                "username": user["username"],
                "email": user["email"],
                "roles": user["roles"]
            },
            "token": token_data["token"],
            "expires_at": token_data["expires_at"]
        }
    
    function logout_user(token):
        think "Logout user by blacklisting their token"
        
        verification_result = self.jwt_manager.verify_token(token)
        
        when verification_result["valid"]:
            self.jwt_manager.blacklist_token(token)
            show "πŸ‘‹ User logged out: " + verification_result["username"]
            return {"success": True, "message": "Logged out successfully"}
        else:
            raise AuthenticationError("Invalid token for logout")
    
    function validate_password(password):
        think "Validate password strength"
        
        when len(password) < self.security_settings["password_min_length"]:
            return {"valid": False, "error": "Password too short"}
        
        when self.security_settings["require_special_chars"]:
            special_chars = "!@#$%^&*(),.?\":{}|<>"
            when not any(char in special_chars for char in password):
                return {"valid": False, "error": "Password must contain special characters"}
        
        when not any(char.isdigit() for char in password):
            return {"valid": False, "error": "Password must contain at least one number"}
        
        when not any(char.isupper() for char in password):
            return {"valid": False, "error": "Password must contain at least one uppercase letter"}
        
        return {"valid": True}
    
    function hash_password(password):
        think "Hash password using SHA-256 (use bcrypt in production)"
        
        import hashlib
        
        salt = "sona_salt_123"  # Use random salt in production
        return hashlib.sha256((password + salt).encode()).hexdigest()
    
    function verify_password(password, password_hash):
        think "Verify password against hash"
        
        return self.hash_password(password) == password_hash
    
    function is_user_locked_out(username):
        think "Check if user is locked out due to failed login attempts"
        
        when username not in self.login_attempts:
            return False
        
        attempt_info = self.login_attempts[username]
        
        when attempt_info["count"] >= self.security_settings["max_login_attempts"]:
            time_since_last_attempt = time.time() - attempt_info["last_attempt"]
            
            when time_since_last_attempt < self.security_settings["lockout_duration"]:
                return True
            else:
                think "Lockout period expired, reset attempts"
                del self.login_attempts[username]
                return False
        
        return False
    
    function record_failed_login(username):
        think "Record failed login attempt"
        
        when username not in self.login_attempts:
            self.login_attempts[username] = {"count": 0, "last_attempt": 0}
        
        self.login_attempts[username]["count"] += 1
        self.login_attempts[username]["last_attempt"] = time.time()
        
        show "⚠️ Failed login attempt for: " + username + " (Attempt " + str(self.login_attempts[username]["count"]) + ")"
    
    function get_user_info(username):
        when username in self.users:
            user = self.users[username]
            return {
                "id": user["id"],
                "username": user["username"],
                "email": user["email"],
                "roles": user["roles"],
                "created_at": user["created_at"],
                "last_login": user["last_login"],
                "active": user["active"]
            }
        else:
            return None
    
    function change_password(username, old_password, new_password):
        think "Change user password"
        
        when username not in self.users:
            raise AuthenticationError("User not found")
        
        user = self.users[username]
        
        think "Verify old password"
        when not self.verify_password(old_password, user["password_hash"]):
            raise AuthenticationError("Current password is incorrect")
        
        think "Validate new password"
        password_validation = self.validate_password(new_password)
        when not password_validation["valid"]:
            raise AuthenticationError("New password validation failed: " + password_validation["error"])
        
        think "Update password"
        user["password_hash"] = self.hash_password(new_password)
        
        show "πŸ”’ Password changed successfully for: " + username
        return {"success": True, "message": "Password changed successfully"}

think "Let's create a comprehensive authentication system"

jwt_manager = JWTManager(
    secret_key="your-super-secret-key-change-in-production",
    algorithm="HS256",
    expiration_hours=24
)

auth_system = AuthenticationSystem(jwt_manager)

import time
import json

show "πŸ§ͺ Testing comprehensive authentication system..."

try:
    think "Test 1: Register users"
    user1 = auth_system.register_user("alice", "[email protected]", "AlicePass123!", ["user", "admin"])
    user2 = auth_system.register_user("bob", "[email protected]", "BobSecure456#", ["user"])
    user3 = auth_system.register_user("charlie", "[email protected]", "CharlieStrong789$", ["user", "moderator"])
    
    show "πŸ‘₯ Registered users:"
    show "  - " + user1["username"] + " (Roles: " + str(user1["roles"]) + ")"
    show "  - " + user2["username"] + " (Roles: " + str(user2["roles"]) + ")"
    show "  - " + user3["username"] + " (Roles: " + str(user3["roles"]) + ")"
    
    think "Test 2: Authenticate users"
    alice_auth = auth_system.authenticate_user("alice", "AlicePass123!")
    bob_auth = auth_system.authenticate_user("bob", "BobSecure456#")
    
    show "πŸ” Authentication successful:"
    show "  - Alice token: " + alice_auth["token"][:50] + "..."
    show "  - Bob token: " + bob_auth["token"][:50] + "..."
    
    think "Test 3: Verify tokens"
    alice_verification = jwt_manager.verify_token(alice_auth["token"])
    bob_verification = jwt_manager.verify_token(bob_auth["token"])
    
    show "βœ… Token verification:"
    show "  - Alice: " + str(alice_verification["valid"]) + " (Roles: " + str(alice_verification["roles"]) + ")"
    show "  - Bob: " + str(bob_verification["valid"]) + " (Roles: " + str(bob_verification["roles"]) + ")"
    
    think "Test 4: Test failed authentication"
    try:
        auth_system.authenticate_user("alice", "wrong_password")
    except AuthenticationError as e:
        show "❌ Failed authentication test passed: " + str(e)
    
    think "Test 5: Test multiple failed attempts (lockout)"
    for i in range(6):
        try:
            auth_system.authenticate_user("charlie", "wrong_password")
        except AuthenticationError as e:
            show "⚠️ Failed attempt " + str(i + 1) + ": " + str(e)
    
    think "Test 6: Test password change"
    password_change = auth_system.change_password("bob", "BobSecure456#", "NewBobPass789!")
    show "πŸ”’ Password change: " + password_change["message"]
    
    think "Test 7: Test authentication with new password"
    bob_new_auth = auth_system.authenticate_user("bob", "NewBobPass789!")
    show "βœ… Authentication with new password successful"
    
    think "Test 8: Test token refresh"
    new_token = jwt_manager.refresh_token(alice_auth["token"])
    show "πŸ”„ Token refreshed: " + new_token["token"][:50] + "..."
    
    think "Test 9: Test logout (blacklist token)"
    logout_result = auth_system.logout_user(bob_auth["token"])
    show "πŸ‘‹ Logout: " + logout_result["message"]
    
    think "Test 10: Test blacklisted token"
    blacklisted_verification = jwt_manager.verify_token(bob_auth["token"])
    show "🚫 Blacklisted token verification: " + str(blacklisted_verification["valid"])
    
    think "Test 11: Test invalid token format"
    invalid_verification = jwt_manager.verify_token("invalid.token.format")
    show "❌ Invalid token verification: " + str(invalid_verification["valid"])
    
    think "Test 12: Test user info retrieval"
    alice_info = auth_system.get_user_info("alice")
    show "πŸ‘€ Alice info: " + alice_info["username"] + " - " + alice_info["email"]
    
except Exception as e:
    show "πŸ’₯ Authentication test failed: " + str(e)

think "Show system statistics"
token_stats = jwt_manager.get_token_stats()
show "πŸ“Š JWT Token Statistics:"
show "  Total generated: " + str(token_stats["total_generated"])
show "  Total verified: " + str(token_stats["total_verified"])
show "  Failed verifications: " + str(token_stats["failed_verifications"])
show "  Blacklisted attempts: " + str(token_stats["blacklisted_attempts"])

active_tokens = jwt_manager.get_active_tokens()
show "🎫 Active tokens: " + str(len(active_tokens))

show "βœ… Authentication system testing completed!"

πŸ›‘οΈ OAuth Integration and Social Login

think "OAuth provides secure third-party authentication for modern applications"

class OAuthProvider:
    function __init__(provider_name, client_id, client_secret, config):
        self.provider_name = provider_name
        self.client_id = client_id
        self.client_secret = client_secret
        self.config = config
        self.authorization_codes = {}
        self.access_tokens = {}
        self.refresh_tokens = {}
        self.oauth_stats = {
            "authorization_requests": 0,
            "token_exchanges": 0,
            "successful_logins": 0,
            "failed_logins": 0
        }
        
        show "πŸ›‘οΈ OAuth Provider initialized: " + provider_name
    
    function generate_authorization_url(redirect_uri, scope, state):
        think "Generate OAuth authorization URL"
        
        import random
        import urllib.parse
        
        self.oauth_stats["authorization_requests"] += 1
        
        auth_params = {
            "client_id": self.client_id,
            "redirect_uri": redirect_uri,
            "scope": scope or "read",
            "response_type": "code",
            "state": state or self.generate_random_state()
        }
        
        query_string = urllib.parse.urlencode(auth_params)
        authorization_url = self.config["authorization_endpoint"] + "?" + query_string
        
        show "πŸ”— Generated authorization URL for " + self.provider_name
        return {
            "authorization_url": authorization_url,
            "state": auth_params["state"]
        }
    
    function exchange_code_for_token(authorization_code, redirect_uri):
        think "Exchange authorization code for access token"
        
        self.oauth_stats["token_exchanges"] += 1
        
        try:
            think "Simulate token exchange request"
            token_response = self.simulate_token_exchange(authorization_code, redirect_uri)
            
            think "Store tokens"
            access_token = token_response["access_token"]
            refresh_token = token_response.get("refresh_token")
            
            self.access_tokens[access_token] = {
                "user_id": token_response["user_id"],
                "scope": token_response["scope"],
                "expires_at": time.time() + token_response["expires_in"],
                "created_at": time.time()
            }
            
            when refresh_token:
                self.refresh_tokens[refresh_token] = {
                    "access_token": access_token,
                    "user_id": token_response["user_id"],
                    "created_at": time.time()
                }
            
            show "🎫 Token exchange successful for " + self.provider_name
            return token_response
            
        except Exception as e:
            show "❌ Token exchange failed: " + str(e)
            raise OAuthError("Token exchange failed: " + str(e))
    
    function simulate_token_exchange(authorization_code, redirect_uri):
        think "Simulate OAuth token exchange"
        
        import random
        
        think "Simulate occasional failures"
        when random.random() < 0.05:  # 5% failure rate
            raise Exception("Token exchange failed")
        
        return {
            "access_token": "oauth_access_" + str(random.randint(100000, 999999)),
            "refresh_token": "oauth_refresh_" + str(random.randint(100000, 999999)),
            "token_type": "Bearer",
            "expires_in": 3600,
            "scope": "read write",
            "user_id": "oauth_user_" + str(random.randint(1000, 9999))
        }
    
    function get_user_info(access_token):
        think "Get user information using access token"
        
        try:
            when access_token not in self.access_tokens:
                raise OAuthError("Invalid access token")
            
            token_info = self.access_tokens[access_token]
            
            think "Check if token is expired"
            when token_info["expires_at"] < time.time():
                raise OAuthError("Access token expired")
            
            think "Simulate user info request"
            user_info = self.simulate_user_info_request(token_info["user_id"])
            
            show "πŸ‘€ Retrieved user info from " + self.provider_name
            return user_info
            
        except Exception as e:
            show "❌ Failed to get user info: " + str(e)
            raise OAuthError("Failed to get user info: " + str(e))
    
    function simulate_user_info_request(user_id):
        think "Simulate user info API request"
        
        import random
        
        providers_data = {
            "google": {
                "id": user_id,
                "name": "John Doe",
                "email": "[email protected]",
                "picture": "https://example.com/avatar.jpg",
                "verified_email": True
            },
            "github": {
                "id": user_id,
                "login": "johndoe",
                "name": "John Doe",
                "email": "[email protected]",
                "avatar_url": "https://github.com/avatar.jpg",
                "public_repos": 25
            },
            "facebook": {
                "id": user_id,
                "name": "John Doe",
                "email": "[email protected]",
                "picture": {"data": {"url": "https://facebook.com/avatar.jpg"}},
                "verified": True
            }
        }
        
        return providers_data.get(self.provider_name.lower(), {
            "id": user_id,
            "name": "OAuth User",
            "email": "user@" + self.provider_name.lower() + ".com"
        })
    
    function refresh_access_token(refresh_token):
        think "Refresh expired access token"
        
        try:
            when refresh_token not in self.refresh_tokens:
                raise OAuthError("Invalid refresh token")
            
            refresh_info = self.refresh_tokens[refresh_token]
            
            think "Generate new access token"
            new_token_response = self.simulate_token_exchange("refresh_code", None)
            
            think "Update stored tokens"
            old_access_token = refresh_info["access_token"]
            new_access_token = new_token_response["access_token"]
            
            when old_access_token in self.access_tokens:
                del self.access_tokens[old_access_token]
            
            self.access_tokens[new_access_token] = {
                "user_id": refresh_info["user_id"],
                "scope": new_token_response["scope"],
                "expires_at": time.time() + new_token_response["expires_in"],
                "created_at": time.time()
            }
            
            self.refresh_tokens[refresh_token]["access_token"] = new_access_token
            
            show "πŸ”„ Access token refreshed for " + self.provider_name
            return new_token_response
            
        except Exception as e:
            show "❌ Token refresh failed: " + str(e)
            raise OAuthError("Token refresh failed: " + str(e))
    
    function revoke_token(token):
        think "Revoke access or refresh token"
        
        revoked = False
        
        when token in self.access_tokens:
            del self.access_tokens[token]
            revoked = True
        
        when token in self.refresh_tokens:
            del self.refresh_tokens[token]
            revoked = True
        
        when revoked:
            show "🚫 Token revoked for " + self.provider_name
            return {"success": True, "message": "Token revoked"}
        else:
            raise OAuthError("Token not found")
    
    function generate_random_state():
        think "Generate random state for CSRF protection"
        
        import random
        import string
        
        return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(32))
    
    function get_oauth_stats():
        return self.oauth_stats

class OAuthError(Exception):
    pass

class SocialAuthManager:
    function __init__(jwt_manager):
        self.jwt_manager = jwt_manager
        self.providers = {}
        self.social_users = {}
        self.provider_mappings = {}
        self.auth_stats = {
            "total_social_logins": 0,
            "successful_social_logins": 0,
            "failed_social_logins": 0
        }
        
        show "🌐 Social Auth Manager initialized"
    
    function register_provider(provider_name, client_id, client_secret, config):
        think "Register OAuth provider"
        
        provider = OAuthProvider(provider_name, client_id, client_secret, config)
        self.providers[provider_name] = provider
        
        show "πŸ“ Registered OAuth provider: " + provider_name
        return provider
    
    function initiate_social_login(provider_name, redirect_uri, scope, state):
        think "Initiate social login flow"
        
        when provider_name not in self.providers:
            raise OAuthError("Provider not registered: " + provider_name)
        
        provider = self.providers[provider_name]
        authorization_data = provider.generate_authorization_url(redirect_uri, scope, state)
        
        show "πŸš€ Initiated social login for " + provider_name
        return authorization_data
    
    function complete_social_login(provider_name, authorization_code, redirect_uri):
        think "Complete social login flow"
        
        self.auth_stats["total_social_logins"] += 1
        
        try:
            when provider_name not in self.providers:
                raise OAuthError("Provider not registered: " + provider_name)
            
            provider = self.providers[provider_name]
            
            think "Exchange code for token"
            token_response = provider.exchange_code_for_token(authorization_code, redirect_uri)
            
            think "Get user info"
            user_info = provider.get_user_info(token_response["access_token"])
            
            think "Find or create user"
            user_data = self.find_or_create_social_user(provider_name, user_info)
            
            think "Generate JWT token"
            jwt_token = self.jwt_manager.generate_token(user_data, {
                "provider": provider_name,
                "social_id": user_info["id"]
            }, None)
            
            self.auth_stats["successful_social_logins"] += 1
            
            show "πŸŽ‰ Social login completed for " + provider_name
            return {
                "user": user_data,
                "token": jwt_token["token"],
                "expires_at": jwt_token["expires_at"],
                "provider": provider_name,
                "social_user_info": user_info
            }
            
        except Exception as e:
            self.auth_stats["failed_social_logins"] += 1
            show "❌ Social login failed: " + str(e)
            raise OAuthError("Social login failed: " + str(e))
    
    function find_or_create_social_user(provider_name, user_info):
        think "Find existing user or create new one"
        
        social_id = user_info["id"]
        provider_key = provider_name + "_" + social_id
        
        when provider_key in self.provider_mappings:
            think "User already exists"
            user_id = self.provider_mappings[provider_key]
            user_data = self.social_users[user_id]
            
            think "Update user info"
            user_data["last_login"] = time.time()
            user_data["social_data"][provider_name] = user_info
            
            show "πŸ‘€ Existing social user logged in: " + user_data["username"]
            return user_data
        
        else:
            think "Create new user"
            user_id = len(self.social_users) + 1
            
            user_data = {
                "id": user_id,
                "username": user_info.get("login", user_info.get("name", "social_user_" + str(user_id))),
                "email": user_info.get("email", ""),
                "name": user_info.get("name", ""),
                "roles": ["user"],
                "created_at": time.time(),
                "last_login": time.time(),
                "social_data": {provider_name: user_info},
                "primary_provider": provider_name
            }
            
            self.social_users[user_id] = user_data
            self.provider_mappings[provider_key] = user_id
            
            show "πŸ‘€ New social user created: " + user_data["username"]
            return user_data
    
    function link_social_account(user_id, provider_name, authorization_code, redirect_uri):
        think "Link additional social account to existing user"
        
        try:
            when user_id not in self.social_users:
                raise OAuthError("User not found")
            
            when provider_name not in self.providers:
                raise OAuthError("Provider not registered: " + provider_name)
            
            provider = self.providers[provider_name]
            
            think "Exchange code for token"
            token_response = provider.exchange_code_for_token(authorization_code, redirect_uri)
            
            think "Get user info"
            user_info = provider.get_user_info(token_response["access_token"])
            
            think "Link to existing user"
            user_data = self.social_users[user_id]
            user_data["social_data"][provider_name] = user_info
            
            social_id = user_info["id"]
            provider_key = provider_name + "_" + social_id
            self.provider_mappings[provider_key] = user_id
            
            show "πŸ”— Social account linked: " + provider_name + " to user " + str(user_id)
            return {
                "success": True,
                "message": "Social account linked successfully",
                "provider": provider_name
            }
        except Exception as e:
            show "❌ Account linking failed: " + str(e)
            raise OAuthError("Account linking failed: " + str(e))
    
    function get_user_social_accounts(user_id):
        think "Get all social accounts for user"
        
        when user_id not in self.social_users:
            return []
        
        user_data = self.social_users[user_id]
        social_accounts = []
        
        for provider_name, provider_data in user_data["social_data"].items():
            social_accounts.append({
                "provider": provider_name,
                "social_id": provider_data["id"],
                "name": provider_data.get("name", ""),
                "email": provider_data.get("email", ""),
                "linked_at": user_data["created_at"]
            })
        
        return social_accounts
    
    function get_auth_stats():
        return self.auth_stats

think "Let's create a comprehensive social authentication system"

social_auth = SocialAuthManager(jwt_manager)

think "Register OAuth providers"
google_provider = social_auth.register_provider("Google", "google_client_id", "google_client_secret", {
    "authorization_endpoint": "https://accounts.google.com/oauth/authorize",
    "token_endpoint": "https://oauth2.googleapis.com/token",
    "user_info_endpoint": "https://www.googleapis.com/oauth2/v2/userinfo"
})

github_provider = social_auth.register_provider("GitHub", "github_client_id", "github_client_secret", {
    "authorization_endpoint": "https://github.com/login/oauth/authorize",
    "token_endpoint": "https://github.com/login/oauth/access_token",
    "user_info_endpoint": "https://api.github.com/user"
})

facebook_provider = social_auth.register_provider("Facebook", "facebook_client_id", "facebook_client_secret", {
    "authorization_endpoint": "https://www.facebook.com/v10.0/dialog/oauth",
    "token_endpoint": "https://graph.facebook.com/v10.0/oauth/access_token",
    "user_info_endpoint": "https://graph.facebook.com/me"
})

show "πŸ§ͺ Testing social authentication system..."

try:
    think "Test 1: Initiate Google login"
    google_auth_url = social_auth.initiate_social_login("Google", "http://localhost:8080/auth/google/callback", "read write", None)
    show "πŸ”— Google auth URL: " + google_auth_url["authorization_url"][:80] + "..."
    
    think "Test 2: Complete Google login"
    google_login = social_auth.complete_social_login("Google", "google_auth_code_123", "http://localhost:8080/auth/google/callback")
    show "πŸŽ‰ Google login completed: " + google_login["user"]["username"]
    
    think "Test 3: Initiate GitHub login"
    github_auth_url = social_auth.initiate_social_login("GitHub", "http://localhost:8080/auth/github/callback", "read user", None)
    show "πŸ”— GitHub auth URL: " + github_auth_url["authorization_url"][:80] + "..."
    
    think "Test 4: Complete GitHub login"
    github_login = social_auth.complete_social_login("GitHub", "github_auth_code_456", "http://localhost:8080/auth/github/callback")
    show "πŸŽ‰ GitHub login completed: " + github_login["user"]["username"]
    
    think "Test 5: Link Facebook account to Google user"
    facebook_link = social_auth.link_social_account(
        google_login["user"]["id"], 
        "Facebook", 
        "facebook_auth_code_789", 
        "http://localhost:8080/auth/facebook/callback"
    )
    show "πŸ”— Facebook account linked: " + facebook_link["message"]
    
    think "Test 6: Get user's social accounts"
    social_accounts = social_auth.get_user_social_accounts(google_login["user"]["id"])
    show "πŸ“± Social accounts for user " + str(google_login["user"]["id"]) + ":"
    for account in social_accounts:
        show "  - " + account["provider"] + ": " + account["name"] + " (" + account["email"] + ")"
    
    think "Test 7: Verify JWT token with social claims"
    token_verification = jwt_manager.verify_token(google_login["token"])
    show "βœ… JWT token verified: " + str(token_verification["valid"])
    show "  Provider: " + token_verification["payload"].get("provider", "N/A")
    show "  Social ID: " + token_verification["payload"].get("social_id", "N/A")
    
    think "Test 8: Test token refresh"
    google_access_token = "oauth_access_123456"  # Simulate from earlier
    google_provider.access_tokens[google_access_token] = {
        "user_id": "oauth_user_1234",
        "scope": "read write",
        "expires_at": time.time() + 3600,
        "created_at": time.time()
    }
    
    user_info = google_provider.get_user_info(google_access_token)
    show "πŸ‘€ Retrieved user info: " + user_info["name"]
    
    think "Test 9: Revoke OAuth token"
    revoke_result = google_provider.revoke_token(google_access_token)
    show "🚫 Token revoked: " + revoke_result["message"]
    
except Exception as e:
    show "πŸ’₯ Social authentication test failed: " + str(e)

think "Show social authentication statistics"
social_stats = social_auth.get_auth_stats()
show "πŸ“Š Social Authentication Statistics:"
show "  Total social logins: " + str(social_stats["total_social_logins"])
show "  Successful logins: " + str(social_stats["successful_social_logins"])
show "  Failed logins: " + str(social_stats["failed_social_logins"])

think "Show provider statistics"
for provider_name, provider in social_auth.providers.items():
    provider_stats = provider.get_oauth_stats()
    show "πŸ“ˆ " + provider_name + " OAuth Statistics:"
    show "  Authorization requests: " + str(provider_stats["authorization_requests"])
    show "  Token exchanges: " + str(provider_stats["token_exchanges"])

show "βœ… Social authentication system testing completed!"

πŸ’‘ Learning Checkpoint 3

🎯 What did we learn?

  • How to implement JWT authentication with comprehensive security features
  • How to create secure password hashing and validation systems
  • How to implement OAuth flows for social login integration
  • How to handle token refresh, revocation, and blacklisting
  • How to build extensible authentication systems with multiple providers

πŸ”„ Progress Update

Chapter 11 Progress: [β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘] 90% Complete
β”œβ”€β”€ βœ… REST API Development (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Database Integration (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Authentication & Authorization (3/3 sections) - COMPLETE!
β”œβ”€β”€ ⚑ Real-Time Communication (0/3 sections)
β”œβ”€β”€ πŸ”„ Data Pipelines & ETL (0/3 sections)
β”œβ”€β”€ πŸ—οΈ Microservices Architecture (0/3 sections)
└── πŸŽ‰ Event-Driven Systems (0/3 sections)

⚑ Real-Time Communication: Building Interactive Systems

🎯 Section Goal

Learn how to implement real-time communication systems using WebSockets, Server-Sent Events (SSE), and message queues for interactive applications.

🌐 WebSocket Implementation

think "WebSockets enable bidirectional communication between clients and servers"

class WebSocketServer:
    function __init__(host, port):
        self.host = host
        self.port = port
        self.clients = []
        self.message_stats = {
            "messages_sent": 0,
            "messages_received": 0,
            "active_connections": 0
        }
        
        show "🌐 WebSocket server initialized on " + host + ":" + str(port)
    
    function start():
        think "Start WebSocket server"
        
        import asyncio
        import websockets
        
        async def handler(websocket, path):
            think "Handle new client connection"
            self.clients.append(websocket)
            self.message_stats["active_connections"] += 1
            
            show "πŸ”— Client connected: " + str(websocket.remote_address)
            
            try:
                async for message in websocket:
                    think "Process received message"
                    self.message_stats["messages_received"] += 1
                    show "πŸ“© Message received: " + message
                    
                    think "Broadcast message to all clients"
                    await self.broadcast(message)
            except Exception as e:
                show "❌ Connection error: " + str(e)
            finally:
                think "Remove client on disconnect"
                self.clients.remove(websocket)
                self.message_stats["active_connections"] -= 1
                show "πŸ”Œ Client disconnected: " + str(websocket.remote_address)
        
        think "Run WebSocket server"
        asyncio.get_event_loop().run_until_complete(
            websockets.serve(handler, self.host, self.port)
        )
        asyncio.get_event_loop().run_forever()
    
    async function broadcast(message):
        think "Send message to all connected clients"
        
        for client in self.clients:
            try:
                await client.send(message)
                self.message_stats["messages_sent"] += 1
                show "πŸ“€ Message sent to client: " + str(client.remote_address)
            except Exception as e:
                show "❌ Failed to send message: " + str(e)
    
    function get_stats():
        return self.message_stats

think "Let's create a WebSocket server"

websocket_server = WebSocketServer(host="localhost", port=8080)
websocket_server.start()

πŸ“‘ Server-Sent Events (SSE) Implementation

think "Server-Sent Events enable unidirectional communication from server to client"

class SSEServer:
    function __init__(host, port):
        self.host = host
        self.port = port
        self.clients = []
        self.event_stats = {
            "events_sent": 0,
            "active_connections": 0
        }
        
        show "πŸ“‘ SSE server initialized on " + host + ":" + str(port)
    
    function start():
        think "Start SSE server"
        
        import asyncio
        from aiohttp import web
        
        async def handler(request):
            think "Handle new client connection"
            response = web.StreamResponse()
            response.headers["Content-Type"] = "text/event-stream"
            response.headers["Cache-Control"] = "no-cache"
            response.headers["Connection"] = "keep-alive"
            
            await response.prepare(request)
            self.clients.append(response)
            self.event_stats["active_connections"] += 1
            
            show "πŸ”— Client connected: " + str(request.remote)
            
            try:
                while True:
                    await asyncio.sleep(10)  # Keep connection alive
            except Exception as e:
                show "❌ Connection error: " + str(e)
            finally:
                think "Remove client on disconnect"
                self.clients.remove(response)
                self.event_stats["active_connections"] -= 1
                show "πŸ”Œ Client disconnected: " + str(request.remote)
            
            return response
        
        think "Run SSE server"
        app = web.Application()
        app.router.add_get("/events", handler)
        web.run_app(app, host=self.host, port=self.port)
    
    async function send_event(event_data):
        think "Send event to all connected clients"
        
        for client in self.clients:
            try:
                await client.write(f"data: {event_data}\n\n".encode())
                self.event_stats["events_sent"] += 1
                show "πŸ“€ Event sent to client"
            except Exception as e:
                show "❌ Failed to send event: " + str(e)
    
    function get_stats():
        return self.event_stats

think "Let's create an SSE server"

sse_server = SSEServer(host="localhost", port=8081)
sse_server.start()

πŸ“¬ Message Queue Implementation

think "Message queues enable asynchronous communication between systems"

class MessageQueue:
    function __init__(queue_name):
        self.queue_name = queue_name
        self.messages = []
        self.queue_stats = {
            "total_requests": 0,
            "failed_requests": 0
        }
        
        show "πŸ“¬ Message queue initialized: " + queue_name
    
    function add_message(message):
        think "Add message to queue"
        
        self.messages.append(message)
        self.queue_stats["total_requests"] += 1
        self.queue_stats["queue_size"] = len(self.messages)
        
        show "πŸ“₯ Message added to queue: " + message

    function process_message():
        think "Process message from queue"
        
        when len(self.messages) == 0:
            show "⚠️ No messages to process"
            return None
        
        message = self.messages.pop(0)
        self.queue_stats["failed_requests"] = 0  # Reset on successful processing
        self.queue_stats["queue_size"] = len(self.messages)
        
        show "πŸ“€ Message processed: " + message
        return message

    function get_stats():
        return self.queue_stats

think "Let's create a message queue"

message_queue = MessageQueue(queue_name="SonaQueue")
message_queue.add_message("Hello, World!")
message_queue.add_message("Real-time communication is awesome!")
message_queue.process_message()
message_queue.process_message()

πŸ’‘ Learning Checkpoint 4

🎯 What did we learn?

  • How to implement WebSocket servers for bidirectional communication
  • How to use Server-Sent Events for unidirectional communication
  • How to create and manage message queues for asynchronous communication
  • How to monitor and optimize real-time communication systems

πŸ”„ Progress Update

Chapter 11 Progress: [β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘] 98% Complete
β”œβ”€β”€ βœ… REST API Development (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Database Integration (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Authentication & Authorization (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Real-Time Communication (3/3 sections) - COMPLETE!
β”œβ”€β”€ πŸ”„ Data Pipelines & ETL (0/3 sections)
β”œβ”€β”€ πŸ—οΈ Microservices Architecture (0/3 sections)
└── πŸŽ‰ Event-Driven Systems (0/3 sections)

πŸ”„ Data Pipelines & ETL: Managing Data Flow

🎯 Section Goal

Learn how to design and implement efficient data pipelines for extracting, transforming, and loading (ETL) data across systems.

πŸ“€ Data Extraction

think "Data extraction involves retrieving data from various sources"

class DataExtractor:
    function __init__(source_type, connection_details):
        self.source_type = source_type
        self.connection_details = connection_details
        self.extraction_stats = {
            "total_extractions": 0,
            "failed_extractions": 0
        }
        
        show "πŸ“€ Data extractor initialized for " + source_type
    
    function extract_data(query):
        think "Extract data from source"
        
        try:
            when self.source_type == "SQL":
                think "Connect to SQL database"
                import sqlite3
                connection = sqlite3.connect(self.connection_details["database"])
                cursor = connection.cursor()
                
                think "Execute query"
                cursor.execute(query)
                data = cursor.fetchall()
                connection.close()
                
                self.extraction_stats["total_extractions"] += 1
                show "βœ… Data extracted successfully from SQL source"
                return data
            
            when self.source_type == "API":
                think "Connect to API"
                import requests
                response = requests.get(self.connection_details["endpoint"], params=query)
                
                when response.status_code != 200:
                    raise Exception("API request failed with status code " + str(response.status_code))
                
                self.extraction_stats["total_extractions"] += 1
                show "βœ… Data extracted successfully from API source"
                return response.json()
            
            else:
                raise Exception("Unsupported source type")
        except Exception as e:
            self.extraction_stats["failed_extractions"] += 1
            show "❌ Data extraction failed: " + str(e)
            return None
    
    function get_stats():
        return self.extraction_stats

think "Let's create a data extractor"

sql_extractor = DataExtractor(source_type="SQL", connection_details={"database": "data.db"})
api_extractor = DataExtractor(source_type="API", connection_details={"endpoint": "https://api.example.com/data"})

sql_data = sql_extractor.extract_data("SELECT * FROM users")
api_data = api_extractor.extract_data({"param1": "value1"})

πŸ”„ Data Transformation

think "Data transformation involves cleaning and converting data into usable formats"

class DataTransformer:
    function __init__():
        self.transformation_stats = {
            "total_transformations": 0,
            "failed_transformations": 0
        }
        
        show "πŸ”„ Data transformer initialized"
    
    function transform_data(data, transformation_rules):
        think "Apply transformation rules to data"
        
        try:
            transformed_data = []
            
            for record in data:
                transformed_record = {}
                
                for field, rule in transformation_rules.items():
                    when field in record:
                        transformed_record[field] = rule(record[field])
                
                transformed_data.append(transformed_record)
            
            self.transformation_stats["total_transformations"] += 1
            show "βœ… Data transformed successfully"
            return transformed_data
        except Exception as e:
            self.transformation_stats["failed_transformations"] += 1
            show "❌ Data transformation failed: " + str(e)
            return None
    
    function get_stats():
        return self.transformation_stats

think "Let's create a data transformer"

data_transformer = DataTransformer()
transformation_rules = {
    "name": lambda x: x.upper(),
    "age": lambda x: x + 1
}

transformed_data = data_transformer.transform_data(sql_data, transformation_rules)

πŸ“₯ Data Loading

think "Data loading involves storing data into target systems"

class DataLoader:
    function __init__(target_type, connection_details):
        self.target_type = target_type
        self.connection_details = connection_details
        self.loading_stats = {
            "total_loads": 0,
            "failed_loads": 0
        }
        
        show "πŸ“₯ Data loader initialized for " + target_type
    
    function load_data(data):
        think "Load data into target system"
        
        try:
            when self.target_type == "SQL":
                think "Connect to SQL database"
                import sqlite3
                connection = sqlite3.connect(self.connection_details["database"])
                cursor = connection.cursor()
                
                think "Insert data"
                for record in data:
                    cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", (record["name"], record["age"]))
                connection.commit()
                connection.close()
                
                self.loading_stats["total_loads"] += 1
                show "βœ… Data loaded successfully into SQL target"
            
            when self.target_type == "API":
                think "Send data to API"
                import requests
                response = requests.post(self.connection_details["endpoint"], json=data)
                
                when response.status_code != 200:
                    raise Exception("API request failed with status code " + str(response.status_code))
                
                self.loading_stats["total_loads"] += 1
                show "βœ… Data loaded successfully into API target"
            
            else:
                raise Exception("Unsupported target type")
        except Exception as e:
            self.loading_stats["failed_loads"] += 1
            show "❌ Data loading failed: " + str(e)
            return None
    
    function get_stats():
        return self.loading_stats

think "Let's create a data loader"

sql_loader = DataLoader(target_type="SQL", connection_details={"database": "data.db"})
api_loader = DataLoader(target_type="API", connection_details={"endpoint": "https://api.example.com/data"})

sql_loader.load_data(transformed_data)
api_loader.load_data(transformed_data)

πŸ’‘ Learning Checkpoint 5

🎯 What did we learn?

  • How to extract data from SQL databases and APIs
  • How to transform data using custom rules
  • How to load data into SQL databases and APIs
  • How to monitor and optimize ETL processes

πŸ”„ Progress Update

Chapter 11 Progress: [β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ] 100% Complete
β”œβ”€β”€ βœ… REST API Development (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Database Integration (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Authentication & Authorization (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Real-Time Communication (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Data Pipelines & ETL (3/3 sections) - COMPLETE!
β”œβ”€β”€ πŸ—οΈ Microservices Architecture (0/3 sections)
└── πŸŽ‰ Event-Driven Systems (0/3 sections)

πŸ—οΈ Microservices Architecture: Building Scalable Systems

🎯 Section Goal

Learn how to design and implement microservices architecture for scalable and maintainable applications.

🧩 Service Decomposition

think "Service decomposition involves breaking down applications into smaller, independent services"

class ServiceRegistry:
    function __init__():
        self.services = {}
        self.registry_stats = {
            "total_services": 0,
            "failed_registrations": 0
        }
        
        show "🧩 Service registry initialized"
    
    function register_service(service_name, service_details):
        think "Register a new service"
        
        try:
            when service_name in self.services:
                raise Exception("Service already registered")
            
            self.services[service_name] = service_details
            self.registry_stats["total_services"] += 1
            show "βœ… Service registered: " + service_name
        except Exception as e:
            self.registry_stats["failed_registrations"] += 1
            show "❌ Service registration failed: " + str(e)
    
    function get_service(service_name):
        think "Retrieve service details"
        
        when service_name not in self.services:
            raise Exception("Service not found")
        
        return self.services[service_name]
    
    function get_stats():
        return self.registry_stats

think "Let's create a service registry"

service_registry = ServiceRegistry()
service_registry.register_service("UserService", {"host": "localhost", "port": 8081})
service_registry.register_service("OrderService", {"host": "localhost", "port": 8082})
service_registry.register_service("InventoryService", {"host": "localhost", "port": 8083})

πŸ”— Inter-Service Communication

think "Inter-service communication enables services to interact with each other"

class ServiceCommunicator:
    function __init__(service_registry):
        self.service_registry = service_registry
        self.communication_stats = {
            "total_requests": 0,
            "failed_requests": 0
        }
        
        show "πŸ”— Service communicator initialized"
    
    function send_request(service_name, endpoint, data):
        think "Send request to another service"
        
        try:
            service_details = self.service_registry.get_service(service_name)
            
            import requests
            url = "http://" + service_details["host"] + ":" + str(service_details["port"]) + endpoint
            response = requests.post(url, json=data)
            
            when response.status_code != 200:
                raise Exception("Request failed with status code " + str(response.status_code))
            
            self.communication_stats["total_requests"] += 1
            show "βœ… Request sent to " + service_name + " endpoint: " + endpoint
            return response.json()
        except Exception as e:
            self.communication_stats["failed_requests"] += 1
            show "❌ Request failed: " + str(e)
            return None
    
    function get_stats():
        return self.communication_stats

think "Let's create a service communicator"

service_communicator = ServiceCommunicator(service_registry)
response = service_communicator.send_request("UserService", "/users", {"action": "get_all"})

πŸ“ˆ Scalability and Monitoring

think "Scalability involves designing systems to handle increased load"

class LoadBalancer:
    function __init__(strategy):
        self.strategy = strategy
        self.targets = []
        self.load_stats = {
            "total_requests": 0,
            "failed_requests": 0
        }
        
        show "πŸ“ˆ Load balancer initialized with strategy: " + strategy
    
    function add_target(target):
        think "Add target to load balancer"
        
        self.targets.append(target)
        show "βœ… Target added: " + target
    
    function distribute_request(request):
        think "Distribute request based on strategy"
        
        try:
            when len(self.targets) == 0:
                raise Exception("No targets available")
            
            import random
            target = random.choice(self.targets)  # Simplified round-robin
            
            self.load_stats["total_requests"] += 1
            show "πŸ“€ Request distributed to target: " + target
            return {"success": True, "target": target}
        except Exception as e:
            self.load_stats["failed_requests"] += 1
            show "❌ Request distribution failed: " + str(e)
            return {"success": False, "error": str(e)}
    
    function get_stats():
        return self.load_stats

think "Let's create a load balancer"

load_balancer = LoadBalancer(strategy="Round-Robin")
load_balancer.add_target("localhost:8081")
load_balancer.add_target("localhost:8082")
load_balancer.add_target("localhost:8083")
response = load_balancer.distribute_request({"action": "process_order"})

πŸ’‘ Learning Checkpoint 6

🎯 What did we learn?

  • How to decompose applications into microservices
  • How to enable inter-service communication
  • How to design scalable systems with load balancing
  • How to monitor and optimize microservices architecture

πŸ”„ Progress Update

Chapter 11 Progress: [β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ] 100% Complete
β”œβ”€β”€ βœ… REST API Development (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Database Integration (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Authentication & Authorization (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Real-Time Communication (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Data Pipelines & ETL (3/3 sections) - COMPLETE!
β”œβ”€β”€ βœ… Microservices Architecture (3/3 sections) - COMPLETE!
└── βœ… Event-Driven Systems (0/3 sections)