NestJS SSE (Server-Sent Events)
sse는 서버에서 이벤트를 발행하여 웹 브라우저에서 해당 이벤트를 수신하는 기능 입니다.
웹소켓보다 훨씬 더 경량화 되어있고 단순한 이벤트를 전달 할 때 효과적이기 때문에 간단한 내용을 보낼 때 주로 사용된다고 합니다.
구현 방법은 어렵지 않습니다.
* 컨트롤러.ts
import { Controller, Sse } from '@nestjs/common';
import { interval, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
@Controller()
export class SseController {
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(5000).pipe(
map((_) => ({ data: { hello: 'world' } } as MessageEvent)),
);
}
}
위 컨트롤러는 "sse" 라는 이름의 이벤트를 클라이언트 브라우저가 구독하면 "hello : world" 라는 메시지를 5초단위로 보내게 되어 있습니다.
이를 받는 클라이언트 코드 입니다.
* index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
</head>
<style></style>
<body>
sse test
</body>
<script>
var evtSource = new EventSource('sse');
evtSource.onmessage = function (e) {
console.log(e);
};
</script>
</html>
sse 라는 이름으로 이벤트를 받아서 단순하게 출력하고 있습니다.
이를 실행해본 모습 입니다.
대량의 데이터가 아니라 간단하면서 심플한 이벤트를 브라우저에 전달하기 위해서는 sse를 사용하는 것도 나쁘지 않을 것 같습니다!
EventSource 인터페이스는 웹표준이며 아래 mdn에서 자세히 살펴볼 수 있습니다.
https://developer.mozilla.org/ko/docs/Web/API/EventSource
여기까지 Nestjs에서 SSE 구현 방법에 대해서 알아보았습니다.
궁금한점 또는 틀린 부분은 언제든 연락 주세요! 😁
# 추가 수정 : 2023-07-20
위 코드를 사용하면 브라우저가 여러개가 붙는 경우라면 가장 먼저 이벤트를 붙인 브라우저만 데이터를 수신하는 문제가 있습니다.
이를 해결하기 위해서는 브라우저가 접속하면 접속한 브라우저별 스트림을 배열로 담아두어 해당 배열에 대해서 이벤트를 전달하게 해야 합니다.
아래는 완성된 코드 입니다.
* 컨트롤러.ts
import {
Controller,
Sse,
Res,
OnModuleDestroy,
OnModuleInit,
Param,
} from '@nestjs/common';
import { Observable, ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';
import { Response } from 'express';
@Controller()
export class SseController implements OnModuleInit, OnModuleDestroy {
private stream: { //접속한 브라우저의 커넥션을 담을 객체
id: string;
subject: ReplaySubject<unknown>;
observer: Observable<unknown>;
}[] = [];
private timer: NodeJS.Timeout; //sse를 전달할 주기
private id = 0;
public onModuleInit(): void { //모듈이 올라오면 아래 setInterval을 실행 합니다.
this.timer = setInterval(() => {
this.id += 1;
this.stream.forEach(({ subject }) => subject.next(this.id)); //구독 대상에게 이벤트 발행
}, 1000);
}
public onModuleDestroy(): void { //모듈이 파괴되면 setInterval 타이머를 제거 합니다.
clearInterval(this.timer);
}
private addStream( //브라우저가 접속할 때 해당 스트림을 담아 둡니다.
subject: ReplaySubject<unknown>,
observer: Observable<unknown>,
id: string,
): void {
this.stream.push({
id,
subject,
observer,
});
}
private removeStream(id: string): void { //브라우저가 종료될 때 대상에서 제거 합니다.
this.stream = this.stream.filter((stream) => stream.id !== id);
}
private static genStreamId(): string {
return Math.random().toString(36).substring(2, 15);
}
@Sse('sse/:id')
sse(
@Param() { id }: Record<string, string>,
@Res() response: Response,
): Observable<MessageEvent> {
console.log(id); //나중에 해당 아이디로 조건을 주어야 합니다
const genId = SseController.genStreamId(); //고유 키 값을 만들고
response.on('close', () => this.removeStream(genId)); //종료이벤트를 붙입니다.
const subject = new ReplaySubject();
const observer = subject.asObservable(); //구독 객체를 생성하여,
this.addStream(subject, observer, genId); //setInterval에 실행 할 스트림에 추가하고
return observer.pipe( // 해당 구독객체를 브라우저에게 전달하여 줍니다.
map(
(_) =>
({
data: { hello: 'workd' }, //나중에 "id"값을 조건으로 빈 데이터 또는 전달할 데이터를 주도록 합니다.
} as MessageEvent),
),
);
}
}
sse를 구독하는 브라우저에서는 본인 고유의 아이디 값을 전송하여 해당 아이디 값을 조건으로 이벤트를 받을 수 있게 수정하여 줍니다.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
</head>
<style></style>
<body>
sse test
</body>
<script>
const randomId = Math.random().toString(36).substring(2, 15);
var evtSource = new EventSource(`sse/${randomId}`); //이런식으로 고유 아이디를 전달!
evtSource.onmessage = function (e) {
console.log(e);
};
</script>
</html>
이를 실행하면 이제 브라우저가 몇개이던지간에 이벤트를 받을 수 있게 되었습니다.
* 포트는 8080이지만 nestjs 입니다.. : - )
끝!