Enterprise Usage - zhoudm1743/go-util GitHub Wiki
本指南专为企业级应用场景设计,涵盖大规模部署、高并发处理、系统集成和运维管理等关键要素。
🏢 企业级架构设计
1. 分层架构模式
// 企业级分层架构示例
type EnterpriseService struct {
// 数据访问层
userRepo UserRepository
orderRepo OrderRepository
// 缓存层
cache *util.XMap[string, interface{}]
// 配置管理
config *AppConfig
// 日志系统
logger Logger
// 监控指标
metrics MetricsCollector
}
func NewEnterpriseService() *EnterpriseService {
return &EnterpriseService{
cache: util.NewSafeMap[string, interface{}](),
config: LoadEnterpriseConfig(),
logger: NewStructuredLogger(),
metrics: NewMetricsCollector(),
}
}
// 业务逻辑层 - 用户管理
func (es *EnterpriseService) ProcessUserBatch(users []User) (*BatchResult, error) {
startTime := util.Now()
defer func() {
duration := util.Now().DiffTime(startTime)
es.metrics.RecordProcessingTime("user_batch", duration)
}()
// 数据验证和清理
validUsers := util.ArraysFromSlice(users).
Filter(es.validateUser).
Map(es.sanitizeUser).
ToSlice()
if len(validUsers) == 0 {
return nil, errors.New("没有有效用户数据")
}
// 分批处理(企业级数据量)
const batchSize = 1000
results := &BatchResult{
Processed: make([]ProcessedUser, 0, len(validUsers)),
Errors: make([]BatchError, 0),
}
for i := 0; i < len(validUsers); i += batchSize {
end := i + batchSize
if end > len(validUsers) {
end = len(validUsers)
}
batch := validUsers[i:end]
batchResult, err := es.processBatch(batch)
if err != nil {
es.logger.Error("批处理失败", map[string]interface{}{
"batch_start": i,
"batch_size": len(batch),
"error": err.Error(),
})
continue
}
results.Processed = append(results.Processed, batchResult.Users...)
results.Errors = append(results.Errors, batchResult.Errors...)
}
return results, nil
}
2. 微服务架构集成
// 微服务通信模块
type MicroserviceClient struct {
httpClient *util.XHttp
cache *util.ExpiringMap[string, []byte]
circuitBrk CircuitBreaker
timeout time.Duration
}
func NewMicroserviceClient(baseURL string) *MicroserviceClient {
return &MicroserviceClient{
httpClient: util.NewHttp().
SetBaseURL(baseURL).
SetTimeout(30 * time.Second).
SetRetry(3),
cache: util.NewExpiringMap[string, []byte](5 * time.Minute),
circuitBrk: NewCircuitBreaker(),
timeout: 30 * time.Second,
}
}
// 服务间调用与缓存
func (mc *MicroserviceClient) GetUserProfile(userID string) (*UserProfile, error) {
cacheKey := fmt.Sprintf("user_profile:%s", userID)
// 尝试从缓存获取
if cached, exists := mc.cache.Get(cacheKey); exists {
var profile UserProfile
if err := json.Unmarshal(cached, &profile); err == nil {
return &profile, nil
}
}
// 熔断器检查
if !mc.circuitBrk.CanExecute() {
return nil, errors.New("服务熔断中")
}
// 调用远程服务
ctx, cancel := context.WithTimeout(context.Background(), mc.timeout)
defer cancel()
response, err := mc.httpClient.
Get(fmt.Sprintf("/api/users/%s", userID)).
WithContext(ctx).
Execute()
if err != nil {
mc.circuitBrk.RecordFailure()
return nil, fmt.Errorf("获取用户资料失败: %w", err)
}
mc.circuitBrk.RecordSuccess()
var profile UserProfile
if err := response.JSON(&profile); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
// 缓存结果
if data, err := json.Marshal(profile); err == nil {
mc.cache.Set(cacheKey, data)
}
return &profile, nil
}
📊 大数据处理
1. 流式数据处理
// 大数据流处理器
type StreamProcessor struct {
inputChannel chan DataItem
outputChannel chan ProcessedItem
errorChannel chan error
workerPool *WorkerPool
metrics *StreamMetrics
}
func NewStreamProcessor(workerCount int) *StreamProcessor {
return &StreamProcessor{
inputChannel: make(chan DataItem, 10000),
outputChannel: make(chan ProcessedItem, 10000),
errorChannel: make(chan error, 1000),
workerPool: NewWorkerPool(workerCount),
metrics: NewStreamMetrics(),
}
}
func (sp *StreamProcessor) Start(ctx context.Context) {
// 启动工作协程
for i := 0; i < sp.workerPool.Size(); i++ {
go sp.worker(ctx, i)
}
// 启动监控协程
go sp.metricsCollector(ctx)
}
func (sp *StreamProcessor) worker(ctx context.Context, workerID int) {
defer sp.workerPool.Done()
batchBuffer := util.NewArrayWithCapacity[DataItem](100)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
sp.processBatch(batchBuffer.ToSlice())
return
case item := <-sp.inputChannel:
batchBuffer.Append(item)
// 批量处理
if batchBuffer.Len() >= 100 {
sp.processBatch(batchBuffer.ToSlice())
batchBuffer.Clear()
}
case <-ticker.C:
// 定时处理剩余数据
if batchBuffer.IsNotEmpty() {
sp.processBatch(batchBuffer.ToSlice())
batchBuffer.Clear()
}
}
}
}
func (sp *StreamProcessor) processBatch(items []DataItem) {
startTime := util.Now()
// 并行处理批次数据
results := util.ArraysFromSlice(items).
ParallelMap(func(item DataItem) ProcessedItem {
return sp.processItem(item)
}, 4).
ToSlice()
// 发送结果
for _, result := range results {
select {
case sp.outputChannel <- result:
default:
sp.errorChannel <- errors.New("输出通道已满")
}
}
// 记录指标
duration := util.Now().DiffTime(startTime)
sp.metrics.RecordBatchProcessing(len(items), duration)
}
2. 数据管道设计
// 企业级数据管道
type DataPipeline struct {
stages []PipelineStage
metrics *PipelineMetrics
config *PipelineConfig
errorHandler ErrorHandler
}
type PipelineStage interface {
Process(data *util.XArray[interface{}]) (*util.XArray[interface{}], error)
Name() string
}
// 数据验证阶段
type ValidationStage struct {
validators []Validator
}
func (vs *ValidationStage) Process(data *util.XArray[interface{}]) (*util.XArray[interface{}], error) {
return data.Filter(func(item interface{}) bool {
for _, validator := range vs.validators {
if !validator.Validate(item) {
return false
}
}
return true
}), nil
}
// 数据转换阶段
type TransformationStage struct {
transformers []Transformer
}
func (ts *TransformationStage) Process(data *util.XArray[interface{}]) (*util.XArray[interface{}], error) {
result := data.Clone()
for _, transformer := range ts.transformers {
var err error
result, err = result.Map(func(item interface{}) interface{} {
transformed, transformErr := transformer.Transform(item)
if transformErr != nil {
err = transformErr
return item
}
return transformed
}), err
if err != nil {
return nil, fmt.Errorf("转换失败: %w", err)
}
}
return result, nil
}
// 数据聚合阶段
type AggregationStage struct {
aggregators map[string]Aggregator
}
func (as *AggregationStage) Process(data *util.XArray[interface{}]) (*util.XArray[interface{}], error) {
// 按键分组
grouped := data.GroupBy(func(item interface{}) string {
return item.(AggregateItem).GroupKey()
})
results := util.NewArray[interface{}]()
for groupKey, items := range grouped {
if aggregator, exists := as.aggregators[groupKey]; exists {
aggregated, err := aggregator.Aggregate(items)
if err != nil {
return nil, fmt.Errorf("聚合失败 [%s]: %w", groupKey, err)
}
results.Append(aggregated)
}
}
return results, nil
}
// 执行数据管道
func (dp *DataPipeline) Execute(data *util.XArray[interface{}]) (*util.XArray[interface{}], error) {
currentData := data.Clone()
for _, stage := range dp.stages {
startTime := util.Now()
processed, err := stage.Process(currentData)
if err != nil {
dp.errorHandler.Handle(stage.Name(), err)
return nil, fmt.Errorf("阶段 [%s] 执行失败: %w", stage.Name(), err)
}
currentData = processed
// 记录阶段指标
duration := util.Now().DiffTime(startTime)
dp.metrics.RecordStageExecution(stage.Name(), duration, currentData.Len())
}
return currentData, nil
}
⚡ 高并发处理
1. 连接池管理
// 企业级连接池
type ConnectionPool struct {
connections *util.SafeMap[string, *Connection]
config *PoolConfig
metrics *PoolMetrics
mutex sync.RWMutex
}
type PoolConfig struct {
MaxConnections int
MinConnections int
ConnectionTimeout time.Duration
IdleTimeout time.Duration
MaxRetries int
}
func NewConnectionPool(config *PoolConfig) *ConnectionPool {
pool := &ConnectionPool{
connections: util.NewSafeMap[string, *Connection](),
config: config,
metrics: NewPoolMetrics(),
}
// 预创建最小连接数
for i := 0; i < config.MinConnections; i++ {
conn, err := pool.createConnection()
if err == nil {
pool.connections.Set(conn.ID, conn)
}
}
// 启动清理协程
go pool.cleanupRoutine()
return pool
}
func (cp *ConnectionPool) GetConnection() (*Connection, error) {
cp.mutex.RLock()
defer cp.mutex.RUnlock()
// 尝试获取可用连接
var availableConn *Connection
cp.connections.ForEach(func(id string, conn *Connection) bool {
if conn.IsAvailable() {
availableConn = conn
return false // 停止遍历
}
return true
})
if availableConn != nil {
availableConn.MarkUsed()
cp.metrics.RecordConnectionUsage()
return availableConn, nil
}
// 如果没有可用连接且未达到最大连接数,创建新连接
if cp.connections.Size() < cp.config.MaxConnections {
newConn, err := cp.createConnection()
if err != nil {
return nil, fmt.Errorf("创建连接失败: %w", err)
}
cp.connections.Set(newConn.ID, newConn)
newConn.MarkUsed()
cp.metrics.RecordConnectionCreation()
return newConn, nil
}
return nil, errors.New("连接池已满")
}
func (cp *ConnectionPool) ReturnConnection(conn *Connection) {
if conn != nil {
conn.MarkAvailable()
conn.UpdateLastUsed(util.Now())
cp.metrics.RecordConnectionReturn()
}
}
// 连接清理协程
func (cp *ConnectionPool) cleanupRoutine() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
cp.cleanupIdleConnections()
}
}
func (cp *ConnectionPool) cleanupIdleConnections() {
now := util.Now()
expiredConnections := util.NewArray[string]()
cp.connections.ForEach(func(id string, conn *Connection) bool {
if conn.IsAvailable() && now.DiffTime(conn.LastUsed) > cp.config.IdleTimeout {
expiredConnections.Append(id)
}
return true
})
// 删除过期连接,但保持最小连接数
currentCount := cp.connections.Size()
expiredConnections.ForEach(func(i int, id string) bool {
if currentCount > cp.config.MinConnections {
if conn, exists := cp.connections.SafeGet(id); exists {
conn.Close()
cp.connections.Delete(id)
currentCount--
cp.metrics.RecordConnectionCleanup()
}
}
return currentCount > cp.config.MinConnections
})
}
2. 限流和熔断
// 企业级限流器
type RateLimiter struct {
limiters *util.SafeMap[string, *TokenBucket]
config *RateLimitConfig
metrics *RateLimitMetrics
}
type RateLimitConfig struct {
GlobalLimit int // 全局限制
UserLimit int // 用户限制
APILimit int // API 限制
RefillInterval time.Duration // 令牌补充间隔
}
type TokenBucket struct {
capacity int
tokens int
lastRefill *util.XTime
refillRate int
mutex sync.Mutex
}
func NewRateLimiter(config *RateLimitConfig) *RateLimiter {
rl := &RateLimiter{
limiters: util.NewSafeMap[string, *TokenBucket](),
config: config,
metrics: NewRateLimitMetrics(),
}
// 定期补充令牌
go rl.refillRoutine()
return rl
}
func (rl *RateLimiter) CheckLimit(key string, limitType string) bool {
bucket := rl.getOrCreateBucket(key, limitType)
return bucket.TryConsume()
}
func (rl *RateLimiter) getOrCreateBucket(key string, limitType string) *TokenBucket {
bucketKey := fmt.Sprintf("%s:%s", limitType, key)
if bucket, exists := rl.limiters.SafeGet(bucketKey); exists {
return bucket
}
// 根据限制类型设置容量
var capacity int
switch limitType {
case "user":
capacity = rl.config.UserLimit
case "api":
capacity = rl.config.APILimit
default:
capacity = rl.config.GlobalLimit
}
bucket := &TokenBucket{
capacity: capacity,
tokens: capacity,
lastRefill: util.Now(),
refillRate: capacity / 60, // 每分钟补充
}
rl.limiters.Set(bucketKey, bucket)
return bucket
}
func (tb *TokenBucket) TryConsume() bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}
// 熔断器实现
type CircuitBreaker struct {
state CircuitState
failures int
requests int
lastFailTime *util.XTime
config *CircuitConfig
mutex sync.RWMutex
}
type CircuitState int
const (
Closed CircuitState = iota
Open
HalfOpen
)
type CircuitConfig struct {
MaxFailures int
ResetTimeout time.Duration
FailureThreshold float64
RequestThreshold int
}
func (cb *CircuitBreaker) CanExecute() bool {
cb.mutex.RLock()
defer cb.mutex.RUnlock()
switch cb.state {
case Closed:
return true
case Open:
if util.Now().DiffTime(cb.lastFailTime) > cb.config.ResetTimeout {
cb.mutex.RUnlock()
cb.mutex.Lock()
cb.state = HalfOpen
cb.mutex.Unlock()
cb.mutex.RLock()
return true
}
return false
case HalfOpen:
return true
default:
return false
}
}
func (cb *CircuitBreaker) RecordSuccess() {
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.requests++
if cb.state == HalfOpen {
cb.state = Closed
cb.failures = 0
}
}
func (cb *CircuitBreaker) RecordFailure() {
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.failures++
cb.requests++
cb.lastFailTime = util.Now()
if cb.shouldTrip() {
cb.state = Open
}
}
func (cb *CircuitBreaker) shouldTrip() bool {
if cb.requests < cb.config.RequestThreshold {
return false
}
failureRate := float64(cb.failures) / float64(cb.requests)
return failureRate >= cb.config.FailureThreshold
}
📊 监控和指标
1. 指标收集系统
// 企业级指标收集器
type MetricsCollector struct {
counters *util.SafeMap[string, *AtomicCounter]
gauges *util.SafeMap[string, *AtomicGauge]
histograms *util.SafeMap[string, *Histogram]
timers *util.SafeMap[string, *Timer]
exporter MetricsExporter
config *MetricsConfig
}
type MetricsConfig struct {
ExportInterval time.Duration
RetentionTime time.Duration
MaxMetrics int
Tags map[string]string
}
func NewMetricsCollector(config *MetricsConfig) *MetricsCollector {
mc := &MetricsCollector{
counters: util.NewSafeMap[string, *AtomicCounter](),
gauges: util.NewSafeMap[string, *AtomicGauge](),
histograms: util.NewSafeMap[string, *Histogram](),
timers: util.NewSafeMap[string, *Timer](),
config: config,
}
// 启动导出协程
go mc.exportRoutine()
return mc
}
// 业务指标记录
func (mc *MetricsCollector) RecordAPICall(endpoint string, method string, statusCode int, duration time.Duration) {
tags := map[string]string{
"endpoint": endpoint,
"method": method,
"status_code": fmt.Sprintf("%d", statusCode),
}
// 请求计数
mc.IncrementCounter("api_requests_total", tags)
// 响应时间
mc.RecordHistogram("api_duration_seconds", float64(duration.Seconds()), tags)
// 错误率
if statusCode >= 400 {
mc.IncrementCounter("api_errors_total", tags)
}
}
func (mc *MetricsCollector) RecordDatabaseOperation(operation string, table string, duration time.Duration, success bool) {
tags := map[string]string{
"operation": operation,
"table": table,
"success": fmt.Sprintf("%t", success),
}
mc.IncrementCounter("db_operations_total", tags)
mc.RecordHistogram("db_duration_seconds", float64(duration.Seconds()), tags)
if !success {
mc.IncrementCounter("db_errors_total", tags)
}
}
// 系统资源监控
func (mc *MetricsCollector) StartSystemMonitoring() {
ticker := time.NewTicker(30 * time.Second)
go func() {
defer ticker.Stop()
for range ticker.C {
mc.collectSystemMetrics()
}
}()
}
func (mc *MetricsCollector) collectSystemMetrics() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// 内存指标
mc.SetGauge("memory_heap_bytes", float64(m.HeapAlloc), nil)
mc.SetGauge("memory_heap_sys_bytes", float64(m.HeapSys), nil)
mc.SetGauge("memory_gc_cycles_total", float64(m.NumGC), nil)
// Goroutine 数量
mc.SetGauge("goroutines_count", float64(runtime.NumGoroutine()), nil)
// CPU 核心数
mc.SetGauge("cpu_cores", float64(runtime.NumCPU()), nil)
}
// 自定义业务指标
func (mc *MetricsCollector) RecordBusinessMetric(metricName string, value float64, tags map[string]string) {
// 添加全局标签
finalTags := make(map[string]string)
for k, v := range mc.config.Tags {
finalTags[k] = v
}
for k, v := range tags {
finalTags[k] = v
}
mc.RecordHistogram(metricName, value, finalTags)
}
2. 分布式追踪
// 分布式追踪系统
type TracingSystem struct {
spans *util.SafeMap[string, *Span]
sampler Sampler
config *TracingConfig
}
type Span struct {
TraceID string
SpanID string
ParentID string
Operation string
StartTime *util.XTime
EndTime *util.XTime
Tags map[string]string
Logs []LogEntry
Status SpanStatus
}
type LogEntry struct {
Timestamp *util.XTime
Level string
Message string
Fields map[string]interface{}
}
func (ts *TracingSystem) StartSpan(operation string, parentSpan *Span) *Span {
span := &Span{
TraceID: generateTraceID(),
SpanID: generateSpanID(),
Operation: operation,
StartTime: util.Now(),
Tags: make(map[string]string),
Logs: make([]LogEntry, 0),
Status: StatusOK,
}
if parentSpan != nil {
span.TraceID = parentSpan.TraceID
span.ParentID = parentSpan.SpanID
}
ts.spans.Set(span.SpanID, span)
return span
}
func (span *Span) AddTag(key, value string) *Span {
span.Tags[key] = value
return span
}
func (span *Span) AddLog(level, message string, fields map[string]interface{}) *Span {
logEntry := LogEntry{
Timestamp: util.Now(),
Level: level,
Message: message,
Fields: fields,
}
span.Logs = append(span.Logs, logEntry)
return span
}
func (span *Span) Finish() {
span.EndTime = util.Now()
// 异步发送到追踪后端
go span.export()
}
// 中间件集成
func TracingMiddleware(ts *TracingSystem) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := ts.StartSpan(fmt.Sprintf("%s %s", r.Method, r.URL.Path), nil)
defer span.Finish()
span.AddTag("http.method", r.Method).
AddTag("http.url", r.URL.String()).
AddTag("http.user_agent", r.UserAgent())
// 将span传递到context
ctx := context.WithValue(r.Context(), "span", span)
r = r.WithContext(ctx)
// 记录请求开始
span.AddLog("info", "请求开始", map[string]interface{}{
"remote_addr": r.RemoteAddr,
"host": r.Host,
})
next.ServeHTTP(w, r)
// 记录请求结束
span.AddLog("info", "请求结束", nil)
})
}
}
🔧 配置管理
1. 多环境配置
// 企业级配置管理系统
type ConfigManager struct {
configs *util.SafeMap[string, interface{}]
watchers []ConfigWatcher
environment string
source ConfigSource
cache *util.ExpiringMap[string, interface{}]
}
type ConfigSource interface {
Load(env string) (map[string]interface{}, error)
Watch(callback func(key string, value interface{}))
}
// 配置热更新
func (cm *ConfigManager) WatchConfig(key string, callback func(interface{})) {
watcher := ConfigWatcher{
Key: key,
Callback: callback,
}
cm.watchers = append(cm.watchers, watcher)
}
// 分层配置加载
func (cm *ConfigManager) LoadConfiguration() error {
// 加载默认配置
defaultConfig, err := cm.source.Load("default")
if err != nil {
return fmt.Errorf("加载默认配置失败: %w", err)
}
// 加载环境特定配置
envConfig, err := cm.source.Load(cm.environment)
if err != nil {
return fmt.Errorf("加载环境配置失败: %w", err)
}
// 合并配置
merged := util.MapFromNative(defaultConfig).
Merge(util.MapFromNative(envConfig)).
ToMap()
// 更新配置缓存
for key, value := range merged {
cm.configs.Set(key, value)
cm.cache.Set(key, value)
}
return nil
}
// 类型安全的配置获取
func (cm *ConfigManager) GetString(key string, defaultValue string) string {
if value, exists := cm.configs.SafeGet(key); exists {
if str, ok := value.(string); ok {
return str
}
}
return defaultValue
}
func (cm *ConfigManager) GetInt(key string, defaultValue int) int {
if value, exists := cm.configs.SafeGet(key); exists {
switch v := value.(type) {
case int:
return v
case float64:
return int(v)
case string:
if i, err := strconv.Atoi(v); err == nil {
return i
}
}
}
return defaultValue
}
func (cm *ConfigManager) GetDuration(key string, defaultValue time.Duration) time.Duration {
if value, exists := cm.configs.SafeGet(key); exists {
if str, ok := value.(string); ok {
if duration, err := time.ParseDuration(str); err == nil {
return duration
}
}
}
return defaultValue
}
// 配置验证
func (cm *ConfigManager) ValidateConfiguration() error {
validators := []ConfigValidator{
NewRequiredFieldValidator([]string{
"database.host",
"database.port",
"redis.host",
"app.port",
}),
NewRangeValidator("app.port", 1, 65535),
NewRegexValidator("database.host", `^[a-zA-Z0-9.-]+$`),
}
for _, validator := range validators {
if err := validator.Validate(cm.configs.ToMap()); err != nil {
return fmt.Errorf("配置验证失败: %w", err)
}
}
return nil
}
2. 敏感信息管理
// 敏感信息管理器
type SecretManager struct {
vault SecretVault
cache *util.ExpiringMap[string, string]
encryptor Encryptor
}
type SecretVault interface {
GetSecret(key string) (string, error)
SetSecret(key, value string) error
DeleteSecret(key string) error
ListSecrets() ([]string, error)
}
func (sm *SecretManager) GetSecret(key string) (string, error) {
// 尝试从缓存获取
if cached, exists := sm.cache.Get(key); exists {
decrypted, err := sm.encryptor.Decrypt(cached)
if err == nil {
return decrypted, nil
}
}
// 从保险库获取
secret, err := sm.vault.GetSecret(key)
if err != nil {
return "", fmt.Errorf("获取密钥失败: %w", err)
}
// 加密后缓存
encrypted, err := sm.encryptor.Encrypt(secret)
if err == nil {
sm.cache.SetWithTTL(key, encrypted, 15*time.Minute)
}
return secret, nil
}
// 数据库连接字符串构建
func (sm *SecretManager) BuildDatabaseURL(configKey string) (string, error) {
dbPassword, err := sm.GetSecret("database.password")
if err != nil {
return "", err
}
config := struct {
Host string `json:"host"`
Port int `json:"port"`
Database string `json:"database"`
Username string `json:"username"`
}{}
if err := sm.loadConfig(configKey, &config); err != nil {
return "", err
}
return fmt.Sprintf("postgresql://%s:%s@%s:%d/%s",
config.Username,
url.QueryEscape(dbPassword),
config.Host,
config.Port,
config.Database,
), nil
}
🔍 日志和审计
1. 结构化日志系统
// 企业级结构化日志系统
type StructuredLogger struct {
outputs []LogOutput
processors []LogProcessor
config *LogConfig
buffer *util.SafeMap[string, []LogEntry]
}
type LogConfig struct {
Level LogLevel
Format LogFormat
EnableAsync bool
BufferSize int
FlushInterval time.Duration
EnableSampling bool
SampleRate float64
}
type LogEntry struct {
Timestamp *util.XTime `json:"timestamp"`
Level LogLevel `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
UserID string `json:"user_id,omitempty"`
ServiceName string `json:"service_name"`
Environment string `json:"environment"`
}
func (sl *StructuredLogger) Log(level LogLevel, message string, fields map[string]interface{}) {
if level < sl.config.Level {
return
}
entry := LogEntry{
Timestamp: util.Now(),
Level: level,
Message: message,
Fields: fields,
ServiceName: sl.config.ServiceName,
Environment: sl.config.Environment,
}
// 添加追踪信息
if ctx := context.Background(); ctx != nil {
if span, ok := ctx.Value("span").(*Span); ok {
entry.TraceID = span.TraceID
entry.SpanID = span.SpanID
}
if requestID, ok := ctx.Value("request_id").(string); ok {
entry.RequestID = requestID
}
}
// 处理日志条目
for _, processor := range sl.processors {
entry = processor.Process(entry)
}
// 输出日志
if sl.config.EnableAsync {
sl.asyncOutput(entry)
} else {
sl.syncOutput(entry)
}
}
// 业务审计日志
func (sl *StructuredLogger) AuditLog(action string, resource string, userID string, details map[string]interface{}) {
auditFields := map[string]interface{}{
"audit_type": "business_action",
"action": action,
"resource": resource,
"user_id": userID,
"timestamp": util.Now().Timestamp(),
}
// 合并详细信息
for k, v := range details {
auditFields[k] = v
}
sl.Log(LogLevelInfo, fmt.Sprintf("用户 %s 执行了 %s 操作", userID, action), auditFields)
}
// 安全审计日志
func (sl *StructuredLogger) SecurityAuditLog(event string, severity string, sourceIP string, userAgent string, details map[string]interface{}) {
securityFields := map[string]interface{}{
"audit_type": "security_event",
"event": event,
"severity": severity,
"source_ip": sourceIP,
"user_agent": userAgent,
"timestamp": util.Now().Timestamp(),
}
for k, v := range details {
securityFields[k] = v
}
level := LogLevelWarn
if severity == "critical" {
level = LogLevelError
}
sl.Log(level, fmt.Sprintf("安全事件: %s", event), securityFields)
}
2. 审计追踪系统
// 审计追踪系统
type AuditTracker struct {
storage AuditStorage
config *AuditConfig
encoder AuditEncoder
}
type AuditRecord struct {
ID string `json:"id"`
Timestamp *util.XTime `json:"timestamp"`
EventType string `json:"event_type"`
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
IPAddress string `json:"ip_address"`
UserAgent string `json:"user_agent"`
Resource string `json:"resource"`
Action string `json:"action"`
Status string `json:"status"`
Details map[string]interface{} `json:"details"`
Before interface{} `json:"before,omitempty"`
After interface{} `json:"after,omitempty"`
TraceID string `json:"trace_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
}
func (at *AuditTracker) RecordDataAccess(userID string, resource string, action string, status string, details map[string]interface{}) {
record := AuditRecord{
ID: generateAuditID(),
Timestamp: util.Now(),
EventType: "data_access",
UserID: userID,
Resource: resource,
Action: action,
Status: status,
Details: details,
}
at.storeAuditRecord(record)
}
func (at *AuditTracker) RecordDataModification(userID string, resource string, before interface{}, after interface{}, status string) {
record := AuditRecord{
ID: generateAuditID(),
Timestamp: util.Now(),
EventType: "data_modification",
UserID: userID,
Resource: resource,
Action: "modify",
Status: status,
Before: before,
After: after,
}
at.storeAuditRecord(record)
}
// 合规性报告生成
func (at *AuditTracker) GenerateComplianceReport(startTime, endTime *util.XTime, filters map[string]string) (*ComplianceReport, error) {
records, err := at.storage.QueryAuditRecords(startTime, endTime, filters)
if err != nil {
return nil, fmt.Errorf("查询审计记录失败: %w", err)
}
report := &ComplianceReport{
Period: fmt.Sprintf("%s to %s", startTime.FormatDate(), endTime.FormatDate()),
Generated: util.Now(),
Summary: at.generateSummary(records),
Details: records,
}
return report, nil
}
func (at *AuditTracker) generateSummary(records []AuditRecord) ComplianceSummary {
recordArray := util.ArraysFromSlice(records)
return ComplianceSummary{
TotalEvents: recordArray.Len(),
UserAccess: recordArray.Count(func(r AuditRecord) bool {
return r.EventType == "data_access"
}),
DataModifications: recordArray.Count(func(r AuditRecord) bool {
return r.EventType == "data_modification"
}),
FailedAttempts: recordArray.Count(func(r AuditRecord) bool {
return r.Status == "failed"
}),
UniqueUsers: len(recordArray.
Map(func(r AuditRecord) string { return r.UserID }).
Distinct().
ToSlice()),
}
}
💬 获取帮助
如果您在企业级应用中遇到问题:
- 🔍 查看FAQ - 常见企业级问题解答
- 🐛 报告问题 - Bug反馈
- 💡 功能建议 - 新功能讨论
- 📧 邮件支持 [email protected]
- 📞 企业级支持 - 提供专业的技术咨询服务
🏢 Go-Util 为您的企业级应用提供强大而可靠的基础设施支持!