공식문서를 보면 Redis Stream을 사용하였을 때, 비동기로 메시지를 처리할 수 있는 방식에 대해서 설명을 해준다. 하지만 어떻게 동작하는 방식에 대해서는 문서가 설명하는 부분이 없어서 코드를 보고 그 동작 방식에 대해서 공부를 해보았다.
우선 궁금했던 것은 비동기 처리를 어떻게 지원하는 것인가? 그리고 비동기 모델을 적용하였을 때, 비동기 스레드가 얼마나 생성이 되는가? 이었다. 이러한 점을 알기 위해서 문서에서 소개하는 StreamListener
와 SteamListenerContainer
가 어떤 역할을 하고 어떠한 컴포넌트를 가지고 있는지를 살펴보았다.
StreamListener 살펴보기
문서의 예제를 보면 StreanListener
를 구현한 다음 코드를 확인할 수 있다.
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
onMessage
함수를 보면 직관적으로 Stream에서 읽은 메시지를 하나씩 처리하기 위한 로직을 구현하는 것임을 유추할 수 있다. 실제로 StreamListener
인터페이스를 보면 다음처럼 onMessage
를 위한 함수형 인터페이스임을 확인할 수 있었다. 하지만 StreamListner
를 보면 Redis와 관련된 어떤 구현체나 인터페이스에 대한 의존이 아무것도 없어서 높은 수준의 추상화 개념임을 짐작할 수 있었다.
StreamListenerContainer 내부 살펴보기
StreamListenerContainer
는 메시지 수신의 모든 스레딩을 담당하고 처리를 위해서 리스너로 전달한다고 문서에서 확인할 수 있었다. org.springframework.data.redis.stream.StreamMessageListenerContainer
클래스를 통해서 Redis Stream을 처리할 StreamListenerContainer
를 생성할 때에는 org.springframework.data.redis.stream .DefaultStreamMessageListenerContainer
구현체를 사용한다.
DefaultStreamMessageListenerContainer
구현체를 간단하게 도식화하면 다음처럼 그려볼 수 있다. 실질적인 스레드를 관리하고 실행하기 위해서 Executor를 가지고 있고 에러가 발생하였을 때의 처리를 위해서 ErrorHandler
를 가지고 있다. 그리고 구독이라는 특이한 컨셉을 가지고 있는데 그러한 구독을 저장하기 위한 subsriptions
라는 필드도 가지고 있었다.
구독의 개념을 구현한 Subscription
인터페이스를 보면 isActive
와 await
메서드만이 있어서 구체적인 상호작용을 단순히 인터페이스만을 보고 설명할 수는 없었다.
DefaultStreamMessageListenerContainer
에서 사용되는 Subscription
구현체는 TaskSubscription
클래스이다. 이 클래스는 DefaultStreamMessageListenerContainer
의 내부 클래스로 선언되어 있는데 중요한 점은 Task
라는 Runnable
을 구현한 객체가 포함이 되어있다. [그림 2]에서는 이러한 구조를 나타내고 있다. StreamPollTask
라는 구현체가 실행되는 작업의 단위이고 Redis Stream의 메시지를 수신하고 StreamListener
를 호출하여 메시지 수신기를 콜하는 역할을 하고 있었다.
Executor를 통한 비동기 수신처리 살펴보기
앞의 과정을 통해서 StreamMessageLintenerContainer
가 어떻게 Redis Stream에 구독하고 처리하는지에 대해서 이해할 수 있었다. 다음으로는 Executor를 통해서 비동기를 지원하는 방식을 살펴보았다.
DefaultStreamMessageListenerContainer
의 기본 Executor는 SimpleAyncTaskExecutor
이다. SimpleAyncTaskExecutor
는 Task가 실행될 때마다 스레드를 생성하는 단순한 실행 구현체이기 때문에 구독이 Task를 Executor에 넘길 때마다 스레드가 하나씩 생성되고 실행이 된다는 것을 확인할 수 있었다.
그렇다면 이런 기본 설정을 사용하였을 때, 최대로 생성이 되는 스레드의 수를 알아보기 위해서 더 코드를 살펴보았는데 결론은 구독의 개수만큼 스레드가 최대로 생성될 수 있다는 것이다. 왜냐하면 구독이 isActive 메서드에서 구독 상태가 활성화되었을 때만 Executor에 새로운 작업을 등록하는데 isActive 함수는 현재 실행하는 Task가 실행 상태이라면 false를 반환하기 때문에 실행 중인 구독은 새로운 작업을 등록할 수 가 없다. 따라서 최대로 생성할 수 있는 스레드는 구독의 수와 일치하다는 것을 알 수 있었다.
하지만 기본으로 제공되는 SimpleAyncTaskExecutor
를 사용하지 않고 다른 솔루션을 적용할 수 있을 것이다. 우선 기본 Executor는 스레드를 계속 재생성하기 때문에 스레드 생성 비용이 들어가기 때문에 이러한 점을 해결하기 위해서 스레드 풀을 지원하는 Executor를 사용하는 것도 좋은 방법이라고 생각한다.
후기
Redis Stream 비동기 처리를 어떻게 지원하는지 알아보기 위해서 시작했지만 StreamListener를 통해서 Stream에 대한 구독을 어떻게 처리하도록 구현했는지를 이해할 수 있어서 생각보다 얻은 점이 많아서 좋았다. Redis Stream 뿐만 아니라 다른 Stream에 구독하는 형태의 솔루션에도 이러한 방식을 적용해서 구현할 수 있다는 것을 느낄 수 있었다.
참고자료
'spring' 카테고리의 다른 글
Spring과 kotest에서 testContainer 사용 후기 (0) | 2024.06.12 |
---|---|
reseilence4j를 이용한 장애 대처 (0) | 2024.01.27 |
[WireMock] WireMock을 이용하여 Mock API 서버 사용하기 (0) | 2024.01.21 |
[Spring] RedisCacheManager에 대해서 (1) | 2024.01.13 |
스프링 AOP를 적용하는 방법 (1) | 2024.01.10 |