KR_gRPC - somaz94/python-study GitHub Wiki

Python gRPC κ°œλ… 정리


1️⃣ gRPC 기초

gRPCλŠ” ꡬ글이 κ°œλ°œν•œ κ³ μ„±λŠ₯ RPC(Remote Procedure Call) ν”„λ ˆμž„μ›Œν¬μ΄λ‹€.

# ν”„λ‘œν† λ²„ν”„ μ •μ˜ (user.proto)
syntax = "proto3";

package user;

// μ„œλΉ„μŠ€ μ •μ˜
service UserService {
    // 단일 μš”μ²­/응닡 λ©”μ„œλ“œ
    rpc GetUser (UserRequest) returns (UserResponse) {}
    // μ‚¬μš©μž 생성 λ©”μ„œλ“œ
    rpc CreateUser (CreateUserRequest) returns (UserResponse) {}
    // μ‚¬μš©μž λͺ©λ‘ 쑰회 λ©”μ„œλ“œ
    rpc ListUsers (ListUsersRequest) returns (ListUsersResponse) {}
    // μ‚¬μš©μž μ—…λ°μ΄νŠΈ λ©”μ„œλ“œ
    rpc UpdateUser (UpdateUserRequest) returns (UserResponse) {}
    // μ‚¬μš©μž μ‚­μ œ λ©”μ„œλ“œ
    rpc DeleteUser (UserRequest) returns (DeleteUserResponse) {}
}

// μš”μ²­/응닡 λ©”μ‹œμ§€ μ •μ˜
message UserRequest {
    string user_id = 1;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
    string phone = 3;
    int32 age = 4;
}

message UpdateUserRequest {
    string user_id = 1;
    string name = 2;
    string email = 3;
    string phone = 4;
    int32 age = 5;
}

message ListUsersRequest {
    int32 page_size = 1;  // νŽ˜μ΄μ§€λ‹Ή ν•­λͺ© 수
    int32 page_number = 2;  // νŽ˜μ΄μ§€ 번호
    string sort_by = 3;  // μ •λ ¬ κΈ°μ€€ ν•„λ“œ
}

message UserResponse {
    string user_id = 1;
    string name = 2;
    string email = 3;
    string phone = 4;
    int32 age = 5;
    string created_at = 6;
    string updated_at = 7;
}

message ListUsersResponse {
    repeated UserResponse users = 1;
    int32 total_count = 2;
    int32 total_pages = 3;
}

message DeleteUserResponse {
    bool success = 1;
    string message = 2;
}

βœ… νŠΉμ§•:

  • ν”„λ‘œν† μ½œ 버퍼
  • κ°•λ ₯ν•œ νƒ€μž… μ‹œμŠ€ν…œ
  • 닀쀑 μ–Έμ–΄ 지원
  • 높은 μ„±λŠ₯ 및 νš¨μœ¨μ„±
  • μ½”λ“œ 생성 μžλ™ν™”
  • μ–‘λ°©ν–₯ 슀트리밍 지원

gRPC vs REST 비ꡐ:

gRPC와 RESTλŠ” API 섀계에 μžˆμ–΄ 각각 λ‹€λ₯Έ μž₯단점을 κ°€μ§€κ³  μžˆλ‹€.

κΈ°λŠ₯ gRPC REST
ν”„λ‘œν† μ½œ HTTP/2 HTTP/1.1 λ˜λŠ” HTTP/2
데이터 ν˜•μ‹ ν”„λ‘œν† μ½œ 버퍼 (이진) JSON, XML (ν…μŠ€νŠΈ)
μ½”λ“œ 생성 μžλ™ 생성 μˆ˜λ™ λ˜λŠ” 도ꡬ μ‚¬μš©
슀트리밍 μ–‘λ°©ν–₯ 슀트리밍 μ œν•œμ 
νƒ€μž… μ•ˆμ „μ„± κ°•λ ₯ν•œ νƒ€μž… 검사 μŠ€ν‚€λ§ˆ 기반 검증 ν•„μš”
λΈŒλΌμš°μ € 지원 μ œν•œμ  λ„€μ΄ν‹°λΈŒ 지원
μ„±λŠ₯ 효율적인 데이터 전솑 μƒλŒ€μ μœΌλ‘œ μ˜€λ²„ν—€λ“œ 큼
ν•™μŠ΅ 곑선 κ°€νŒŒλ¦„ μ™„λ§Œν•¨


2️⃣ μ„œλ²„ κ΅¬ν˜„

Pythonμ—μ„œ gRPC μ„œλ²„λ₯Ό κ΅¬ν˜„ν•˜λŠ” 방법과 μ„œλΉ„μŠ€ 둜직 μž‘μ„± 방법이닀.

import grpc
from concurrent import futures
import time
import logging
import user_pb2
import user_pb2_grpc
from database import UserDatabase

