Apache Kafka ‐ 카프카 스트림즈 - woojin-playground/Backend-PlayGround GitHub Wiki
KStream, KTable, GlobalKTable
KStream은 레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성되어 있다.
KStream으로 데이터를 조회하면 토픽에 존재하는 모든 레코드가 출력된다.
KStream은 컨슈머로 토픽을 구독하는 것과 동일 선상에서 사용하는 것으로 볼 수 있다.
KTable은 KStream과 다르게 메시지 키를 기준으로 묶어서 사용한다.
KStream은 토픽의 모든 레코드를 조회할 수 있지만 KTable은 유니크한 메시지 키를 기준으로 가장 최신의 레코드를 사용한다.
그러므로 KTable로 데이터를 조회하게 되면 메시지 키를 기준으로 가장 최신에 추가된 레코드의 데이터가 출력된다.
새로 데이터를 적재할 경우 만약 동일한 메시지 키가 이미 존재한다면 데이터가 업데이트 된다고 볼 수 있다.
GlobalKTable은 전체 파티션의 모든 데이터를 가져와서 사용한다.
GlobalKTable의 데이터가 비대할 경우 각각의 태스크마다 모든 데이터들이 적재되기 때문에 굉장히 큰 용량이 필요하고 애플리케이션 자체에 엄청난 부담으로 작용하게 된다.
GlobalKTable을 사용하고자한다면 데이터의 규모도 필수로 체크해서 도입하길 권장한다.
Topic: orders (4 partitions)
KStream / KTable:
Instance A → partition 0, 1
Instance B → partition 2, 3
※ 각 인스턴스는 자기 파티션 데이터만 보유
GlobalKTable:
Instance A → partition 0, 1, 2, 3 (전체)
Instance B → partition 0, 1, 2, 3 (전체)
※ 모든 인스턴스가 전체 데이터 보유
토폴로지
토폴로지란, 2개 이상의 노드들과 선으로 이루어진 집합을 말한다.
여러 종류들이 있는데 스트림즈에서 사용하는 토폴로지는 트리 형태와 유사하다.
토폴로지를 이루는 노드를 하나의 프로세서(Processor)라고 부르고 노드와 노드를 이은 선을 스트림(Stream)이라고 부른다.
소스 프로세서 : 데이터를 처리하기 위해 최초로 선언해야 하는 노드로, 하나 이상의 토픽에서 데이터를 가져온다.
스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할을 한다.(변환, 분기 처리와 같은 로직)
싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할을 하며 스트림즈로 처리된 데이터의 최종 종착지이다.
카프카 스트림즈 주요 옵션
필수 옵션
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력해 일부 브로커에서 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정할 수 있다.
application.id : 스트림즈 애플리케이션을 구분하기 위한 고유 아이디를 설정한다. 다른 로직을 가진 스트림즈 애플리케이션들은 서로 다른 application.id값을 가져야 한다.
선택 옵션
default.key.serde : 레코드의 메시지 키를 직렬화/역직렬화하는 클래스를 지정한다. 기본값은 바이트 직렬화/역직렬화 클래스인 Serdes.ByteArray().getClass().getName()이다.
default.value.serde : 레코드의 메시지 값을 직렬화/역직렬화하는 클래스를 지정한다. 기본값은 바이트 직렬화/역직렬화 클래스인 Serdes.ByteArray().getClass().getName()이다.
num.stream.threads : 스트림 프로세싱 실행 시 실행될 쓰레드 개수를 지정하는 것으로 기본값은 1이다.
state.dir : 상태기반 데이터 처리를 할 때, 데이터를 저장할 디렉터리를 지정하는 것으로 기본값은 /tmp/kafka-streams이다.
Streams DSL Window Processing
스트림 데이터를 분석할 때, 가장 많이 활용하는 프로세싱 중 하나는 Window 연산이다.
Window 연산은 특정 시간에 대응하여 취합 연산을 처리할 때 활용한다.
카프카 스트림즈에서 제공하는 Window Processing 4가지를 지원한다. 모든 프로세싱은 메시지 키를 기준으로 취합된다.
그러므로 해당 토픽에 동일한 파티션에는 동일한 메시지 키가 있는 레코드가 존재해야지만 정확한 취합이 가능하다.
텀블링 윈도우
서로 겹치지 않은 윈도우를 특정 간격으로 지속적으로 처리할 때 사용한다.
윈도우 최대 사이즈에 도달하면 해당 시점에 데이터를 취합하여 결과를 도출한다.
텀블링 윈도우는 단위 시간당 데이터가 필요할 경우 사용할 수 있다.
호핑 윈도우
일정 시간 간격으로 겹치는 윈도우가 존재하는 윈도우 연산을 처리할 경우 사용한다.
호핑 윈도우는 윈도우 사이즈와 윈도우 간격 2가지 변수를 가진다.
윈도우 사이즈는 연산을 수행할 최대 윈도우 사이즈를 뜻하고 윈도우 간격은 서로 다른 윈도우 간격을 뜻한다.
텀블링 윈도우와 다르게 동일한 키의 데이터가 서로 다른 윈도우에서 여러 번 연산될 수 있다.
슬라이딩 윈도우
슬라이딩 윈도우는 호핑 윈도우와 유사하지만 데이터의 정확한 시간을 바탕으로 윈도우 사이즈에 포함되는 데이터를 모두 연산에 포함시키는 특징이 있다.
세션 윈도우
세션 윈도우는 동일한 메시지 키의 데이터를 한 세션에 묶어 연산할 때 사용한다.
세션의 최대 만료시간에 따라 윈도우 사이즈가 달라진다.
세션 만료 시간이 지나게 되면 세션 윈도우가 종료되고 해당 윈도우의 모든 데이터를 취합하여 연산한다.
그렇기 때문에 세션 윈도우의 윈도우 사이즈는 가변적이다.
Streams DSL Queryable Store
카프카 스트림즈에서 KTable은 카프카 토픽의 데이터를 로컬의 RocksDB에 Mview로 만들어두고 사용하기 때문에 레코드의 메시지, 메시지 값을 기반으로 KeyValueStore로 사용할 수 있다.
특정 토픽을 KTable로 사용하고 ReadOnlyKeyValueStore로 뷰를 가져오면 메시지 키를 기반으로 토픽 데이터를 조회할 수 있다.
Processor API
Processor API는 스트림즈 DSL보다 저수준이지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서 동일한 역할을 수행한다.
스트림즈 DSL은 데이터 처리, 분기, 조인을 위한 다양한 메서드들을 제공하지만 추가적인 상세 로직 구현이 필요하다면 Processor API를 활용할 수 있다.
프로세서 API에서는 스트림즈 DSL에서 사용했던 KStream, KTable, GlobalKTable 개념이 없다.
다만 스트림즈 DSL과 프로세서 API는 함께 구현하여 사용할 때는 활용할 수 있다.
Processor API를 구현하기 위해서 Processor 또는 Transformer 인터페이스로 구현한 클래스가 필요하다.
Processor 인터페이스는 일정 로직이 이루어진 뒤 다음 프로세서로 데이터가 넘어가지 않을 때 사용하고 Transformer 인터페이스는 일정 로직이 이루어진 뒤 다음 프로세서로 데이터를 넘길 때 사용한다.
/** * A processor of key-value pair records. * * @param <KIn> the type of input keys * @param <VIn> the type of input values * @param <KOut> the type of output keys * @param <VOut> the type of output values */@FunctionalInterfacepublicinterfaceProcessor<KIn, VIn, KOut, VOut> {
/** * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology * that contains it is initialized. When the framework is done with the processor, {@link #close()} will be called on it; the * framework may later re-use the processor by calling {@code #init()} again. * <p> * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to * {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} a method to be * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s. * * @param context the context; may not be null */defaultvoidinit(finalProcessorContext<KOut, VOut> context) {}
/** * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator. * * @param record the record to process */voidprocess(Record<KIn, VIn> record);
/** * Close this processor and clean up any resources. Be aware that {@code #close()} is called after an internal cleanup. * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. The framework may * later re-use this processor by calling {@code #init()} on it again. * <p> * Note: Do not close any streams managed resources, like {@link StateStore}s here, as they are managed by the library. */defaultvoidclose() {}
}
/** * The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output * records (both key and value type can be altered arbitrarily). * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for * each record of a stream and can access and modify a state that is available beyond a single call of * {@link #transform(Object, Object)} (cf. {@link KeyValueMapper} for stateless record transformation). * Additionally, this {@code Transformer} can {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} * a method to be {@link Punctuator#punctuate(long) called periodically} with the provided context. * <p> * Use {@link TransformerSupplier} to provide new instances of {@code Transformer} to Kafka Stream's runtime. * <p> * If only a record's value should be modified {@link ValueTransformer} can be used. * * @param <K> key type * @param <V> value type * @param <R> {@link KeyValue} return type (both key and value type can be set * arbitrarily) * @see TransformerSupplier * @see ValueTransformer * @see KStream#map(KeyValueMapper) * @see KStream#flatMap(KeyValueMapper) * @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.Processor api.Processor} instead. */@DeprecatedpublicinterfaceTransformer<K, V, R> {
/** * Initialize this transformer. * This is called once per instance when the topology gets initialized. * When the framework is done with the transformer, {@link #close()} will be called on it; the * framework may later re-use the transformer by calling {@link #init(ProcessorContext)} again. * <p> * The provided {@link ProcessorContext context} can be used to access topology and record meta data, to * {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator) schedule} a method to be * {@link Punctuator#punctuate(long) called periodically} and to access attached {@link StateStore}s. * <p> * Note, that {@link ProcessorContext} is updated in the background with the current record's meta data. * Thus, it only contains valid record meta data when accessed within {@link #transform(Object, Object)}. * * @param context the context */voidinit(finalProcessorContextcontext);
/** * Transform the record with the given key and value. * Additionally, any {@link StateStore state} that is attached to this operator can be accessed and modified * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}). * <p> * If only one record should be forward downstream, {@code transform} can return a new {@link KeyValue}. If * more than one output record should be forwarded downstream, {@link ProcessorContext#forward(Object, Object)} * and {@link ProcessorContext#forward(Object, Object, To)} can be used. * If no record should be forwarded downstream, {@code transform} can return {@code null}. * * Note that returning a new {@link KeyValue} is merely for convenience. The same can be achieved by using * {@link ProcessorContext#forward(Object, Object)} and returning {@code null}. * * @param key the key for the record * @param value the value for the record * @return new {@link KeyValue} pair—if {@code null} no key-value pair will * be forwarded to down stream */Rtransform(finalKkey, finalVvalue);
/** * Close this transformer and clean up any resources. The framework may * later re-use this transformer by calling {@link #init(ProcessorContext)} on it again. * <p> * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and * {@link ProcessorContext#forward(Object, Object, To)} can be used. */voidclose();
}