[webflux websocket broad cast all] 웹플럭스 웹소켓 에코 말고 전체에게 보내기
웹플럭스에서 웹소켓을 활용하여 채팅서버를 만들 수 있습니다.
구글링 하다보면 다양한 예제를 만날 수 있으며, 샘플 코드 또한 훌륭 합니다.
그런데..아쉬운 점은..
바로 대부분의 설명과 예제가 단순히 자기 자신에게 돌아오는 에코(echo)로 된 코드라는 점 이였습니다.
포스팅 설명을 읽고 한번 실행하여 보면 자기한테만 메시지가 올 분 다른 클라이언트에게 메시지는 전달되지 않았습니다.
그래서 이것저것 찾아보며 전체 접속한 사용자에게 전송하는 방법에 대해서 정리하여 보았습니다!
* 2023. 05 기준 입니다.
웹플럭스에 사용한 라이브러리는 2종류 입니다.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
* 개인적으로 그레이들(gradle) 보다는 xml 구조의 메이븐(maven)형식을 더 선호해서..
웹소켓 서버에 필요한 클래스는 딱 2가지 입니다.
먼저 설정 파일 입니다.
* 파일이름 : 웹소켓컨피그.class
package 패키지명;
import java.util.HashMap;
import java.util.Map;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
@Configuration
public class 웹소켓컨피그 {
웹소켓핸들러 handler;
public 웹소켓컨피그(웹소켓핸들러 handler) {
this.handler = handler;
}
@Bean
HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/chatting", handler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
}
어렵지 않는 코드 입니다.
나중에 만들어줄 "웹소켓핸들러" 클래스를 "chatting" 이라는 웹소켓 요청(ws)이 올 경우 사용하겠다는 내용 입니다.
예) ws://127.0.0.1/chatting
이제 핵심인 코드 입니다.
핸들러 클래스를 구현하기 위해서 먼저 "WebSocketHandler" 인터페이스를 상속 받습니다.
해당 패키지는 "org.springframework.web.reactive.socket" 로 되어 있습니다.
패키지명에 햇갈리지 않아야 합니다.
* 파일이름 : 웹소켓핸들러.class
package 패키지명;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
@Component
public class 웹소켓핸들러 implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession 웹소켓에접속한세션아이디) {
}
}
여기 까지는 다른 포스팅과는 다른점이 없습니다.
이제 고민해야되는 것은 1:1 응답이 아니라 1:N 응답을 위한 고민 입니다.
사용자가 chatting이라는 웹소켓(ws)에 접속하면 위 클래스의 handle 이라는 메서드의 정의된 내용이 실행 됩니다.
이때 필요한 것이 이벤트를 발행(publish)하고 구독(subscribe) 하게 하는 것 입니다.
#1. 구독(subscribe)
1) 웹소켓을 통해 접속한 사용자의 데이터 수신(reseive) 행위에 대해서 서버가 무엇을 할지 정의 합니다.
2) 수신(reseive)한 메시지에 대해서 서버는 구독(subscribe)을 통해 해당 메시지의 이벤트를 계속해서 받습니다.
3) 마지막으로 구독을 하면서 조건을 통하여 다른 사용자에게 발행(emit)을 해 주어 메시지가 에코(echo)가 아니라 전달이 되게 합니다.
4) 이때 메시지를 발행(emit)하기 위해서는 이벤트를 동작시켜줄 클래스의 도움을 받습니다.
위 단계를 코드로 표현하면 아래와 같습니다.
@Component
public class 웹소켓핸들러 implements WebSocketHandler {
Sinks.Many<WebSocketMessage> 공장 = Sinks.many().multicast().directAllOrNothing(); //발행자
Flux<WebSocketMessage> 배달차량 = 공장.asFlux(); //구독을 위한 객체
Sinks.EmitFailureHandler 실패할때부를보험사 = (signalType, emitResult) -> emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);
@Override
public Mono<Void> handle(WebSocketSession 웹소켓에접속한세션아이디) {
System.out.println("connection start : " + 웹소켓에접속한세션아이디);
웹소켓에접속한세션아이디.receive() //메시지를 수신하는 행동의 정의를 시작 합니다.
.map(arg-> arg.getPayloadAsText()) //수신한 데이터를 텍스트로 바꿔
.concatMap( arg-> { //조건을 통해 메시지를 처리합니다.
System.out.println("message in : " + arg);
//아래 코드처럼 조건을 줄 수 있습니다.
//if(arg.equals("DB에 조회하고 없으면 NULL을넣어서 메시지를 못 받게 하는..조건같은 프로세스")){
// return Mono.justOrEmpty(null); //또는 웹소켓에접속한세션아이디.close() 통해 강제종료
//}
return Mono.justOrEmpty(웹소켓에접속한세션아이디.textMessage(arg));
})
.doOnError(arg-> { //오류에 대한 정의
System.out.println("connection error : " + arg.getMessage());
})
.doFinally(arg->{ //커넥션이 끊긴 경우에 대한 정의
System.out.println("connection end : " + 웹소켓에접속한세션아이디.getId());
})
.subscribe( wsmsg->{ //서버가 사용자에게 메시지를 받으면 할 행동에 대한 정의
if(wsmsg != null)
공장.emitNext(wsmsg, 실패할때부를보험사); //메시지 전송
});
}
}
위 코드에서 "공장", "배달차량" 및 "실패할때부를보험사" 으로 포현한 객체는 이벤트를 접속한 웹소켓 클라이언트에게 데이터를 공유하고 전달할 기능 입니다.
"공장" 에서는 메시지를 보관하고 찍어내어 전달할 수 있도록 합니다.
그리고 이를 실제로 전달하는 기능은 "배달차량" 입니다.
이러한 "배달차량"이 전복되는 경우에는 "실패할때부를보험사" 의 도움을 받습니다.
정리하면, 위 클래스는 서버가 메시지를 받거나 전달할 때 사용하는 객체라 생각하면 되겠습니다.
* 참조 : https://lts0606.tistory.com/306
서버가 메시지를 수신하는 행동에 대한 정의가 끝났으므로 이제 메시지를 전달하는 행동을 정의 합니다.
#2. 발행(publish)
1) 메시지를 전달하기 위해서는 "공장" 에서 발행된 데이터를 사용해야 합니다.
2) 이러한 메시지는 직접 전달하기 힘들기 때 문에 "배달차량"의 도움을 받아서 전달 합니다.
위 단계를 코드로 표현하면 아래와 같습니다.
//생략..
@Override
public Mono<Void> handle(WebSocketSession 웹소켓에접속한세션아이디) {
//생략....
return 웹소켓에접속한세션아이디.send( //메시지를 전달하는 행동에 대한 정의를 시작 합니다.
Mono.delay(Duration.ofMillis(10)) //간격을 통해
.thenMany( //메시지를 전달하는데..
배달차량.filter( a-> a != null).map(it -> //배달차량의 도움을 받아 메시지를 가져와
웹소켓에접속한세션아이디.textMessage(it.getPayloadAsText() //웹소켓이 사용하는 메시지로 컨버팅 합니다
)
)
)
);
}
위 내용에 사용된 소스코드 전체 내용 입니다.
* 파일이름 : 웹소켓핸들러.class
import java.time.Duration;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@Component
public class 웹소켓핸들러 implements WebSocketHandler {
Sinks.Many<WebSocketMessage> 공장 = Sinks.many().multicast().directAllOrNothing(); //발행자
Flux<WebSocketMessage> 배달차량 = 공장.asFlux(); //구독을 위한 객체
Sinks.EmitFailureHandler 실패할때부를보험사 = (signalType, emitResult) -> emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);
@Override
public Mono<Void> handle(WebSocketSession 웹소켓에접속한세션아이디) {
System.out.println("connection start : " + 웹소켓에접속한세션아이디);
웹소켓에접속한세션아이디.receive() //메시지를 수신하는 행동의 정의를 시작 합니다.
.map(arg-> arg.getPayloadAsText())
.concatMap( arg-> { //조건을 통해 메시지를 처리합니다.
//아래 코드처럼 조건을 줄 수 있습니다.
System.out.println("message in : " + arg);
//if(arg.equals("DB에 조회하고 없으면 NULL을넣어서 메시지를 못 받게 하는..조건같은 프로세스")){
// return Mono.justOrEmpty(null); //또는 웹소켓에접속한세션아이디.close() 통해 강제종료
//}
return Mono.justOrEmpty(웹소켓에접속한세션아이디.textMessage(arg));
})
.doOnError(arg-> { //오류에 대한 정의
System.out.println("connection error : " + arg.getMessage());
})
.doFinally(arg->{ //커넥션이 끊긴 경우에 대한 정의
System.out.println("connection end : " + 웹소켓에접속한세션아이디.getId());
})
.subscribe( wsmsg->{ //서버가 사용자에게 메시지를 받으면 할 행동에 대한 정의
if(wsmsg != null)
공장.emitNext(wsmsg, 실패할때부를보험사);
});
return 웹소켓에접속한세션아이디.send( //메시지를 전달하는 행동에 대한 정의를 시작 합니다.
Mono.delay(Duration.ofMillis(10))
.thenMany( //메시지를 전달하는데..
배달차량.filter( a-> a != null).map(it -> //배달차량의 도움을 받아 메시지를 가져와
웹소켓에접속한세션아이디.textMessage(it.getPayloadAsText() //웹소켓이 사용하는 메시지로 컨버팅 합니다
)
)
)
);
}
}
전체 설정 및 코드는 아래 제 깃허브에서 확인할 수 있습니다.
https://github.com/TaeSeungRyu/sample/tree/main/웹플럭스샘플코드
어렵고 쉽지않는 웹플럭스!
웹플럭스를 활용하여 웹소켓을 통해 전체 접속한 사용자에게 메시지를 사용하는 방법에 대해서 소개하여보았습니다.
궁금한점 또는 틀린 부분은 언제든 알려주세요.😁
* 웹플럭스를 다시 공부중 입니다..^-^;