Push 알림 기능 개발기 (2) ‐ SSE 구현 - YJGwon/connectruck GitHub Wiki

기본적인 동작 흐름

sequenceDiagram
    Client->>Server: 주문 알림 구독 요청
		activate Server
		loop 주문 발생할 때 마다
			Server-->>Client: 새 주문 event
			Note over Client: browser notification
    end
		Note over Server: time out
		deactivate Server
		Client->>Server: SSE 재연결 요청(브라우저에서 처리)

Loading

SSE with Spring

SSE stream 시작하기

Spring에서는 SseEmitter를 통해 Server Sent Events를 지원한다.

SseEmitter

ResponseBodyEmitter를 상속받은 class

  • ResponseBodyEmitterchunked response를 비동기적으로 전송 (이 때 비동기는 spring application 내에서 비동기를 의미하는 것이 아니라 client - server 간 비동기를 의미)
  • SseEmitterSSE specification에 맞게 event를 전송

SseEmitter를 생성하고 rest controller에서 이를 response body로 응답하면 SSE stream이 생성된다. 생성자에 parameter로 Long 값을 넘겨주어 timeout을 설정할 수도 있다.

@GetMapping(path="/events")
public SseEmitter handle() {
	SseEmitter emitter = new SseEmitter();
	return emitter;
}

이 때 생성한 SseEmitter instance는 저장해두어야 한다. stream이 종료될 때 까지 해당 객체를 통해 event를 전송해야 하기 때문이다.


nginx 응답 buffer 무효화

nginx는 proxy한 server의 응답을 buffering한다(proxy buffering). event stream이 바로바로 전송되게 하려면 이 buffering을 무효화해야 한다.

listen block에서 option을 설정하거나 X-Accel-Buffering이라는 header를 추가해서 무효화 할 수 있다. 특정 요청의 응답에 대해서만 무효화하려면 해당 응답에만 header를 추가하는 것이 적절하다.

@GetMapping("/orders/my")
public ResponseEntity<SseEmitter> subscribeOrders() {

    final SseEmitter sseEmitter = new SseEmitter();
    return ResponseEntity.ok()
            .header("X-Accel-Buffering", "no")
            .body(sseEmitter);
}

SSE event 전송하기

SSE message format

sse는 utf-8 text로만 이루어지며 event stream의 각 행은 field이름: 내용의 형식, 두 개의 줄바꿈 문자로 구분

정해진 네 가지 field name 이외에 다른 이름을 가진 field는 무시

  • event: 이벤트의 종류
  • data: 데이터
  • id: event 식별자, reconnection 요청 시 last event ID로 활용됨
  • retry: event stream의 reconnection time을 설정하는 정수값 (정수가 아니면 무시)

data를 제외한 event, id, retry field는 한 이벤트 당 한 줄만 읽음(마지막에 전송된 line으로 덮어씌워진다)


Spring에서는 SSE format의 메세지를 SseEmitterSseEventBuilder로 편리하게 만들 수 있다.SseEmitter의 static method event()로 생성할 수 있다.

try {
  final SseEventBuilder eventBuilder = SseEmitter.event()
          .id(id)
          .name(name)
          .data(data);
  sseEmitter.send(eventBuilder);
} catch (IOException e) {
  log.error("failed to send sse event - {}", id, e);
}

id와 name(=event field)은 String인 반면 여러 줄에 걸쳐 전송할 수 있는 data는 Object를 받아 HttpMessageConverter로 serialize한 후 전송된다. 만약 data field만 보낼거라면 send method에 Object를 넘겨주면 된다.

💡 Sse connection이 성립된 후 timeout 전까지 아무런 event도 전송하지 않으면 reconnection 요청할 때 error가 발생한다. 그러므로 SseEmitter를 생성한 뒤 initial event를 하나 보내두면 좋다.

SSE 예외 처리

event 전송 도중 예외가 발생하면 Spring MVC가 알아서 completeWithError를 호출한다. 대신 우리는 SseEmittercompletion과 timeout에 대해 callback을 등록할 수 있다.

final SseEmitter sseEmitter = new SseEmitter(SUBSCRIBE_TIME_OUT);
	sseEmitter.onTimeout(sseEmitter::complete);
	sseEmitter.onCompletion(() -> {
	    log.info("SSE connection complete - {}", truckId);
			sseEmitterRepository.deleteById(truckId);
});

이 때 callback이 동시에 여러 다른 thread에서 실행될 수 있으므로 동시성 문에 주의해야 한다. 위 코드의 SseEmitterRepository의 경우 내부에서 ConcurrentHashMap을 사용한다.

Reconnection

SSE 연결이 끊어지면 browser는 재연결 요청을 보낸다. 이 때 마지막으로 수신한 id field값을 Last-Event-ID header에 담아 보낸다(front에서 event-source-polyfill을 사용하면 request parameter로 보낸다). 이를 활용하면 연결이 끊긴 사이 발생한 event들을 발송해줄 수 있다.

나는 푸드트럭id_timestamp의 형식으로 id를 보내주고 새 주문이 들어올 때 마다 SseEvent 정보를 저장해두었다. front에서 lastEventId와 함께 연결 요청 보내왔을 때 해당 푸드트럭의 event 중 lastEventId가 더 큰 event가 있다면 보내주도록 했다.

Server 처리 흐름

sequenceDiagram
    Client->>Controller: 주문 알림 구독 요청
		activate Controller
		Controller->>Service: ownerId
		activate Service
		Service->>Repository: SseEmitter 저장(truckId에 mapping)
		Note over Service: SseEmitter.send(initial event)
		Service-->>Client: SseEmitter 응답
		deactivate Service
		loop 주문 발생할 때 마다
			Service->>Repository: SseEvent 저장, SseEmitter 조회
			activate Service
			Repository-->>Service: SseEmitter
			Note over Service: SseEmitter.send(새 주문 event)
			Service-->>Client: 새 주문 event
			deactivate Service
			Note over Client: browser notification
    end
		Note over Controller: time out
		deactivate Controller
		Service->>Repository: SseEmitter 삭제

Loading

재연결

sequenceDiagram
		Client->>Controller: 재연결 요청(with lastEventId)
		activate Controller
		Controller->>Service: ownerId, lastEventId
		activate Service
		Service->>Repository: 새 SseEmitter 저장
		Note over Service: SseEmitter.send(initial event)
		Service->>Repository: 해당 truck의 lastEventId 이후 event 조회
		Repository-->>Service: events
		Note over Service: SseEmitter.send(events)
		Service-->>Client: SseEmitter 응답
		deactivate Service
		deactivate Controller
Loading

문제점

SseEmitter instance를 통해 이벤트를 전달해야 하기 때문에 서버가 다중화되면 문제가 생긴다.

WAS A에서 SSE 연결이 성립되었는데 WAS B에서 해당 푸드트럭의 주문이 발생한 경우 event를 전달하려면 서버 간에 주문 발생 이벤트가 공유되어야 한다.

Redis Pub/Sub을 도입하여 해결한 이야기는 다음 편에서 계속…!

references

⚠️ **GitHub.com Fallback** ⚠️