Concurrency Strategy - yuzvak/flashsale-service GitHub Wiki

Concurrency Strategy

Core Approach: Optimistic Locking

Philosophy

  • No reservations during checkout phase
  • Atomic operations at purchase time only
  • Minimal lock contention for maximum throughput

Database Concurrency

Conditional Updates

UPDATE items 
SET sold = TRUE, sold_to_user_id = $1, sold_at = NOW()
WHERE id = $2 AND sale_id = $3 AND sold = FALSE

Key Benefits:

  • Single atomic operation - no race conditions
  • Natural conflict resolution - database handles contention
  • Zero row locks held during user workflow

Transaction Isolation

tx, err := db.BeginTx(ctx, &sql.TxOptions{
    Isolation: sql.LevelSerializable, // Highest level for critical ops
})

Isolation Levels:

  • Purchase operations: SERIALIZABLE for consistency
  • Read operations: READ_COMMITTED for performance
  • Bulk operations: Custom isolation per use case

Redis Atomic Operations

Lua Scripts for Counters

local sale_key = KEYS[1]
local user_key = KEYS[2]
local item_count = tonumber(ARGV[1])
local max_sale_items = tonumber(ARGV[2])
local max_user_items = tonumber(ARGV[3])

-- Check both limits atomically
local current_sale = tonumber(redis.call('GET', sale_key) or 0)
local current_user = tonumber(redis.call('GET', user_key) or 0)

if current_sale + item_count > max_sale_items then
    return 0  -- Sale limit exceeded
end

if current_user + item_count > max_user_items then
    return 0  -- User limit exceeded  
end

-- Increment both counters
redis.call('INCRBY', sale_key, item_count)
redis.call('INCRBY', user_key, item_count)
return 1  -- Success

Distributed Locking

func (c *Cache) DistributedLock(ctx context.Context, key string, expiration time.Duration) (bool, error) {
    lockKey := fmt.Sprintf("lock:%s", key)
    return c.client.SetNX(ctx, lockKey, "1", expiration).Result()
}

Lock Strategy:

  • Short-lived locks (3 seconds max)
  • Automatic expiration prevents deadlocks
  • Limited scope - purchase operations only

Bloom Filter Optimization

Fast Rejection Path

func (uc *PurchaseUseCase) attemptPurchase(ctx context.Context, checkout *sale.Checkout) (*sale.PurchaseResult, error) {
    for _, item := range items {
        // Fast path: Check if likely sold
        alreadySold, _ := uc.cache.ItemExistsInBloomFilter(ctx, item.ID)
        if alreadySold {
            continue // Skip expensive DB operation
        }
        
        // Slow path: Attempt atomic purchase
        success, err := txRepo.MarkItemAsSold(ctx, item.ID, checkout.UserID)
        // ...
    }
}

Benefits:

  • 99%+ accuracy for sold items detection
  • Microsecond latency vs millisecond DB queries
  • Automatic updates on every sale

Connection Pool Management

PostgreSQL Configuration

db.SetMaxOpenConns(100)      // Total connections
db.SetMaxIdleConns(50)       // Keep-alive pool
db.SetConnMaxLifetime(time.Hour)
db.SetConnMaxIdleTime(30 * time.Minute)

Redis Configuration

&redis.Options{
    PoolSize:     200,    // Connection pool
    MinIdleConns: 50,     // Always ready
    PoolTimeout:  30 * time.Second,
}

Goroutine Management

HTTP Server Limits

server := &http.Server{
    ReadTimeout:  10 * time.Second,
    WriteTimeout: 30 * time.Second,
    IdleTimeout:  120 * time.Second,
}

Context Cancellation

func (uc *PurchaseUseCase) ExecutePurchase(ctx context.Context, checkoutCode string) (*sale.PurchaseResult, error) {
    // Timeout for entire operation
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()
    
    // Check context throughout
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
        // Continue processing
    }
}

Race Condition Handling

Multiple Users, Same Item

// Database automatically handles this via conditional update
// Last writer wins, others get "0 rows affected"
result, err := tx.Exec(`
    UPDATE items SET sold = TRUE WHERE id = $1 AND sold = FALSE
`, itemID)

rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
    return ErrItemAlreadySold // Graceful failure
}

User Limit Enforcement

// Redis Lua script ensures atomicity
canPurchase, err := cache.AtomicPurchaseCheck(ctx, saleID, userID, itemCount, 10000, 10)
if !canPurchase {
    return ErrLimitExceeded
}

Sale Limit Protection

  • Atomic counters in Redis for real-time tracking
  • Database constraints as final safety net
  • Rollback mechanisms for partial failures

Error Recovery

Retry Logic

const retryAttempts = 2

for attempt := 0; attempt < retryAttempts; attempt++ {
    result, err := uc.attemptPurchase(ctx, checkout)
    if err == nil || isBusinessLogicError(err) {
        break // Success or permanent failure
    }
    
    // Exponential backoff for transient errors
    time.Sleep(time.Millisecond * time.Duration(100*(attempt+1)))
}

Circuit Breaker Pattern

  • Fail fast when downstream services struggle
  • Automatic recovery after cooldown periods
  • Graceful degradation with reduced functionality

Performance Optimizations

Prepared Statements

// Cached prepared statements for hot paths
checkoutStmt, _ := db.Prepare(`
    INSERT INTO checkout_attempts (id, sale_id, user_id, checkout_code, created_at)
    VALUES ($1, $2, $3, $4, $5)
`)

Connection Reuse

  • HTTP Keep-Alive for client connections
  • Database connection pooling
  • Redis pipelining for batch operations

Memory Management

  • Sync.Pool for frequent allocations
  • Buffer reuse for JSON marshaling
  • Context-aware cleanup of resources