카프카 스트림즈는 토픽에 적재된 데이터를 상태기반 또는 비상태기반으로 변환하여 다른 토픽에 적재하는 라이브러리이다.
- 장점
- 카프카에서 공식적으로 지원하여 호환성이 매우 뛰어남
- 장애 허용 시스템을 가져 장애에 강함
- 테스크
스트림즈 애플리케이션을 실행하면 생기는 데이터 처리 최소 단위
- 토폴로지
2개 이상의 노드들과 선으로 이루어진 집합
- 프로세서
소스 프로세서 : 데이터를 처리하기 위한 노드
스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할을 하는 노드
싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할을 하는 노드
스트림즈 DSL
KStream
레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성
KTable
유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용한 스트림
GlobalKTable
KTable과 동일하게 메시지 키를 기준으로 묶어서 사용하지만 KTable로 선언된 토픽은 1개 파티션이 1개 태스크에 할당되어 사용되고
GlobalDTable로 선언된 토픽은 모든 파티션 데이터가 가 태스크에 할당되어 사용된다는 차이가 있다.
코파티셔닝
조인을 하는 2개의 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업
리파티셔닝
새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정
조인
KTable과 KStream을 메시지 키를 기준으로 조인할 수 있다. 하지만 코파티셔닝이 되지 않으면 TopologyException
이 발생한다.
GlobalKTable과 KStream은 코파티셔닝이 되지 않아도 조인이 가능하다.
하지만 정의된 모든 데이터를 저장하고 사용하는 GlobalKTable은 로컬 스토리지의 사용량이 증가하고 네트워크, 브로커에 부하가 생기므로 주의해야 한다.
코드 실습
프로세서 API
토폴로지를 기준으로 데이터를 처리한다는 관점에서는 스트림즈 DSL과 동일한 역할을 한다.
추가적인 상세 로직의 구현이 필요하다면 프로세서 API를 활용할 수 있다.
- processor 구현
public class FilterProcessor implements Processor<String, String, String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
if (record.value().length() > 5){
context.forward(record);
}
context.commit();
}
@Override
public void close() {
}
}
- 프로세스 사용하기
public class SimpleKafkaProcessor {
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String APPLICATION_NAME = "processor-application";
private final static String STREAM_LOG = "test";
private final static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) {
var props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
var topology = new Topology();
topology.addSource("Source", STREAM_LOG)
.addProcessor("Process",
() -> new FilterProcessor(),
"Source")
.addSink("Sink",
STREAM_LOG_COPY,
"Process");
var streaming = new KafkaStreams(topology, props);
streaming.start();
}
}
'devops > kafka' 카테고리의 다른 글
4장 카프카 상세 개념 설명 (0) | 2023.10.31 |
---|---|
3장 카프카 컨슈머 (0) | 2023.10.21 |
3장 카프카 기본 개념 설명 - 브로커, 프로듀서 (1) | 2023.10.17 |
2장 카프카 빠르게 시작해보기 (카프카 커맨드 라인 ) (0) | 2023.10.16 |
2장 카프카 빠르게 시작해보기 (실습용 카프카 브로커 설치) (0) | 2023.10.15 |