WEBRTC IMPLEMENTATION PLAN - nself-org/nchat GitHub Wiki
Version: 1.0.0 Date: 2026-02-03 Status: Planning Related Tasks: TODO.md Tasks 71-77 (Phase 8)
- Executive Summary
- Architecture Overview
- Media Server Selection
- Required Components
- API Endpoints
- Database Schema
- Client Implementation
- Recording & Storage
- Live Streaming
- Scaling Considerations
- nself Plugin Architecture
- Implementation Phases
- Security Considerations
- Testing Strategy
This document outlines the implementation strategy for voice/video calls and live streaming in nchat. The solution addresses:
- 1:1 Voice/Video Calls: Peer-to-peer with fallback to media server
- Group Calls: SFU-based architecture for 2-100+ participants
- Screen Sharing: Desktop and application window sharing
- Call Recording: Server-side recording with S3-compatible storage
- Live Streaming: Broadcast to unlimited viewers via HLS/DASH
| Component | Recommendation | Rationale |
|---|---|---|
| Media Server | LiveKit | Complete framework, excellent SDKs, AI-ready |
| Signaling | Existing realtime plugin | Leverage Socket.io infrastructure |
| TURN/STUN | Coturn (self-hosted) | Cost-effective, full control |
| Live Streaming | LiveKit Egress + HLS | Unified platform, low latency |
| Recording Storage | MinIO/S3 | Existing nself infrastructure |
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β nchat Clients β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Web β β iOS β β Android β β Desktop β β Tauri β β
β ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ β
β β β β β β β
β βββββββββββββββ΄ββββββββββββββ΄βββββββ¬βββββββ΄ββββββββββββββ β
β β β
β ββββββββΌβββββββ β
β β LiveKit β β
β β Client SDK β β
β ββββββββ¬βββββββ β
ββββββββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββ
β
WebRTC (DTLS-SRTP)
β
ββββββββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββ
β Backend Infrastructure β
β β β
β βββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββ β
β β LiveKit Media Server β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β SFU β β Egress β β Ingress β β Room β β β
β β β Router β β (Record) β β (RTMP In) β β Service β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββ β
β β β β β
β ββββββΌβββββ ββββββββββββ βββββββββββββΌββββββββββββ βββββββββββββββ β β
β β Coturn β β Redis β β nself Backend β β MinIO β β β
β βTURN/STUNβ β (State) β β βββββββββββββββββββ β β (Storage) β β β
β βββββββββββ ββββββββββββ β β Hasura GraphQL β β βββββββββββββββ β β
β β βββββββββββββββββββ€ β β β
β β β Realtime WS β β β β
β β βββββββββββββββββββ€ β β β
β β β PostgreSQL β β β β
β β βββββββββββββββββββ β β β
β βββββββββββββββββββββββββ β β
β β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Participants | Topology | Reason |
|---|---|---|
| 2 (1:1 calls) | P2P with TURN fallback | Lowest latency, minimal server load |
| 3-4 | SFU | P2P bandwidth becomes prohibitive |
| 5+ | SFU | Required for scalability |
| 100+ | SFU with Cascading | Multi-region support |
Recommendation: Always use SFU via LiveKit for consistency and feature parity. P2P optimization is complex and provides marginal benefit for chat applications.
ββββββββββ ββββββββββββ ββββββββββ
βClient Aβ β Server β βClient Bβ
βββββ¬βββββ ββββββ¬ββββββ βββββ¬βββββ
β β β
β 1. initiate_call β β
ββββββββββββββββββββββββββββββ>β β
β β 2. call_incoming β
β ββββββββββββββββββββββββββββββ>β
β β β
β β 3. call_accepted β
β β<ββββββββββββββββββββββββββββββ
β 4. call_accepted β β
β<ββββββββββββββββββββββββββββββ β
β β β
β 5. Get LiveKit token β β
ββββββββββββββββββββββββββββββ>β β
β 6. LiveKit room token β β
β<ββββββββββββββββββββββββββββββ β
β β 7. Get LiveKit token β
β β<ββββββββββββββββββββββββββββββ
β β 8. LiveKit room token β
β ββββββββββββββββββββββββββββββ>β
β β β
ββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββ€
β WebRTC Media Flow (via LiveKit SFU) β
ββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββ€
| Feature | LiveKit | mediasoup | Janus | Jitsi |
|---|---|---|---|---|
| Language | Go (Pion) | C++ / Node.js | C | Java |
| Setup Complexity | Low | High | Medium | Medium |
| SDKs | Excellent (JS, iOS, Android, Flutter, Unity) | Good (JS, Node) | Limited | Good |
| Recording | Built-in Egress | Custom | Plugin | Built-in |
| Live Streaming | Built-in HLS/RTMP | Custom | Plugin | Built-in |
| AI Integration | Agents Framework | Manual | Manual | Limited |
| Scalability | Excellent | Excellent | Good | Good |
| Documentation | Excellent | Good | Good | Good |
| Community | Very Active | Active | Active | Active |
| License | Apache 2.0 | ISC | GPL v3 | Apache 2.0 |
| Self-Hosted | Yes | Yes | Yes | Yes |
| Managed Cloud | Yes | No | No | Yes (8x8) |
Primary Reasons:
- Complete Framework: Signaling, SFU, recording, streaming in one package
- Developer Experience: Best-in-class SDKs and documentation
- AI Ready: Agents framework for voice AI, transcription, translation
- Performance: Go/Pion-based, highly optimized
- Future Proof: Active development, Media over QUIC (MoQ) roadmap
- Flexible Deployment: Self-hosted or managed cloud
Trade-offs:
- Slightly higher memory usage than pure mediasoup
- GPL dependency concerns resolved (Apache 2.0 licensed)
Use mediasoup if:
- Maximum control over media routing is required
- Custom SFU behavior needed
- Team has deep WebRTC expertise
- Performance is critical (2x more efficient than LiveKit in some benchmarks)
| Service | Purpose | Deployment | Port |
|---|---|---|---|
| LiveKit Server | SFU Media Server | Docker/K8s | 7880 (HTTP), 7881 (RTC) |
| Coturn | TURN/STUN Server | Docker/K8s | 3478 (UDP/TCP), 5349 (TLS) |
| Redis | LiveKit State | Existing nself | 6379 |
| MinIO | Recording Storage | Existing nself | 9000 |
| PostgreSQL | Call Metadata | Existing nself | 5432 |
# Add to backend/docker-compose.yml
services:
livekit:
image: livekit/livekit-server:v1.7
container_name: nchat_livekit
restart: unless-stopped
ports:
- '7880:7880' # HTTP/WS
- '7881:7881' # RTC (UDP)
- '7882:7882' # RTC (TCP)
environment:
- LIVEKIT_KEYS=${LIVEKIT_API_KEY}:${LIVEKIT_API_SECRET}
- LIVEKIT_REDIS_ADDRESS=redis:6379
volumes:
- ./config/livekit.yaml:/etc/livekit.yaml
command: --config /etc/livekit.yaml
depends_on:
- redis
networks:
- nself_network
livekit-egress:
image: livekit/egress:v1.8
container_name: nchat_livekit_egress
restart: unless-stopped
environment:
- EGRESS_CONFIG_FILE=/etc/egress.yaml
volumes:
- ./config/egress.yaml:/etc/egress.yaml
cap_add:
- SYS_ADMIN
depends_on:
- livekit
networks:
- nself_network
coturn:
image: coturn/coturn:4.6
container_name: nchat_coturn
restart: unless-stopped
ports:
- '3478:3478/udp'
- '3478:3478/tcp'
- '5349:5349/udp'
- '5349:5349/tcp'
- '49152-49200:49152-49200/udp' # Relay ports
volumes:
- ./config/turnserver.conf:/etc/coturn/turnserver.conf
networks:
- nself_network# backend/config/livekit.yaml
port: 7880
rtc:
port_range_start: 50000
port_range_end: 60000
tcp_port: 7881
use_external_ip: true
redis:
address: redis:6379
keys:
# Generated via: openssl rand -hex 32
${LIVEKIT_API_KEY}: ${LIVEKIT_API_SECRET}
room:
auto_create: false
empty_timeout: 300 # 5 minutes
max_participants: 100
turn:
enabled: true
domain: turn.${DOMAIN}
tls_port: 5349
udp_port: 3478
external_tls: true
webhook:
urls:
- ${BACKEND_URL}/api/webhooks/livekit
api_key: ${WEBHOOK_SECRET}
logging:
level: info
pion_level: error# backend/config/turnserver.conf
listening-port=3478
tls-listening-port=5349
listening-ip=0.0.0.0
external-ip=${EXTERNAL_IP}
relay-ip=0.0.0.0
min-port=49152
max-port=49200
fingerprint
lt-cred-mech
use-auth-secret
static-auth-secret=${TURN_SECRET}
realm=${DOMAIN}
server-name=${DOMAIN}
# TLS certificates
cert=/etc/ssl/certs/turn.crt
pkey=/etc/ssl/private/turn.key
# Deny private IP ranges for security
denied-peer-ip=10.0.0.0-10.255.255.255
denied-peer-ip=172.16.0.0-172.31.255.255
denied-peer-ip=192.168.0.0-192.168.255.255
# Logging
log-file=/var/log/turnserver.log
verbose
// src/app/api/calls/route.ts
// POST /api/calls/initiate
// Start a new call
interface InitiateCallRequest {
targetUserId: string // For 1:1 calls
targetUserIds?: string[] // For group calls
channelId?: string // Optional channel context
type: 'audio' | 'video'
metadata?: Record<string, unknown>
}
interface InitiateCallResponse {
callId: string
roomName: string
token: string // LiveKit JWT token
iceServers: RTCIceServer[] // TURN/STUN config
expiresAt: string
}
// POST /api/calls/:id/join
// Join an existing call
interface JoinCallRequest {
callId: string
}
interface JoinCallResponse {
roomName: string
token: string
participants: Participant[]
iceServers: RTCIceServer[]
}
// POST /api/calls/:id/leave
// Leave a call (does not end it)
interface LeaveCallRequest {
callId: string
reason?: 'user_left' | 'network_error' | 'kicked'
}
// POST /api/calls/:id/end
// End a call for all participants
interface EndCallRequest {
callId: string
reason?: 'host_ended' | 'timeout' | 'everyone_left'
}
// POST /api/calls/:id/mute
// Toggle mute state
interface MuteRequest {
callId: string
trackType: 'audio' | 'video' | 'screen'
muted: boolean
}
// POST /api/calls/:id/kick
// Remove a participant (moderator only)
interface KickParticipantRequest {
callId: string
participantId: string
reason?: string
}
// GET /api/calls/:id
// Get call info
interface GetCallResponse {
call: Call
participants: Participant[]
isRecording: boolean
isStreaming: boolean
}
// GET /api/calls/history
// Get call history
interface CallHistoryQuery {
limit?: number
offset?: number
channelId?: string
participantId?: string
startDate?: string
endDate?: string
}// POST /api/calls/:id/recording/start
interface StartRecordingRequest {
callId: string
options?: {
audioOnly?: boolean
layout?: 'grid' | 'speaker' | 'single'
resolution?: '720p' | '1080p' | '4k'
customLayout?: string // Custom layout template
}
}
interface StartRecordingResponse {
recordingId: string
startedAt: string
}
// POST /api/calls/:id/recording/stop
interface StopRecordingResponse {
recordingId: string
duration: number
fileSize: number
url: string
thumbnailUrl: string
}
// GET /api/recordings/:id
interface Recording {
id: string
callId: string
channelId?: string
duration: number
fileSize: number
status: 'processing' | 'ready' | 'failed'
url: string
thumbnailUrl: string
createdAt: string
expiresAt: string // Retention policy
}
// DELETE /api/recordings/:id
// Delete a recording (owner/admin only)// POST /api/streams/start
interface StartStreamRequest {
channelId: string
title: string
description?: string
visibility: 'public' | 'channel_members' | 'invite_only'
options?: {
enableChat: boolean
enableReactions: boolean
recordStream: boolean
rtmpOutputs?: RTMPDestination[] // Simulcast to YouTube, Twitch
}
}
interface StartStreamResponse {
streamId: string
roomName: string
hostToken: string // LiveKit token for broadcaster
rtmpIngestUrl?: string // For external encoder
hlsUrl: string // HLS playback URL
dashUrl?: string // DASH playback URL
embedCode: string // HTML embed snippet
}
// POST /api/streams/:id/stop
interface StopStreamResponse {
streamId: string
duration: number
peakViewers: number
totalViews: number
recordingUrl?: string
}
// GET /api/streams/:id
interface Stream {
id: string
channelId: string
hostId: string
title: string
description?: string
status: 'live' | 'ended' | 'scheduled'
startedAt: string
endedAt?: string
viewerCount: number
peakViewers: number
hlsUrl: string
chatEnabled: boolean
reactionsEnabled: boolean
}
// GET /api/streams/:id/viewers
interface StreamViewers {
total: number
authenticated: number
anonymous: number
viewers: Viewer[]
}
// POST /api/streams/:id/chat
// Send message to stream chat
interface StreamChatMessage {
content: string
replyTo?: string
}
// POST /api/streams/:id/react
// Send reaction to stream
interface StreamReaction {
type: 'like' | 'love' | 'fire' | 'clap' | 'wow'
}
// GET /api/streams/scheduled
// Get upcoming scheduled streams// POST /api/calls/:id/screen/start
interface StartScreenShareRequest {
callId: string
shareType: 'screen' | 'window' | 'tab'
withAudio: boolean
}
interface StartScreenShareResponse {
trackId: string
constraints: MediaTrackConstraints
}
// POST /api/calls/:id/screen/stop
interface StopScreenShareRequest {
callId: string
trackId: string
}// POST /api/webhooks/livekit
// Receives LiveKit server events
interface LiveKitWebhook {
event:
| 'room_started'
| 'room_finished'
| 'participant_joined'
| 'participant_left'
| 'track_published'
| 'track_unpublished'
| 'egress_started'
| 'egress_updated'
| 'egress_ended'
room: {
name: string
sid: string
numParticipants: number
}
participant?: {
identity: string
sid: string
name: string
}
track?: {
sid: string
type: 'audio' | 'video'
source: 'camera' | 'microphone' | 'screen_share'
}
egressInfo?: {
egressId: string
status: string
file?: { filename: string; size: number }
}
}// Add to backend/schema.dbml
// =============================================================================
// CALLS
// =============================================================================
Table nchat_calls {
id uuid [pk, default: `gen_random_uuid()`]
room_name varchar(255) [not null, unique]
room_sid varchar(255) [note: 'LiveKit room SID']
channel_id uuid [ref: > nchat_channels.id]
initiator_id uuid [not null, ref: > nchat_users.id]
type call_type [not null]
status call_status [not null, default: 'initiated']
scheduled_at timestamptz
started_at timestamptz
connected_at timestamptz
ended_at timestamptz
end_reason call_end_reason
max_participants int [default: 100]
metadata jsonb [default: '{}']
created_at timestamptz [not null, default: `now()`]
updated_at timestamptz [not null, default: `now()`]
indexes {
channel_id
initiator_id
status
started_at
(channel_id, status)
}
}
Enum call_type {
audio
video
}
Enum call_status {
initiated
ringing
connecting
connected
reconnecting
ended
}
Enum call_end_reason {
completed
declined
busy
timeout
cancelled
failed
no_answer
network_error
host_ended
kicked
}
// =============================================================================
// CALL PARTICIPANTS
// =============================================================================
Table nchat_call_participants {
id uuid [pk, default: `gen_random_uuid()`]
call_id uuid [not null, ref: > nchat_calls.id, delete: cascade]
user_id uuid [not null, ref: > nchat_users.id]
participant_sid varchar(255) [note: 'LiveKit participant SID']
role participant_role [not null, default: 'participant']
status participant_status [not null, default: 'invited']
invited_at timestamptz [not null, default: `now()`]
joined_at timestamptz
left_at timestamptz
leave_reason varchar(100)
is_muted boolean [not null, default: false]
is_video_enabled boolean [not null, default: false]
is_screen_sharing boolean [not null, default: false]
connection_quality connection_quality
metadata jsonb [default: '{}']
indexes {
call_id
user_id
(call_id, user_id) [unique]
status
}
}
Enum participant_role {
host
co_host
participant
viewer
}
Enum participant_status {
invited
ringing
connecting
connected
disconnected
left
kicked
}
Enum connection_quality {
excellent
good
poor
lost
}
// =============================================================================
// CALL RECORDINGS
// =============================================================================
Table nchat_call_recordings {
id uuid [pk, default: `gen_random_uuid()`]
call_id uuid [not null, ref: > nchat_calls.id, delete: cascade]
egress_id varchar(255) [not null, note: 'LiveKit egress ID']
status recording_status [not null, default: 'starting']
type recording_type [not null, default: 'composite']
layout recording_layout [default: 'grid']
resolution varchar(10) [default: '1080p']
file_path varchar(500)
file_size bigint
duration int [note: 'Duration in seconds']
storage_bucket varchar(255)
thumbnail_path varchar(500)
started_at timestamptz
ended_at timestamptz
expires_at timestamptz [note: 'Retention policy']
error_message text
metadata jsonb [default: '{}']
created_at timestamptz [not null, default: `now()`]
updated_at timestamptz [not null, default: `now()`]
indexes {
call_id
egress_id [unique]
status
expires_at
}
}
Enum recording_status {
starting
recording
processing
ready
failed
deleted
}
Enum recording_type {
composite
individual_tracks
audio_only
}
Enum recording_layout {
grid
speaker
single
custom
}
// =============================================================================
// LIVE STREAMS
// =============================================================================
Table nchat_streams {
id uuid [pk, default: `gen_random_uuid()`]
channel_id uuid [not null, ref: > nchat_channels.id]
host_id uuid [not null, ref: > nchat_users.id]
room_name varchar(255) [not null, unique]
room_sid varchar(255)
title varchar(255) [not null]
description text
visibility stream_visibility [not null, default: 'channel_members']
status stream_status [not null, default: 'scheduled']
// Playback URLs
hls_url varchar(500)
dash_url varchar(500)
rtmp_ingest_url varchar(500)
// Options
chat_enabled boolean [not null, default: true]
reactions_enabled boolean [not null, default: true]
record_stream boolean [not null, default: false]
// Analytics
viewer_count int [not null, default: 0]
peak_viewers int [not null, default: 0]
total_views int [not null, default: 0]
total_reactions int [not null, default: 0]
// Timestamps
scheduled_at timestamptz
started_at timestamptz
ended_at timestamptz
// Recording reference
recording_id uuid [ref: > nchat_call_recordings.id]
metadata jsonb [default: '{}']
created_at timestamptz [not null, default: `now()`]
updated_at timestamptz [not null, default: `now()`]
indexes {
channel_id
host_id
status
scheduled_at
(channel_id, status)
}
}
Enum stream_visibility {
public
channel_members
invite_only
}
Enum stream_status {
scheduled
starting
live
ending
ended
cancelled
}
// =============================================================================
// STREAM VIEWERS
// =============================================================================
Table nchat_stream_viewers {
id uuid [pk, default: `gen_random_uuid()`]
stream_id uuid [not null, ref: > nchat_streams.id, delete: cascade]
user_id uuid [ref: > nchat_users.id, note: 'Null for anonymous viewers']
session_id varchar(255) [not null]
joined_at timestamptz [not null, default: `now()`]
left_at timestamptz
watch_duration int [note: 'Total seconds watched']
device_type varchar(50)
connection_quality connection_quality
indexes {
stream_id
user_id
session_id
(stream_id, session_id) [unique]
}
}
// =============================================================================
// STREAM CHAT
// =============================================================================
Table nchat_stream_chat {
id uuid [pk, default: `gen_random_uuid()`]
stream_id uuid [not null, ref: > nchat_streams.id, delete: cascade]
user_id uuid [not null, ref: > nchat_users.id]
content text [not null]
reply_to_id uuid [ref: > nchat_stream_chat.id]
is_pinned boolean [not null, default: false]
is_highlighted boolean [not null, default: false]
is_deleted boolean [not null, default: false]
deleted_by uuid [ref: > nchat_users.id]
created_at timestamptz [not null, default: `now()`]
indexes {
stream_id
user_id
created_at
(stream_id, created_at)
}
}
// =============================================================================
// STREAM REACTIONS
// =============================================================================
Table nchat_stream_reactions {
id uuid [pk, default: `gen_random_uuid()`]
stream_id uuid [not null, ref: > nchat_streams.id, delete: cascade]
user_id uuid [not null, ref: > nchat_users.id]
type reaction_type [not null]
created_at timestamptz [not null, default: `now()`]
indexes {
stream_id
(stream_id, created_at)
}
}
Enum reaction_type {
like
love
fire
clap
wow
laugh
sad
}
// =============================================================================
// RTMP DESTINATIONS (Simulcast)
// =============================================================================
Table nchat_stream_destinations {
id uuid [pk, default: `gen_random_uuid()`]
stream_id uuid [not null, ref: > nchat_streams.id, delete: cascade]
platform destination_platform [not null]
rtmp_url varchar(500) [not null]
stream_key_encrypted varchar(500) [not null, note: 'Encrypted stream key']
enabled boolean [not null, default: true]
status destination_status [not null, default: 'pending']
error_message text
created_at timestamptz [not null, default: `now()`]
indexes {
stream_id
platform
(stream_id, platform) [unique]
}
}
Enum destination_platform {
youtube
twitch
facebook
custom
}
Enum destination_status {
pending
connecting
connected
disconnected
error
}
// =============================================================================
// ICE SERVERS (TURN/STUN Configuration)
// =============================================================================
Table nchat_ice_servers {
id uuid [pk, default: `gen_random_uuid()`]
urls text[] [not null]
username varchar(255)
credential_encrypted varchar(500)
region varchar(50)
priority int [not null, default: 0]
is_active boolean [not null, default: true]
last_health_check timestamptz
health_status varchar(20) [default: 'unknown']
created_at timestamptz [not null, default: `now()`]
updated_at timestamptz [not null, default: `now()`]
indexes {
is_active
region
priority
}
}
# Hasura metadata for call tables
# nchat_calls permissions
- table:
schema: public
name: nchat_calls
select_permissions:
- role: user
permission:
columns: '*'
filter:
_or:
- initiator_id: { _eq: X-Hasura-User-Id }
- call_participants:
user_id: { _eq: X-Hasura-User-Id }
- channel:
channel_members:
user_id: { _eq: X-Hasura-User-Id }
insert_permissions:
- role: user
permission:
columns: [channel_id, type, scheduled_at, metadata]
check:
_or:
- channel_id: { _is_null: true } # Direct call
- channel:
channel_members:
user_id: { _eq: X-Hasura-User-Id }
set:
initiator_id: X-Hasura-User-Id
status: 'initiated'
# nchat_call_recordings - only participants can view
- table:
schema: public
name: nchat_call_recordings
select_permissions:
- role: user
permission:
columns: '*'
filter:
call:
_or:
- initiator_id: { _eq: X-Hasura-User-Id }
- call_participants:
user_id: { _eq: X-Hasura-User-Id }
# nchat_streams - based on visibility
- table:
schema: public
name: nchat_streams
select_permissions:
- role: user
permission:
columns: '*'
filter:
_or:
- visibility: { _eq: 'public' }
- host_id: { _eq: X-Hasura-User-Id }
- _and:
- visibility: { _eq: 'channel_members' }
- channel:
channel_members:
user_id: { _eq: X-Hasura-User-Id }src/
βββ lib/
β βββ webrtc/
β βββ index.ts # Main exports
β βββ livekit-client.ts # LiveKit wrapper
β βββ media-devices.ts # Device enumeration
β βββ media-constraints.ts # Media constraints factory
β βββ connection-quality.ts # Quality monitoring
β βββ __tests__/
β βββ *.test.ts
βββ hooks/
β βββ webrtc/
β βββ use-room.ts # Room connection hook
β βββ use-participants.ts # Participant management
β βββ use-local-participant.ts # Local user media
β βββ use-tracks.ts # Track management
β βββ use-screen-share.ts # Screen sharing
β βββ use-recording.ts # Recording controls
β βββ use-media-devices.ts # Device selection
βββ components/
β βββ call/
β βββ CallProvider.tsx # Call context provider
β βββ CallOverlay.tsx # In-call UI overlay
β βββ CallControls.tsx # Mute, camera, hang up
β βββ ParticipantGrid.tsx # Video grid layout
β βββ ParticipantTile.tsx # Single participant video
β βββ ScreenShareView.tsx # Screen share display
β βββ CallTimer.tsx # Duration display
β βββ DeviceSelector.tsx # Audio/video device picker
β βββ IncomingCallDialog.tsx # Incoming call UI
β βββ CallQualityIndicator.tsx # Connection quality
βββ stores/
βββ call-store.ts # Already exists, extend
// src/lib/webrtc/livekit-client.ts
import {
Room,
RoomEvent,
Track,
Participant,
LocalParticipant,
RemoteParticipant,
ConnectionState,
DisconnectReason,
RoomOptions,
} from 'livekit-client'
export interface LiveKitConfig {
url: string
token: string
iceServers?: RTCIceServer[]
}
export interface JoinRoomOptions {
autoSubscribe?: boolean
publishDefaults?: {
audioPreset?: AudioPreset
videoPreset?: VideoPreset
screenSharePreset?: ScreenSharePreset
}
}
export type AudioPreset = 'music' | 'speech' | 'speech_low_bandwidth'
export type VideoPreset = 'h1080' | 'h720' | 'h540' | 'h360'
export type ScreenSharePreset = 'h1080_15fps' | 'h720_15fps'
export class LiveKitClient {
private room: Room | null = null
private config: LiveKitConfig | null = null
/**
* Connect to a LiveKit room
*/
async connect(config: LiveKitConfig, options?: JoinRoomOptions): Promise<Room> {
this.config = config
const roomOptions: RoomOptions = {
adaptiveStream: true,
dynacast: true,
publishDefaults: {
audioPreset: options?.publishDefaults?.audioPreset,
videoEncoding: this.getVideoEncoding(options?.publishDefaults?.videoPreset),
screenShareEncoding: this.getScreenShareEncoding(
options?.publishDefaults?.screenSharePreset
),
},
}
// Add custom ICE servers if provided
if (config.iceServers) {
roomOptions.rtcConfig = {
iceServers: config.iceServers,
}
}
this.room = new Room(roomOptions)
// Set up event handlers
this.setupEventHandlers()
// Connect to room
await this.room.connect(config.url, config.token, {
autoSubscribe: options?.autoSubscribe ?? true,
})
return this.room
}
/**
* Disconnect from room
*/
async disconnect(): Promise<void> {
if (this.room) {
await this.room.disconnect()
this.room = null
}
}
/**
* Enable/disable local camera
*/
async setCameraEnabled(enabled: boolean): Promise<void> {
await this.room?.localParticipant.setCameraEnabled(enabled)
}
/**
* Enable/disable local microphone
*/
async setMicrophoneEnabled(enabled: boolean): Promise<void> {
await this.room?.localParticipant.setMicrophoneEnabled(enabled)
}
/**
* Start screen share
*/
async startScreenShare(options?: {
audio?: boolean
contentHint?: 'motion' | 'detail' | 'text'
}): Promise<void> {
await this.room?.localParticipant.setScreenShareEnabled(true, {
audio: options?.audio ?? false,
contentHint: options?.contentHint ?? 'detail',
})
}
/**
* Stop screen share
*/
async stopScreenShare(): Promise<void> {
await this.room?.localParticipant.setScreenShareEnabled(false)
}
/**
* Switch audio/video device
*/
async switchDevice(kind: MediaDeviceKind, deviceId: string): Promise<void> {
switch (kind) {
case 'audioinput':
await this.room?.switchActiveDevice('audioinput', deviceId)
break
case 'videoinput':
await this.room?.switchActiveDevice('videoinput', deviceId)
break
case 'audiooutput':
await this.room?.switchActiveDevice('audiooutput', deviceId)
break
}
}
/**
* Get current room
*/
getRoom(): Room | null {
return this.room
}
/**
* Get local participant
*/
getLocalParticipant(): LocalParticipant | undefined {
return this.room?.localParticipant
}
/**
* Get remote participants
*/
getRemoteParticipants(): Map<string, RemoteParticipant> | undefined {
return this.room?.remoteParticipants
}
// Private methods
private setupEventHandlers(): void {
if (!this.room) return
this.room.on(RoomEvent.Connected, () => {
console.log('[LiveKit] Connected to room')
})
this.room.on(RoomEvent.Disconnected, (reason?: DisconnectReason) => {
console.log('[LiveKit] Disconnected:', reason)
})
this.room.on(RoomEvent.Reconnecting, () => {
console.log('[LiveKit] Reconnecting...')
})
this.room.on(RoomEvent.Reconnected, () => {
console.log('[LiveKit] Reconnected')
})
this.room.on(RoomEvent.ParticipantConnected, (participant) => {
console.log('[LiveKit] Participant joined:', participant.identity)
})
this.room.on(RoomEvent.ParticipantDisconnected, (participant) => {
console.log('[LiveKit] Participant left:', participant.identity)
})
this.room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
console.log('[LiveKit] Track subscribed:', track.kind, participant.identity)
})
this.room.on(RoomEvent.ActiveSpeakersChanged, (speakers) => {
console.log(
'[LiveKit] Active speakers:',
speakers.map((s) => s.identity)
)
})
this.room.on(RoomEvent.ConnectionQualityChanged, (quality, participant) => {
console.log('[LiveKit] Quality changed:', quality, participant.identity)
})
}
private getVideoEncoding(preset?: VideoPreset) {
const presets: Record<VideoPreset, { maxBitrate: number; maxFramerate: number }> = {
h1080: { maxBitrate: 3_000_000, maxFramerate: 30 },
h720: { maxBitrate: 1_700_000, maxFramerate: 30 },
h540: { maxBitrate: 900_000, maxFramerate: 25 },
h360: { maxBitrate: 500_000, maxFramerate: 20 },
}
return preset ? presets[preset] : presets.h720
}
private getScreenShareEncoding(preset?: ScreenSharePreset) {
const presets: Record<ScreenSharePreset, { maxBitrate: number; maxFramerate: number }> = {
h1080_15fps: { maxBitrate: 3_000_000, maxFramerate: 15 },
h720_15fps: { maxBitrate: 1_500_000, maxFramerate: 15 },
}
return preset ? presets[preset] : presets['h1080_15fps']
}
}
// Singleton instance
export const liveKitClient = new LiveKitClient()// src/hooks/webrtc/use-room.ts
import { useState, useEffect, useCallback } from 'react'
import {
Room,
RoomEvent,
ConnectionState,
RemoteParticipant,
LocalParticipant,
} from 'livekit-client'
import { liveKitClient, LiveKitConfig, JoinRoomOptions } from '@/lib/webrtc/livekit-client'
import { useCallStore } from '@/stores/call-store'
import { captureError } from '@/lib/sentry-utils'
export interface UseRoomOptions extends JoinRoomOptions {
onParticipantJoined?: (participant: RemoteParticipant) => void
onParticipantLeft?: (participant: RemoteParticipant) => void
onDisconnected?: (reason?: string) => void
}
export interface UseRoomReturn {
room: Room | null
connectionState: ConnectionState
localParticipant: LocalParticipant | undefined
remoteParticipants: RemoteParticipant[]
connect: (config: LiveKitConfig) => Promise<void>
disconnect: () => Promise<void>
isConnecting: boolean
error: Error | null
}
export function useRoom(options?: UseRoomOptions): UseRoomReturn {
const [room, setRoom] = useState<Room | null>(null)
const [connectionState, setConnectionState] = useState<ConnectionState>(
ConnectionState.Disconnected
)
const [remoteParticipants, setRemoteParticipants] = useState<RemoteParticipant[]>([])
const [isConnecting, setIsConnecting] = useState(false)
const [error, setError] = useState<Error | null>(null)
const { setCallState, setCallConnected, setCallReconnecting } = useCallStore()
// Update participants list
const updateParticipants = useCallback((room: Room) => {
setRemoteParticipants(Array.from(room.remoteParticipants.values()))
}, [])
// Connect to room
const connect = useCallback(
async (config: LiveKitConfig) => {
setIsConnecting(true)
setError(null)
try {
const connectedRoom = await liveKitClient.connect(config, options)
setRoom(connectedRoom)
updateParticipants(connectedRoom)
// Set up room event handlers
connectedRoom.on(RoomEvent.ConnectionStateChanged, (state) => {
setConnectionState(state)
switch (state) {
case ConnectionState.Connected:
setCallConnected()
break
case ConnectionState.Reconnecting:
setCallReconnecting()
break
case ConnectionState.Disconnected:
setCallState('ended')
break
}
})
connectedRoom.on(RoomEvent.ParticipantConnected, (participant) => {
updateParticipants(connectedRoom)
options?.onParticipantJoined?.(participant)
})
connectedRoom.on(RoomEvent.ParticipantDisconnected, (participant) => {
updateParticipants(connectedRoom)
options?.onParticipantLeft?.(participant)
})
connectedRoom.on(RoomEvent.Disconnected, (reason) => {
setRoom(null)
setRemoteParticipants([])
options?.onDisconnected?.(reason)
})
} catch (err) {
const error = err instanceof Error ? err : new Error('Failed to connect')
setError(error)
captureError(error, { tags: { feature: 'webrtc', action: 'connect' } })
} finally {
setIsConnecting(false)
}
},
[options, updateParticipants, setCallConnected, setCallReconnecting, setCallState]
)
// Disconnect from room
const disconnect = useCallback(async () => {
await liveKitClient.disconnect()
setRoom(null)
setRemoteParticipants([])
setConnectionState(ConnectionState.Disconnected)
}, [])
// Cleanup on unmount
useEffect(() => {
return () => {
disconnect()
}
}, [disconnect])
return {
room,
connectionState,
localParticipant: room?.localParticipant,
remoteParticipants,
connect,
disconnect,
isConnecting,
error,
}
}The existing /Users/admin/Sites/nself-chat/src/lib/voip-push.ts provides the foundation. Extend it:
// src/platforms/capacitor/src/native/call-kit.ts
import { registerPlugin } from '@capacitor/core'
export interface CallKitPlugin {
setup(options: { appName: string; iconTemplate?: string; ringtoneSound?: string }): Promise<void>
reportIncomingCall(options: {
uuid: string
handle: string
handleType: 'generic' | 'email' | 'phone'
hasVideo: boolean
callerDisplayName?: string
callerImageUrl?: string
}): Promise<void>
endCall(reason: CallEndReason, uuid?: string): Promise<void>
setMutedState(muted: boolean, uuid: string): Promise<void>
setHeldState(held: boolean, uuid: string): Promise<void>
addListener(
eventName: CallKitEvent,
callback: (data: any) => void
): Promise<{ remove: () => Promise<void> }>
}
export type CallEndReason =
| 'failed'
| 'remoteEnded'
| 'unanswered'
| 'answeredElsewhere'
| 'declinedElsewhere'
export type CallKitEvent = 'callStarted' | 'callAnswered' | 'callEnded' | 'callMuted' | 'callHeld'
const CallKit = registerPlugin<CallKitPlugin>('CallKit')
export class CallKitManager {
private initialized = false
async initialize(): Promise<void> {
if (this.initialized) return
await CallKit.setup({
appName: 'nchat',
iconTemplate: 'CallKitIcon',
ringtoneSound: 'ringtone.wav',
})
// Set up listeners
await this.setupListeners()
this.initialized = true
}
private async setupListeners(): Promise<void> {
await CallKit.addListener('callAnswered', async (data) => {
// User answered via CallKit
// Trigger WebRTC connection
})
await CallKit.addListener('callEnded', async (data) => {
// User ended via CallKit
// Disconnect WebRTC
})
await CallKit.addListener('callMuted', async (data) => {
// User toggled mute via CallKit
})
}
async reportIncomingCall(options: {
uuid: string
handle: string
handleType: 'generic' | 'email' | 'phone'
hasVideo: boolean
callerDisplayName?: string
callerImageUrl?: string
}): Promise<void> {
await CallKit.reportIncomingCall(options)
}
async endCall(reason: CallEndReason, uuid?: string): Promise<void> {
await CallKit.endCall(reason, uuid)
}
}
export const callKitManager = new CallKitManager()// src/lib/webrtc/connection-quality.ts
import { ConnectionQuality as LKConnectionQuality, Participant } from 'livekit-client'
export type ConnectionQuality = 'excellent' | 'good' | 'poor' | 'lost' | 'unknown'
export interface ConnectionStats {
quality: ConnectionQuality
packetLoss: number
jitter: number
latency: number
bitrate: {
upload: number
download: number
}
}
export function mapConnectionQuality(lkQuality: LKConnectionQuality): ConnectionQuality {
switch (lkQuality) {
case LKConnectionQuality.Excellent:
return 'excellent'
case LKConnectionQuality.Good:
return 'good'
case LKConnectionQuality.Poor:
return 'poor'
case LKConnectionQuality.Lost:
return 'lost'
default:
return 'unknown'
}
}
export class ConnectionQualityMonitor {
private statsInterval: NodeJS.Timeout | null = null
private lastStats: Map<string, ConnectionStats> = new Map()
private callbacks: Set<(stats: Map<string, ConnectionStats>) => void> = new Set()
start(room: any, intervalMs = 2000): void {
this.stop()
this.statsInterval = setInterval(async () => {
await this.collectStats(room)
}, intervalMs)
}
stop(): void {
if (this.statsInterval) {
clearInterval(this.statsInterval)
this.statsInterval = null
}
}
subscribe(callback: (stats: Map<string, ConnectionStats>) => void): () => void {
this.callbacks.add(callback)
return () => this.callbacks.delete(callback)
}
private async collectStats(room: any): Promise<void> {
// Collect stats from room participants
const newStats = new Map<string, ConnectionStats>()
// Add local participant stats
const localStats = await this.getParticipantStats(room.localParticipant)
newStats.set(room.localParticipant.identity, localStats)
// Add remote participant stats
for (const [identity, participant] of room.remoteParticipants) {
const stats = await this.getParticipantStats(participant)
newStats.set(identity, stats)
}
this.lastStats = newStats
// Notify subscribers
this.callbacks.forEach((cb) => cb(newStats))
}
private async getParticipantStats(participant: Participant): Promise<ConnectionStats> {
return {
quality: mapConnectionQuality(participant.connectionQuality),
packetLoss: 0, // Would need WebRTC stats API
jitter: 0,
latency: 0,
bitrate: {
upload: 0,
download: 0,
},
}
}
} ββββββββββββββββββββ
β LiveKit Room β
β (Active Call) β
ββββββββββ¬ββββββββββ
β
ββββββββββΌββββββββββ
β LiveKit Egress β
β Service β
ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββββββββΌβββββββββββββββββββββββββ
β β β
ββββββββββΌβββββββββ βββββββββββΌβββββββββ ββββββββββΌβββββββββ
β Room Composite β β Track Composite β β Individual β
β (Grid Layout) β β (Custom Layout) β β Track Export β
ββββββββββ¬βββββββββ βββββββββββ¬βββββββββ ββββββββββ¬βββββββββ
β β β
ββββββββββββββββββββββββββΌβββββββββββββββββββββββββ
β
ββββββββββΌββββββββββ
β Transcoding β
β (VP8βH.264) β
ββββββββββ¬ββββββββββ
β
ββββββββββΌββββββββββ
β MinIO/S3 β
β Storage β
ββββββββββ¬ββββββββββ
β
βββββββββββββββΌββββββββββββββ
β β β
ββββββββββΌββββ ββββββββΌββββββ ββββββΌββββββββββ
β Video β β Thumbnail β β Metadata β
β File β β Images β β JSON β
ββββββββββββββ ββββββββββββββ ββββββββββββββββ
# backend/config/egress.yaml
log_level: info
api_key: ${LIVEKIT_API_KEY}
api_secret: ${LIVEKIT_API_SECRET}
ws_url: ws://livekit:7880
# S3-compatible storage (MinIO)
s3:
access_key: ${MINIO_ACCESS_KEY}
secret: ${MINIO_SECRET_KEY}
region: us-east-1
endpoint: http://minio:9000
bucket: nchat-recordings
force_path_style: true
# CPU/memory limits
cpu_cost:
room_composite_cpus: 3
track_composite_cpus: 2
track_cpus: 1
enable_chrome_sandbox: true// src/services/recording-service.ts
import { EgressClient, RoomCompositeEgressRequest, EncodedFileOutput } from 'livekit-server-sdk'
export interface RecordingOptions {
callId: string
roomName: string
layout?: 'grid' | 'speaker' | 'single'
resolution?: '720p' | '1080p' | '4k'
audioOnly?: boolean
customLayoutUrl?: string
}
export interface RecordingResult {
egressId: string
status: 'starting' | 'recording' | 'complete' | 'failed'
fileUrl?: string
duration?: number
error?: string
}
export class RecordingService {
private client: EgressClient
constructor() {
this.client = new EgressClient(
process.env.LIVEKIT_URL!,
process.env.LIVEKIT_API_KEY!,
process.env.LIVEKIT_API_SECRET!
)
}
async startRecording(options: RecordingOptions): Promise<string> {
const { callId, roomName, layout, resolution, audioOnly } = options
const fileOutput: EncodedFileOutput = {
fileType: audioOnly ? 'OGG' : 'MP4',
filepath: `recordings/${callId}/{room_name}-{time}.${audioOnly ? 'ogg' : 'mp4'}`,
s3: {
accessKey: process.env.MINIO_ACCESS_KEY!,
secret: process.env.MINIO_SECRET_KEY!,
region: 'us-east-1',
endpoint: process.env.MINIO_ENDPOINT!,
bucket: 'nchat-recordings',
forcePathStyle: true,
},
}
const request: RoomCompositeEgressRequest = {
roomName,
layout: this.getLayout(layout),
videoOnly: false,
audioOnly: audioOnly ?? false,
file: fileOutput,
}
// Add custom layout if provided
if (options.customLayoutUrl) {
request.customBaseUrl = options.customLayoutUrl
}
const info = await this.client.startRoomCompositeEgress(roomName, request)
return info.egressId
}
async stopRecording(egressId: string): Promise<RecordingResult> {
const info = await this.client.stopEgress(egressId)
return {
egressId: info.egressId,
status: this.mapStatus(info.status),
fileUrl: info.file?.location,
duration: info.file?.duration,
}
}
async getRecordingStatus(egressId: string): Promise<RecordingResult> {
const infos = await this.client.listEgress({ egressId })
const info = infos[0]
if (!info) {
throw new Error('Recording not found')
}
return {
egressId: info.egressId,
status: this.mapStatus(info.status),
fileUrl: info.file?.location,
duration: info.file?.duration,
error: info.error,
}
}
private getLayout(layout?: string): string {
switch (layout) {
case 'speaker':
return 'speaker-dark'
case 'single':
return 'single-speaker'
case 'grid':
default:
return 'grid-dark'
}
}
private mapStatus(status: number): RecordingResult['status'] {
// LiveKit egress status codes
switch (status) {
case 0:
return 'starting'
case 1:
return 'recording'
case 2:
return 'complete'
default:
return 'failed'
}
}
}
export const recordingService = new RecordingService()// src/services/recording-retention.ts
export interface RetentionPolicy {
id: string
name: string
retentionDays: number
applyTo: 'all' | 'channel' | 'user'
targetId?: string
}
const DEFAULT_POLICIES: RetentionPolicy[] = [
{ id: 'free', name: 'Free Tier', retentionDays: 7, applyTo: 'all' },
{ id: 'pro', name: 'Pro Tier', retentionDays: 30, applyTo: 'all' },
{ id: 'enterprise', name: 'Enterprise', retentionDays: 365, applyTo: 'all' },
]
export class RecordingRetentionService {
/**
* Apply retention policy - delete expired recordings
*/
async enforceRetention(): Promise<void> {
// Query recordings past retention period
// Delete from storage and database
}
/**
* Get retention policy for a recording
*/
async getPolicy(userId: string, channelId?: string): Promise<RetentionPolicy> {
// Check user's subscription tier
// Return appropriate policy
return DEFAULT_POLICIES[0]
}
}βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β BROADCASTER β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
β β Browser ββββββββ LiveKit ββββββββ LiveKit Server β β
β β (WebRTC) β β Client SDK β β (Room: stream_123) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββ¬ββββββββββββββ β
β β β
β βββββββββββββββ βββββββββββββΌββββββββββββββ β
β β OBS/Wire ββββββββββββββββββββββββββββ LiveKit Ingress β β
β β cast β RTMP/WHIP β (RTMP/WHIP Input) β β
β βββββββββββββββ βββββββββββββ¬ββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββ
β
βββββββββββββΌββββββββββββββ
β LiveKit Egress β
β (HLS/DASH Output) β
βββββββββββββ¬ββββββββββββββ
β
ββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββ
β β β
ββββββββββΌβββββββββ ββββββββββΌβββββββββ βββββββββββΌβββββββββ
β HLS Stream β β DASH Stream β β RTMP Simulcast β
β (CDN Ready) β β (CDN Ready) β β (YouTube/Twitch)β
ββββββββββ¬βββββββββ ββββββββββ¬βββββββββ ββββββββββββββββββββ
β β
ββββββββββββββββ¬βββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββββββββββββββ
β VIEWERS (Unlimited Scale) β
β β β
β ββββββββββββββββββ ββββββββββββββββββΌβββββββββββββββββ ββββββββββββββββββ β
β β Video.js/HLS.jsβ β Video.js/HLS.jsβ Video.js/HLS.jsβ β Video.js/HLS.jsβ β
β β Web β β iOS App β Android App β β Desktop β β
β ββββββββββββββββββ ββββββββββββββββββ βββββββββββββββ ββββββββββββββββββ β
β β β
β ββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββββββββββ β
β β Stream Chat (WebSocket) β β
β β Reactions (WebSocket) β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# backend/config/egress.yaml (extended for streaming)
# HLS output settings
hls:
segment_duration: 4
playlist_type: 'live'
segments_per_playlist: 5
# RTMP output for simulcast
rtmp:
- url: rtmp://a.rtmp.youtube.com/live2
stream_key: ${YOUTUBE_STREAM_KEY}
- url: rtmp://live.twitch.tv/app
stream_key: ${TWITCH_STREAM_KEY}// src/services/streaming-service.ts
import {
EgressClient,
RoomCompositeEgressRequest,
StreamOutput,
SegmentedFileOutput,
} from 'livekit-server-sdk'
import { RoomServiceClient, ParticipantPermission } from 'livekit-server-sdk'
export interface StreamOptions {
channelId: string
title: string
description?: string
visibility: 'public' | 'channel_members' | 'invite_only'
enableChat: boolean
enableReactions: boolean
recordStream: boolean
rtmpDestinations?: Array<{
platform: 'youtube' | 'twitch' | 'facebook' | 'custom'
url: string
streamKey: string
}>
}
export interface StreamInfo {
streamId: string
roomName: string
hlsUrl: string
dashUrl?: string
rtmpIngestUrl?: string
status: 'starting' | 'live' | 'ended'
}
export class StreamingService {
private egressClient: EgressClient
private roomClient: RoomServiceClient
constructor() {
const url = process.env.LIVEKIT_URL!
const apiKey = process.env.LIVEKIT_API_KEY!
const apiSecret = process.env.LIVEKIT_API_SECRET!
this.egressClient = new EgressClient(url, apiKey, apiSecret)
this.roomClient = new RoomServiceClient(url, apiKey, apiSecret)
}
async startStream(hostId: string, options: StreamOptions): Promise<StreamInfo> {
const streamId = crypto.randomUUID()
const roomName = `stream_${streamId}`
// Create room with appropriate settings
await this.roomClient.createRoom({
name: roomName,
emptyTimeout: 3600, // 1 hour
maxParticipants: 1, // Only host can publish
})
// Start HLS egress
const hlsOutput: SegmentedFileOutput = {
protocol: 'HLS',
filenamePrefix: `streams/${streamId}/`,
playlistName: 'playlist.m3u8',
segmentDuration: 4,
s3: {
accessKey: process.env.MINIO_ACCESS_KEY!,
secret: process.env.MINIO_SECRET_KEY!,
region: 'us-east-1',
endpoint: process.env.MINIO_ENDPOINT!,
bucket: 'nchat-streams',
forcePathStyle: true,
},
}
// Build outputs array
const outputs: any[] = [hlsOutput]
// Add RTMP simulcast destinations
if (options.rtmpDestinations?.length) {
for (const dest of options.rtmpDestinations) {
outputs.push({
url: `${dest.url}/${dest.streamKey}`,
} as StreamOutput)
}
}
// Start egress
await this.egressClient.startRoomCompositeEgress(roomName, {
roomName,
layout: 'speaker-dark',
segments: hlsOutput,
stream: options.rtmpDestinations?.length
? { urls: outputs.slice(1).map((o) => o.url) }
: undefined,
})
return {
streamId,
roomName,
hlsUrl: `${process.env.CDN_URL}/streams/${streamId}/playlist.m3u8`,
status: 'starting',
}
}
async stopStream(streamId: string): Promise<void> {
const roomName = `stream_${streamId}`
// Stop all egresses for this room
const egresses = await this.egressClient.listEgress({ roomName })
for (const egress of egresses) {
await this.egressClient.stopEgress(egress.egressId)
}
// End the room
await this.roomClient.deleteRoom(roomName)
}
async getStreamToken(streamId: string, userId: string, isHost: boolean): Promise<string> {
const roomName = `stream_${streamId}`
const permissions: ParticipantPermission = isHost
? { canPublish: true, canSubscribe: true, canPublishData: true }
: { canPublish: false, canSubscribe: true, canPublishData: false }
// Generate token with appropriate permissions
const { AccessToken } = await import('livekit-server-sdk')
const token = new AccessToken(process.env.LIVEKIT_API_KEY!, process.env.LIVEKIT_API_SECRET!, {
identity: userId,
ttl: '4h',
})
token.addGrant({
room: roomName,
roomJoin: true,
...permissions,
})
return token.toJwt()
}
}
export const streamingService = new StreamingService()// src/components/stream/StreamChat.tsx
'use client';
import { useState, useEffect, useRef, useCallback } from 'react';
import { useRealtime } from '@/contexts/realtime-context';
import { useAuth } from '@/contexts/auth-context';
import { ScrollArea } from '@/components/ui/scroll-area';
import { Input } from '@/components/ui/input';
import { Button } from '@/components/ui/button';
import { Send } from 'lucide-react';
interface StreamChatMessage {
id: string;
userId: string;
userName: string;
content: string;
createdAt: string;
isHighlighted?: boolean;
}
interface StreamChatProps {
streamId: string;
disabled?: boolean;
}
export function StreamChat({ streamId, disabled }: StreamChatProps) {
const [messages, setMessages] = useState<StreamChatMessage[]>([]);
const [input, setInput] = useState('');
const scrollRef = useRef<HTMLDivElement>(null);
const { socket } = useRealtime();
const { user } = useAuth();
// Subscribe to chat messages
useEffect(() => {
if (!socket) return;
const handleMessage = (message: StreamChatMessage) => {
setMessages(prev => [...prev, message].slice(-200)); // Keep last 200
};
socket.on(`stream:${streamId}:chat`, handleMessage);
return () => {
socket.off(`stream:${streamId}:chat`, handleMessage);
};
}, [socket, streamId]);
// Auto-scroll to bottom
useEffect(() => {
if (scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
}
}, [messages]);
const sendMessage = useCallback(async () => {
if (!input.trim() || !user || disabled) return;
try {
await fetch(`/api/streams/${streamId}/chat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ content: input.trim() }),
});
setInput('');
} catch (error) {
console.error('Failed to send message:', error);
}
}, [input, user, disabled, streamId]);
return (
<div className="flex flex-col h-full">
<ScrollArea ref={scrollRef} className="flex-1 p-2">
{messages.map((msg) => (
<div
key={msg.id}
className={`mb-2 ${msg.isHighlighted ? 'bg-primary/10 p-2 rounded' : ''}`}
>
<span className="font-semibold text-primary">{msg.userName}: </span>
<span>{msg.content}</span>
</div>
))}
</ScrollArea>
<div className="p-2 border-t flex gap-2">
<Input
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyDown={(e) => e.key === 'Enter' && sendMessage()}
placeholder={disabled ? 'Chat disabled' : 'Send a message...'}
disabled={disabled}
/>
<Button onClick={sendMessage} disabled={disabled || !input.trim()}>
<Send className="h-4 w-4" />
</Button>
</div>
</div>
);
}// src/components/stream/StreamReactions.tsx
'use client';
import { useState, useEffect, useCallback } from 'react';
import { motion, AnimatePresence } from 'framer-motion';
import { useRealtime } from '@/contexts/realtime-context';
import { Button } from '@/components/ui/button';
import { Heart, Flame, Star, HandMetal, PartyPopper } from 'lucide-react';
interface FloatingReaction {
id: string;
type: string;
x: number;
}
interface StreamReactionsProps {
streamId: string;
disabled?: boolean;
}
const REACTION_ICONS: Record<string, React.ReactNode> = {
like: <Heart className="h-6 w-6 text-red-500 fill-red-500" />,
fire: <Flame className="h-6 w-6 text-orange-500 fill-orange-500" />,
wow: <Star className="h-6 w-6 text-yellow-500 fill-yellow-500" />,
clap: <HandMetal className="h-6 w-6 text-blue-500" />,
party: <PartyPopper className="h-6 w-6 text-purple-500" />,
};
export function StreamReactions({ streamId, disabled }: StreamReactionsProps) {
const [floating, setFloating] = useState<FloatingReaction[]>([]);
const { socket } = useRealtime();
// Subscribe to reactions
useEffect(() => {
if (!socket) return;
const handleReaction = (data: { type: string }) => {
const id = crypto.randomUUID();
const x = 20 + Math.random() * 60; // Random position 20-80%
setFloating(prev => [...prev, { id, type: data.type, x }]);
// Remove after animation
setTimeout(() => {
setFloating(prev => prev.filter(r => r.id !== id));
}, 2000);
};
socket.on(`stream:${streamId}:reaction`, handleReaction);
return () => {
socket.off(`stream:${streamId}:reaction`, handleReaction);
};
}, [socket, streamId]);
const sendReaction = useCallback(async (type: string) => {
if (disabled) return;
try {
await fetch(`/api/streams/${streamId}/react`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ type }),
});
} catch (error) {
console.error('Failed to send reaction:', error);
}
}, [disabled, streamId]);
return (
<div className="relative">
{/* Floating reactions */}
<div className="absolute inset-0 pointer-events-none overflow-hidden">
<AnimatePresence>
{floating.map((reaction) => (
<motion.div
key={reaction.id}
initial={{ y: '100%', x: `${reaction.x}%`, opacity: 1, scale: 1 }}
animate={{ y: '-100%', opacity: 0, scale: 1.5 }}
exit={{ opacity: 0 }}
transition={{ duration: 2, ease: 'easeOut' }}
className="absolute bottom-0"
>
{REACTION_ICONS[reaction.type]}
</motion.div>
))}
</AnimatePresence>
</div>
{/* Reaction buttons */}
<div className="flex gap-1 p-2">
{Object.entries(REACTION_ICONS).map(([type, icon]) => (
<Button
key={type}
variant="ghost"
size="sm"
onClick={() => sendReaction(type)}
disabled={disabled}
className="hover:scale-110 transition-transform"
>
{icon}
</Button>
))}
</div>
</div>
);
}ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LOAD BALANCER β
β (Geographic DNS / Anycast) β
βββββββββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββΌββββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββββ βββββββββββββββββ βββββββββββββββββ
β US-East β β EU-West β β APAC β
β Region β β Region β β Region β
βββββββββββββββββ€ βββββββββββββββββ€ βββββββββββββββββ€
β β β β β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β LiveKit β βββββββββββΊβ β LiveKit β βββββββββββΊβ β LiveKit β β
β β Server 1 β β Cascade β β Server 1 β β Cascade β β Server 1 β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β LiveKit β β β β LiveKit β β β β LiveKit β β
β β Server 2 β β β β Server 2 β β β β Server 2 β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β β β β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β Coturn β β β β Coturn β β β β Coturn β β
β β TURN β β β β TURN β β β β TURN β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β β β β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β Redis βββΌβββββββββββΌβΊβ Redis βββΌβββββββββββΌβΊβ Redis β β
β β Cluster β β Sync β β Cluster β β Sync β β Cluster β β
β βββββββββββββ β β βββββββββββββ β β βββββββββββββ β
β β β β β β
βββββββββββββββββ βββββββββββββββββ βββββββββββββββββ
| Scenario | LiveKit Nodes | TURN Bandwidth | Redis | Storage |
|---|---|---|---|---|
| Small (100 concurrent calls) | 2 | 100 Mbps | 1 node | 100 GB |
| Medium (1,000 concurrent calls) | 4-6 | 1 Gbps | 3 nodes | 1 TB |
| Large (10,000 concurrent calls) | 20-30 | 10 Gbps | 6 nodes | 10 TB |
| Enterprise (100,000+) | 100+ | 100 Gbps | Cluster | 100 TB+ |
// LiveKit simulcast configuration
const publishDefaults = {
simulcast: true,
videoSimulcastLayers: [
{ width: 1280, height: 720, encoding: { maxBitrate: 1_500_000 } },
{ width: 640, height: 360, encoding: { maxBitrate: 500_000 } },
{ width: 320, height: 180, encoding: { maxBitrate: 150_000 } },
],
screenShareSimulcastLayers: [
{ width: 1920, height: 1080, encoding: { maxBitrate: 3_000_000 } },
{ width: 1280, height: 720, encoding: { maxBitrate: 1_000_000 } },
],
}# deploy/k8s/livekit-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: livekit-server
namespace: nchat
spec:
replicas: 3
selector:
matchLabels:
app: livekit-server
template:
metadata:
labels:
app: livekit-server
spec:
containers:
- name: livekit
image: livekit/livekit-server:v1.7
ports:
- containerPort: 7880
name: http
- containerPort: 7881
name: rtc-tcp
protocol: TCP
- containerPort: 7882
name: rtc-udp
protocol: UDP
resources:
requests:
cpu: '2'
memory: '4Gi'
limits:
cpu: '4'
memory: '8Gi'
env:
- name: LIVEKIT_KEYS
valueFrom:
secretKeyRef:
name: livekit-secrets
key: api-keys
volumeMounts:
- name: config
mountPath: /etc/livekit.yaml
subPath: livekit.yaml
volumes:
- name: config
configMap:
name: livekit-config
---
apiVersion: v1
kind: Service
metadata:
name: livekit-server
namespace: nchat
spec:
type: LoadBalancer
ports:
- port: 7880
targetPort: 7880
name: http
- port: 7881
targetPort: 7881
protocol: TCP
name: rtc-tcp
- port: 7882
targetPort: 7882
protocol: UDP
name: rtc-udp
selector:
app: livekit-serverRecommendation: Create a new webrtc nself plugin that integrates with the existing realtime plugin.
nself-plugins/
βββ packages/
β βββ webrtc/
β βββ package.json
β βββ README.md
β βββ src/
β β βββ index.ts # Main plugin entry
β β βββ config.ts # Plugin configuration
β β βββ livekit/
β β β βββ server.ts # LiveKit server management
β β β βββ tokens.ts # Token generation
β β β βββ webhooks.ts # Webhook handlers
β β βββ coturn/
β β β βββ server.ts # TURN server management
β β β βββ credentials.ts # Credential generation
β β βββ recordings/
β β β βββ service.ts # Recording management
β β β βββ retention.ts # Retention policies
β β βββ streaming/
β β βββ service.ts # Streaming management
β β βββ simulcast.ts # RTMP destinations
β βββ migrations/
β β βββ 001_webrtc_tables.sql
β βββ docker/
β β βββ livekit.yaml
β β βββ egress.yaml
β β βββ turnserver.conf
β βββ tests/
β βββ *.test.ts
// nself-plugins/packages/webrtc/src/config.ts
export interface WebRTCPluginConfig {
// LiveKit
livekit: {
enabled: boolean
url: string
apiKey: string
apiSecret: string
webhookSecret: string
}
// TURN/STUN
turn: {
enabled: boolean
urls: string[]
domain: string
secret: string
credentialTTL: number // seconds
}
// Recording
recording: {
enabled: boolean
storage: 'minio' | 's3'
bucket: string
defaultRetentionDays: number
}
// Streaming
streaming: {
enabled: boolean
maxConcurrentStreams: number
hlsSegmentDuration: number
}
// Limits
limits: {
maxParticipantsPerCall: number
maxCallDuration: number // minutes
maxRecordingDuration: number // minutes
}
}
export const defaultConfig: WebRTCPluginConfig = {
livekit: {
enabled: true,
url: 'ws://localhost:7880',
apiKey: '',
apiSecret: '',
webhookSecret: '',
},
turn: {
enabled: true,
urls: ['turn:turn.localhost:3478'],
domain: 'localhost',
secret: '',
credentialTTL: 86400,
},
recording: {
enabled: true,
storage: 'minio',
bucket: 'nchat-recordings',
defaultRetentionDays: 30,
},
streaming: {
enabled: true,
maxConcurrentStreams: 10,
hlsSegmentDuration: 4,
},
limits: {
maxParticipantsPerCall: 100,
maxCallDuration: 480, // 8 hours
maxRecordingDuration: 480,
},
}# Install webrtc plugin
nself plugin install webrtc
# Initialize with configuration
nself plugin webrtc init \
--livekit-api-key=<key> \
--livekit-api-secret=<secret> \
--turn-domain=turn.example.com
# Generate TURN credentials
nself plugin webrtc turn:credentials --user=<user-id>
# Verify installation
nself plugin webrtc statusThe webrtc plugin should integrate with the existing realtime plugin for signaling:
// In realtime plugin, add WebRTC signaling events
export const WEBRTC_EVENTS = {
// Call lifecycle
CALL_INITIATE: 'call:initiate',
CALL_INCOMING: 'call:incoming',
CALL_ACCEPT: 'call:accept',
CALL_DECLINE: 'call:decline',
CALL_END: 'call:end',
CALL_CANCELLED: 'call:cancelled',
// Call state
CALL_STATE_CHANGED: 'call:state_changed',
CALL_PARTICIPANT_JOINED: 'call:participant_joined',
CALL_PARTICIPANT_LEFT: 'call:participant_left',
// Media state
CALL_MUTE_CHANGED: 'call:mute_changed',
CALL_VIDEO_CHANGED: 'call:video_changed',
CALL_SCREEN_SHARE_STARTED: 'call:screen_share_started',
CALL_SCREEN_SHARE_STOPPED: 'call:screen_share_stopped',
// Recording
RECORDING_STARTED: 'recording:started',
RECORDING_STOPPED: 'recording:stopped',
// Streaming
STREAM_STARTED: 'stream:started',
STREAM_ENDED: 'stream:ended',
STREAM_CHAT: 'stream:chat',
STREAM_REACTION: 'stream:reaction',
}- Set up LiveKit server in Docker
- Configure Coturn TURN server
- Create database migrations
- Implement token generation service
- Set up webhook handling
- Implement call initiation API
- Create LiveKit client wrapper
- Build call UI components
- Implement call store updates
- Add signaling events to realtime plugin
- Implement mute/unmute
- Add video toggle
- Implement screen sharing
- Add device selection
- Build connection quality monitoring
- Implement recording service
- Configure egress for S3/MinIO
- Build recording UI
- Implement retention policies
- Add recording playback
- Implement streaming service
- Configure HLS output
- Build streaming UI (host)
- Build viewer experience
- Implement stream chat and reactions
- Implement CallKit (iOS)
- Implement ConnectionService (Android)
- Add VoIP push notifications
- Test on physical devices
- Optimize for battery/performance
- Load testing (10k+ concurrent)
- Failover testing
- Security audit
- Documentation
- Final QA
// Token validation for all WebRTC APIs
export async function validateWebRTCAccess(
userId: string,
callId: string,
requiredRole?: ParticipantRole
): Promise<boolean> {
// 1. Verify user is authenticated
// 2. Check call exists and is active
// 3. Verify user is participant (or has permission)
// 4. Check role-based permissions (host, co-host, etc.)
return true
}- Ephemeral credentials: Generate time-limited credentials per user
- IP allowlisting: Restrict relay to known IP ranges
- Rate limiting: Prevent abuse/DoS
- TLS: Always use TURNS (TLS-encrypted TURN)
For Signal-grade E2EE with calls:
// Note: E2EE breaks server-side recording
// Options:
// 1. Client-side recording only (upload after)
// 2. Insertable Streams API for E2EE with SFU
// 3. SFrame encryption (LiveKit supports this)
const room = new Room({
e2ee: {
keyProvider: new ExternalKeyProvider(),
worker: new Worker('/e2ee-worker.js'),
},
})// Validate all API inputs
import { z } from 'zod'
const initiateCallSchema = z.object({
targetUserId: z.string().uuid(),
type: z.enum(['audio', 'video']),
channelId: z.string().uuid().optional(),
})
const startRecordingSchema = z.object({
callId: z.string().uuid(),
layout: z.enum(['grid', 'speaker', 'single']).optional(),
resolution: z.enum(['720p', '1080p', '4k']).optional(),
})// src/lib/webrtc/__tests__/livekit-client.test.ts
import { liveKitClient } from '../livekit-client'
import { Room } from 'livekit-client'
// Mock LiveKit SDK
jest.mock('livekit-client')
describe('LiveKitClient', () => {
beforeEach(() => {
jest.clearAllMocks()
})
describe('connect', () => {
it('should connect to room with token', async () => {
const mockRoom = { connected: true }
;(Room as jest.Mock).mockImplementation(() => mockRoom)
const room = await liveKitClient.connect({
url: 'ws://localhost:7880',
token: 'test-token',
})
expect(room).toBeDefined()
})
it('should apply ICE servers when provided', async () => {
// Test ICE server configuration
})
})
describe('media controls', () => {
it('should toggle camera', async () => {
// Test camera enable/disable
})
it('should toggle microphone', async () => {
// Test microphone enable/disable
})
it('should start/stop screen share', async () => {
// Test screen sharing
})
})
})// src/services/__tests__/recording-service.integration.test.ts
import { recordingService } from '../recording-service'
describe('RecordingService Integration', () => {
// These tests require LiveKit running
it('should start and stop recording', async () => {
const egressId = await recordingService.startRecording({
callId: 'test-call',
roomName: 'test-room',
})
expect(egressId).toBeDefined()
const result = await recordingService.stopRecording(egressId)
expect(result.status).toBe('complete')
})
})// e2e/calls.spec.ts
import { test, expect } from '@playwright/test'
test.describe('Voice/Video Calls', () => {
test('should initiate and accept a call', async ({ page, context }) => {
// User A initiates call
const pageA = await context.newPage()
await pageA.goto('/chat/user-b')
await pageA.click('[data-testid="start-call"]')
// User B receives call
const pageB = await context.newPage()
await pageB.goto('/chat/user-a')
await expect(pageB.locator('[data-testid="incoming-call"]')).toBeVisible()
// User B accepts
await pageB.click('[data-testid="accept-call"]')
// Both should be connected
await expect(pageA.locator('[data-testid="call-connected"]')).toBeVisible()
await expect(pageB.locator('[data-testid="call-connected"]')).toBeVisible()
})
test('should share screen during call', async ({ page }) => {
// Setup call first...
await page.click('[data-testid="share-screen"]')
await expect(page.locator('[data-testid="screen-share-preview"]')).toBeVisible()
})
})// load-tests/webrtc-load.ts
import { check } from 'k6'
import { WebSocket } from 'k6/experimental/websockets'
export const options = {
stages: [
{ duration: '30s', target: 100 }, // Ramp up
{ duration: '1m', target: 1000 }, // Stay at 1000 concurrent calls
{ duration: '30s', target: 0 }, // Ramp down
],
}
export default function () {
// Simulate call signaling
const ws = new WebSocket('wss://api.nchat.local/realtime')
ws.onopen = () => {
ws.send(
JSON.stringify({
event: 'call:initiate',
data: { targetUserId: 'load-test-user', type: 'audio' },
})
)
}
ws.onmessage = (e) => {
const data = JSON.parse(e.data)
check(data, {
'call initiated': (d) => d.event === 'call:incoming',
})
}
}| Term | Definition |
|---|---|
| SFU | Selective Forwarding Unit - routes media without transcoding |
| MCU | Multipoint Control Unit - mixes/transcodes media centrally |
| TURN | Traversal Using Relays around NAT - relay server for connectivity |
| STUN | Session Traversal Utilities for NAT - discovers public IP |
| ICE | Interactive Connectivity Establishment - finds best connection path |
| SDP | Session Description Protocol - describes media capabilities |
| Egress | Output from media server (recording, streaming) |
| Ingress | Input to media server (RTMP, WHIP) |
| HLS | HTTP Live Streaming - Apple's adaptive streaming protocol |
| DASH | Dynamic Adaptive Streaming over HTTP - MPEG standard |
| Simulcast | Publishing same content at multiple quality levels |
| MoQ | Media over QUIC - emerging low-latency streaming protocol |
Document Version: 1.0.0 Last Updated: 2026-02-03 Author: nchat Development Team