KR_Redis - somaz94/python-study GitHub Wiki
Redisλ μΈλ©λͺ¨λ¦¬ λ°μ΄ν° ꡬ쑰 μ μ₯μλ‘, κ³ μ±λ₯ μΊμ±, λ©μμ§ λΈλ‘컀 λ° μ€μκ° μ ν리μΌμ΄μ
μ λ°μ΄ν° μ μ₯μλ‘ λ리 μ¬μ©λλ€. Pythonμμλ redis-py λΌμ΄λΈλ¬λ¦¬λ₯Ό ν΅ν΄ μ½κ² μ κ·Όνκ³ νμ©ν μ μλ€.
import redis
from typing import Optional, Any, Dict, List, Union
import json
import time
import logging
from contextlib import contextmanager
# Redis μ°κ²° ν΄λμ€
class RedisClient:
"""Redis μλ² μ°κ²° λ° κ΄λ¦¬λ₯Ό μν ν΄λΌμ΄μΈνΈ ν΄λμ€"""
def __init__(
self,
host: str = 'localhost',
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
socket_timeout: float = 5.0,
socket_connect_timeout: float = 5.0,
socket_keepalive: bool = True,
retry_on_timeout: bool = True,
health_check_interval: int = 30,
decode_responses: bool = True
):
"""
Redis ν΄λΌμ΄μΈνΈ μ΄κΈ°ν
Args:
host: Redis μλ² νΈμ€νΈ
port: Redis μλ² ν¬νΈ
db: μ¬μ©ν λ°μ΄ν°λ² μ΄μ€ λ²νΈ
password: μΈμ¦ λΉλ°λ²νΈ
socket_timeout: μμΌ μμ
νμμμ(μ΄)
socket_connect_timeout: μ°κ²° νμμμ(μ΄)
socket_keepalive: μμΌ keepalive μ€μ
retry_on_timeout: νμμμ μ μ¬μλ μ¬λΆ
health_check_interval: μν νμΈ κ°κ²©(μ΄)
decode_responses: μλ΅ μλ λμ½λ© μ¬λΆ
"""
self.logger = logging.getLogger(__name__)
# μ°κ²° μ€μ
self.connection_params = {
'host': host,
'port': port,
'db': db,
'password': password,
'socket_timeout': socket_timeout,
'socket_connect_timeout': socket_connect_timeout,
'socket_keepalive': socket_keepalive,
'retry_on_timeout': retry_on_timeout,
'health_check_interval': health_check_interval,
'decode_responses': decode_responses
}
self.redis = self._create_connection()
def _create_connection(self):
"""Redis μ°κ²° μμ±"""
try:
self.logger.info(f"Redis μλ² {self.connection_params['host']}:{self.connection_params['port']} μ°κ²° μλ...")
connection = redis.Redis(**self.connection_params)
return connection
except redis.ConnectionError as e:
self.logger.error(f"Redis μ°κ²° μ€ν¨: {e}")
raise
def ping(self) -> bool:
"""
Redis μλ² μ°κ²° ν
μ€νΈ
Returns:
bool: μ°κ²° μ±κ³΅ μ¬λΆ
"""
try:
return self.redis.ping()
except redis.ConnectionError as e:
self.logger.error(f"Redis μλ² μ°κ²° ν
μ€νΈ μ€ν¨: {e}")
return False
def info(self) -> Dict:
"""
Redis μλ² μ 보 μ‘°ν
Returns:
Dict: Redis μλ² μ 보
"""
try:
return self.redis.info()
except redis.RedisError as e:
self.logger.error(f"Redis μλ² μ 보 μ‘°ν μ€ν¨: {e}")
return {}
def close(self):
"""μ°κ²° μ’
λ£"""
try:
self.redis.close()
self.logger.info("Redis μ°κ²° μ’
λ£")
except Exception as e:
self.logger.error(f"Redis μ°κ²° μ’
λ£ μ€ μ€λ₯: {e}")
@contextmanager
def pipeline_context(self, transaction: bool = True):
"""
νμ΄νλΌμΈ 컨ν
μ€νΈ λ§€λμ
μ¬μ© μ:
```
with client.pipeline_context() as pipe:
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()
```
Args:
transaction: νΈλμμ
μ¬μ© μ¬λΆ
Yields:
Pipeline: Redis νμ΄νλΌμΈ κ°μ²΄
"""
pipeline = self.redis.pipeline(transaction=transaction)
try:
yield pipeline
finally:
pipeline.execute()
def flush_db(self, force: bool = False):
"""
νμ¬ λ°μ΄ν°λ² μ΄μ€μ λͺ¨λ ν€ μμ
Args:
force: κ°μ μμ μ¬λΆ (μμ μ₯μΉ)
Returns:
bool: μ±κ³΅ μ¬λΆ
"""
if not force:
self.logger.warning("flush_db νΈμΆ μ force=True μ΅μ
μ΄ νμνλ€")
return False
try:
self.redis.flushdb()
self.logger.info(f"λ°μ΄ν°λ² μ΄μ€ {self.connection_params['db']} λΉμ")
return True
except redis.RedisError as e:
self.logger.error(f"λ°μ΄ν°λ² μ΄μ€ λΉμ°κΈ° μ€ν¨: {e}")
return False
β
νΉμ§:
- μμ μ μΈ μ°κ²° μ€μ λ° κ΄λ¦¬
- λ€μν μ°κ²° μ΅μ μ§μ
- νμ΄νλΌμΈμ ν΅ν μ±λ₯ μ΅μ ν
- μλ μλ΅ λμ½λ© μ€μ
- νμ νν μΌλ‘ μ½λ κ°λ μ± ν₯μ
- 컨ν μ€νΈ λ§€λμ λ₯Ό ν΅ν μμ κ΄λ¦¬
- μμΈν λ‘κΉ μΌλ‘ μ°κ²° λ¬Έμ μΆμ
- μλ² μν νμΈ (ping, info)
- μμ ν λ°μ΄ν°λ² μ΄μ€ μ΄κΈ°ν λ©μ»€λμ¦
Redisλ λ¬Έμμ΄, ν΄μ, 리μ€νΈ, μ§ν©, μ λ ¬ μ§ν© λ± λ€μν λ°μ΄ν° ꡬ쑰λ₯Ό μ 곡νλ€. κ° λ°μ΄ν° νμ
μ λ§λ ν¨μ¨μ μΈ μμ
λ°©λ²μ ν΅ν΄ λ€μν μ¬μ© μ¬λ‘λ₯Ό ꡬνν μ μλ€.
class RedisOperations:
"""Redis κΈ°λ³Έ λ°μ΄ν° νμ
λ³ μμ
ν΄λμ€"""
def __init__(self, client: RedisClient):
"""
Redis μμ
ν΄λμ€ μ΄κΈ°ν
Args:
client: Redis ν΄λΌμ΄μΈνΈ μΈμ€ν΄μ€
"""
self.redis = client.redis
self.logger = logging.getLogger(__name__)
# λ¬Έμμ΄ μμ
def set_value(
self,
key: str,
value: str,
expiry: Optional[int] = None,
nx: bool = False,
xx: bool = False
) -> bool:
"""
ν€-κ° μ μ₯
Args:
key: ν€
value: κ°
expiry: λ§λ£ μκ°(μ΄)
nx: ν€κ° μ‘΄μ¬νμ§ μμ λλ§ μ€μ
xx: ν€κ° μ΄λ―Έ μ‘΄μ¬ν λλ§ μ€μ
Returns:
bool: μ€μ μ±κ³΅ μ¬λΆ
"""
try:
return self.redis.set(key, value, ex=expiry, nx=nx, xx=xx)
except redis.RedisError as e:
self.logger.error(f"κ° μ€μ μ€ν¨ - ν€: {key}, μ€λ₯: {e}")
return False
def get_value(self, key: str) -> Optional[str]:
"""
κ° μ‘°ν
Args:
key: ν€
Returns:
Optional[str]: μ μ₯λ κ° λλ None
"""
try:
return self.redis.get(key)
except redis.RedisError as e:
self.logger.error(f"κ° μ‘°ν μ€ν¨ - ν€: {key}, μ€λ₯: {e}")
return None
def increment(self, key: str, amount: int = a1) -> Optional[int]:
"""
μ«μ κ° μ¦κ°
Args:
key: ν€
amount: μ¦κ°λ
Returns:
Optional[int]: μ¦κ° ν κ° λλ None
"""
try:
return self.redis.incrby(key, amount)
except redis.RedisError as e:
self.logger.error(f"μ¦λΆ μ€ν¨ - ν€: {key}, μ€λ₯: {e}")
return None
def set_with_ttl(self, key: str, value: str, ttl: int) -> bool:
"""
TTL(Time-To-Live)κ³Ό ν¨κ» κ° μ€μ
Args:
key: ν€
value: κ°
ttl: λ§λ£ μκ°(μ΄)
Returns:
bool: μ€μ μ±κ³΅ μ¬λΆ
"""
try:
return self.redis.setex(key, ttl, value)
except redis.RedisError as e:
self.logger.error(f"TTLκ³Ό ν¨κ» κ° μ€μ μ€ν¨ - ν€: {key}, μ€λ₯: {e}")
return False
def set_json(self, key: str, data: Dict, expiry: Optional[int] = None) -> bool:
"""
JSON λ°μ΄ν° μ μ₯
Args:
key: ν€
data: JSONμΌλ‘ μ§λ ¬νν λ°μ΄ν°
expiry: λ§λ£ μκ°(μ΄)
Returns:
bool: μ€μ μ±κ³΅ μ¬λΆ
"""
try:
json_data = json.dumps(data)
return self.set_value(key, json_data, expiry)
except (redis.RedisError, TypeError) as e:
self.logger.error(f"JSON λ°μ΄ν° μ μ₯ μ€ν¨ - ν€: {key}, μ€λ₯: {e}")
return False
def get_json(self, key: str) -> Optional[Dict]:
"""
JSON λ°μ΄ν° μ‘°ν
Args:
key: ν€
Returns:
Optional[Dict]: μμ§λ ¬νλ λ°μ΄ν° λλ None
"""
try:
json_data = self.get_value(key)
if json_data:
return json.loads(json_data)
return None
except (redis.RedisError, json.JSONDecodeError) as e:
self.logger.error(f"JSON λ°μ΄ν° μ‘°ν μ€ν¨ - ν€: {key}, μ€λ₯: {e}")
return None
# ν΄μ μμ
def set_hash(self, name: str, mapping: Dict) -> bool:
"""
ν΄μ μ μ₯
Args:
name: ν΄μ μ΄λ¦
mapping: νλ-κ° λ§€ν
Returns:
bool: μ€μ μ±κ³΅ μ¬λΆ
"""
try:
self.redis.hset(name, mapping=mapping)
return True
except redis.RedisError as e:
self.logger.error(f"ν΄μ μ μ₯ μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return False
def get_hash(self, name: str) -> Dict:
"""
ν΄μ μ 체 μ‘°ν
Args:
name: ν΄μ μ΄λ¦
Returns:
Dict: ν΄μμ λͺ¨λ νλ-κ° μ
"""
try:
return self.redis.hgetall(name)
except redis.RedisError as e:
self.logger.error(f"ν΄μ μ‘°ν μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return {}
def get_hash_field(self, name: str, field: str) -> Optional[str]:
"""
ν΄μ νλ κ° μ‘°ν
Args:
name: ν΄μ μ΄λ¦
field: νλ μ΄λ¦
Returns:
Optional[str]: νλ κ° λλ None
"""
try:
return self.redis.hget(name, field)
except redis.RedisError as e:
self.logger.error(f"ν΄μ νλ μ‘°ν μ€ν¨ - μ΄λ¦: {name}, νλ: {field}, μ€λ₯: {e}")
return None
def increment_hash_field(self, name: str, field: str, amount: int = 1) -> Optional[int]:
"""
ν΄μ νλ κ° μ¦κ°
Args:
name: ν΄μ μ΄λ¦
field: νλ μ΄λ¦
amount: μ¦κ°λ
Returns:
Optional[int]: μ¦κ° ν κ° λλ None
"""
try:
return self.redis.hincrby(name, field, amount)
except redis.RedisError as e:
self.logger.error(f"ν΄μ νλ μ¦λΆ μ€ν¨ - μ΄λ¦: {name}, νλ: {field}, μ€λ₯: {e}")
return None
# 리μ€νΈ μμ
def push_to_list(self, name: str, *values: str) -> Optional[int]:
"""
리μ€νΈμ κ° μΆκ° (μ€λ₯Έμͺ½)
Args:
name: 리μ€νΈ μ΄λ¦
*values: μΆκ°ν κ°λ€
Returns:
Optional[int]: μΆκ° ν 리μ€νΈ κΈΈμ΄ λλ None
"""
try:
return self.redis.rpush(name, *values)
except redis.RedisError as e:
self.logger.error(f"리μ€νΈ μΆκ° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
def push_to_list_left(self, name: str, *values: str) -> Optional[int]:
"""
리μ€νΈμ κ° μΆκ° (μΌμͺ½)
Args:
name: 리μ€νΈ μ΄λ¦
*values: μΆκ°ν κ°λ€
Returns:
Optional[int]: μΆκ° ν 리μ€νΈ κΈΈμ΄ λλ None
"""
try:
return self.redis.lpush(name, *values)
except redis.RedisError as e:
self.logger.error(f"리μ€νΈ μΌμͺ½ μΆκ° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
def pop_from_list(self, name: str) -> Optional[str]:
"""
리μ€νΈμμ κ° κΊΌλ΄κΈ° (μ€λ₯Έμͺ½)
Args:
name: 리μ€νΈ μ΄λ¦
Returns:
Optional[str]: κΊΌλΈ κ° λλ None
"""
try:
return self.redis.rpop(name)
except redis.RedisError as e:
self.logger.error(f"리μ€νΈ κΊΌλ΄κΈ° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
def pop_from_list_left(self, name: str) -> Optional[str]:
"""
리μ€νΈμμ κ° κΊΌλ΄κΈ° (μΌμͺ½)
Args:
name: 리μ€νΈ μ΄λ¦
Returns:
Optional[str]: κΊΌλΈ κ° λλ None
"""
try:
return self.redis.lpop(name)
except redis.RedisError as e:
self.logger.error(f"리μ€νΈ μΌμͺ½ κΊΌλ΄κΈ° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
def get_list(self, name: str, start: int = 0, end: int = -1) -> List[str]:
"""
리μ€νΈ λ²μ μ‘°ν
Args:
name: 리μ€νΈ μ΄λ¦
start: μμ μΈλ±μ€
end: μ’
λ£ μΈλ±μ€ (-1μ λκΉμ§)
Returns:
List[str]: 리μ€νΈ μμλ€
"""
try:
return self.redis.lrange(name, start, end)
except redis.RedisError as e:
self.logger.error(f"리μ€νΈ μ‘°ν μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return []
def get_list_length(self, name: str) -> int:
"""
리μ€νΈ κΈΈμ΄ μ‘°ν
Args:
name: 리μ€νΈ μ΄λ¦
Returns:
int: 리μ€νΈ κΈΈμ΄
"""
try:
return self.redis.llen(name)
except redis.RedisError as e:
self.logger.error(f"리μ€νΈ κΈΈμ΄ μ‘°ν μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return 0
# μ§ν© μμ
def add_to_set(self, name: str, *values: str) -> Optional[int]:
"""
μ§ν©μ κ° μΆκ°
Args:
name: μ§ν© μ΄λ¦
*values: μΆκ°ν κ°λ€
Returns:
Optional[int]: μλ‘ μΆκ°λ κ°μ μ λλ None
"""
try:
return self.redis.sadd(name, *values)
except redis.RedisError as e:
self.logger.error(f"μ§ν© μΆκ° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
def get_set_members(self, name: str) -> List[str]:
"""
μ§ν© λ©€λ² μ‘°ν
Args:
name: μ§ν© μ΄λ¦
Returns:
List[str]: μ§ν© λ©€λ² λͺ©λ‘
"""
try:
return list(self.redis.smembers(name))
except redis.RedisError as e:
self.logger.error(f"μ§ν© λ©€λ² μ‘°ν μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return []
def is_member(self, name: str, value: str) -> bool:
"""
κ°μ΄ μ§ν©μ λ©€λ²μΈμ§ νμΈ
Args:
name: μ§ν© μ΄λ¦
value: νμΈν κ°
Returns:
bool: λ©€λ² μ¬λΆ
"""
try:
return self.redis.sismember(name, value)
except redis.RedisError as e:
self.logger.error(f"μ§ν© λ©€λ² νμΈ μ€ν¨ - μ΄λ¦: {name}, κ°: {value}, μ€λ₯: {e}")
return False
def remove_from_set(self, name: str, *values: str) -> Optional[int]:
"""
μ§ν©μμ κ° μ κ±°
Args:
name: μ§ν© μ΄λ¦
*values: μ κ±°ν κ°λ€
Returns:
Optional[int]: μ κ±°λ κ°μ μ λλ None
"""
try:
return self.redis.srem(name, *values)
except redis.RedisError as e:
self.logger.error(f"μ§ν© μ κ±° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
# μ λ ¬λ μ§ν© μμ
def add_to_sorted_set(self, name: str, mapping: Dict[str, float]) -> Optional[int]:
"""
μ λ ¬λ μ§ν©μ κ° μΆκ°
Args:
name: μ λ ¬λ μ§ν© μ΄λ¦
mapping: κ°-μ μ λ§€ν
Returns:
Optional[int]: μλ‘ μΆκ°λ κ°μ μ λλ None
"""
try:
return self.redis.zadd(name, mapping)
except redis.RedisError as e:
self.logger.error(f"μ λ ¬λ μ§ν© μΆκ° μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return None
def get_sorted_set_range(self, name: str, start: int = 0, end: int = -1, withscores: bool = False) -> Union[List[str], List[tuple]]:
"""
μ λ ¬λ μ§ν© λ²μ μ‘°ν (μ μ μ€λ¦μ°¨μ)
Args:
name: μ λ ¬λ μ§ν© μ΄λ¦
start: μμ μΈλ±μ€
end: μ’
λ£ μΈλ±μ€ (-1μ λκΉμ§)
withscores: μ μ ν¬ν¨ μ¬λΆ
Returns:
Union[List[str], List[tuple]]: κ° λͺ©λ‘ λλ (κ°, μ μ) νν λͺ©λ‘
"""
try:
return self.redis.zrange(name, start, end, withscores=withscores)
except redis.RedisError as e:
self.logger.error(f"μ λ ¬λ μ§ν© λ²μ μ‘°ν μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return []
def get_sorted_set_rev_range(self, name: str, start: int = 0, end: int = -1, withscores: bool = False) -> Union[List[str], List[tuple]]:
"""
μ λ ¬λ μ§ν© λ²μ μ‘°ν (μ μ λ΄λ¦Όμ°¨μ)
Args:
name: μ λ ¬λ μ§ν© μ΄λ¦
start: μμ μΈλ±μ€
end: μ’
λ£ μΈλ±μ€ (-1μ λκΉμ§)
withscores: μ μ ν¬ν¨ μ¬λΆ
Returns:
Union[List[str], List[tuple]]: κ° λͺ©λ‘ λλ (κ°, μ μ) νν λͺ©λ‘
"""
try:
return self.redis.zrevrange(name, start, end, withscores=withscores)
except redis.RedisError as e:
self.logger.error(f"μ λ ¬λ μ§ν© μμ λ²μ μ‘°ν μ€ν¨ - μ΄λ¦: {name}, μ€λ₯: {e}")
return []
def increment_score(self, name: str, value: str, amount: float = 1.0) -> Optional[float]:
"""
μ λ ¬λ μ§ν© λ©€λ²μ μ μ μ¦κ°
Args:
name: μ λ ¬λ μ§ν© μ΄λ¦
value: κ°
amount: μ¦κ°λ
Returns:
Optional[float]: μ μ μ λλ None
"""
try:
return self.redis.zincrby(name, amount, value)
except redis.RedisError as e:
self.logger.error(f"μ λ ¬λ μ§ν© μ μ μ¦κ° μ€ν¨ - μ΄λ¦: {name}, κ°: {value}, μ€λ₯: {e}")
return None
# ν€ κ΄λ¦¬
def delete_key(self, *names: str) -> int:
"""
ν€ μμ
Args:
*names: μμ ν ν€ μ΄λ¦λ€
Returns:
int: μμ λ ν€μ μ
"""
try:
return self.redis.delete(*names)
except redis.RedisError as e:
self.logger.error(f"ν€ μμ μ€ν¨ - ν€: {names}, μ€λ₯: {e}")
return 0
def set_expiry(self, name: str, time_seconds: int) -> bool:
"""
ν€ λ§λ£ μκ° μ€μ
Args:
name: ν€ μ΄λ¦
time_seconds: λ§λ£ μκ°(μ΄)
Returns:
bool: μ€μ μ±κ³΅ μ¬λΆ
"""
try:
return self.redis.expire(name, time_seconds)
except redis.RedisError as e:
self.logger.error(f"λ§λ£ μκ° μ€μ μ€ν¨ - ν€: {name}, μ€λ₯: {e}")
return False
def get_ttl(self, name: str) -> int:
"""
ν€ λ§λ£ μκ° μ‘°ν
Args:
name: ν€ μ΄λ¦
Returns:
int: λ¨μ μκ°(μ΄), -1μ μꡬμ , -2λ ν€ μμ
"""
try:
return self.redis.ttl(name)
except redis.RedisError as e:
self.logger.error(f"TTL μ‘°ν μ€ν¨ - ν€: {name}, μ€λ₯: {e}")
return -2
def exists(self, *names: str) -> int:
"""
ν€ μ‘΄μ¬ μ¬λΆ νμΈ
Args:
*names: νμΈν ν€ μ΄λ¦λ€
Returns:
int: μ‘΄μ¬νλ ν€μ μ
"""
try:
return self.redis.exists(*names)
except redis.RedisError as e:
self.logger.error(f"ν€ μ‘΄μ¬ νμΈ μ€ν¨ - ν€: {names}, μ€λ₯: {e}")
return 0
β
νΉμ§:
- λͺ¨λ Redis λ°μ΄ν° νμ μ§μ (λ¬Έμμ΄, ν΄μ, 리μ€νΈ, μ§ν©, μ λ ¬ μ§ν©)
- κ° λ°μ΄ν° νμ λ³ μ μ© λ©μλ
- μΌκ΄λ μ€λ₯ μ²λ¦¬μ λ‘κΉ
- JSON λ°μ΄ν° μ§λ ¬ν/μμ§λ ¬ν μ§μ
- TTL(Time-To-Live) μ€μ λ° κ΄λ¦¬
- μμμ μ¦λΆ μ°μ° μ§μ
- λ€μν 리μ€νΈ μμ (μλ°©ν₯ μΆκ°/μ κ±°)
- μ§ν© μ°μ° μ§μ
- μ λ ¬λ μ§ν© κ΄λ¦¬ (μμ, μ μ κΈ°λ°)
- ν€ κ΄λ¦¬ κΈ°λ₯ (μμ , λ§λ£ μ€μ , μ‘΄μ¬ νμΈ)
Redisλ λΉ λ₯Έ μ κ·Ό μλμ TTL κΈ°λ₯μ ν΅ν΄ μ ν리μΌμ΄μ
μ μ±λ₯μ ν₯μμν€λ ν¨κ³Όμ μΈ μΊμ± μμ€ν
μΌλ‘ νμ©λλ€. ν¨μ κ²°κ³Όλ λ°μ΄ν°λ² μ΄μ€ 쿼리 κ²°κ³Ό λ±μ μΊμ±νμ¬ λ°λ³΅μ μΈ κ³μ°μ΄λ IO μμ
μ μ΅μνν μ μλ€.
from functools import wraps
import pickle
import hashlib
from datetime import timedelta
import inspect
import time
class RedisCache:
"""Redisλ₯Ό νμ©ν μΊμ± ꡬν ν΄λμ€"""
def __init__(self, client: RedisClient, prefix: str = "cache"):
"""
Redis μΊμ μ΄κΈ°ν
Args:
client: Redis ν΄λΌμ΄μΈνΈ μΈμ€ν΄μ€
prefix: μΊμ ν€ μ λμ¬
"""
self.redis = client.redis
self.prefix = prefix
self.logger = logging.getLogger(__name__)
def _make_cache_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""
ν¨μ νΈμΆμ λν μΊμ ν€ μμ±
Args:
func_name: ν¨μ μ΄λ¦
args: μμΉ μΈμ
kwargs: ν€μλ μΈμ
Returns:
str: μμ±λ μΊμ ν€
"""
# μΈμλ₯Ό μ λ ¬νμ¬ μΌκ΄λ ν€ μμ±
key_parts = [func_name, str(args)]
if kwargs:
sorted_items = sorted(kwargs.items())
key_parts.append(str(sorted_items))
# SHA-256 ν΄μ μμ±
key_str = "::".join(key_parts)
hashed = hashlib.sha256(key_str.encode()).hexdigest()
return f"{self.prefix}:{func_name}:{hashed}"
def cache_decorator(self, expiry: int = 3600):
"""
ν¨μ κ²°κ³Όλ₯Ό μΊμνλ λ°μ½λ μ΄ν°
Args:
expiry: μΊμ λ§λ£ μκ°(μ΄)
Returns:
callable: λ°μ½λ μ΄ν° ν¨μ
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# ν¨μ μ΄λ¦ κ°μ Έμ€κΈ°
func_name = func.__name__
# μΊμ ν€ μμ±
cache_key = self._make_cache_key(func_name, args, kwargs)
# μΊμλ κ²°κ³Ό νμΈ
cached_result = self.redis.get(cache_key)
if cached_result:
self.logger.debug(f"μΊμ ννΈ: {func_name}")
try:
return pickle.loads(cached_result)
except pickle.PickleError as e:
self.logger.error(f"μΊμ μμ§λ ¬ν μ€ν¨: {e}")
# μΊμ λ―Έμ€ - ν¨μ μ€ν
self.logger.debug(f"μΊμ λ―Έμ€: {func_name}")
start_time = time.time()
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# κ²°κ³Ό μΊμ±
try:
serialized = pickle.dumps(result)
self.redis.setex(cache_key, expiry, serialized)
self.logger.debug(
f"μΊμ μ μ₯: {func_name}, μ€ν μκ°: {execution_time:.4f}s, ν¬κΈ°: {len(serialized)} λ°μ΄νΈ"
)
except (pickle.PickleError, redis.RedisError) as e:
self.logger.error(f"μΊμ μ μ₯ μ€ν¨: {e}")
return result
return wrapper
return decorator
def invalidate_cache(self, pattern: str = "*") -> int:
"""
ν¨ν΄μ λ§λ μΊμ 무ν¨ν
Args:
pattern: μΊμ ν€ ν¨ν΄
Returns:
int: μμ λ ν€μ μ
"""
try:
# ν¨ν΄μ λ§λ ν€ μ°ΎκΈ°
cache_pattern = f"{self.prefix}:{pattern}"
keys = self.redis.keys(cache_pattern)
if not keys:
return 0
# ν€ μμ
deleted = self.redis.delete(*keys)
self.logger.info(f"{deleted}κ°μ μΊμ νλͺ© μμ λ¨: {cache_pattern}")
return deleted
except redis.RedisError as e:
self.logger.error(f"μΊμ 무ν¨ν μ€ν¨: {e}")
return 0
def memoize(self, expiry: int = 3600):
"""
ν¨μ κ²°κ³Όλ₯Ό λ©λͺ¨μ΄μ μ΄μ
νλ λ°μ½λ μ΄ν°
(λμΌ μΈμμ λν΄ ν¨μ λ΄λΆμμ ν λ²λ§ μ€ν)
Args:
expiry: μΊμ λ§λ£ μκ°(μ΄)
Returns:
callable: λ°μ½λ μ΄ν° ν¨μ
"""
return self.cache_decorator(expiry)
def cached_property(self, expiry: int = 3600):
"""
ν΄λμ€ νλ‘νΌν° κ²°κ³Όλ₯Ό μΊμνλ λ°μ½λ μ΄ν°
Args:
expiry: μΊμ λ§λ£ μκ°(μ΄)
Returns:
callable: λ°μ½λ μ΄ν° ν¨μ
"""
def decorator(method):
@property
@wraps(method)
def wrapper(self):
# μΈμ€ν΄μ€ μ 보λ₯Ό ν¬ν¨ν μΊμ ν€ μμ±
instance_id = id(self)
method_name = method.__name__
class_name = self.__class__.__name__
cache_key = f"{self.prefix}:{class_name}:{instance_id}:{method_name}"
# μΊμ νμΈ
cached_result = self.redis.get(cache_key)
if cached_result:
try:
return pickle.loads(cached_result)
except pickle.PickleError:
pass
# λ©μλ μ€ν
result = method(self)
# κ²°κ³Ό μΊμ±
try:
self.redis.setex(cache_key, expiry, pickle.dumps(result))
except (pickle.PickleError, redis.RedisError):
pass
return result
return wrapper
return decorator
def cache_context(self, key: str, expiry: int = 3600):
"""
컨ν
μ€νΈ λ§€λμ λ₯Ό ν΅ν μΊμ±
Args:
key: μΊμ ν€
expiry: μΊμ λ§λ£ μκ°(μ΄)
Returns:
contextmanager: 컨ν
μ€νΈ λ§€λμ
"""
@contextmanager
def cache_cm():
cache_key = f"{self.prefix}:{key}"
# μΊμ νμΈ
cached_data = self.redis.get(cache_key)
if cached_data:
try:
yield pickle.loads(cached_data), True # (λ°μ΄ν°, μΊμ ννΈ)
return
except pickle.PickleError:
pass
# μΊμ λ―Έμ€
result_container = []
try:
yield result_container, False # (κ²°κ³Ό 컨ν
μ΄λ, μΊμ λ―Έμ€)
# κ²°κ³Όκ° μ»¨ν
μ΄λμ μ μ₯λμλμ§ νμΈ
if result_container:
# 첫 λ²μ§Έ νλͺ©μ μΊμ
try:
self.redis.setex(cache_key, expiry, pickle.dumps(result_container[0]))
except (pickle.PickleError, redis.RedisError) as e:
self.logger.error(f"컨ν
μ€νΈ μΊμ μ μ₯ μ€ν¨: {e}")
except Exception as e:
self.logger.error(f"μΊμ 컨ν
μ€νΈ μ€λ₯: {e}")
raise
return cache_cm()
β
νΉμ§:
- λ°μ½λ μ΄ν° ν¨ν΄μ ν΅ν κ°νΈν μΊμ±
- μμ μ μΈ μΊμ ν€ μμ± (SHA-256 ν΄μ μ¬μ©)
- μ§λ ¬νλ₯Ό ν΅ν 볡μ‘ν κ°μ²΄ μΊμ±
- ν¨ν΄ κΈ°λ° μΊμ 무ν¨ν
- λ©λͺ¨μ΄μ μ΄μ μ§μ
- νλ‘νΌν° μΊμ± κΈ°λ₯
- 컨ν μ€νΈ λ§€λμ λ₯Ό ν΅ν μ μ°ν μΊμ±
- μ€ν μκ° μΈ‘μ λ° λ‘κΉ
- μμΈ μ²λ¦¬λ₯Ό ν΅ν μμ μ± ν보
- μΌκ΄λ ν€ μ λμ¬ κ΄λ¦¬
Redisμ λΉ λ₯Έ μ²λ¦¬ μλμ TTL κΈ°λ₯μ μΉ μ ν리μΌμ΄μ
μ μΈμ
κ΄λ¦¬μ μ΄μμ μ΄λ€. μΈμ¦ μ 보μ μ¬μ©μ μνλ₯Ό μμ νκ² μ μ₯νκ³ κ΄λ¦¬ν μ μμΌλ©°, λΆμ° νκ²½μμλ μΌκ΄λ μΈμ
μ κ·Όμ΄ κ°λ₯νλ€.
import uuid
from datetime import datetime, timedelta
import json
import secrets
import time
class RedisSession:
"""Redisλ₯Ό νμ©ν μΈμ
κ΄λ¦¬ ν΄λμ€"""
def __init__(
self,
client: RedisClient,
prefix: str = "session",
default_expiry: int = 86400 # 24μκ°
):
"""
Redis μΈμ
κ΄λ¦¬μ μ΄κΈ°ν
Args:
client: Redis ν΄λΌμ΄μΈνΈ μΈμ€ν΄μ€
prefix: μΈμ
ν€ μ λμ¬
default_expiry: κΈ°λ³Έ μΈμ
λ§λ£ μκ°(μ΄)
"""
self.redis = client.redis
self.prefix = prefix
self.default_expiry = default_expiry
self.logger = logging.getLogger(__name__)
def _get_session_key(self, session_id: str) -> str:
"""
μΈμ
IDλ‘λΆν° μΈμ
ν€ μμ±
Args:
session_id: μΈμ
ID
Returns:
str: μΈμ
ν€
"""
return f"{self.prefix}:{session_id}"
def create_session(
self,
user_id: str,
data: dict,
expiry: Optional[int] = None
) -> str:
"""
μ μΈμ
μμ±
Args:
user_id: μ¬μ©μ ID
data: μΈμ
μ μ μ₯ν λ°μ΄ν°
expiry: μΈμ
λ§λ£ μκ°(μ΄), Noneμ΄λ©΄ κΈ°λ³Έκ° μ¬μ©
Returns:
str: μμ±λ μΈμ
ID
"""
# μΈμ
ID μμ± (UUID v4)
session_id = str(uuid.uuid4())
session_key = self._get_session_key(session_id)
# μΈμ
λ°μ΄ν° μ€λΉ
current_time = datetime.now().isoformat()
session_data = {
"user_id": user_id,
"created_at": current_time,
"last_accessed": current_time,
"ip_address": data.get("ip_address", "unknown"),
"user_agent": data.get("user_agent", "unknown"),
**data
}
# TTL μ€μ
ttl = expiry if expiry is not None else self.default_expiry
try:
# μΈμ
μ μ₯
self.redis.hset(session_key, mapping=session_data)
self.redis.expire(session_key, timedelta(seconds=ttl))
# μ¬μ©μ IDλ‘ μΈμ
μΈλ±μ± (ν μ¬μ©μκ° μ¬λ¬ μΈμ
μ κ°μ§ μ μμ)
user_sessions_key = f"user_sessions:{user_id}"
self.redis.sadd(user_sessions_key, session_id)
self.logger.info(f"μΈμ
μμ±: {session_id} (μ¬μ©μ: {user_id})")
return session_id
except redis.RedisError as e:
self.logger.error(f"μΈμ
μμ± μ€ν¨: {e}")
raise
def get_session(self, session_id: str, update_access: bool = True) -> Optional[dict]:
"""
μΈμ
μ 보 μ‘°ν
Args:
session_id: μΈμ
ID
update_access: μ κ·Ό μκ° μ
λ°μ΄νΈ μ¬λΆ
Returns:
Optional[dict]: μΈμ
λ°μ΄ν° λλ None
"""
if not session_id:
return None
session_key = self._get_session_key(session_id)
try:
# μΈμ
λ°μ΄ν° μ‘°ν
session_data = self.redis.hgetall(session_key)
if not session_data:
return None
# λ§μ§λ§ μ κ·Ό μκ° μ
λ°μ΄νΈ
if update_access:
self.redis.hset(session_key, "last_accessed", datetime.now().isoformat())
return session_data
except redis.RedisError as e:
self.logger.error(f"μΈμ
μ‘°ν μ€ν¨: {e}")
return None
def update_session(self, session_id: str, data: dict) -> bool:
"""
μΈμ
λ°μ΄ν° μ
λ°μ΄νΈ
Args:
session_id: μΈμ
ID
data: μ
λ°μ΄νΈν λ°μ΄ν°
Returns:
bool: μ
λ°μ΄νΈ μ±κ³΅ μ¬λΆ
"""
session_key = self._get_session_key(session_id)
try:
# μΈμ
μ‘΄μ¬ νμΈ
if not self.redis.exists(session_key):
return False
# μ
λ°μ΄νΈν λ°μ΄ν°μ λ§μ§λ§ μ κ·Ό μκ° μΆκ°
update_data = {
**data,
"last_accessed": datetime.now().isoformat()
}
# μΈμ
μ
λ°μ΄νΈ
self.redis.hset(session_key, mapping=update_data)
return True
except redis.RedisError as e:
self.logger.error(f"μΈμ
μ
λ°μ΄νΈ μ€ν¨: {e}")
return False
def delete_session(self, session_id: str) -> bool:
"""
μΈμ
μμ
Args:
session_id: μΈμ
ID
Returns:
bool: μμ μ±κ³΅ μ¬λΆ
"""
session_key = self._get_session_key(session_id)
try:
# μΈμ
λ°μ΄ν°μμ μ¬μ©μ ID κ°μ Έμ€κΈ°
user_id = self.redis.hget(session_key, "user_id")
# μΈμ
μμ
if self.redis.delete(session_key):
# μ¬μ©μ μΈμ
μΈλ±μ€μμλ μμ
if user_id:
self.redis.srem(f"user_sessions:{user_id}", session_id)
self.logger.info(f"μΈμ
μμ : {session_id}")
return True
return False
except redis.RedisError as e:
self.logger.error(f"μΈμ
μμ μ€ν¨: {e}")
return False
def extend_session(self, session_id: str, extension: int) -> bool:
"""
μΈμ
λ§λ£ μκ° μ°μ₯
Args:
session_id: μΈμ
ID
extension: μ°μ₯ν μκ°(μ΄)
Returns:
bool: μ°μ₯ μ±κ³΅ μ¬λΆ
"""
session_key = self._get_session_key(session_id)
try:
# νμ¬ λ¨μ μκ°μ μ°μ₯ μκ° μΆκ°
return self.redis.expire(session_key, timedelta(seconds=extension))
except redis.RedisError as e:
self.logger.error(f"μΈμ
μ°μ₯ μ€ν¨: {e}")
return False
def get_user_sessions(self, user_id: str) -> List[str]:
"""
μ¬μ©μμ λͺ¨λ μΈμ
ID μ‘°ν
Args:
user_id: μ¬μ©μ ID
Returns:
List[str]: μΈμ
ID λͺ©λ‘
"""
try:
user_sessions_key = f"user_sessions:{user_id}"
return list(self.redis.smembers(user_sessions_key))
except redis.RedisError as e:
self.logger.error(f"μ¬μ©μ μΈμ
μ‘°ν μ€ν¨: {e}")
return []
def invalidate_user_sessions(self, user_id: str, keep_current: Optional[str] = None) -> int:
"""
μ¬μ©μμ λͺ¨λ μΈμ
무ν¨ν
Args:
user_id: μ¬μ©μ ID
keep_current: μ μ§ν νμ¬ μΈμ
ID (μ ν μ¬ν)
Returns:
int: 무ν¨νλ μΈμ
μ
"""
try:
# μ¬μ©μμ λͺ¨λ μΈμ
ID κ°μ Έμ€κΈ°
sessions = self.get_user_sessions(user_id)
if not sessions:
return 0
# νμ¬ μΈμ
μ μ μΈν λͺ¨λ μΈμ
μμ
deleted_count = 0
for session_id in sessions:
if keep_current and session_id == keep_current:
continue
if self.delete_session(session_id):
deleted_count += 1
self.logger.info(f"μ¬μ©μ {user_id}μ μΈμ
{deleted_count}κ° λ¬΄ν¨ν")
return deleted_count
except redis.RedisError as e:
self.logger.error(f"μ¬μ©μ μΈμ
무ν¨ν μ€ν¨: {e}")
return 0
def create_csrf_token(self, session_id: str) -> Optional[str]:
"""
CSRF ν ν° μμ± λ° μΈμ
μ μ μ₯
Args:
session_id: μΈμ
ID
Returns:
Optional[str]: CSRF ν ν° λλ None
"""
try:
# λλ€ ν ν° μμ±
csrf_token = secrets.token_hex(32)
# μΈμ
μ ν ν° μ μ₯
if self.update_session(session_id, {"csrf_token": csrf_token}):
return csrf_token
return None
except Exception as e:
self.logger.error(f"CSRF ν ν° μμ± μ€ν¨: {e}")
return None
def validate_csrf_token(self, session_id: str, token: str) -> bool:
"""
CSRF ν ν° κ²μ¦
Args:
session_id: μΈμ
ID
token: κ²μ¦ν CSRF ν ν°
Returns:
bool: μ ν¨ μ¬λΆ
"""
try:
# μΈμ
μμ ν ν° κ°μ Έμ€κΈ°
session = self.get_session(session_id, update_access=False)
if not session:
return False
# ν ν° λΉκ΅ (μκ° λ
립μ λΉκ΅)
stored_token = session.get("csrf_token")
if not stored_token:
return False
return secrets.compare_digest(stored_token, token)
except Exception as e:
self.logger.error(f"CSRF ν ν° κ²μ¦ μ€ν¨: {e}")
return False
# μ¬μ© μμ
def create_user_session(redis_session, user_info):
"""μ¬μ©μ λ‘κ·ΈμΈ ν μΈμ
μμ± μμ """
user_id = user_info["id"]
ip_address = user_info.get("ip_address", "127.0.0.1")
user_agent = user_info.get("user_agent", "Unknown")
# μΈμ
λ°μ΄ν° μ€λΉ
session_data = {
"username": user_info["username"],
"email": user_info["email"],
"role": user_info["role"],
"ip_address": ip_address,
"user_agent": user_agent,
"login_time": int(time.time())
}
# μΈμ
μμ±
session_id = redis_session.create_session(user_id, session_data)
# CSRF ν ν° μμ±
csrf_token = redis_session.create_csrf_token(session_id)
return {
"session_id": session_id,
"csrf_token": csrf_token
}
β
νΉμ§:
- μμ ν μΈμ ID μμ± (UUID μ¬μ©)
- μ μ°ν μΈμ λ°μ΄ν° μ μ₯
- μλ λ§λ£ μκ° κ΄λ¦¬
- μΈμ μ κ·Ό μκ° μΆμ
- μ¬μ©μλ³ μΈμ μΈλ±μ±
- μΈμ λ°μ΄ν° μ λ°μ΄νΈ κΈ°λ₯
- μΈμ μ°μ₯ μ§μ
- μ¬μ©μλ³ μΈμ κ΄λ¦¬
- CSRF λ³΄νΈ κΈ°λ₯ λ΄μ₯
- μμ ν ν ν° λΉκ΅ (νμ΄λ° μ΄ν λ°©μ§)
Redisλ λ¨μν ν€-κ° μ μ₯μλ₯Ό λμ΄ λ€μν κ³ κΈ ν¨ν΄μ ꡬνν μ μλ€. λ©μμ§ ν, λΆμ° λ½, μλ μ ν, 리λ보λ λ±μ κΈ°λ₯μ ν¨μ¨μ μΌλ‘ ꡬννμ¬ λ³΅μ‘ν μ ν리μΌμ΄μ
μꡬμ¬νμ ν΄κ²°ν μ μλ€.
import time
import random
import uuid
from datetime import datetime
from typing import Optional, Dict, List, Tuple, Callable, Any
class RedisAdvancedPatterns:
"""Redisλ₯Ό νμ©ν κ³ κΈ ν¨ν΄ ꡬν ν΄λμ€"""
def __init__(self, client: RedisClient):
"""
Redis κ³ κΈ ν¨ν΄ μ΄κΈ°ν
Args:
client: Redis ν΄λΌμ΄μΈνΈ μΈμ€ν΄μ€
"""
self.redis = client.redis
self.logger = logging.getLogger(__name__)
# λ©μμ§ ν ν¨ν΄
def enqueue(self, queue_name: str, message: Dict[str, Any]) -> bool:
"""
λ©μμ§ νμ λ©μμ§ μΆκ°
Args:
queue_name: ν μ΄λ¦
message: λ©μμ§ λ°μ΄ν°
Returns:
bool: μ±κ³΅ μ¬λΆ
"""
try:
# λ©μμ§μ λ©νλ°μ΄ν° μΆκ°
full_message = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"data": message
}
# νμ λ©μμ§ μΆκ° (μ€λ₯Έμͺ½μ μΆκ°)
self.redis.rpush(queue_name, json.dumps(full_message))
return True
except (redis.RedisError, json.JSONDecodeError) as e:
self.logger.error(f"λ©μμ§ ν μΆκ° μ€ν¨: {e}")
return False
def dequeue(self, queue_name: str, timeout: int = 0) -> Optional[Dict]:
"""
λ©μμ§ νμμ λ©μμ§ κ°μ Έμ€κΈ° (λΈλ‘νΉ)
Args:
queue_name: ν μ΄λ¦
timeout: νμμμ(μ΄), 0μ 무ν λκΈ°
Returns:
Optional[Dict]: λ©μμ§ λλ None
"""
try:
# μΌμͺ½μμ λ©μμ§ κ°μ Έμ€κΈ° (BLPOP: λΈλ‘νΉ μΌμͺ½ ν)
result = self.redis.blpop(queue_name, timeout)
# νμμμλ κ²½μ°
if not result:
return None
# κ²°κ³Όλ (ν μ΄λ¦, λ©μμ§) νν
_, message_data = result
# JSON μμ§λ ¬ν
return json.loads(message_data)
except (redis.RedisError, json.JSONDecodeError) as e:
self.logger.error(f"λ©μμ§ ν κ°μ Έμ€κΈ° μ€ν¨: {e}")
return None
def queue_length(self, queue_name: str) -> int:
"""
ν κΈΈμ΄ μ‘°ν
Args:
queue_name: ν μ΄λ¦
Returns:
int: ν κΈΈμ΄
"""
try:
return self.redis.llen(queue_name)
except redis.RedisError as e:
self.logger.error(f"ν κΈΈμ΄ μ‘°ν μ€ν¨: {e}")
return 0
# λΆμ° λ½ ν¨ν΄
def acquire_lock(
self,
lock_name: str,
owner: str = None,
expiry: int = 10,
retry_count: int = 3,
retry_delay: float = 0.2
) -> Optional[str]:
"""
λΆμ° λ½ νλ
Args:
lock_name: λ½ μ΄λ¦
owner: λ½ μμ μ μλ³μ
expiry: λ½ λ§λ£ μκ°(μ΄)
retry_count: μ¬μλ νμ
retry_delay: μ¬μλ κ°κ²©(μ΄)
Returns:
Optional[str]: λ½ μλ³μ λλ None
"""
# λ½ ν€
lock_key = f"lock:{lock_name}"
# λ½ μμ μ μλ³μ
owner_id = owner or str(uuid.uuid4())
# μ¬μλ 루ν
for attempt in range(retry_count + 1):
# λ½ μλ (NX: ν€κ° μμ λλ§ μ€μ )
success = self.redis.set(lock_key, owner_id, ex=expiry, nx=True)
if success:
self.logger.debug(f"λ½ νλ: {lock_name} (μμ μ: {owner_id})")
return owner_id
# λ§μ§λ§ μλκ° μλλ©΄ λκΈ° ν μ¬μλ
if attempt < retry_count:
# 무μμ μ§ν° μΆκ° (κ²½ν© ν΄κ²°)
jitter = random.uniform(0, 0.1)
time.sleep(retry_delay + jitter)
self.logger.debug(f"λ½ νλ μ€ν¨: {lock_name}")
return None
def release_lock(self, lock_name: str, owner_id: str) -> bool:
"""
λΆμ° λ½ ν΄μ
Args:
lock_name: λ½ μ΄λ¦
owner_id: λ½ μμ μ μλ³μ
Returns:
bool: μ±κ³΅ μ¬λΆ
"""
lock_key = f"lock:{lock_name}"
# Lua μ€ν¬λ¦½νΈλ₯Ό μ¬μ©ν μμ ν λ½ ν΄μ
# μμ μκ° μΌμΉνλ κ²½μ°μλ§ μμ
script = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
try:
# μ€ν¬λ¦½νΈ μ€ν
result = self.redis.eval(script, 1, lock_key, owner_id)
success = result == 1
if success:
self.logger.debug(f"λ½ ν΄μ : {lock_name} (μμ μ: {owner_id})")
else:
self.logger.debug(f"λ½ ν΄μ μ€ν¨: {lock_name} (μμ μ λΆμΌμΉ λλ λ§λ£)")
return success
except redis.RedisError as e:
self.logger.error(f"λ½ ν΄μ μ€λ₯: {e}")
return False
@contextmanager
def distributed_lock(
self,
lock_name: str,
expiry: int = 10,
retry_count: int = 3,
retry_delay: float = 0.2
):
"""
λΆμ° λ½ μ»¨ν
μ€νΈ λ§€λμ
Args:
lock_name: λ½ μ΄λ¦
expiry: λ½ λ§λ£ μκ°(μ΄)
retry_count: μ¬μλ νμ
retry_delay: μ¬μλ κ°κ²©(μ΄)
Raises:
RuntimeError: λ½ νλ μ€ν¨ μ
"""
owner_id = None
try:
# λ½ νλ
owner_id = self.acquire_lock(
lock_name,
expiry=expiry,
retry_count=retry_count,
retry_delay=retry_delay
)
if not owner_id:
raise RuntimeError(f"λ½ νλ μ€ν¨: {lock_name}")
# λ½ νλ μ±κ³΅, 컨ν
μ€νΈ μ§μ
yield
finally:
# λ½ ν΄μ (μμ μκ° μλ κ²½μ°μλ§)
if owner_id:
self.release_lock(lock_name, owner_id)
# μλ μ ν ν¨ν΄
def rate_limit(
self,
key: str,
limit: int,
period: int
) -> Tuple[bool, int, int]:
"""
μλ μ ν μ²΄ν¬ (μ¬λΌμ΄λ© μλμ° μκ³ λ¦¬μ¦)
Args:
key: μ νν ν€ (μ: IP λλ μ¬μ©μ ID)
limit: κΈ°κ° λ΄ νμ© μμ² μ
period: κΈ°κ°(μ΄)
Returns:
Tuple[bool, int, int]: (νμ© μ¬λΆ, νμ¬ μμ² μ, λ¨μ μ΄)
"""
# μλ μ ν ν€ λ° νμ¬ μκ°
rate_key = f"ratelimit:{key}"
now = int(time.time())
# νμ΄νλΌμΈμ ν΅ν μμμ μ°μ°
pipe = self.redis.pipeline()
try:
# λ§λ£λ μμ² μ κ±° (νμ¬ μκ° - κΈ°κ°λ³΄λ€ μ΄μ μμ²)
pipe.zremrangebyscore(rate_key, 0, now - period)
# μ μμ² μΆκ°
pipe.zadd(rate_key, {str(now): now})
# νμ¬ μμ² μ μ‘°ν
pipe.zcard(rate_key)
# ν€ λ§λ£ μ€μ (κΈ°κ° + 1μ΄)
pipe.expire(rate_key, period + 1)
# νμ΄νλΌμΈ μ€ν
_, _, request_count, _ = pipe.execute()
# μμ² νμ© μ¬λΆ κ²°μ
allowed = request_count <= limit
# TTL νμΈ
ttl = self.redis.ttl(rate_key)
return allowed, request_count, ttl
except redis.RedisError as e:
self.logger.error(f"μλ μ ν μ²΄ν¬ μ€ν¨: {e}")
return False, 0, 0
# 리λ보λ ν¨ν΄
def add_to_leaderboard(self, leaderboard: str, user: str, score: float) -> int:
"""
리λ보λμ μ μ μΆκ°/κ°±μ
Args:
leaderboard: 리λ보λ μ΄λ¦
user: μ¬μ©μ μλ³μ
score: μ μ
Returns:
int: μλ‘ μΆκ°λ κ²½μ° 1, κ°±μ λ κ²½μ° 0
"""
try:
return self.redis.zadd(leaderboard, {user: score})
except redis.RedisError as e:
self.logger.error(f"리λ보λ μΆκ° μ€ν¨: {e}")
return 0
def get_leaderboard(
self,
leaderboard: str,
start: int = 0,
end: int = 9,
desc: bool = True
) -> List[Tuple[str, float]]:
"""
리λ보λ μμ μ‘°ν
Args:
leaderboard: 리λ보λ μ΄λ¦
start: μμ μΈλ±μ€ (0λΆν° μμ)
end: μ’
λ£ μΈλ±μ€
desc: λ΄λ¦Όμ°¨μ μ¬λΆ
Returns:
List[Tuple[str, float]]: (μ¬μ©μ, μ μ) λͺ©λ‘
"""
try:
# λ΄λ¦Όμ°¨μ λλ μ€λ¦μ°¨μ
if desc:
result = self.redis.zrevrange(leaderboard, start, end, withscores=True)
else:
result = self.redis.zrange(leaderboard, start, end, withscores=True)
return result
except redis.RedisError as e:
self.logger.error(f"리λ보λ μ‘°ν μ€ν¨: {e}")
return []
def get_rank(self, leaderboard: str, user: str, desc: bool = True) -> Optional[int]:
"""
리λ보λ μμ μ‘°ν
Args:
leaderboard: 리λ보λ μ΄λ¦
user: μ¬μ©μ μλ³μ
desc: λ΄λ¦Όμ°¨μ μ¬λΆ
Returns:
Optional[int]: 0λΆν° μμνλ μμ λλ None
"""
try:
# λ΄λ¦Όμ°¨μ λλ μ€λ¦μ°¨μ
if desc:
rank = self.redis.zrevrank(leaderboard, user)
else:
rank = self.redis.zrank(leaderboard, user)
return rank
except redis.RedisError as e:
self.logger.error(f"μμ μ‘°ν μ€ν¨: {e}")
return None
def get_score(self, leaderboard: str, user: str) -> Optional[float]:
"""
리λ보λ μ μ μ‘°ν
Args:
leaderboard: 리λ보λ μ΄λ¦
user: μ¬μ©μ μλ³μ
Returns:
Optional[float]: μ μ λλ None
"""
try:
return self.redis.zscore(leaderboard, user)
except redis.RedisError as e:
self.logger.error(f"μ μ μ‘°ν μ€ν¨: {e}")
return None
β
νΉμ§:
- λ©μμ§ ν ꡬν (μμ°μ-μλΉμ ν¨ν΄)
- λΆμ° λ½μ ν΅ν λμμ± μ μ΄
- μ¬λΌμ΄λ© μλμ° μλ μ ν
- 리λ보λ λ° μμ μμ€ν
- Lua μ€ν¬λ¦½νΈλ₯Ό ν΅ν μμμ μ°μ°
- νμ΄νλΌμΈμ νμ©ν μ±λ₯ μ΅μ ν
- 컨ν μ€νΈ λ§€λμ μ§μ
- λΆμ° νκ²½ μ§μ
- ν¨μ¨μ μΈ λ§λ£ μκ° κ΄λ¦¬
- κ²¬κ³ ν μ€λ₯ μ²λ¦¬
Redisλ λ¨μΌ μλ² κ΅¬μ±μ λμ΄ ν΄λ¬μ€ν°λ§κ³Ό 볡μ κΈ°λ₯μ ν΅ν΄ νμ₯μ±κ³Ό κ³ κ°μ©μ±μ μ 곡νλ€. λκ·λͺ¨ λ°μ΄ν°μ νΈλν½ μ²λ¦¬, μ₯μ μ‘°μΉ λ° λ°μ΄ν° μ§μμ±μ μν λ€μν ꡬμ±μ΄ κ°λ₯νλ€.
from rediscluster import RedisCluster
import redis.sentinel
class RedisHighAvailability:
"""Redis ν΄λ¬μ€ν°λ§ λ° κ³ κ°μ©μ± κ΅¬μ± ν΄λμ€"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.cluster = None
self.master = None
def connect_cluster(self, startup_nodes, decode_responses=True):
"""Redis ν΄λ¬μ€ν° μ°κ²°"""
try:
self.cluster = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=decode_responses,
skip_full_coverage_check=True
)
self.logger.info("Redis ν΄λ¬μ€ν° μ°κ²° μ±κ³΅")
return True
except Exception as e:
self.logger.error(f"Redis ν΄λ¬μ€ν° μ°κ²° μ€ν¨: {e}")
return False
def connect_sentinel(self, sentinel_list, master_name, password=None):
"""Redis Sentinel μ°κ²° (λ§μ€ν°-μ¬λ μ΄λΈ ꡬμ±)"""
try:
sentinel = redis.sentinel.Sentinel(
sentinel_list,
socket_timeout=1.0,
password=password
)
# λ§μ€ν° μ°κ²°
self.master = sentinel.master_for(
master_name,
socket_timeout=0.5,
password=password,
decode_responses=True
)
# μ¬λ μ΄λΈ μ°κ²° (μ½κΈ° μ μ©)
self.slave = sentinel.slave_for(
master_name,
socket_timeout=0.5,
password=password,
decode_responses=True
)
self.logger.info(f"Redis Sentinel μ°κ²° μ±κ³΅ (λ§μ€ν°: {master_name})")
return True
except Exception as e:
self.logger.error(f"Redis Sentinel μ°κ²° μ€ν¨: {e}")
return False
def get_cluster_info(self):
"""ν΄λ¬μ€ν° μ 보 μ‘°ν"""
if not self.cluster:
return None
try:
nodes = self.cluster.cluster_nodes()
slots = self.cluster.cluster_slots()
info = self.cluster.cluster_info()
return {
"nodes": nodes,
"slots": slots,
"info": info
}
except Exception as e:
self.logger.error(f"ν΄λ¬μ€ν° μ 보 μ‘°ν μ€ν¨: {e}")
return None
def write_to_master(self, key, value):
"""λ§μ€ν°μ λ°μ΄ν° μ°κΈ°"""
if not self.master:
return False
try:
return self.master.set(key, value)
except Exception as e:
self.logger.error(f"λ§μ€ν° μ°κΈ° μ€ν¨: {e}")
return False
def read_from_slave(self, key):
"""μ¬λ μ΄λΈμμ λ°μ΄ν° μ½κΈ°"""
if not hasattr(self, 'slave') or not self.slave:
return None
try:
return self.slave.get(key)
except Exception as e:
self.logger.error(f"μ¬λ μ΄λΈ μ½κΈ° μ€ν¨: {e}")
return None
class RedisPubSub:
"""Redisλ₯Ό νμ©ν λ°ν/ꡬλ
ꡬν ν΄λμ€"""
def __init__(self, client):
self.redis = client.redis
self.pubsub = self.redis.pubsub()
self.logger = logging.getLogger(__name__)
self.thread = None
def subscribe(self, channels):
"""μ±λ ꡬλ
"""
try:
self.pubsub.subscribe(*channels)
self.logger.info(f"μ±λ ꡬλ
: {channels}")
return True
except Exception as e:
self.logger.error(f"μ±λ ꡬλ
μ€ν¨: {e}")
return False
def pattern_subscribe(self, patterns):
"""ν¨ν΄ ꡬλ
"""
try:
self.pubsub.psubscribe(*patterns)
self.logger.info(f"ν¨ν΄ ꡬλ
: {patterns}")
return True
except Exception as e:
self.logger.error(f"ν¨ν΄ ꡬλ
μ€ν¨: {e}")
return False
def publish(self, channel, message):
"""λ©μμ§ λ°ν"""
try:
receivers = self.redis.publish(channel, json.dumps(message))
self.logger.debug(f"λ©μμ§ λ°ν: {channel}, μμ μ: {receivers}")
return receivers
except Exception as e:
self.logger.error(f"λ©μμ§ λ°ν μ€ν¨: {e}")
return 0
def start_listening(self, callback, sleep_time=0.1):
"""λ°±κ·ΈλΌμ΄λμμ λ©μμ§ μμ μμ"""
self.thread = self.pubsub.run_in_thread(
sleep_time=sleep_time,
callback=callback
)
return self.thread
def stop_listening(self):
"""λ©μμ§ μμ μ€μ§"""
if self.thread:
self.thread.stop()
self.thread = None
def get_message(self, ignore_subscribe_messages=True):
"""λ¨μΌ λ©μμ§ μμ """
return self.pubsub.get_message(
ignore_subscribe_messages=ignore_subscribe_messages
)
def close(self):
"""ꡬλ
μ’
λ£ λ° μμ μ 리"""
self.stop_listening()
self.pubsub.unsubscribe()
self.pubsub.punsubscribe()
self.pubsub.close()
# ν΄λ¬μ€ν° μ€μ μμ
def setup_redis_cluster():
"""Redis ν΄λ¬μ€ν° μ€μ λ° μ°κ²° μμ """
startup_nodes = [
{"host": "redis-node1", "port": 7000},
{"host": "redis-node2", "port": 7001},
{"host": "redis-node3", "port": 7002}
]
ha_manager = RedisHighAvailability()
if ha_manager.connect_cluster(startup_nodes):
print("ν΄λ¬μ€ν° μ°κ²° μ±κ³΅")
# ν΄λ¬μ€ν° μ 보 μΆλ ₯
cluster_info = ha_manager.get_cluster_info()
if cluster_info:
print(f"ν΄λ¬μ€ν° μν: {cluster_info['info'].get('cluster_state')}")
print(f"λ
Έλ μ: {len(cluster_info['nodes'])}")
else:
print("ν΄λ¬μ€ν° μ°κ²° μ€ν¨")
β
νΉμ§:
- λ€μν κ³ κ°μ©μ± κ΅¬μ± μ§μ
- ν΄λ¬μ€ν° λͺ¨λ: μν νμ₯ λ° λ°μ΄ν° λΆμ°
- Sentinel λͺ¨λ: μλ μ₯μ μ‘°μΉ λ° λ§μ€ν°-μ¬λ μ΄λΈ ꡬμ±
- ν΄λ¬μ€ν° μν λͺ¨λν°λ§ κΈ°λ₯
- μ½κΈ°/μ°κΈ° λΆλ¦¬λ‘ λΆν λΆμ° κ°λ₯
- λ°ν/ꡬλ (Pub/Sub) ν¨ν΄ ꡬν
- μ€μκ° λ©μμ§ μ²λ¦¬ λ° μ΄λ²€νΈ κΈ°λ° μν€ν μ²
- λ©ν°μ±λ λ° ν¨ν΄ κΈ°λ° κ΅¬λ
- λ°±κ·ΈλΌμ΄λ μ€λ λλ‘ λΉλκΈ° λ©μμ§ μ²λ¦¬
- μμ μμ ν μ 리 λ° μ’ λ£ μ²λ¦¬
β
λͺ¨λ² μ¬λ‘:
-
ν€ μ€κ³: μλ―Έ μλ μ λμ¬μ ꡬλΆμλ₯Ό μ¬μ©νμ¬ μ²΄κ³μ μΈ ν€ κ΅¬μ‘° μ€κ³
user:1000:profile - μ¬μ©μ νλ‘ν user:1000:posts - μ¬μ©μ κ²μλ¬Ό post:5000:comments - κ²μλ¬Ό λκΈ
-
λ©λͺ¨λ¦¬ μ΅μ ν: λ©λͺ¨λ¦¬ μ¬μ©λ μ΅μνλ₯Ό μν λ°μ΄ν° ꡬ쑰 μ ν
- μμ ν΄μλ λ¬Έμμ΄λ³΄λ€ ν¨μ¨μ (ziplist μΈμ½λ©)
- μ«μλ μ μλ‘ μ μ₯ (int μΈμ½λ©)
- νμν κ²½μ°μλ§ TTL μ€μ
-
μ±λ₯ λͺ¨λν°λ§: μ£Όμ μ§ν μΆμ μΌλ‘ μ±λ₯ λ¬Έμ μ‘°κΈ° λ°κ²¬
# λ©λͺ¨λ¦¬, ν΄λΌμ΄μΈνΈ μ°κ²°, λͺ λ Ή μ λ± λͺ¨λν°λ§ info = redis.info() # λλ¦° λ‘κ·Έ μ€μ λ° νμΈ redis.config_set('slowlog-log-slower-than', 10000) # 10ms slow_logs = redis.slowlog_get(10)
-
λ°±μ μ λ΅: λ°μ΄ν° μμ€ λ°©μ§λ₯Ό μν RDB λ° AOF μ€μ
# redis.conf μ€μ μ save 900 1 appendonly yes appendfsync everysec
-
보μ κ°ν: μΈμ¦, λͺ λ Ήμ΄ μ ν, λ€νΈμν¬ μ κ·Ό μ μ΄
# redis.conf 보μ μ€μ requirepass StrongPassword rename-command FLUSHALL "" # μν λͺ λ Ήμ΄ λΉνμ±ν bind 127.0.0.1 # λ‘컬 μ κ·Όλ§ νμ©
-
ν¨μ¨μ μΈ νμ΄νλΌμΈ μ¬μ©: λ€νΈμν¬ μ§μ° κ°μλ₯Ό μν λͺ λ Ή λ°°μΉ μ²λ¦¬
# λλ λ°μ΄ν° μ²λ¦¬ μ νμ΄νλΌμΈ νμ© with redis.pipeline() as pipe: for i in range(10000): pipe.set(f"key:{i}", f"value:{i}") pipe.execute()
-
ν΄λΌμ΄μΈνΈ μ°κ²° κ΄λ¦¬: 컀λ₯μ ν μ¬μ©μΌλ‘ μμ ν¨μ¨ν
pool = redis.ConnectionPool(max_connections=10) client = redis.Redis(connection_pool=pool)
-
Lua μ€ν¬λ¦½νΈ νμ©: 볡μ‘ν μμ μ μμμ 보μ₯
# Lua μ€ν¬λ¦½νΈλ‘ μμμ μ°μ° μν script = """ local current = redis.call('GET', KEYS[1]) if current then return redis.call('SET', KEYS[1], tonumber(current) + tonumber(ARGV[1])) end return nil """ redis.eval(script, 1, 'counter', 5)
-
μ λ¬Έ κΈ°λ₯ νμ©: Redis λͺ¨λ λ° νμ₯ κΈ°λ₯ νμ©
- RedisSearch: μ λ¬Έ κ²μ μμ§
- RedisTimeSeries: μκ³μ΄ λ°μ΄ν° μ²λ¦¬
- RedisGears: μλ²μΈ‘ λ°μ΄ν° μ²λ¦¬ μμ§
- RedisAI: λ¨Έμ λ¬λ λͺ¨λΈ νΈμ€ν
-
νμ₯μ± κ³ν: μ²μλΆν° νμ₯μ±μ κ³ λ €ν μ€κ³
- μ€λ© ν€ μλ³
- ν΄λ¬μ€ν° νΈν λͺ λ Ήμ΄ μ¬μ©
- λ€μ€ λ°μ΄ν°λ² μ΄μ€ λμ ν€ μ λμ¬ μ¬μ©
<br/>
---