hidb.md - maoxiaoyue/hypgo GitHub Wiki

HiDB Package (pkg/hidb)

hidb 套件為 HypGo 框架提供資料庫抽象層與連線管理器,支援多資料庫引擎、讀寫分離(Master-Replica)、連線池、交易管理、Redis 快取,以及可擴展的插件系統。

主要特色

  • 多資料庫支援: 透過 Dialect 介面抽象 MySQL / PostgreSQL 的驅動差異,使用 Bun ORM 作為查詢建構層。
  • 讀寫分離: 寫入操作始終路由至 Master,讀取操作透過 ReplicaPool 以 Round-Robin 分配至 Replica,無 Replica 時自動回退至 Master。
  • Lock-Free Round-Robin: 使用 atomic.Pointer + atomic.Uint64 實現完全無鎖讀路徑(copy-on-write),100k+ QPS 下零 mutex 競爭。
  • 連線池管理: 支援 MaxIdleConns / MaxOpenConns / ConnMaxLifetime(預設 30 分鐘)配置,Master 與每個 Replica 獨立管理連線池。
  • 交易管理: 同時提供原生 sql.Tx 與 Bun ORM bun.Tx 兩種交易介面。
  • Redis 整合: 內建 go-redis/v9 客戶端,與 SQL 資料庫統一管理生命週期。
  • 插件系統: 透過 DatabasePlugin 介面支援非 SQL 資料庫(如 Cassandra)的動態註冊與載入。
  • 優雅關閉: 按順序關閉 Replica → Master → Redis → Plugins,累積錯誤統一報告。
  • 向後兼容: HypDB() / SQL() 始終返回 Master,舊有程式碼無需修改。

基礎使用

package main

import (
	"context"
	"log"

	"github.com/maoxiaoyue/hypgo/pkg/hidb"
	"github.com/maoxiaoyue/hypgo/pkg/hidb/pg"
)

func main() {
	// 1. 建立資料庫管理器(PostgreSQL)
	db, err := hidb.NewWithInterface(
		appConfig.Database,           // 實作 config.DatabaseConfigInterface
		hidb.WithDialect(pg.New()),   // 指定 SQL 方言
	)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	// 2. 健康檢查
	if err := db.HealthCheck(context.Background()); err != nil {
		log.Fatal(err)
	}

	// 3. 讀取操作 → Replica(無 Replica 時回退至 Master)
	readDB := db.ReadHypDB()
	var users []User
	readDB.NewSelect().Model(&users).Scan(context.Background())

	// 4. 寫入操作 → 始終使用 Master
	writeDB := db.WriteHypDB()
	writeDB.NewInsert().Model(&newUser).Exec(context.Background())
}

讀寫分離 (Master-Replica)

配置

透過實作 config.ReplicaConfigProvider 介面啟用讀寫分離:

type ReplicaConfigProvider interface {
	GetReplicas() []ReplicaConfig
}

type ReplicaConfig struct {
	DSN          string // Replica 連線字串
	MaxIdleConns int    // 最大閒置連線數
	MaxOpenConns int    // 最大連線數
}

查詢路由

方法 目標 說明
ReadHypDB() Replica (ORM) Round-Robin 分配,無 Replica 回退 Master
ReadSQL() Replica (Raw SQL) Round-Robin 分配,無 Replica 回退 Master
WriteHypDB() Master (ORM) 始終使用 Master
WriteSQL() Master (Raw SQL) 始終使用 Master
HypDB() Master (ORM) 向後兼容,等同 WriteHypDB()
SQL() Master (Raw SQL) 向後兼容,等同 WriteSQL()

Round-Robin 負載均衡

// 讀路徑完全無鎖:atomic.Pointer + atomic.Uint64
// 寫路徑 copy-on-write:Mutex 保護 + 建立新 slice + atomic.Store
replicas := *rp.replicas.Load()  // 零鎖競爭
idx := rp.counter.Add(1) - 1
replica := replicas[idx % uint64(len(replicas))]

ConnMaxLifetime

Master 和所有 Replica 統一設定 ConnMaxLifetime = 30 分鐘,防止長連線持有過期狀態(如 DNS 變更、密碼輪換)。

狀態查詢

db.HasReplicas()   // bool — 是否配置了 Replica
db.ReplicaCount()  // int  — Replica 數量

交易管理

原生 SQL 交易

err := db.Transaction(ctx, func(tx *sql.Tx) error {
	_, err := tx.ExecContext(ctx, "INSERT INTO users (name) VALUES (?)", "Alice")
	if err != nil {
		return err // 自動 Rollback
	}
	return nil // 自動 Commit
})

Bun ORM 交易

err := db.HypDBTransaction(ctx, func(ctx context.Context, tx bun.Tx) error {
	_, err := tx.NewInsert().Model(&user).Exec(ctx)
	if err != nil {
		return err // 自動 Rollback
	}
	return nil // 自動 Commit
})

Redis / KeyDB 整合

HiDB 內建完整的 Redis 封裝,同時相容 KeyDB(Redis 協議相容)。除了原始 go-redis/v9 client,還提供高階方法覆蓋所有常用資料結構。

