본문 바로가기

server

[Spring Boot] WebSocket & Stomp & Redis pub/sub & FCM으로 개발하는 채팅기능

 

안녕하세요, [모두의택시] 팀 서버 개발자 박성훈입니다.

이번 포스팅에서는 우리 서비스에 채팅 기능 개발,도입을 위해 겪었던 시행착오에 대해 공유드리고자 합니다.


1. 요구사항 정의

택시 동승팟 매칭 플램폼인 [모두의택시]에서는 동승매칭을 원하는 이들끼리 소통할 수 있는 "채팅기능"이 필요했습니다.

이를 위해 우리는 아래 두 가지 방법에 대해 비교하고 고민하였습니다.

 

- 첫 번째 방법 : 자체 채팅 플랫폼 구축

- 두 번째 방법 : 카카오톡(오픈채팅방 링크) 활용

 

자체 채팅 플랫폼을 구축하는 것은 빠른 개발을 위한 우리에게 다소 부담이 있는 작업이었습니다. Server, AOS, iOS 세 파트에서 모두 처리해주어야 하는 것은 당연하고, AOS를 제외한 파트 인원들은 채팅 개발의 경험이 부족했기 때문입니다.

그렇다고 오픈채팅방을 쓰자고 하기에는 유저 친화적이지 않다고 생각했습니다. 우리는 최소한의 인원관리와 서로 간 소통할 수 있는 창구를 열어줌으로써 기존 택시동승매칭 앱이나 오픈채팅과 차별점을 두고 싶었기 때문이죠.

 

이에 따라 분석한 결과 우리 서비스에는 다음과 같은 요구사항이 있었습니다.

1️⃣ 방 참가와 동시에 채팅방에 참여된다.
2️⃣ 방 참가하기 위해 요청, 수락, 대기의 Flow를 거친다.
3️⃣ 방의 정원은 4명이다.
4️⃣ 채팅 메세지 영구 저장(신고 및 유저 관리에 대비)되어야 한다.
5️⃣ 각 방마다 특정 시간대에 챗봇의 예약 메세지가 전송되어야 한다.

 

이렇게 정리한 결과 다소 복잡한 요구사항과 택시 동승매칭을 위한 채팅이라는 특수한 목적상 채팅 플랫폼을 자체 구축하기로 결정하였습니다.


2. 기술 선택

언어 및 프레임워크

백엔드 개발자 세 명 모두에게 가장 익숙한 Java와 Spring Boot로 선택하였습니다.

 

통신방법 - WebSocket vs Polling

채팅 서비스에서 메세지 처리 방법에 대해 WebSocket과 Polling 방식을 검토할 수 있었습니다. 채팅 구현 여부에 이어 구현 방법에서도 가장 중심이 되었던 것은 개발의 편의성과 퀄리티 사이의 선택이었습니다. 이에 따라 각각의 방법에 따라 나름대로 정의한 장단점은 다음과 같습니다.

 

HTTP Polling, WebSocker 장단점 비교

  장점 단점
HTTP Polling 방식 + 구현이 간편하다. - 실시간을 구현하기 위해 짧은 주기를 가져간다면 더더욱 서버에 부하가 있다.
- 요청이 없는 상태에서도 주기적으로 동작한다.
WebSocket + 실시간 양방향 통신
+ HTTP 요청/응답 오버헤드가 존재하지 않는다.
+ 지속적인 연결로 인해 데이터를 주고받을 때마다 헤더 정보를 반복적으로 전송할 필요가 없다.
- 러닝커브..!!
- 연결 유지 및 관리, 재연결 등등 신경써줘야 할 사항이 많다.

 

 

WebSocket을 선택한 이유

우리는 결과적으로 WebSocket을 선택하였습니다. Polling 방식이 비효율적이라고 생각했던 가장 큰 이유는 우리의 서비스 특성상 특정 시간대에 요청이 있을 것으로 예상되며 그 외의 시간대에는 그다지 많은 트래픽이 있을 것 같지 않았습니다. 또한 서로 연락처가 없는 타인이라는 관계 속에서 채팅은 특히나 실시간성을 보장해야하기 때문입니다. 이 외에도 자잘한 이유가 있었지만 이정도만으로도 충분히 소켓연결을 해야하는 정당한 이유가 되었습니다.

 

WebSocket을 선택하며 Stomp( Simple Text Oriented Messaging Protocol )의 사용이 자연스럽게 따라왔습니다.

