KR_Kafka - somaz94/python-study GitHub Wiki
Kafka๋ ๊ณ ์ฑ๋ฅ ๋ถ์ฐ ๋ฉ์์ง ์์คํ
์ด๋ค.
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from typing import List, Dict, Any
import json
class KafkaConfig:
def __init__(
self,
bootstrap_servers: str = 'localhost:9092',
client_id: str = 'python-kafka'
):
self.bootstrap_servers = bootstrap_servers
self.client_id = client_id
def create_admin_client(self) -> KafkaAdminClient:
"""Admin ํด๋ผ์ด์ธํธ ์์ฑ"""
return KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers,
client_id=self.client_id
)
def create_topic(self, topic_name: str, num_partitions: int = 1):
"""ํ ํฝ ์์ฑ"""
admin_client = self.create_admin_client()
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=1
)
admin_client.create_topics([topic])
โ
ํน์ง:
- ๋ถ์ฐ ์์คํ
- ๊ณ ์ฑ๋ฅ ์ฒ๋ฆฌ
- ํ์ฅ์ฑ
Kafka์ ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ ํ๋ก๋์ ๊ตฌํ ๋ฐฉ๋ฒ์ด๋ค.
class KafkaMessageProducer:
def __init__(self, config: KafkaConfig):
self.producer = KafkaProducer(
bootstrap_servers=config.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
def send_message(
self,
topic: str,
value: Dict,
key: str = None,
partition: int = None
):
"""๋ฉ์์ง ์ ์ก"""
future = self.producer.send(
topic=topic,
value=value,
key=key,
partition=partition
)
return future.get(timeout=10)
โ
ํน์ง:
- ๋น๋๊ธฐ ์ ์ก
- ์ง๋ ฌํ
- ํํฐ์ ๋
Kafka์์ ๋ฉ์์ง๋ฅผ ์๋นํ๋ ์ปจ์๋จธ ๊ตฌํ ๋ฐฉ๋ฒ์ด๋ค.
class KafkaMessageConsumer:
def __init__(
self,
config: KafkaConfig,
topics: List[str],
group_id: str
):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=config.bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def consume_messages(self, handler):
"""๋ฉ์์ง ์๋น"""
try:
for message in self.consumer:
handler(message)
finally:
self.close()
โ
ํน์ง:
- ๊ทธ๋ฃน ๊ด๋ฆฌ
- ์คํ์ ๊ด๋ฆฌ
- ์๋ ์ปค๋ฐ
Kafka ๋ฉ์์ง๋ฅผ ์ผ๊ด์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๋ฐฐ์น ์ฒ๋ฆฌ ๋ฐฉ๋ฒ์ด๋ค.
from typing import List, Callable
import time
class KafkaBatchProcessor:
def __init__(
self,
producer: KafkaMessageProducer,
batch_size: int = 100,
flush_interval: int = 30
):
self.producer = producer
self.batch_size = batch_size
self.flush_interval = flush_interval
self.messages = []
self.last_flush = time.time()
def add_message(self, topic: str, message: Dict):
"""๋ฉ์์ง ๋ฐฐ์น์ ์ถ๊ฐ"""
self.messages.append((topic, message))
if len(self.messages) >= self.batch_size:
self.flush_batch()
โ
ํน์ง:
- ๋ฐฐ์น ์ฒ๋ฆฌ
- ์ฑ๋ฅ ์ต์ ํ
- ๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ
Kafka ๋ฉ์์ง ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ์ด๋ค.
class KafkaStreamProcessor:
def __init__(
self,
input_topic: str,
output_topic: str,
config: KafkaConfig,
group_id: str
):
self.producer = KafkaMessageProducer(config)
self.consumer = KafkaMessageConsumer(
config,
[input_topic],
group_id
)
def process_stream(self, transform_func):
"""์คํธ๋ฆผ ์ฒ๋ฆฌ"""
def message_handler(message):
transformed_value = transform_func(message.value)
self.producer.send_message(
self.output_topic,
transformed_value
)
self.consumer.consume_messages(message_handler)
โ
ํน์ง:
- ์ค์๊ฐ ์ฒ๋ฆฌ
- ๋ณํ ํ์ดํ๋ผ์ธ
- ์คํธ๋ฆผ ์ฐ๊ฒฐ
โ
๋ชจ๋ฒ ์ฌ๋ก:
- ํํฐ์ ์ ๋ต ์๋ฆฝ
- ๋ฉ์์ง ์์ ๋ณด์ฅ
- ์คํ์ ๊ด๋ฆฌ
- ์ฌ์๋ ์ฒ๋ฆฌ
- ๋ฐฐ์น ์ฒ๋ฆฌ ์ต์ ํ
- ๋ชจ๋ํฐ๋ง ๊ตฌ์ถ
- ์๋ฌ ์ฒ๋ฆฌ
- ํ์ฅ์ฑ ๊ณ ๋ ค
- ๋ณด์ ์ค์
- ์ฑ๋ฅ ํ๋