配置

database:
  driver: "redis"      # 或搭配 SQL 一起使用
  redis:
    addr: "localhost:6379"    # KeyDB 也是同一個設定
    password: "your_password"
    db: 0
type RedisConfigInterface interface {
    GetAddr() string
    GetPassword() string
    GetDB() int
}

基礎 Key-Value

ctx := context.Background()

// 設定(含 TTL,0 = 不過期)
db.RedisSet(ctx, "user:1:name", "Alice", 10*time.Minute)

// 取得
name, err := db.RedisGet(ctx, "user:1:name")
if hidb.RedisIsNil(err) {
    // key 不存在
}

// 刪除
db.RedisDel(ctx, "user:1:name", "user:2:name")

// 檢查存在
count, _ := db.RedisExists(ctx, "user:1:name")

// TTL 管理
db.RedisExpire(ctx, "session:abc", 30*time.Minute)
ttl, _ := db.RedisTTL(ctx, "session:abc")

JSON 序列化

直接存取 Go struct,自動 JSON 序列化/反序列化:

type GameState struct {
    Players []string `json:"players"`
    Round   int      `json:"round"`
    Status  string   `json:"status"`
}

// 存入
state := GameState{Players: []string{"p1", "p2"}, Round: 1, Status: "playing"}
db.RedisSetJSON(ctx, "game:123:state", state, time.Hour)

// 取出
var loaded GameState
db.RedisGetJSON(ctx, "game:123:state", &loaded)

Hash

// 設定多個欄位
db.RedisHSet(ctx, "user:1", "name", "Alice", "email", "[email protected]")

// 取得單一欄位
name, _ := db.RedisHGet(ctx, "user:1", "name")

// 取得所有欄位
all, _ := db.RedisHGetAll(ctx, "user:1")
// all = map[string]string{"name": "Alice", "email": "[email protected]"}

// 刪除欄位
db.RedisHDel(ctx, "user:1", "email")

List(佇列)

// 推入
db.RedisLPush(ctx, "queue:tasks", "task1", "task2")
db.RedisRPush(ctx, "queue:tasks", "task3")

// 取出
task, _ := db.RedisLPop(ctx, "queue:tasks")

// 範圍查詢
tasks, _ := db.RedisLRange(ctx, "queue:tasks", 0, -1)

// 長度
length, _ := db.RedisLLen(ctx, "queue:tasks")

Set

// 新增成員
db.RedisSAdd(ctx, "room:1:players", "player1", "player2")

// 檢查成員
isMember, _ := db.RedisSIsMember(ctx, "room:1:players", "player1")

// 取得所有成員
members, _ := db.RedisSMembers(ctx, "room:1:players")

// 移除成員
db.RedisSRem(ctx, "room:1:players", "player1")

Sorted Set(排行榜)

import "github.com/redis/go-redis/v9"

// 新增分數
db.RedisZAdd(ctx, "leaderboard", redis.Z{Score: 100, Member: "player1"})

// 取得排行(含分數)
results, _ := db.RedisZRangeWithScores(ctx, "leaderboard", 0, 9) // Top 10

// 查詢分數
score, _ := db.RedisZScore(ctx, "leaderboard", "player1")

// 移除
db.RedisZRem(ctx, "leaderboard", "player1")

原子操作

// 計數器
db.RedisIncr(ctx, "api:requests:count")
db.RedisDecr(ctx, "stock:item:42")
db.RedisIncrBy(ctx, "user:1:score", 50)

分散式鎖(SetNX)

// 嘗試取得鎖(30 秒自動釋放)
acquired, _ := db.RedisSetNX(ctx, "lock:game:123", "owner-uuid", 30*time.Second)
if acquired {
    defer db.RedisDel(ctx, "lock:game:123")
    // 執行臨界區操作...
}

Pub/Sub

// 發布
db.RedisPublish(ctx, "game:events", "player_joined")

// 訂閱(呼叫端需 Close)
sub, _ := db.RedisSubscribe(ctx, "game:events")
defer sub.Close()

for msg := range sub.Channel() {
    fmt.Printf("Channel: %s, Payload: %s\n", msg.Channel, msg.Payload)
}

Pipeline(批次操作)

// 一般 pipeline(非交易)
pipe, _ := db.RedisPipeline()
pipe.Set(ctx, "key1", "val1", 0)
pipe.Set(ctx, "key2", "val2", 0)
pipe.Incr(ctx, "counter")
cmds, err := pipe.Exec(ctx)

// 交易式 pipeline(MULTI/EXEC)
txPipe, _ := db.RedisTxPipeline()
txPipe.Set(ctx, "key1", "val1", 0)
txPipe.Set(ctx, "key2", "val2", 0)
cmds, err = txPipe.Exec(ctx)

Scan(遊標迭代)

// 不阻塞伺服器的 key 掃描
var cursor uint64
for {
    keys, nextCursor, _ := db.RedisScan(ctx, cursor, "user:*", 100)
    for _, key := range keys {
        fmt.Println(key)
    }
    cursor = nextCursor
    if cursor == 0 {
        break
    }
}

