API i Integracje - emielregis2/SmartFlowAI GitHub Wiki
API i Integracje - SmartFlowAI
Przegląd architektury
SmartFlowAI wykorzystuje architekturę opartą na zewnętrznych serwisach API, zapewniając skalowalność, niezawodność i szybkość rozwoju. Aplikacja integruje się z trzema kluczowymi serwisami zewnętrznymi.
graph TB
A[Streamlit Frontend] --> B[Supabase API]
A --> C[OpenAI API]
B --> D[PostgreSQL Database]
B --> E[Supabase Auth]
C --> F[ChatGPT-4o Model]
subgraph "External Services"
B
C
end
subgraph "Data Layer"
D
E
end
1. Supabase Integration
1.1 Konfiguracja i połączenie
Zmienne środowiskowe:
SUPABASE_URL = "https://twoj-projekt.supabase.co"
SUPABASE_ANON_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
Inicjalizacja klienta:
from supabase import create_client, Client
import streamlit as st
@st.cache_resource
def init_supabase() -> Client:
"""Initialize Supabase client with caching"""
url = st.secrets["SUPABASE_URL"]
key = st.secrets["SUPABASE_ANON_KEY"]
return create_client(url, key)
supabase = init_supabase()
1.2 Schema bazy danych
Tabela processes
:
CREATE TABLE processes (
id BIGSERIAL PRIMARY KEY,
user_email TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT NOT NULL,
ai_analysis TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Row Level Security
ALTER TABLE processes ENABLE ROW LEVEL SECURITY;
CREATE POLICY "Users manage own processes" ON processes
FOR ALL USING (auth.email() = user_email);
1.3 Operacje CRUD
Tworzenie procesu:
def create_process(title: str, description: str, user_email: str) -> dict:
"""Create new process in database"""
try:
result = supabase.table("processes").insert({
"title": title,
"description": description,
"user_email": user_email,
"ai_analysis": None
}).execute()
if result.data:
st.success("✅ Proces został zapisany!")
return result.data[0]
else:
st.error("❌ Błąd podczas zapisywania procesu")
return None
except Exception as e:
st.error(f"❌ Błąd bazy danych: {str(e)}")
return None
Pobieranie procesów użytkownika:
def get_user_processes(user_email: str) -> list:
"""Get all processes for specific user"""
try:
result = supabase.table("processes").select("*").eq(
"user_email", user_email
).order("created_at", desc=True).execute()
return result.data if result.data else []
except Exception as e:
st.error(f"❌ Błąd podczas pobierania procesów: {str(e)}")
return []
Aktualizacja analizy AI:
def update_process_analysis(process_id: int, analysis: str) -> bool:
"""Update process with AI analysis"""
try:
result = supabase.table("processes").update({
"ai_analysis": analysis
}).eq("id", process_id).execute()
return bool(result.data)
except Exception as e:
st.error(f"❌ Błąd podczas aktualizacji: {str(e)}")
return False
1.4 Autentykacja Supabase
Logowanie:
def login_user(email: str, password: str) -> bool:
"""Authenticate user with Supabase Auth"""
try:
response = supabase.auth.sign_in_with_password({
"email": email,
"password": password
})
if response.user:
st.session_state.user = response.user
st.session_state.authenticated = True
return True
else:
st.error("❌ Nieprawidłowe dane logowania")
return False
except Exception as e:
st.error(f"❌ Błąd logowania: {str(e)}")
return False
Wylogowanie:
def logout_user():
"""Sign out user and clear session"""
try:
supabase.auth.sign_out()
st.session_state.authenticated = False
st.session_state.user = None
st.rerun()
except Exception as e:
st.error(f"❌ Błąd podczas wylogowania: {str(e)}")
2. OpenAI Integration
2.1 Konfiguracja klienta
import openai
import streamlit as st
# Initialize OpenAI client
openai.api_key = st.secrets["OPENAI_API_KEY"]
2.2 Prompt Engineering
Główny prompt dla analizy procesów:
ANALYSIS_PROMPT = """
Jesteś ekspertem ds. automatyzacji procesów biznesowych. Przeanalizuj następujący proces i zwróć odpowiedź w formacie:
OCENA: [1-10]
PROBLEM: [główny problem do rozwiązania]
ROZWIĄZANIE: [konkretne narzędzia automatyzacji]
OSZCZĘDNOŚCI: [szacowane oszczędności czasowe]
WDROŻENIE: [kroki implementacji]
Proces do analizy:
Nazwa: {title}
Opis: {description}
Skup się na praktycznych, dostępnych rozwiązaniach typu Zapier, n8n, Microsoft Power Automate, czy proste skrypty Python.
"""
2.3 Funkcja analizy AI
def analyze_process_with_ai(title: str, description: str) -> str:
"""Analyze business process using OpenAI ChatGPT-4o"""
prompt = ANALYSIS_PROMPT.format(
title=title,
description=description
)
try:
# Show progress
with st.spinner("🤖 AI analizuje Twój proces..."):
response = openai.ChatCompletion.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Jesteś ekspertem automatyzacji procesów biznesowych w Polsce. Odpowiadaj po polsku, konkretnie i praktycznie."
},
{
"role": "user",
"content": prompt
}
],
max_tokens=1500,
temperature=0.7,
timeout=30
)
analysis = response.choices[0].message.content.strip()
# Log successful analysis
st.info("✅ Analiza AI została wygenerowana!")
return analysis
except openai.error.RateLimitError:
error_msg = "❌ Przekroczono limit zapytań OpenAI. Spróbuj ponownie za chwilę."
st.error(error_msg)
return error_msg
except openai.error.InvalidRequestError as e:
error_msg = f"❌ Błąd zapytania OpenAI: {str(e)}"
st.error(error_msg)
return error_msg
except openai.error.AuthenticationError:
error_msg = "❌ Błąd autoryzacji OpenAI. Sprawdź klucz API."
st.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"❌ Nieoczekiwany błąd AI: {str(e)}"
st.error(error_msg)
return error_msg
2.4 Optymalizacja kosztów OpenAI
Caching wyników:
@st.cache_data(ttl=3600) # Cache for 1 hour
def cached_ai_analysis(title: str, description: str) -> str:
"""Cached version of AI analysis to reduce API calls"""
return analyze_process_with_ai(title, description)
Monitoring użycia:
def log_openai_usage(tokens_used: int, model: str = "gpt-4o"):
"""Log OpenAI API usage for cost monitoring"""
cost_per_token = 0.00001 # Approximate cost for GPT-4o
estimated_cost = tokens_used * cost_per_token
# Log to file or external monitoring service
print(f"OpenAI Usage - Model: {model}, Tokens: {tokens_used}, Cost: ${estimated_cost:.4f}")
3. Streamlit Integration
3.1 Session State Management
def init_session_state():
"""Initialize Streamlit session state"""
if 'authenticated' not in st.session_state:
st.session_state.authenticated = False
if 'user' not in st.session_state:
st.session_state.user = None
if 'processes' not in st.session_state:
st.session_state.processes = []
3.2 UI Components
Process form component:
def render_process_form():
"""Render process creation form"""
with st.form("process_form"):
st.subheader("📝 Dodaj nowy proces")
title = st.text_input(
"Nazwa procesu",
placeholder="np. Wystawianie faktur"
)
description = st.text_area(
"Szczegółowy opis procesu",
placeholder="Opisz krok po kroku jak wygląda Twój proces...",
height=150
)
submitted = st.form_submit_button("🚀 Analizuj przez AI")
if submitted and title and description:
return title, description
return None, None
4. Bezpieczeństwo API
4.1 Ochrona kluczy API
Environment variables w Streamlit Cloud:
# .streamlit/secrets.toml (local development)
[secrets]
SUPABASE_URL = "https://your-project.supabase.co"
SUPABASE_ANON_KEY = "your-anon-key"
OPENAI_API_KEY = "sk-your-openai-key"
Walidacja konfiguracji:
def validate_api_configuration():
"""Validate that all required API keys are present"""
required_secrets = [
"SUPABASE_URL",
"SUPABASE_ANON_KEY",
"OPENAI_API_KEY"
]
missing_secrets = []
for secret in required_secrets:
if secret not in st.secrets:
missing_secrets.append(secret)
if missing_secrets:
st.error(f"❌ Brakujące zmienne: {', '.join(missing_secrets)}")
st.stop()
4.2 Rate Limiting
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests=10, time_window=60):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
def is_allowed(self, user_id: str) -> bool:
"""Check if user can make another request"""
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests
user_requests[:] = [req_time for req_time in user_requests
if now - req_time < self.time_window]
if len(user_requests) >= self.max_requests:
return False
user_requests.append(now)
return True
# Usage
ai_rate_limiter = RateLimiter(max_requests=5, time_window=300) # 5 requests per 5 minutes
4.3 Input Sanitization
import re
def sanitize_input(text: str, max_length: int = 1000) -> str:
"""Sanitize user input for AI processing"""
if not text:
return ""
# Remove potential harmful content
text = re.sub(r'[^\w\s\-.,!?()ąćęłńóśźż]', '', text, flags=re.IGNORECASE)
# Limit length
if len(text) > max_length:
text = text[:max_length] + "..."
return text.strip()
5. Error Handling i Monitoring
5.1 Centralized Error Handling
import logging
from functools import wraps
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def handle_api_errors(func):
"""Decorator for handling API errors"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {str(e)}")
st.error(f"❌ Wystąpił błąd: {str(e)}")
return None
return wrapper
@handle_api_errors
def safe_ai_analysis(title: str, description: str) -> str:
"""AI analysis with error handling"""
return analyze_process_with_ai(title, description)
5.2 Health Checks
def check_api_health():
"""Check health of external APIs"""
health_status = {
"supabase": False,
"openai": False
}
# Check Supabase
try:
result = supabase.table("processes").select("id").limit(1).execute()
health_status["supabase"] = True
except Exception as e:
logger.error(f"Supabase health check failed: {e}")
# Check OpenAI (simple request)
try:
openai.Model.list()
health_status["openai"] = True
except Exception as e:
logger.error(f"OpenAI health check failed: {e}")
return health_status
6. Performance Optimization
6.1 Caching Strategies
# Cache Supabase client
@st.cache_resource
def get_supabase_client():
return create_client(
st.secrets["SUPABASE_URL"],
st.secrets["SUPABASE_ANON_KEY"]
)
# Cache user processes
@st.cache_data(ttl=300) # 5 minutes
def get_cached_processes(user_email: str):
return get_user_processes(user_email)
# Cache AI responses
@st.cache_data(ttl=86400) # 24 hours
def get_cached_ai_analysis(title: str, description: str):
return analyze_process_with_ai(title, description)
6.2 Async Operations
import asyncio
import aiohttp
async def async_ai_analysis(title: str, description: str) -> str:
"""Async version of AI analysis for better UX"""
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {st.secrets['OPENAI_API_KEY']}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o",
"messages": [
{"role": "user", "content": ANALYSIS_PROMPT.format(title=title, description=description)}
],
"max_tokens": 1500
}
async with session.post(
"https://api.openai.com/v1/chat/completions",
json=payload,
headers=headers
) as response:
result = await response.json()
return result["choices"][0]["message"]["content"]
7. API Troubleshooting
7.1 Częste problemy i rozwiązania
Problem: Błąd połączenia z Supabase
❌ Błąd: "Invalid API URL or key"
Rozwiązanie:
- Sprawdź poprawność
SUPABASE_URL
iSUPABASE_ANON_KEY
- Upewnij się, że projekt Supabase jest aktywny
- Sprawdź czy RLS policies są poprawnie skonfigurowane
Problem: Przekroczenie limitu OpenAI
❌ Błąd: "Rate limit exceeded"
Rozwiązanie:
- Zaimplementuj rate limiting na poziomie aplikacji
- Dodaj retry logic z exponential backoff
- Rozważ cache'owanie częstych zapytań
Problem: Timeout OpenAI API
❌ Błąd: "Request timeout"
Rozwiązanie:
- Zwiększ timeout w konfiguracji
- Zaimplementuj progress indicator dla użytkownika
- Dodaj fallback dla długich zapytań
7.2 Debugging Tools
def debug_api_call(api_name: str, params: dict, response: any):
"""Debug API calls in development"""
if st.secrets.get("DEBUG_MODE", False):
with st.expander(f"🔍 Debug: {api_name}"):
st.json({
"parameters": params,
"response": str(response)[:500] + "..." if len(str(response)) > 500 else str(response)
})
8. Metryki i Analytics
8.1 Tracking użycia
def track_api_usage(user_email: str, api_name: str, success: bool):
"""Track API usage for analytics"""
usage_data = {
"user_email": user_email,
"api_name": api_name,
"success": success,
"timestamp": datetime.now().isoformat()
}
# Log to analytics service or database
logger.info(f"API Usage: {usage_data}")
8.2 Cost Monitoring
def calculate_openai_costs():
"""Calculate estimated OpenAI API costs"""
# GPT-4o pricing (approximate)
cost_per_1k_tokens = 0.01
total_tokens = get_total_tokens_used() # Implement this function
estimated_cost = (total_tokens / 1000) * cost_per_1k_tokens
return {
"total_tokens": total_tokens,
"estimated_cost_usd": estimated_cost
}
Podsumowanie
SmartFlowAI wykorzystuje nowoczesną architekturę opartą na zewnętrznych API, zapewniając:
- 🔒 Bezpieczeństwo: RLS, environment variables, input sanitization
- ⚡ Performance: Caching, rate limiting, async operations
- 🛡️ Niezawodność: Error handling, health checks, monitoring
- 💰 Optymalizacja kosztów: Caching AI responses, usage tracking
- 📊 Observability: Logging, metrics, debugging tools
Ta architektura umożliwia łatwe skalowanie i dodawanie nowych funkcjonalności przy zachowaniu stabilności i bezpieczeństwa systemu.