Stomp는 웹소켓 위에서 메시지 전송을 효율적으로 할 수 있게 하는 프로토콜입니다. pub/sub 기반으로 동작합니다.

 

우리가 선택해야 할 것이 하나 남았습니다. Stomp를 사용하면 기본적으로 Spring boot의 내장 메세지 브로커를 활용할 수 있습니다. 이 때 발생할 수 있는 문제점들이 명확히 존재합니다.

1️⃣ 장애 시 메시지 유실

2️⃣ 서버간 채팅방 공유 불가능

3️⃣ 장애 시 메시지 유실서버 재시작 채팅방 정보 리셋

이런 문제들을 해결하기 위해 외부 브로커가 필요하며 우리의 마지막 선택의 고민거리가 되었습니다.

 

메세지 브로커 Redis vs Kafka vs RabbitMQ

  장점 단점
Redis pub/sub + 간단하고 빠르다.
+ 작은 규모의 메시징 시스템에서 뛰어난 성능
- 메시지 유실 위험
- 스케일링 제한적
Kafka + 높은 처리량과 내구성
+ 복잡한 메시징 패턴
+ 병렬처리지원
- 러닝 커브
- 레디스에 비해 높은 지연 시간
RabbitMQ + 다양한 메시징 기능과 패턴
+ 유연성, 확장성
- 성능 이슈

 

이미 공부해야 할 부분이 많기도 했고 우리의 서비스가 글로벌을 바라보고 있지는 않기 때문에 Redis로 선택하였습니다.


3. 실행 흐름

웹소켓의 가장 큰 단점은 조건이 많아질수록 분기된 상황을 모두 고려하고 작업하는 것이 어렵다는 점입니다. 개발을 해보니 테스트 툴도 빈약하여 간단하게 구현한 웹페이지(테스트용), 프론트 인원과 IP공유로 실시간 개발 등 인사이트를 얻기까지의 과정이 오래걸렸습니다.

이렇게 개발을 진행하며 우리는 한 가지를 고민했습니다.

"다 소켓으로 처리할 필요는 없잖아?"

우리 팀이 소켓 통신을 선택했던 이유는 채팅의 실시간성이었습니다. 이에 따라 채팅에 참여하기까지의 일련의 과정과 그 외의 나머지 기능들은 REST API로 구현하였습니다.

방 참여 요청, 요청 수락, 메세지 불러오기, 특수한 상황에서의 메세지 전송 등이 이에 속합니다.

[참고자료] REST API 설계 과정 일부 도식화

 

따라서 위의 예시처럼 참여요청, 수락, 수락 연결, 특수한 상황의 Fcm 전송, 수락 시 Room 정보 매핑 등 소켓 연결, 입장 가능 여부 등을 REST API로 처리하였습니다.


4. 구현

이제 구현을 위한 중심적인 코드의 일부분을 소개드리겠습니다.

 

Redis config

// pub/sub 메시지 처리를 위한 Listener 등록
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
    RedisConnectionFactory connectionFactory,
    MessageListenerAdapter messageListenerAdapter,
    ChannelTopic channelTopic
) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(messageListenerAdapter, channelTopic);
    return container;
}


@Bean
public MessageListenerAdapter messageListenerAdapter(RedisSubscriber subscriber) {
    return new MessageListenerAdapter(subscriber, "sendMessage");
}

//토픽 설정(토픽 단일화)
@Bean
public ChannelTopic channelTopic() {
    return new ChannelTopic("/sub/chat/");
}

 


Redis Topic을 여러 개 발행하면 그만큼 ChannelTopic을 생성하고 새로운 redisListener와 연동하는 작업이 있습니다.

우리는 MessageListener를 빈으로 관리합니다. 스프링 빈의 개수가 많아지면 시스템 저하를 불러올 수 있으므로 단일화된 ChannelTopic을 리스너에 등록해 관리하고 세부적인 경로(채팅방)에 메세지를 전송하는 것은 RedisSubscriber에서 처리하기로 결정했습니다.

 

WebSocketConfig

@EnableWebSocketMessageBroker
@Configuration
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    private final StompHandler stompHandler;
    private final StompExceptionHandler stompExceptionHandler;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry){
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*");
        registry.setErrorHandler(stompExceptionHandler);
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry){
        registry.setApplicationDestinationPrefixes("/pub");
        registry.enableSimpleBroker("/sub");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration){
        registration.interceptors(stompHandler);
    }
}

 

 