# λ‘œκΉ… μ„€μ •
logging.basicConfig(level=logging.INFO)

# μ„œλΉ„μŠ€ κ΅¬ν˜„ 클래슀
class UserServicer(user_pb2_grpc.UserServiceServicer):
    def __init__(self):
        # λ°μ΄ν„°λ² μ΄μŠ€ μ—°κ²° (μ˜ˆμ‹œ)
        self.db = UserDatabase()
        
    def GetUser(self, request, context):
        logging.info(f"GetUser μš”μ²­ μˆ˜μ‹ : {request.user_id}")
        try:
            # μ‚¬μš©μž 쑰회 (가상 ν•¨μˆ˜)
            user = self.db.find_user(request.user_id)
            if not user:
                context.set_code(grpc.StatusCode.NOT_FOUND)
                context.set_details(f"μ‚¬μš©μž ID {request.user_id}λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€")
                return user_pb2.UserResponse()
            
            # 응닡 생성
            return user_pb2.UserResponse(
                user_id=user.id,
                name=user.name,
                email=user.email,
                phone=user.phone,
                age=user.age,
                created_at=user.created_at.isoformat(),
                updated_at=user.updated_at.isoformat()
            )
        except Exception as e:
            logging.error(f"GetUser 였λ₯˜: {str(e)}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"λ‚΄λΆ€ μ„œλ²„ 였λ₯˜: {str(e)}")
            return user_pb2.UserResponse()
    
    def CreateUser(self, request, context):
        logging.info(f"CreateUser μš”μ²­ μˆ˜μ‹ : {request.name}, {request.email}")
        try:
            # μž…λ ₯ 검증
            if not request.name or not request.email:
                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                context.set_details("이름과 이메일은 ν•„μˆ˜ ν•­λͺ©μž…λ‹ˆλ‹€")
                return user_pb2.UserResponse()
            
            # 이메일 쀑볡 확인
            if self.db.email_exists(request.email):
                context.set_code(grpc.StatusCode.ALREADY_EXISTS)
                context.set_details(f"이메일 {request.email}λŠ” 이미 μ‚¬μš© μ€‘μž…λ‹ˆλ‹€")
                return user_pb2.UserResponse()
            
            # μ‚¬μš©μž 생성 (가상 ν•¨μˆ˜)
            user = self.db.create_user(
                name=request.name,
                email=request.email,
                phone=request.phone,
                age=request.age
            )
            
            # 응닡 생성
            return user_pb2.UserResponse(
                user_id=user.id,
                name=user.name,
                email=user.email,
                phone=user.phone,
                age=user.age,
                created_at=user.created_at.isoformat(),
                updated_at=user.updated_at.isoformat()
            )
        except Exception as e:
            logging.error(f"CreateUser 였λ₯˜: {str(e)}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"λ‚΄λΆ€ μ„œλ²„ 였λ₯˜: {str(e)}")
            return user_pb2.UserResponse()
    
    def ListUsers(self, request, context):
        logging.info(f"ListUsers μš”μ²­ μˆ˜μ‹ : νŽ˜μ΄μ§€ {request.page_number}, 크기 {request.page_size}")
        try:
            # νŽ˜μ΄μ§€λ„€μ΄μ…˜ 처리 (가상 ν•¨μˆ˜)
            users, total = self.db.list_users(
                page_size=request.page_size,
                page_number=request.page_number,
                sort_by=request.sort_by
            )
            
            # μ‚¬μš©μž λͺ©λ‘ ꡬ성
            user_responses = []
            for user in users:
                user_responses.append(user_pb2.UserResponse(
                    user_id=user.id,
                    name=user.name,
                    email=user.email,
                    phone=user.phone,
                    age=user.age,
                    created_at=user.created_at.isoformat(),
                    updated_at=user.updated_at.isoformat()
                ))
            
            # 총 νŽ˜μ΄μ§€ 수 계산
            total_pages = (total + request.page_size - 1) // request.page_size
            
            # 응닡 생성
            return user_pb2.ListUsersResponse(
                users=user_responses,
                total_count=total,
                total_pages=total_pages
            )
        except Exception as e:
            logging.error(f"ListUsers 였λ₯˜: {str(e)}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"λ‚΄λΆ€ μ„œλ²„ 였λ₯˜: {str(e)}")
            return user_pb2.ListUsersResponse()

# μ„œλ²„ μ‹œμž‘ ν•¨μˆ˜
def serve():
    # μ„œλ²„ 생성
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    
    # μ„œλΉ„μŠ€ 등둝
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServicer(), server
    )
    
    # 포트 바인딩
    server.add_insecure_port('[::]:50051')
    
    # μ„œλ²„ μ‹œμž‘
    server.start()
    logging.info("μ„œλ²„κ°€ 포트 50051μ—μ„œ μ‹œμž‘λ˜μ—ˆμŠ΅λ‹ˆλ‹€")
    
    try:
        # μ„œλ²„ μ‹€ν–‰ μœ μ§€
        while True:
            time.sleep(86400)  # 1일
    except KeyboardInterrupt:
        # μ„œλ²„ μ’…λ£Œ
        server.stop(0)
        logging.info("μ„œλ²„κ°€ μ’…λ£Œλ˜μ—ˆμŠ΅λ‹ˆλ‹€")

if __name__ == '__main__':
    serve()

βœ… νŠΉμ§•:

  • μ„œλΉ„μŠ€ κ΅¬ν˜„
  • μš”μ²­ 처리
  • 응닡 생성
  • 였λ₯˜ 처리
  • 비동기 지원
  • μŠ€λ ˆλ“œ ν’€ 관리
  • 메타데이터 처리


3️⃣ ν΄λΌμ΄μ–ΈνŠΈ κ΅¬ν˜„

Pythonμ—μ„œ gRPC ν΄λΌμ΄μ–ΈνŠΈλ₯Ό κ΅¬ν˜„ν•˜κ³  μ„œλ²„μ™€ ν†΅μ‹ ν•˜λŠ” 방법이닀.

import grpc
import logging
import time
from datetime import datetime
import user_pb2
import user_pb2_grpc

# λ‘œκΉ… μ„€μ •
logging.basicConfig(level=logging.INFO)

class UserClient:
    def __init__(self, host='localhost', port=50051):
        # 채널 μ„€μ • 및 μŠ€ν… 생성
        self.channel = grpc.insecure_channel(f'{host}:{port}')
        self.stub = user_pb2_grpc.UserServiceStub(self.channel)
        logging.info(f"ν΄λΌμ΄μ–ΈνŠΈκ°€ {host}:{port}에 μ—°κ²°λ˜μ—ˆμŠ΅λ‹ˆλ‹€")
    
    def __del__(self):
        # 채널 μ’…λ£Œ
        self.channel.close()
    
    def get_user(self, user_id):
        """μ‚¬μš©μž 쑰회"""
        logging.info(f"μ‚¬μš©μž 쑰회: {user_id}")
        request = user_pb2.UserRequest(user_id=user_id)
        
        try:
            # νƒ€μž„μ•„μ›ƒ μ„€μ • (5초)
            response = self.stub.GetUser(request, timeout=5)
            
            # 응닡 처리
            return {
                'user_id': response.user_id,
                'name': response.name,
                'email': response.email,
                'phone': response.phone,
                'age': response.age,
                'created_at': response.created_at,
                'updated_at': response.updated_at
            }
        except grpc.RpcError as e:
            # gRPC 였λ₯˜ 처리
            status_code = e.code()
            details = e.details()
            
            if status_code == grpc.StatusCode.NOT_FOUND:
                logging.warning(f"μ‚¬μš©μžλ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€: {details}")
                return None
            elif status_code == grpc.StatusCode.DEADLINE_EXCEEDED:
                logging.error("μš”μ²­ μ‹œκ°„ 초과")
                return None
            else:
                logging.error(f"RPC 였λ₯˜: {status_code} - {details}")
                return None
        except Exception as e:
            logging.error(f"일반 였λ₯˜: {str(e)}")
            return None
    
    def create_user(self, name, email, phone=None, age=None):
        """μ‚¬μš©μž 생성"""
        logging.info(f"μ‚¬μš©μž 생성: {name}, {email}")
        request = user_pb2.CreateUserRequest(
            name=name,
            email=email,
            phone=phone or "",
            age=age or 0
        )
        
        try:
            # 메타데이터 μ„€μ • (예: API ν‚€)
            metadata = [
                ('api-key', 'your-api-key'),
                ('client-id', 'python-client')
            ]
            
            # νƒ€μž„μ•„μ›ƒκ³Ό λ©”νƒ€λ°μ΄ν„°λ‘œ μš”μ²­
            response = self.stub.CreateUser(
                request, 
                timeout=5,
                metadata=metadata
            )
            
            # 응닡 처리
            return {
                'user_id': response.user_id,
                'name': response.name,
                'email': response.email,
                'phone': response.phone,
                'age': response.age,
                'created_at': response.created_at,
                'updated_at': response.updated_at
            }
        except grpc.RpcError as e:
            status_code = e.code()
            details = e.details()
            
            if status_code == grpc.StatusCode.ALREADY_EXISTS:
                logging.warning(f"이미 μ‘΄μž¬ν•˜λŠ” μ‚¬μš©μž: {details}")
            elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
                logging.warning(f"잘λͺ»λœ 인자: {details}")
            else:
                logging.error(f"RPC 였λ₯˜: {status_code} - {details}")
            
            return None
        except Exception as e:
            logging.error(f"일반 였λ₯˜: {str(e)}")
            return None
    
    def list_users(self, page_size=10, page_number=1, sort_by='name'):
        """μ‚¬μš©μž λͺ©λ‘ 쑰회"""
        logging.info(f"μ‚¬μš©μž λͺ©λ‘ 쑰회: νŽ˜μ΄μ§€ {page_number}, 크기 {page_size}")
        request = user_pb2.ListUsersRequest(
            page_size=page_size,
            page_number=page_number,
            sort_by=sort_by
        )
        
        try:
            # gRPC μš”μ²­
            response = self.stub.ListUsers(request, timeout=10)
            
            # 응닡 처리
            users = []
            for user in response.users:
                users.append({
                    'user_id': user.user_id,
                    'name': user.name,
                    'email': user.email,
                    'phone': user.phone,
                    'age': user.age,
                    'created_at': user.created_at,
                    'updated_at': user.updated_at
                })
            
            # νŽ˜μ΄μ§€λ„€μ΄μ…˜ 메타데이터 포함
            return {
                'users': users,
                'total_count': response.total_count,
                'total_pages': response.total_pages,
                'current_page': page_number,
                'page_size': page_size
            }
        except grpc.RpcError as e:
            logging.error(f"RPC 였λ₯˜: {e.code()} - {e.details()}")
            return None
        except Exception as e:
            logging.error(f"일반 였λ₯˜: {str(e)}")
            return None

# ν΄λΌμ΄μ–ΈνŠΈ μ‚¬μš© 예제
def main():
    # ν΄λΌμ΄μ–ΈνŠΈ μΈμŠ€ν„΄μŠ€ 생성
    client = UserClient()
    
    # μƒˆ μ‚¬μš©μž 생성
    new_user = client.create_user(
        name="홍길동",
        email="[email protected]",
        phone="010-1234-5678",
        age=30
    )
    
    if new_user:
        # μƒμ„±λœ μ‚¬μš©μž ID둜 쑰회
        user_id = new_user['user_id']
        user = client.get_user(user_id)
        
        if user:
            print(f"쑰회된 μ‚¬μš©μž: {user['name']}, {user['email']}")
        
        # μ‚¬μš©μž λͺ©λ‘ 쑰회
        users_page = client.list_users(page_size=5, page_number=1)
        
        if users_page:
            print(f"총 μ‚¬μš©μž 수: {users_page['total_count']}")
            for user in users_page['users']:
                print(f"- {user['name']} ({user['email']})")

if __name__ == "__main__":
    main()

βœ… νŠΉμ§•:

  • 채널 μ„€μ •
  • μŠ€ν… 생성
  • μ—λŸ¬ 처리
  • 메타데이터 전솑
  • νƒ€μž„μ•„μ›ƒ 관리
  • 응닡 νŒŒμ‹±
  • λ¦¬μ†ŒμŠ€ 정리


4️⃣ 슀트리밍

service UserService {
    rpc GetUserStream (UserRequest) returns (stream UserResponse) {}
    rpc CreateUsers (stream CreateUserRequest) returns (UsersResponse) {}
    rpc ChatStream (stream ChatMessage) returns (stream ChatMessage) {}
}

class UserServicer(user_pb2_grpc.UserServiceServicer):
    async def GetUserStream(self, request, context):
        users = get_users_batch(request.user_id)
        for user in users:
            yield user_pb2.UserResponse(
                user_id=user.id,
                name=user.name,
                email=user.email
            )

βœ… νŠΉμ§•:

  • μ„œλ²„ 슀트리밍
  • ν΄λΌμ΄μ–ΈνŠΈ 슀트리밍
  • μ–‘λ°©ν–₯ 슀트리밍


5️⃣ 인터셉터

class AuthInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get('authorization')
        
        if not token:
            return self._unauthenticated()
        
        try:
            user = verify_token(token)
            context = set_user_context(user)
            return continuation(handler_call_details, context)
        except Exception:
            return self._unauthenticated()

βœ… νŠΉμ§•:

  • 인증/인가
  • λ‘œκΉ…
  • 메타데이터 처리


μ£Όμš” 팁

βœ… λͺ¨λ²” 사둀:

  • ν”„λ‘œν† λ²„ν”„ 섀계 μ‹ μ€‘νžˆ
  • 슀트리밍 적절히 ν™œμš©
  • μ—λŸ¬ 처리 ν‘œμ€€ν™”
  • 인터셉터 ν™œμš©
  • μ„±λŠ₯ μ΅œμ ν™”
  • λ³΄μ•ˆ κ³ λ €
  • λͺ¨λ‹ˆν„°λ§ κ΅¬ν˜„
  • ν…ŒμŠ€νŠΈ μž‘μ„±


⚠️ **GitHub.com Fallback** ⚠️