T5 — Thiết kế Command Pipeline (Sequence Diagram) - congsinhv/fluxion GitHub Wiki
Issue: #25 — Thiết kế Command Pipeline (Sequence Diagram) Tuần: 5 | 21/04 – 27/04/2026
Numbering chính thức: Mục 3.7 theo Master TOC
Revision 2026-04-18: Cập nhật pipeline sau T8b review.
assignActionthêm optionalmessageTemplateId→ resolver load template content (plain string) → gán vào payload → worker pass-through không rendering- Resolver INSERT
batch_actions(IN_PROGRESS) + Nbatch_device_actions(PENDING) khi enqueueaction-triggerworker pick up từng device → publish SNS xong = UPDATEbatch_device_actions.status = SUCCESS; invalid/dispatch fail = FAILED + error_code. Aggregatebatch_actions.statuskhi tất cả rows done- Batch chỉ track DISPATCH outcome (publish SNS), KHÔNG track device execution callback. Checkin-handler KHÔNG update batch tables
- Tracking: #59
Command Pipeline xử lý lệnh phân tán theo Choreography Saga pattern, chia thành 3 layers: BE (business logic + DB), OEM (Apple OEM protocol), Event (SNS/SQS glue).
Nguyên tắc: BE chỉ biết DB. OEM chỉ biết 3rd party (APNS + device). Giao tiếp qua events.
Pipeline participants:
| Layer | Component | Type | Vai trò |
|---|---|---|---|
| BE | action-resolver |
Lambda (AppSync) | Validate + enqueue |
| BE | action-trigger |
Lambda (SQS) | Create execution record, publish command |
| BE | checkin-handler |
Lambda (SQS) | Process device events → update DB |
| OEM | apple-process-action |
Lambda (SQS + API GW) | Cache + APNS (SQS trigger) / OEM protocol (API GW trigger) |
| OEM | API Gateway | HTTP API |
PUT /mdm (device) + POST /apns/{token} (proxy → APNS) |
sequenceDiagram
participant User as Admin/Operator
participant API as AppSync
participant AR as action-resolver λ
participant SQS1 as action-trigger-sqs
participant AT as action-trigger λ
participant RDS as RDS PostgreSQL
participant SNS1 as command-sns
participant SQS2 as apple-process-action-sqs
participant OEM as apple-process-action λ
participant CACHE as Command Cache
participant APIGW as API Gateway
participant APNS as Apple APNS
participant Device as iOS Device
participant SNS2 as checkin-sns
participant SQS3 as checkin-handler-sqs
participant CK as checkin-handler λ
rect rgb(240, 248, 255)
Note over AR,AT: BE Layer — Step 1-2
User->>API: mutation assignAction(deviceId, actionId, messageTemplateId?)
API->>AR: resolve
AR->>RDS: SELECT device, action (validate)
opt messageTemplateId set
AR->>RDS: SELECT content, is_active FROM message_templates
AR->>AR: payload.messageContent = template.content (plain, no rendering)
end
AR->>RDS: INSERT batch_actions (status=IN_PROGRESS) + batch_device_actions (status=PENDING)
AR->>SQS1: enqueue {batchId, deviceId, actionId, payload}
AR-->>API: return { executionId, batchId, status: PENDING }
SQS1->>AT: trigger
AT->>RDS: INSERT action_executions (ACTION_PENDING, command_uuid)
AT->>RDS: UPDATE devices SET assigned_action_id
AT->>SNS1: publish {command_uuid, device_udid, push_token, push_magic, request_type, payload}
AT->>RDS: UPDATE batch_device_actions SET status=SUCCESS (dispatch OK)
Note right of AT: Nếu publish fail / device invalid → batch_device_actions.status=FAILED + error_code
AT->>RDS: UPDATE batch_actions.status=COMPLETED (khi tất cả rows done)
end
rect rgb(255, 248, 240)
Note over OEM,OEM: OEM Layer — Step 3-5
SNS1->>SQS2: fan-out
SQS2->>OEM: trigger (SQS)
OEM->>CACHE: PUT command (key=device_udid)
OEM->>APIGW: POST /apns/{token}
APIGW->>APNS: proxy → silent push {"mdm":"PushMagic"}
APNS-->>Device: wake up
Device->>APIGW: PUT /mdm (Status=Idle, UDID)
APIGW->>OEM: invoke (API GW trigger)
OEM->>CACHE: GET + DELETE command (key=device_udid)
OEM-->>Device: 200 {CommandUUID, Command: DeviceLock}
Device->>Device: execute DeviceLock
Device->>APIGW: PUT /mdm (Status=Acknowledged, CommandUUID)
APIGW->>OEM: trigger
OEM->>SNS2: publish {event: ACTION_COMPLETED, command_uuid, udid}
OEM-->>Device: 200 (empty body)
end
rect rgb(240, 255, 240)
Note over CK: BE Layer — Step 6
SNS2->>SQS3: fan-out
SQS3->>CK: trigger
CK->>RDS: SELECT action_executions WHERE command_uuid=...
CK->>RDS: UPDATE action_executions SET status=ACTION_COMPLETED
CK->>RDS: UPDATE devices SET current_policy_id, assigned_action_id=NULL
CK->>RDS: INSERT milestones
CK->>API: AppSync subscription: deviceStateChanged
Note right of CK: KHÔNG đụng batch_actions/batch_device_actions (đã finalize ở action-trigger)
end
API-->>User: subscription update: device locked ✓
sequenceDiagram
participant OEM as apple-process-action λ
participant CACHE as Command Cache
participant APNS as Apple APNS
participant Device as iOS Device
participant TH as timeout-handler λ
participant RDS as RDS PostgreSQL
OEM->>CACHE: PUT command (TTL=60s)
OEM->>APNS: silent push
APNS-->>Device: wake up (device OFFLINE)
Note over CACHE: TTL expires → command auto-deleted
Note over TH: EventBridge scheduled rule (every 30s)
TH->>RDS: SELECT action_executions WHERE status='ACTION_PENDING' AND created_at < NOW() - INTERVAL '60s'
TH->>RDS: UPDATE action_executions SET status='ACTION_FAILED', ext_fields.failed_reason='device_timeout'
TH->>RDS: UPDATE devices SET assigned_action_id=NULL
Note over RDS: Device state giữ nguyên — chỉ clear assigned_action_id
Timeout strategy:
- Command cache có TTL (auto-expire) → OEM tự cleanup
- BE
timeout-handlerLambda chạy periodic → mark staleACTION_PENDINGasACTION_FAILED
sequenceDiagram
participant OEM as apple-process-action λ
participant APNS as Apple APNS
participant SNS2 as checkin-sns
participant CK as checkin-handler λ
participant RDS as RDS PostgreSQL
OEM->>APNS: silent push
APNS-->>OEM: 400 BadDeviceToken
OEM->>SNS2: publish {event: ACTION_FAILED, command_uuid, reason: apns_bad_token}
SNS2->>CK: via SQS
CK->>RDS: UPDATE action_executions SET status='ACTION_FAILED'
CK->>RDS: UPDATE devices SET assigned_action_id=NULL
| APNS Status | Fluxion Action |
|---|---|
| 200 | Proceed (cache command, wait for device) |
| 400 | ACTION_FAILED, log error |
| 410 | ACTION_FAILED, mark token invalid |
| 429, 500, 503 | Re-enqueue to SQS (retry) |
sequenceDiagram
participant SQS as action-trigger-sqs
participant AT as action-trigger λ
participant DLQ as DLQ
participant DH as dlq-handler λ
participant RDS as RDS PostgreSQL
SQS->>AT: attempt 1 (exception)
SQS->>AT: attempt 2 (exception)
SQS->>AT: attempt 3 (exception)
SQS->>DLQ: maxReceiveCount=3 reached
DLQ->>DH: trigger
alt Recoverable
DH->>SQS: re-enqueue with delay
else Non-recoverable
DH->>RDS: UPDATE action_executions SET status='ACTION_FAILED'
DH->>RDS: UPDATE devices SET assigned_action_id=NULL
Note over DH: CloudWatch Alarm → alert ops
end
sequenceDiagram
participant User as Admin
participant API as AppSync
participant AR as action-resolver λ
participant SQS1 as action-trigger-sqs
participant RDS as RDS PostgreSQL
User->>API: mutation assignBulkAction(deviceIds[], actionId)
API->>AR: resolve
loop for each deviceId
AR->>RDS: SELECT device (validate busy + transition)
alt Valid
AR->>SQS1: enqueue {deviceId, actionId}
else Invalid
AR->>AR: collect error
end
end
AR-->>API: { valid: [...executionIds], failed: [...errors] }
Note over SQS1: Mỗi message → independent pipeline<br/>Song song, không có bulk transaction
| Queue | Consumer | Visibility Timeout | DLQ | maxReceiveCount |
|---|---|---|---|---|
action-trigger-sqs |
action-trigger λ | 30s | action-trigger-dlq |
3 |
apple-process-action-sqs |
apple-process-action λ | 30s | apple-process-action-dlq |
3 |
checkin-handler-sqs |
checkin-handler λ | 30s | — | — |
| SNS Topic | Subscribers |
|---|---|
command-sns |
apple-process-action-sqs |
checkin-sns |
checkin-handler-sqs |
{
"command_uuid": "uuid",
"device_udid": "...",
"push_token": "base64...",
"push_magic": "...",
"request_type": "DeviceLock",
"command_payload": { "Message": "...", "PhoneNumber": "..." }
}{
"event_type": "ACTION_COMPLETED | ACTION_FAILED | DEVICE_TOKEN_UPDATE | DEVICE_RELEASED",
"command_uuid": "uuid",
"udid": "...",
"payload": {}
}| action_type_id | Action | OEM Command | Pipeline |
|---|---|---|---|
| 1 | Upload | — | DB only |
| 2 | Register | — | DB only |
| 3 | Checkin | — | Enrollment (device → OEM direct) |
| 4 | Activate | — | Enrollment complete |
| 5 | Lock | DeviceLock |
Full pipeline (APNS → device → result) |
| 6 | Unlock | ClearPasscode |
Full pipeline |
| 7 | Send Message | APNS push notification | APNS only (no OEM command) |
| 8 | Lock Message |
DeviceLock + Message |
Full pipeline |
| 9 | Release | RemoveProfile |
Full pipeline |
| 10 | Deregister | — | DB only |
1. Server → APNS: {"mdm":"PushMagic"} ← silent push (wake device)
2. Device → Server: PUT /mdm Status=Idle ← device polls for command
3. Server → Device: 200 OK {CommandUUID, Command} ← serve command from cache
4. Device executes command
5. Device → Server: PUT /mdm Status=Acknowledged ← device sends result
6. Server → Device: 200 OK (empty body) ← close connection
| OEM Event | Fluxion Event | Data |
|---|---|---|
| TokenUpdate check-in | DEVICE_TOKEN_UPDATE |
push_token, push_magic, unlock_token, topic |
| CheckOut check-in | DEVICE_RELEASED |
udid |
| Status=Acknowledged | ACTION_COMPLETED |
command_uuid, udid |
| Status=Error | ACTION_FAILED |
command_uuid, udid, error_chain |
Command Pipeline của Fluxion được thiết kế theo Choreography Saga pattern với 3 lớp phân tách rõ ràng (BE, OEM, Event), loại bỏ hoàn toàn điểm điều phối tập trung (central orchestrator). Happy path gồm 6 bước từ GraphQL mutation đến AppSync subscription update, trong đó BE và OEM giao tiếp thuần túy qua SNS/SQS events — BE không bao giờ gọi trực tiếp vào OEM và ngược lại. Cấu trúc này đảm bảo tính độc lập giữa business logic và Apple MDM protocol, cho phép thay thế OEM layer (ví dụ: từ iOS sang Android) mà không ảnh hưởng BE.
Ba failure paths được thiết kế bù đắp cho từng điểm thất bại: Device Timeout (cache TTL + periodic timeout-handler), APNS Error (error events qua SNS), và DLQ (maxReceiveCount=3 với recoverable/non-recoverable branching). Mọi failure path đều kết thúc bằng việc clear assigned_action_id trên device — đảm bảo device không bị kẹt ở trạng thái "busy" vĩnh viễn. Tính idempotency được đảm bảo qua command_uuid unique — duplicate SQS messages được xử lý an toàn.
Cấu hình SQS (visibility timeout 30s, maxReceiveCount=3, DLQ riêng cho từng queue) cân bằng giữa at-least-once delivery và tránh duplicate processing. Bulk command flow sử dụng fan-out độc lập (mỗi device một message pipeline riêng), không có bulk transaction, đảm bảo partial success được xử lý gracefully thay vì fail toàn bộ batch.
[1] Garcia-Molina, H. & Salem, K. Sagas. ACM SIGMOD, 1987.
[2] Hohpe, G. & Woolf, B. Enterprise Integration Patterns. Addison-Wesley, 2003.
[3] AWS. Saga Pattern — Prescriptive Guidance. 2024.
[4] Richardson, C. Microservices Patterns, Ch. 4 (Saga). Manning, 2018.