소켓연결을 진행할 엔드포인트를 연결하고, 메세지를 전송(publish)하고 구독(subscribe)할 경로를 설정합니다.

또한 소켓 통신 중 발생하는 예외를 핸들링할 StompExceptionHandler와 WebSocket 연결 및 해제에 관련된 사항을 처리할 StompHandler를 적용하여 WebSocket 통신을 구성합니다.

 

StompHandler

@Slf4j
@Component
@RequiredArgsConstructor
public class StompHandler implements ChannelInterceptor {

    private final JwtTokenProvider jwtTokenProvider;
    private final RedisChatRoomRepositoryImpl redisChatRoomRepositoryImpl;
    private final ChatService chatService;
    private final RoomRepository roomRepository;
    private final FcmService fcmService;
    private final MemberRepository memberRepository;

    private static final int FULL_MEMBER = 4;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        //웹소켓 연결 요청
        if (StompCommand.CONNECT == accessor.getCommand()) {

            String sessionId = accessor.getSessionId();
            String token = accessor.getFirstNativeHeader("token");

            try {
                String memberId = jwtTokenProvider.getMemberIdByToken(token);
                redisChatRoomRepositoryImpl.setUserInfo(sessionId, memberId);
            } catch (JwtException e) {
                throw new StompException(StompErrorCode.FAULT_JWT);
            }
        }

        //구독 요청
        else if (StompCommand.SUBSCRIBE == accessor.getCommand()) {
            log.info("구독 요청");
            String sessionId = accessor.getSessionId();
            String destination = (String) message.getHeaders().get("simpDestination");


            /*
            
	            ----- 생략(구독 시 예외사항에 따른 처리 )    ----- 
            
            */
            
            
            String nickName = member.getNickname();
            

			// 퇴장 없이 소켓 연결만 끊어진 상황이 아닌 새로운 입장&연결
            if (chatRoomMappingInfo == null) {
                chatRoomMappingInfo = new ChatRoomMappingInfo(roomId, nickName);
                redisChatRoomRepositoryImpl.setUserEnterInfo(memberId, chatRoomMappingInfo);

                room.plusCurrentHeadCount();
                ChatMessageRequestDto joinMessage = new ChatMessageRequestDto(
                        Long.valueOf(roomId), MessageType.JOIN, nickName + "님이 들어왔습니다.",
                        chatRoomMappingInfo.getNickname(), memberId, LocalDateTime.now());

                fcmService.subscribe(Long.valueOf(memberId), Long.valueOf(roomId));
                chatService.sendChatMessage(joinMessage);
            }
        } else if (StompCommand.DISCONNECT == accessor.getCommand()) {
            String sessionId = accessor.getSessionId();
            // 세션 정보 삭제
            redisChatRoomRepositoryImpl.removeUserBySessionIdEnterInfo(sessionId);
        }

        return message;
    }
}

 

 

WebSocket 연결 요청 처리 (CONNECT)

  • 클라이언트가 WebSocket 연결을 요청할 때 JWT 토큰을 검사하고, 유효한 토큰일 경우 사용자 정보를 Redis에 저장합니다.

구독 요청 처리 (SUBSCRIBE)

  • 클라이언트가 특정 채팅방을 구독하려고 할 때 해당 요청을 처리합니다.
  • 생략된 예외사항에 따른 처리 부분에서는 사용자가 이미 다른 방에 연결되어 있거나 방의 인원이 가득 찬 경우 등이 있습니다.
  • 사용자가 성공적으로 방에 참여하면, 채팅방에 참여 메시지를 전송하고 FCM 구독을 처리합니다.

연결 해제 요청 처리 (DISCONNECT)

  • 클라이언트가 WebSocket 연결을 해제할 때 Redis에서 사용자의 세션 정보를 삭제합니다. 하지만 퇴장은 REST API로 처리하기에 방 매핑 정보가 삭제되는 것은 아닙니다.

이와 같은 방식으로 StompHandler는 WebSocket 메시징을 관리하며 소켓 연결, Topic 구독, 연결 해제 과정에서의 JWT 인증, 사용자 세션 관리, 채팅방 상태 유지 및 FCM 알림 구독 등을 처리할 수 있습니다.

 

메세지 전송 (publish)