原始 Client 存取

需要使用未封裝的 go-redis 功能時:

client := db.Redis() // *redis.Client
// 直接使用 go-redis/v9 全部 API

KeyDB 相容性

KeyDB 與 Redis 使用相同協議,只需更改連線地址即可:

database:
  driver: "redis"
  redis:
    addr: "keydb-server:6379"  # 指向 KeyDB
    password: "password"
    db: 0

KeyDB 特有優勢:多執行緒架構、FLASH 儲存支援、Active Replication — 全部透明運作,無需程式碼變更。

Dialect 方言系統

介面

type Dialect interface {
	DriverName() string          // "mysql" 或 "postgres"
	BunDialect() schema.Dialect  // Bun ORM 方言實例
}

內建方言

MySQL / TiDB:

import "github.com/maoxiaoyue/hypgo/pkg/hidb/mysql"

db, err := hidb.NewWithInterface(cfg, hidb.WithDialect(mysql.New()))

PostgreSQL:

import "github.com/maoxiaoyue/hypgo/pkg/hidb/pg"

db, err := hidb.NewWithInterface(cfg, hidb.WithDialect(pg.New()))

插件系統

DatabasePlugin 介面

type DatabasePlugin interface {
	Name() string
	Init(config map[string]interface{}) error
	Connect() error
	Close() error
	Ping(ctx context.Context) error
}

註冊與使用

// 註冊插件
cassandraPlugin := cassandra.NewPlugin()
db.RegisterPlugin(cassandraPlugin)

// 動態載入(Init + Connect)
db.LoadPlugin("cassandra", map[string]interface{}{
	"hosts":    []string{"127.0.0.1"},
	"keyspace": "my_keyspace",
})

// 取得插件
if plugin, ok := db.GetPlugin("cassandra"); ok {
	plugin.Ping(ctx)
}

Cassandra 插件

pkg/hidb/cassandra 提供內建的 Cassandra/ScyllaDB 插件:

import "github.com/maoxiaoyue/hypgo/pkg/hidb/cassandra"

cdb, err := cassandra.New(cassandra.Config{
	Hosts:       []string{"127.0.0.1", "127.0.0.2"},
	Keyspace:    "my_keyspace",
	Port:        9042,
	Consistency: "quorum",
	Username:    "admin",
	Password:    "secret",
	NumConns:    2,
})
defer cdb.Close()

// 查詢
query := cdb.Query("SELECT * FROM users WHERE id = ?", userID)

// 批次操作
batch := cdb.Batch(gocql.LoggedBatch)

Cassandra 配置選項:

欄位 說明
Hosts 叢集節點位址
Keyspace 鍵空間名稱
Port 連接埠(預設 9042)
Consistency 一致性等級(one, quorum, all, local_quorum 等)
Username / Password 認證資訊
ConnectTimeout / Timeout 連線與查詢逾時
NumConns 每節點連線數
MaxPreparedStmts 預備語句快取上限
ProtoVersion CQL 協議版本

健康檢查與生命週期

// 連線狀態
db.IsConnected()  // bool

// 完整健康檢查(Master + Replicas + Redis + Plugins)
err := db.HealthCheck(ctx)

// 資料庫驅動類型
db.Type()  // "mysql", "postgres", "redis", etc.

// 優雅關閉(按順序:Replicas → Master → Redis → Plugins)
err := db.Close()

向後兼容工廠

當配置物件未實作 config.DatabaseConfigInterface 時,New() 透過反射自動適配:

// 傳統結構體(使用反射適配)
db, err := hidb.New(legacyConfig, hidb.WithDialect(pg.New()))

// 推薦:實作 DatabaseConfigInterface
db, err := hidb.NewWithInterface(modernConfig, hidb.WithDialect(pg.New()))

DatabaseConfigAdapter 透過反射提取 DriverDSNMaxIdleConnsMaxOpenConnsRedis 等欄位,確保舊有程式碼無需修改。

檔案結構

pkg/hidb/
├── hidb.go              # 核心:Database, Dialect, Option, Plugin 系統,
│                         #   交易管理, 健康檢查, 優雅關閉
├── redis.go             # Redis/KeyDB 高階封裝(KV、Hash、List、Set、ZSet、Pub/Sub、Pipeline)
├── redis_test.go        # Redis nil guard + RedisIsNil 測試
├── readwrite.go         # ReplicaPool: Round-Robin 負載均衡, 讀寫分離
├── readwrite_test.go    # 11 項測試:Round-Robin, 併發, 回退, 關閉
├── mysql/
│   └── mysql.go         # MySQL / TiDB 方言實作
├── pg/
│   └── pg.go            # PostgreSQL 方言實作
└── cassandra/
    └── cassandra.go     # Cassandra 插件實作

依賴

套件 用途
database/sql Go 標準 SQL 介面
github.com/uptrace/bun ORM 查詢建構
github.com/redis/go-redis/v9 Redis 客戶端
github.com/go-sql-driver/mysql MySQL 驅動
github.com/lib/pq PostgreSQL 驅動
github.com/gocql/gocql Cassandra 驅動