- 데이터 입력 - Input Topic의 각 파티션에서 데이터 수신
- 스트림 처리 - StreamThread: 병렬 처리 단위
- Task 1, 2, 3이 각각 특정 파티션 담당
- 각 Task는 Input → Filter/Map/Transform → Output 처리
- 상태 관리(Stateful 처리 시)
- KTable: 상태 테이블, Upsert 방식, Key별 최신 상태
- KStream: 수정/삭제 불가, Append-Only, 이벤트 스트림, 변경 불가능한 이벤트
- State Store: 로컬에 상태 저장
- Changelog Topic: 상태 백업용 토픽
- 데이터 출력 - 처리 결과를 Output Topic의 파티션으로 전송
Kafka Streams 주요 API 및 연산자 정리 & Kafka Streams Topology
@Component
class OrderStreamsProcessor(
@Value("\${kafka.topics.orders}") private val ordersTopic: String,
@Value("\${kafka.topics.high-value-orders}") private val highValueOrdersTopic: String,
@Value("\${kafka.topics.fraud-alerts}") private val fraudAlertsTopic: String,
) {
private val logger = LoggerFactory.getLogger(OrderStreamsProcessor::class.java)
private val orderEventSerde = createJsonSerde<OrderEvent>()
private val fraudAlertSerde = createJsonSerde<FraudAlert>()
private val windowedOrderCountSerde = createJsonSerde<WindowedOrderCount>()
private val windowedSalesDataSerde = createJsonSerde<WindowedSalesData>()
private inline fun <reified T> createJsonSerde() : JsonSerde<T> {
return JsonSerde<T>().apply {
configure(mapOf(
"spring.json.trusted.packages" to "org.kafka_lecture.model",
"spring.json.add.type.headers" to false,
"spring.json.value.default.type" to T::class.java.name
), false)
}
}
@Bean
fun orderProcessingTopology(builder : StreamsBuilder) : Topology {
val orderStream : KStream<String, OrderEvent> = builder.stream(ordersTopic, Consumed.with(Serdes.String(), orderEventSerde))
highValueStream(orderStream)
fraudStream(orderStream)
orderCountStatsStream(orderStream)
salesStatsStream(orderStream)
return builder.build()
}
private fun highValueStream(orderStream : KStream<String, OrderEvent>) {
// filter: 조건에 맞는 이벤트만 통과
val highValueStream = orderStream.filter { _, orderEvent ->
logger.info("Filtering high Value Stream order: {}", orderEvent.orderId)
orderEvent.price >= BigDecimal("1000")
}
highValueStream.to(highValueOrdersTopic, Produced.with(Serdes.String(), orderEventSerde))
}
private fun fraudStream(orderStream : KStream<String, OrderEvent>) {
val fraudStream = orderStream.filter { _, orderEvent ->
orderEvent.price >= BigDecimal("5000") ||
orderEvent.quantity > 100 ||
orderEvent.price.multiply(BigDecimal.valueOf(orderEvent.quantity.toLong())) >= BigDecimal("10000")
}
// mapValues: 값 변환 (키는 유지)
.mapValues { orderEvent ->
val reason = when {
orderEvent.price >= BigDecimal("5000") -> "High single order value"
orderEvent.quantity > 100 -> "High quantity order"
else -> "High total order value"
}
val severity = when {
orderEvent.price >= BigDecimal("10000") -> FraudSeverity.CRITICAL
orderEvent.price >= BigDecimal("5000") -> FraudSeverity.HIGH
orderEvent.quantity > 100 -> FraudSeverity.MEDIUM
else -> FraudSeverity.LOW
}
FraudAlert(
orderId = orderEvent.orderId,
customerId = orderEvent.customerId,
reason = reason,
severity = severity,
)
}
fraudStream.to(fraudAlertsTopic, Produced.with(Serdes.String(), fraudAlertSerde))
}
private fun orderCountStatsStream(orderStream: KStream<String, OrderEvent>) {
orderStream
// groupByKey: 현재 키로 그룹화
.groupByKey(Grouped.with(Serdes.String(), orderEventSerde))
// windowedBy: 시간 기반 윈도우 생성
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
// aggregate: 상태 누적 계산
.aggregate(
{ WindowedOrderCount() },
{ _, _, aggregate -> aggregate.increment() },
// Materialized: 상태 저장소 설정
Materialized.`as`<String, WindowedOrderCount, WindowStore<Bytes, ByteArray>>("order-count-store")
.withValueSerde(windowedOrderCountSerde)
)
}
private fun salesStatsStream(orderStream: KStream<String, OrderEvent>) {
/*
<"customer1", OrderEvent(orderId="order1", customerId="customer1", price=100)>
<"customer2", OrderEvent(orderId="order2", customerId="customer2", price=200)>
<"customer1", OrderEvent(orderId="order3", customerId="customer1", price=150)>
customer1: [OrderEvent(order1, 100), OrderEvent(order3, 150)]
customer2: [OrderEvent(order2, 200)]
*/
orderStream
// groupBy: 새로운 키로 재그룹화
.groupBy(
{ key, orderEvent -> orderEvent.customerId },
Grouped.with(Serdes.String(), orderEventSerde)
)
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
{ WindowedSalesData() },
{ _, orderEvent, aggregate -> aggregate.add(orderEvent.price) },
Materialized.`as`<String, WindowedSalesData, WindowStore<Bytes, ByteArray>>("sales-stats-store")
.withValueSerde(windowedSalesDataSerde)
)
}
}
- Source (소스)
-
builder.stream(ordersTopic, ...): Kafka 토픽에서 데이터를 읽어오는 시작점
- Processor (프로세서)
-
filter(): 조건에 맞는 데이터만 통과
-
mapValues(): 데이터 변환
-
groupBy(), groupByKey(): 키별로 그룹화
-
windowedBy(): 시간 윈도우로 집계
-
aggregate(): 누적 계산
- Sink (싱크)
-
to(): 처리된 데이터를 다른 토픽으로 전송
- State Store
-
Materialized.as(): 집계 결과를 저장하는 로컬 저장
@Service
class OrderStreamsService(
private val factory : StreamsBuilderFactoryBean
) {
private val logger = LoggerFactory.getLogger(OrderStreamsService::class.java)
fun orderCountComparison() : OrderCountComparisonStats? {
return try {
val stream = factory.kafkaStreams
if (stream == null || stream.state() != KafkaStreams.State.RUNNING) {
return null
}
// Kafka Streams Topology에서 생성된 State Store에 접근
// order-count-store는 이전에 Materialized.as()로 생성한 저장
// 실시간 집계 데이터를 조회할 수 있게 함
val store : ReadOnlyWindowStore<String, WindowedOrderCount> = stream.store(
StoreQueryParameters.fromNameAndType("order-count-store", QueryableStoreTypes.windowStore())
)
val now = Instant.now()
/*
9시
8시 55분 ~ 9시 까지의 데이터
8시 50분 ~ 8시 55분 까지의 데이터
*/
val currentPeriodEnd = now
val currentPeriodStart = now.minusSeconds(300) // 5분전
val prevPeriodEnd = currentPeriodStart
val prevPeriodStart = currentPeriodStart.minusSeconds(300)
val currentCount = countForPeriod(store, currentPeriodStart, currentPeriodEnd) // 8시 55분 ~ 9시 까지의 데이터
val previousCount = countForPeriod(store, prevPeriodStart, prevPeriodEnd) // 8시 50분 ~ 8시 55분 까지의 데이터
val changeCount = currentCount - previousCount
val changePercentage = if (previousCount > 0) {
(changeCount.toDouble() / previousCount.toDouble()) * 100.0
} else if (currentCount > 0) {
100.0
} else {
0.0
}
OrderCountComparisonStats(
currentPeriod = PeriodStats(
windowStart = LocalDateTime.ofInstant(currentPeriodStart, ZoneOffset.UTC),
windowEnd = LocalDateTime.ofInstant(currentPeriodEnd, ZoneOffset.UTC),
orderCount = currentCount
),
previousPeriod = PeriodStats(
windowStart = LocalDateTime.ofInstant(prevPeriodStart, ZoneOffset.UTC),
windowEnd = LocalDateTime.ofInstant(prevPeriodEnd, ZoneOffset.UTC),
orderCount = previousCount
),
changeCount = changeCount,
changePercentage = changePercentage,
isIncreasing = changeCount > 0
)
} catch (e : Exception) {
logger.error("Failed to get streams info", e.message)
return null
}
}
private fun countForPeriod(
store : ReadOnlyWindowStore<String, WindowedOrderCount>,
startTime : Instant,
endTime : Instant
) : Long {
var totalCount =0L
// Topology에서 실시간으로 계산된 결과를 가져옴
store.fetchAll(startTime, endTime).use { iter ->
while (iter.hasNext()) {
val entry = iter.next()
totalCount += entry.value.count
}
}
return totalCount
}
}
동적 이벤트 스키마 관리 - Apache Avro
@Component
class SchemaManager(
@Value("\${schema.registry.url:http://localhost:8081}")
private val schemaRegistryUrl: String
) : ApplicationRunner {
private val logger = LoggerFactory.getLogger(SchemaManager::class.java)
private var cachedSchema: Schema? = null
private val client : SchemaRegistryClient = CachedSchemaRegistryClient(schemaRegistryUrl, 100)
fun orderEventSchema() : Schema{
return cachedSchema ?: loadSchemaFromRegistry().also {
cachedSchema = it
}
}
private fun loadSchemaFromRegistry(): Schema {
return try {
val subject = "orders-avro-value"
val last = client.getLatestSchemaMetadata(subject)
val schemaString = last.schema
Schema.Parser().parse(schemaString)
} catch (e: Exception) {
logger.warn("Faeild to load schema from register", e)
loadSchemaFromFile("avro/order-entity.avsc")
}
}
private fun loadSchemaFromFile(path: String) : Schema {
return try {
logger.info("path {}", path)
val resource = ClassPathResource(path)
val content = resource.inputStream.bufferedReader().use { it.readText() }
Schema.Parser().parse(content)
} catch (e : IOException) {
throw IllegalStateException("Failed to load schema from $path", e)
}
}
private fun registerSchemaIfNotExists(): Int? {
// topic - name - key - value
return try {
val subject = "orders-avro-value"
val existing = try {
client.getAllVersions(subject)
} catch (e: Exception) {
logger.info("subject does not exist")
emptyList<Int>()
}
if (existing.isNotEmpty()) {
// 최신 Schema를 넘겨준다
val last = client.getLatestSchemaMetadata(subject)
return last.id
}
val schema = loadSchemaFromFile("avro/order-entity.avsc")
val avroSchema = AvroSchema(schema)
val schemaId = client.register(subject, avroSchema)
return schemaId
} catch (e : Exception) {
logger.error("failed to register schema", e)
null
}
}
override fun run(args: ApplicationArguments?) {
registerSchemaIfNotExists()
}
}
- 동적 코드 생성
- Avro는 스키마를 사용하여 데이터를 직렬화하고 역직렬화하는 동적 코드 생성 기능을 제공한다.
- 런타임 시에 스키마 정보를 사용하여 코드를 생성하므로 개발자가 직접 코드를 작성할 필요가 없다.
- JSON 으로 정의한 스키마를 자바 파일로 빌드하여 사용할 수 있어 개발이 용이하다는 장점이 있다.
- 스키마 진화의 용이성
- Avro는 스키마 진화를 지원하여 데이터 구조의 변경을 쉽게 처리할 수 있다.
- 새로운 필드를 추가하거나 기존 필드를 수정이 용이하며, 이는 데이터의 유연성과 상호 운용성을 높일 수 있다.
- JSON 기반 스키마 정의
- Avro의 스키마는 JSON 형식으로 정의되어 있어 사람이 쉽게 읽고 이해할 수 있다.
- 자체 압축 기능
- Avro는 데이터를 압축하여 저장하는 기능을 제공한다.
- 데이터의 전송 및 저장 과정에서 발생하는 네트워크 대역폭과 디스크 공간을 절약할 수 있다.
@Service
class AvroOrderEventProducer(
private val avroKafkaTemplate: KafkaTemplate<String, GenericRecord>,
private val schemaManager: SchemaManager
) {
private val logger = LoggerFactory.getLogger(AvroOrderEventProducer::class.java)
fun publishOrderEvent(
orderId: String,
customerId: String,
quantity: Int,
price: BigDecimal
) {
try {
val schema = schemaManager.orderEventSchema()
val avroRecord = createAvroRecord(orderId, customerId, quantity, price, schema)
avroKafkaTemplate.send("orders-avro", orderId, avroRecord)
.whenComplete { result, ex ->
if (ex == null) {
logger.info("Avro order event published successfully: orderId={}, partition={}, offset={}",
orderId, result?.recordMetadata?.partition(), result?.recordMetadata?.offset())
} else {
logger.error("Failed to publish Avro order event: orderId={}", orderId, ex)
}
}
} catch (ex: Exception) {
logger.error("Error creating Avro record for order: orderId={}", orderId, ex)
}
}
private fun createAvroRecord(
orderId: String,
customerId: String,
quantity: Int,
price: BigDecimal,
schema: Schema
): GenericRecord {
val record = GenericData.Record(schema)
val now = LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli()
record.put("orderId", orderId)
record.put("customerId", customerId)
record.put("quantity", quantity)
record.put("price", convertPriceToBytes(price))
record.put("status", GenericData.EnumSymbol(schema.getField("status").schema(), "PENDING"))
record.put("createdAt", now)
record.put("updatedAt", now)
record.put("version", 1L)
return record
}
private fun convertPriceToBytes(price: BigDecimal): ByteBuffer {
val scaled = price.setScale(2)
val unscaledValue = scaled.unscaledValue()
return ByteBuffer.wrap(unscaledValue.toByteArray())
}
}
@Component
class AvroOrderEventConsumer {
private val logger = LoggerFactory.getLogger(AvroOrderEventConsumer::class.java)
@KafkaListener(
topics = ["orders-avro"],
groupId = "avro-order-processor-v1",
containerFactory = "avroKafkaListenerContainerFactory"
)
fun handleOrderEventV1(
@Payload avroRecord: GenericRecord,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long
) {
try {
val orderData = extractOrderDataFromAvro(avroRecord)
logger.info("Processing Avro order: orderId={}, partition={}, offset={}",
orderData.orderId, partition, offset)
} catch (e: Exception) {
logger.error(e.message, e)
}
}
private fun extractOrderDataFromAvro(record: GenericRecord): OrderDataV1 {
return OrderDataV1(
orderId = record.get("orderId").toString(),
customerId = record.get("customerId").toString(),
quantity = record.get("quantity") as Int,
price = convertBytesToPrice(record.get("price") as ByteBuffer),
timestamp = convertTimestamp(record.get("createdAt") as Long),
status = record.get("status").toString(),
version = record.get("version") as Long
)
}
private fun convertBytesToPrice(byteBuffer: ByteBuffer): BigDecimal {
val bytes = ByteArray(byteBuffer.remaining())
byteBuffer.get(bytes)
val bigInt = BigInteger(bytes)
return BigDecimal(bigInt, 2)
}
private fun convertTimestamp(epochMilli: Long): LocalDateTime {
// unix epoch milliseconds
return LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMilli), ZoneOffset.UTC)
}
}
data class OrderDataV1(
val orderId: String,
val customerId: String,
val quantity: Int,
val price: BigDecimal,
val timestamp: LocalDateTime,
val status: String,
val version: Long
)