← 返回
开发者工具 中文

Websocket Hub Patterns

Horizontally-scalable WebSocket hub pattern with lazy Redis subscriptions, connection registry, and graceful shutdown. Use when building real-time WebSocket servers that scale across multiple instances. Triggers on WebSocket hub, WebSocket scaling, connection registry, Redis WebSocket, real-time gateway, horizontal scaling.
可水平扩展的 WebSocket 中心模式,支持懒加载 Redis 订阅、连接注册表和优雅关闭。适用于构建可跨多实例扩展的实时 WebSocket 服务器。
wpank wpank 来源
开发者工具 clawhub v1.0.0 1 版本 99907.1 Key: 无需
★ 0
Stars
📥 1,076
下载
💾 7
安装
1
版本
#latest

概述

WebSocket Hub Patterns

Production patterns for horizontally-scalable WebSocket connections with Redis-backed coordination.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install websocket-hub-patterns

When to Use

  • Real-time bidirectional communication
  • Chat applications, collaborative editing
  • Live dashboards with client interactions
  • Need horizontal scaling across multiple gateway instances

Hub Structure

type Hub struct {
    // Local state
    connections   map[*Connection]bool
    subscriptions map[string]map[*Connection]bool // channel -> connections

    // Channels
    register   chan *Connection
    unregister chan *Connection
    broadcast  chan *Event

    // Redis for scaling
    redisClient  *redis.Client
    redisSubs    map[string]*goredis.PubSub
    redisSubLock sync.Mutex

    // Optional: Distributed registry
    connRegistry *ConnectionRegistry
    instanceID   string

    // Shutdown
    done chan struct{}
    wg   sync.WaitGroup
}

Hub Main Loop

func (h *Hub) Run() {
    for {
        select {
        case <-h.done:
            return

        case conn := <-h.register:
            h.connections[conn] = true
            if h.connRegistry != nil {
                h.connRegistry.RegisterConnection(ctx, conn.ID(), info)
            }

        case conn := <-h.unregister:
            if _, ok := h.connections[conn]; ok {
                if h.connRegistry != nil {
                    h.connRegistry.UnregisterConnection(ctx, conn.ID())
                }
                h.removeConnection(conn)
            }

        case event := <-h.broadcast:
            h.broadcastToChannel(event)
        }
    }
}

Lazy Redis Subscriptions

Subscribe to Redis only when first local subscriber joins:

func (h *Hub) subscribeToChannel(conn *Connection, channel string) error {
    // Add to local subscriptions
    if h.subscriptions[channel] == nil {
        h.subscriptions[channel] = make(map[*Connection]bool)
    }
    h.subscriptions[channel][conn] = true

    // Lazy: Only subscribe to Redis on first subscriber
    h.redisSubLock.Lock()
    defer h.redisSubLock.Unlock()

    if _, exists := h.redisSubs[channel]; !exists {
        pubsub := h.redisClient.Subscribe(context.Background(), channel)
        h.redisSubs[channel] = pubsub
        go h.forwardRedisMessages(channel, pubsub)
    }

    return nil
}

func (h *Hub) unsubscribeFromChannel(conn *Connection, channel string) {
    if subs, ok := h.subscriptions[channel]; ok {
        delete(subs, conn)

        // Cleanup when no local subscribers
        if len(subs) == 0 {
            delete(h.subscriptions, channel)
            h.closeRedisSubscription(channel)
        }
    }
}

Redis Message Forwarding

func (h *Hub) forwardRedisMessages(channel string, pubsub *goredis.PubSub) {
    ch := pubsub.Channel()
    for {
        select {
        case <-h.done:
            return
        case msg, ok := <-ch:
            if !ok {
                return
            }
            h.broadcast <- &Event{
                Channel: channel,
                Data:    []byte(msg.Payload),
            }
        }
    }
}

func (h *Hub) broadcastToChannel(event *Event) {
    subs := h.subscriptions[event.Channel]
    for conn := range subs {
        select {
        case conn.send <- event.Data:
            // Sent
        default:
            // Buffer full - close slow client
            h.removeConnection(conn)
        }
    }
}

Connection Write Pump

func (c *Connection) writePump() {
    ticker := time.NewTicker(54 * time.Second) // Ping interval
    defer func() {
        ticker.Stop()
        c.conn.Close()
    }()

    for {
        select {
        case message, ok := <-c.send:
            c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if !ok {
                c.conn.WriteMessage(websocket.CloseMessage, []byte{})
                return
            }
            c.conn.WriteMessage(websocket.TextMessage, message)

            // Batch drain queue
            for i := 0; i < len(c.send); i++ {
                c.conn.WriteMessage(websocket.TextMessage, <-c.send)
            }

        case <-ticker.C:
            if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

Connection Registry for Horizontal Scaling

type ConnectionRegistry struct {
    client     *redis.Client
    instanceID string
}

func (r *ConnectionRegistry) RegisterConnection(ctx context.Context, connID string, info ConnectionInfo) error {
    info.InstanceID = r.instanceID
    data, _ := json.Marshal(info)
    return r.client.Set(ctx, "ws:conn:"+connID, data, 2*time.Minute).Err()
}

func (r *ConnectionRegistry) HeartbeatInstance(ctx context.Context, connectionCount int) error {
    info := InstanceInfo{
        InstanceID:  r.instanceID,
        Connections: connectionCount,
    }
    data, _ := json.Marshal(info)
    return r.client.Set(ctx, "ws:instance:"+r.instanceID, data, 30*time.Second).Err()
}

Graceful Shutdown

func (h *Hub) Shutdown() {
    close(h.done)

    // Close all Redis subscriptions
    h.redisSubLock.Lock()
    for channel, pubsub := range h.redisSubs {
        pubsub.Close()
        delete(h.redisSubs, channel)
    }
    h.redisSubLock.Unlock()

    // Close all connections
    for conn := range h.connections {
        conn.Close()
    }

    h.wg.Wait()
}

Decision Tree

SituationApproach
---------------------
Single instanceSkip ConnectionRegistry
Multi-instanceEnable ConnectionRegistry
No subscribers to channelLazy unsubscribe from Redis
Slow clientClose on buffer overflow
Need message historyUse Redis Streams + Pub/Sub

Related Skills


NEVER Do

  • NEVER block on conn.send — Use select with default to detect overflow
  • NEVER skip graceful shutdown — Clients need close frames
  • NEVER share pubsub across channels — Each channel needs own subscription
  • NEVER forget instance heartbeat — Dead instances leave orphaned connections
  • NEVER send without ping/pong — Load balancers close "idle" connections

版本历史

共 1 个版本

  • v1.0.0 当前
    2026-03-29 03:14 安全 安全

安全检测

腾讯云安全 (Keen)

安全,无风险
查看报告

腾讯云安全 (Sanbu)

安全,无风险
查看报告

🔗 相关推荐

design-media

Tailwind Design System

wpank
使用 CVA 构建可扩展、可主题化的 Tailwind CSS 组件库,支持变体、复合组件、设计令牌、暗黑模式及响应式网格。
★ 8 📥 6,116
dev-programming

Github

steipete
使用 `gh` CLI 与 GitHub 交互,通过 `gh issue`、`gh pr`、`gh run` 和 `gh api` 管理议题、PR、CI 运行及高级查询。
★ 677 📥 326,033
dev-programming

Mcporter

steipete
使用 mcporter CLI 直接列出、配置、认证及调用 MCP 服务器/工具(支持 HTTP 或 stdio),涵盖临时服务器、配置编辑及 CLI/类型生成功能。
★ 195 📥 67,437