Extending TMI - ericfitz/tmi GitHub Wiki
Extending TMI
This guide explains how to extend TMI through addons, webhooks, and custom integrations.
Table of Contents
Overview
TMI can be extended in several ways:
- Addons - Server-side extensions that users can invoke
- Webhooks - Receive notifications for TMI events
- Custom Integrations - Build tools that interact with TMI's API
- Client Libraries - Create language-specific SDKs
Addon Development
What are Addons?
Addons are webhook-based services that:
- Receive invocation requests from TMI users
- Process them asynchronously
- Report status back via callbacks
- Appear in the TMI UI as actions users can trigger
Addon Use Cases
- STRIDE Analysis - Automated threat categorization
- Compliance Checking - Verify against security frameworks
- Report Generation - Create PDFs or documents
- Integration - Sync with external tools (Jira, ServiceNow)
- AI Analysis - LLM-powered threat identification
Quick Start
1. Register Webhook
First, have a TMI administrator create a webhook subscription:
POST /admin/webhooks/subscriptions
Authorization: Bearer {admin_jwt}
Content-Type: application/json
{
"name": "My Addon Service",
"url": "https://my-service.example.com/webhooks/tmi",
"events": ["addon.invoked"],
"secret": "your-hmac-secret-min-16-chars"
}
The events array must contain at least one event type. For addons, use "addon.invoked".
The secret field is optional (auto-generated if omitted); if provided, it must be between 16 and 128 characters.
Save the webhook_id from response.
2. Register Addon
Have a TMI administrator register your addon:
POST /addons
Authorization: Bearer {admin_jwt}
Content-Type: application/json
{
"name": "STRIDE Analyzer",
"webhook_id": "{webhook_id_from_step_1}",
"description": "Automated STRIDE threat analysis",
"icon": "material-symbols:security",
"objects": ["threat_model", "asset"],
"threat_model_id": "optional-uuid-to-scope-to-a-specific-threat-model"
}
The objects field accepts: threat_model, diagram, asset, threat, document, note, repository, metadata, survey, survey_response.
The optional threat_model_id scopes the addon to a specific threat model.
The optional parameters array declares typed parameter definitions for client UI generation (max 20).
3. Implement Webhook Endpoint
Create an HTTPS endpoint that:
- Receives POST requests from TMI
- Verifies HMAC signature
- Processes asynchronously
- Calls back to update status
Webhook Invocation Flow
Step 1: Receive Invocation
Your webhook receives:
POST /webhooks/tmi
Content-Type: application/json
X-Webhook-Event: addon.invoked
X-Webhook-Delivery-Id: 550e8400-e29b-41d4-a716-446655440000
X-Webhook-Subscription-Id: 660e8400-e29b-41d4-a716-446655440001
X-Webhook-Signature: sha256=abc123...
User-Agent: TMI-Webhook/1.0
{
"event_type": "addon.invoked",
"threat_model_id": "789e0123-e45b-67c8-d901-234567890abc",
"object_type": "asset",
"object_id": "def01234-5678-90ab-cdef-1234567890ab",
"timestamp": "2025-11-08T12:00:00Z",
"data": {
"addon_id": "123e4567-e89b-12d3-a456-426614174000",
"user_data": {
"user_param_1": "value1",
"user_param_2": "value2"
}
}
}
Note: The delivery_id is sent in the X-Webhook-Delivery-Id header, not in the JSON body.
The X-Webhook-Subscription-Id header identifies the webhook subscription.
User-provided invocation data appears under data.user_data (not data.payload).
Step 2: Verify HMAC Signature
CRITICAL: Always verify the signature:
import hmac
import hashlib
def verify_signature(payload_bytes, signature_header, secret):
expected = hmac.new(
secret.encode('utf-8'),
payload_bytes,
hashlib.sha256
).hexdigest()
expected_sig = f"sha256={expected}"
return hmac.compare_digest(signature_header, expected_sig)
# In handler
payload_bytes = request.get_data()
signature = request.headers.get('X-Webhook-Signature')
if not verify_signature(payload_bytes, signature, WEBHOOK_SECRET):
return 'Invalid signature', 401
// Node.js
const crypto = require('crypto');
function verifySignature(payloadBody, signatureHeader, secret) {
const hmac = crypto.createHmac('sha256', secret);
hmac.update(payloadBody);
const expectedSig = `sha256=${hmac.digest('hex')}`;
return crypto.timingSafeEqual(
Buffer.from(signatureHeader),
Buffer.from(expectedSig)
);
}
Step 3: Respond Quickly
Return 200 OK immediately:
@app.route('/webhooks/tmi', methods=['POST'])
def handle_invocation():
# Verify signature
if not verify_signature(...):
return 'Invalid signature', 401
payload = request.json
delivery_id = request.headers.get('X-Webhook-Delivery-Id')
# Queue for async processing
task_queue.enqueue(process_invocation, delivery_id, payload)
# Respond immediately
# Return X-TMI-Callback: async header to tell TMI this will be handled asynchronously
return '', 200, {'X-TMI-Callback': 'async'}
If you return the X-TMI-Callback: async header, TMI marks the delivery as in_progress and waits for your status callbacks. Without this header, a 2xx response marks the delivery as delivered immediately.
Step 4: Update Status During Processing
Call back to TMI using the POST /webhook-deliveries/{delivery_id}/status endpoint.
Authentication is via HMAC signature over the request body (no JWT required):
import requests
import json
def update_status(delivery_id, status, percent, message):
callback_url = f"https://tmi.example.com/webhook-deliveries/{delivery_id}/status"
payload = json.dumps({
"status": status,
"status_percent": percent,
"status_message": message
})
signature = generate_signature(payload.encode(), WEBHOOK_SECRET)
response = requests.post(
callback_url,
data=payload,
headers={
'Content-Type': 'application/json',
'X-Webhook-Signature': signature
}
)
return response.status_code == 200
# During processing
def process_invocation(delivery_id, payload):
# Started
update_status(delivery_id, "in_progress", 10, "Starting analysis...")
# Do work
analyze_threats(payload)
# Halfway
update_status(delivery_id, "in_progress", 50, "Analyzing assets...")
# More work
generate_report(payload)
# Completed (maps to "delivered" internally)
update_status(delivery_id, "completed", 100, "Analysis complete")
Step 5: Handle Failures
def process_invocation(delivery_id, payload):
try:
analyze_threats(payload)
update_status(delivery_id, "completed", 100, "Success")
except ValidationError as e:
update_status(delivery_id, "failed", 0, f"Validation error: {e}")
except Exception as e:
logger.exception("Processing failed")
update_status(delivery_id, "failed", 0, f"Internal error: {e}")
Status Update API
Endpoint: POST /webhook-deliveries/{delivery_id}/status
Authentication: HMAC signature over the request body using the webhook subscription secret (no JWT required).
The signature must be sent in the X-Webhook-Signature header.
Request:
{
"status": "in_progress",
"status_percent": 75,
"status_message": "Processing..."
}
Callback Statuses (what you send):
in_progress(0-99%)completed(100%) - mapped todeliveredinternallyfailed(0%)
Internal Delivery Statuses (what GET returns):
pending- queued, not yet sentin_progress- acknowledged by receiver, processingdelivered- completed successfullyfailed- permanently failed
Status Transitions:
pending → in_progress → delivered (via "completed" callback)
→ failed
Invalid transitions (return 409 Conflict):
delivered → *(terminal state)failed → *(terminal state)
Checking Delivery Status: GET /webhook-deliveries/{delivery_id} supports dual authentication: JWT (for admins, subscription owners, or addon invokers) or HMAC signature over the delivery ID string.
Complete Example (Python Flask)
from flask import Flask, request
import hmac
import hashlib
import json
import requests
app = Flask(__name__)
WEBHOOK_SECRET = "your-webhook-secret" # 16-128 characters, or auto-generated
TMI_BASE_URL = "https://tmi.example.com"
def verify_signature(payload_bytes, signature, secret):
expected = hmac.new(
secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def generate_signature(payload_bytes, secret):
mac = hmac.new(secret.encode(), payload_bytes, hashlib.sha256)
return f"sha256={mac.hexdigest()}"
def update_status(delivery_id, status, percent, message=""):
payload = json.dumps({
"status": status,
"status_percent": percent,
"status_message": message
})
signature = generate_signature(payload.encode(), WEBHOOK_SECRET)
requests.post(
f"{TMI_BASE_URL}/webhook-deliveries/{delivery_id}/status",
data=payload,
headers={
'Content-Type': 'application/json',
'X-Webhook-Signature': signature
}
)
@app.route('/webhooks/tmi', methods=['POST'])
def handle_invocation():
# Verify signature
payload_bytes = request.get_data()
signature = request.headers.get('X-Webhook-Signature')
if not verify_signature(payload_bytes, signature, WEBHOOK_SECRET):
return 'Unauthorized', 401
# Parse payload
data = request.json
delivery_id = request.headers.get('X-Webhook-Delivery-Id')
# Queue for async processing
task_queue.enqueue(process_invocation, delivery_id, data)
# Respond immediately with async callback header
return '', 200, {'X-TMI-Callback': 'async'}
def process_invocation(delivery_id, data):
user_payload = data.get('data', {}).get('user_data', {})
try:
# Start
update_status(delivery_id, "in_progress", 10, "Starting analysis")
# Process
result = analyze_threats(user_payload)
# Progress
update_status(delivery_id, "in_progress", 75, "Generating report")
# Finish
update_status(delivery_id, "completed", 100, "Analysis complete")
except Exception as e:
update_status(delivery_id, "failed", 0, f"Error: {e}")
if __name__ == '__main__':
app.run(port=8000)
Best Practices
1. Idempotency
Handle duplicate invocations using the delivery ID from the header:
cache = {}
def process_invocation(delivery_id, payload):
# Check if already processed
if delivery_id in cache:
logger.info(f"Duplicate invocation: {delivery_id}")
return cache[delivery_id]
# Process
result = do_work(payload)
# Cache result
cache[delivery_id] = result
return result
2. Progress Updates
Update regularly for long operations:
def long_operation(delivery_id):
update_status(delivery_id, "in_progress", 0, "Starting...")
for i, step in enumerate(steps):
process_step(step)
percent = int((i + 1) / len(steps) * 100)
update_status(delivery_id, "in_progress", percent,
f"Step {i+1}/{len(steps)}")
update_status(delivery_id, "completed", 100, "Done")
3. Error Handling
Provide useful error messages:
try:
validate_payload(payload)
except ValidationError as e:
update_status(delivery_id, "failed", 0,
f"Invalid input: {e}. Please check parameters.")
4. Timeouts
Set reasonable timeouts:
from timeout_decorator import timeout
@timeout(300) # 5 minute timeout
def process_invocation(payload):
try:
result = do_work(payload)
update_status(delivery_id, "completed", 100, "Success")
except TimeoutError:
update_status(delivery_id, "failed", 0,
"Processing timeout after 5 minutes")
5. Security
- Always verify HMAC signatures
- Use HTTPS for all callbacks
- Don't log secrets
- Validate all input
- Use constant-time comparison for signatures
Testing Your Addon
Local Development
- Use ngrok to expose local server:
ngrok http 8000
- Register webhook with ngrok URL (admin only):
POST /admin/webhooks/subscriptions
Authorization: Bearer {admin_jwt}
{
"name": "Dev Addon",
"url": "https://abc123.ngrok.io/webhooks/tmi",
"events": ["addon.invoked"]
}
- Invoke addon and check logs
Testing HMAC
import hmac
import hashlib
import json
payload = json.dumps({"status": "completed", "status_percent": 100})
secret = "your-webhook-secret"
mac = hmac.new(secret.encode(), payload.encode(), hashlib.sha256)
signature = f"sha256={mac.hexdigest()}"
print(f"X-Webhook-Signature: {signature}")
Test Status Updates
curl -X POST https://tmi.example.com/webhook-deliveries/{id}/status \
-H "Content-Type: application/json" \
-H "X-Webhook-Signature: sha256=..." \
-d '{
"status": "completed",
"status_percent": 100,
"status_message": "Test completed"
}'
Webhook Integration
Event Webhooks
Subscribe to TMI events. Webhook subscriptions are managed by administrators.
Available Events (follows {resource}.{action} pattern):
| Resource | Events |
|---|---|
| Threat Model | threat_model.created, threat_model.updated, threat_model.deleted |
| Diagram | diagram.created, diagram.updated, diagram.deleted |
| Document | document.created, document.updated, document.deleted |
| Note | note.created, note.updated, note.deleted |
| Repository | repository.created, repository.updated, repository.deleted |
| Asset | asset.created, asset.updated, asset.deleted |
| Threat | threat.created, threat.updated, threat.deleted |
| Metadata | metadata.created, metadata.updated, metadata.deleted |
| Survey | survey.created, survey.updated, survey.deleted |
| Survey Response | survey_response.created, survey_response.updated, survey_response.deleted |
| Addon | addon.invoked |
Register Webhook (admin only):
POST /admin/webhooks/subscriptions
Authorization: Bearer {admin_jwt}
Content-Type: application/json
{
"name": "My Integration",
"url": "https://my-service.example.com/webhooks",
"events": ["threat_model.created", "threat.created"],
"secret": "your-hmac-secret"
}
The secret is optional (auto-generated if omitted); if provided, must be 16-128 characters.
You can optionally include threat_model_id to scope the subscription to a single threat model.
Receive Event:
POST /your-webhook-endpoint
Content-Type: application/json
X-Webhook-Event: threat_model.created
X-Webhook-Delivery-Id: 550e8400-e29b-41d4-a716-446655440000
X-Webhook-Subscription-Id: 660e8400-e29b-41d4-a716-446655440001
X-Webhook-Signature: sha256=...
User-Agent: TMI-Webhook/1.0
{
"event_type": "threat_model.created",
"threat_model_id": "789e0123-e45b-67c8-d901-234567890abc",
"object_type": "threat_model",
"object_id": "789e0123-e45b-67c8-d901-234567890abc",
"timestamp": "2025-11-08T12:00:00Z",
"data": { ... }
}
Webhook Security
Always verify HMAC signature before processing.
Custom Integrations
Issue Tracking Integration
Sync threats with Jira/GitHub Issues:
class JiraIntegration:
def sync_threat_to_jira(self, threat):
# Create Jira issue
issue = self.jira_client.create_issue({
'project': {'key': 'SEC'},
'summary': threat['name'],
'description': threat['description'],
'issuetype': {'name': 'Security Threat'},
'labels': [threat['stride']]
})
# Store issue key in TMI metadata
self.tmi_client.create_metadata(
threat['threat_model_id'],
threat['id'],
'jira_issue',
issue.key
)
return issue.key
CI/CD Integration
Validate threat models in CI pipeline:
#!/bin/bash
# validate-threat-model.sh
TM_ID=$1
API_URL="https://api.tmi.example.com"
# Get threat model
response=$(curl -s -H "Authorization: Bearer $TOKEN" \
"$API_URL/threat_models/$TM_ID")
# Check threat count
threat_count=$(echo $response | jq '.threat_count')
if [ "$threat_count" -lt 5 ]; then
echo "ERROR: Insufficient threats identified (found: $threat_count, required: 5)"
exit 1
fi
# Check for high severity unmitigated threats
unmitigated=$(curl -s -H "Authorization: Bearer $TOKEN" \
"$API_URL/threat_models/$TM_ID/threats" | \
jq '[.[] | select(.severity == "high" and .status == "open")] | length')
if [ "$unmitigated" -gt 0 ]; then
echo "ERROR: $unmitigated high severity threats not mitigated"
exit 1
fi
echo "SUCCESS: Threat model validation passed"
Reporting Integration
Generate custom reports:
from jinja2 import Template
from weasyprint import HTML
class ReportGenerator:
def generate_pdf_report(self, threat_model_id):
# Fetch data from TMI
tm = self.tmi_client.get_threat_model(threat_model_id)
threats = self.tmi_client.get_threats(threat_model_id)
diagrams = self.tmi_client.get_diagrams(threat_model_id)
# Render HTML template
template = Template(open('report_template.html').read())
html_content = template.render(
threat_model=tm,
threats=threats,
diagrams=diagrams
)
# Convert to PDF
HTML(string=html_content).write_pdf(
f'threat_model_{threat_model_id}.pdf'
)
Client Libraries
Building a Client Library
Create language-specific SDKs:
Python Client
# tmi_client/__init__.py
import requests
class TMIClient:
def __init__(self, api_url, token):
self.api_url = api_url
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
})
def get_threat_models(self):
response = self.session.get(f'{self.api_url}/threat_models')
response.raise_for_status()
return response.json()
def create_threat_model(self, name, description):
response = self.session.post(
f'{self.api_url}/threat_models',
json={'name': name, 'description': description}
)
response.raise_for_status()
return response.json()
# Usage
client = TMIClient('https://api.tmi.dev', 'your-jwt-token')
threat_models = client.get_threat_models()
JavaScript Client
// tmi-client.js
class TMIClient {
constructor(apiUrl, token) {
this.apiUrl = apiUrl;
this.token = token;
}
async getThreatModels() {
const response = await fetch(`${this.apiUrl}/threat_models`, {
headers: { Authorization: `Bearer ${this.token}` }
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
async createThreatModel(name, description) {
const response = await fetch(`${this.apiUrl}/threat_models`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({ name, description })
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
}
// Usage
const client = new TMIClient('https://api.tmi.dev', 'your-jwt-token');
const threatModels = await client.getThreatModels();
Publishing Client Libraries
-
Package:
- Python:
pip install tmi-client - JavaScript:
npm install @tmi/client - Go:
go get github.com/ericfitz/tmi-clients
- Python:
-
Document: Include examples and API reference
-
Test: Comprehensive test coverage
-
Versioning: Semantic versioning (semver)
Examples and Resources
Example Projects
See https://github.com/ericfitz/tmi-clients for:
- Python client library
- JavaScript/TypeScript client
- Go client
- Example integrations
Documentation
- API-Integration - REST and WebSocket APIs
- Architecture-and-Design - System design
- Developer docs in
/docs/developer/of repositories
Support
- GitHub Issues: https://github.com/ericfitz/tmi/issues
- API Reference: OpenAPI spec in
/docs/reference/apis/
Next Steps
- API-Integration - Learn the APIs
- Architecture-and-Design - Understand the system
- Getting-Started-with-Development - Set up your development environment