@MessageMapping("/chat")
public void sendMessage(ChatMessageRequestDto message, @Header("token") String token ) {

    String memberId = jwtTokenProvider.getMemberIdByToken(token);
    ChatRoomMappingInfo chatRoomMappingInfo = redisChatRoomRepositoryImpl.findChatInfoByMemberId(memberId);

    message.setSender(chatRoomMappingInfo.getNickname());
    message.setMemberId(memberId);
    message.setDateTime(LocalDateTime.now());


    // Websocket에 발행된 메시지를 redis로 발행(publish)
    chatService.sendChatMessage(message);
    
    --- 생략 ---
}
public void sendChatMessage(ChatMessageRequestDto chatMessageRequestDto) {
    redisTemplate.convertAndSend(channelTopic.getTopic(),
            ChatMessageMapper.toDto(chatMessageRequestDto));

    fcmService.sendChatMessage(chatMessageRequestDto);
//    채팅저장
}

 

/pub 엔드포인트로 전송된 메세지는 따로 핸들링을 거치지 않고, @MessageMapping 어노테이션이 적용된 해당 함수로 전달되어 메시지를 Redis로 발행합니다.

또한 메세지가 보내지면 Redis 토픽에 메세지를 전송하게 됩니다. 이는 단일화된 Topic에 메세지를 Publish하는 과정입니다.

 

@Service
@RequiredArgsConstructor
public class RedisSubscriber {

    private final ObjectMapper objectMapper;
    private final SimpMessageSendingOperations messageSendingOperations;

    public void sendMessage(String publishMessage) {
        try {
            ChatMessageResponse chatMessageResponse;
            chatMessageResponse = objectMapper.readValue(publishMessage, ChatMessageResponse.class);
            messageSendingOperations.convertAndSend("/sub/chat/" + chatMessageResponse.getRoomId(),
                    chatMessageResponse);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("메세지 전송에 실패하였습니다.");
        }
    }
}

 

Redis에 pub된 메시지는 RedisSubscriber를 통해 소비되며 실제 세분화된 토픽으로 메시지를 전송하고, 이 경로를 구독하고 있는 참여자들이 해당 메시지를 소비합니다. 눈치채셨을 지 모르겠지만 해당 함수는 Redis config에서 MessageListenerAdapter로 설정되어있습니다.

(코드 못찾으시는 분들을 위해)

 

 

결과적으로 메시지 전달 흐름을 간소화하면 위와 같을 것입니다. 이렇게 함으로써 우리는 비즈니스 요구사항에 맞는 채팅 서비스를 구현할 수 있었습니다.

 


5. 예상되는 문제

만약 채팅방에 연결되어있는데 앱 종료나 모종의 이유로 소켓이 끊기면 어떻게 될까요?

이 문제에 대응하기 위해 Fcm을 활용했습니다. 서버에서 클라이언트의 세션을 저장하고 또 이를 영구적으로 활용할 수 없습니다. 이에 따라 서버가 확실하게 알고 있는 정보인 Fcm Token을 이용해 Fcm으로 알림을 보내고 이를 프론트 측에서 채팅방에 그릴 지 말 지를 선택하기로 했습니다. 이에 따라 소비되지 않아 사라지는 메세지라 할 지라도 Fcm으로 전송되어 사라지지 않을 수 있습니다.

 

✅ 참여요청 수락을 하면 소켓에 바로 연결해주어야 하는 거 아닌가요?

이 또한 근본적으로 위 문제점과 동일한 방법으로 해결할 수 있습니다. 소켓도 처음 연결하는 과정에서는 HTTP와 같습니다. 클라이언트의 요청이 있어야 하기에 서버에서 클라이언트의 세션으로 연결시켜줄 수는 없다고 판단했습니다. 이에 따라 Fcm 구독을 시키고 클라이언트에서 방에 참여할 때마다 소켓연결을 하는 과정으로 진행하였습니다.

 

✅ 방에서 나가지 않았는데 세션이 새로 발급되면 다른 사용자라고 인식하나요?

우리는 세션과 방 매핑 정보를 동시에 갖고 있습니다. 주가 되는 것은 방 매핑정보이기 때문에 사용자의 세션이 바뀌는 것은 우리 서비스에서 큰 문제를 일으키지 않습니다.

 

다음 포스팅에서는 Stomp 프로토콜에서 예외처리를 하는 방법에 대해 소개하겠습니다.