正在加载,请稍候…

Node.js 中可扩展的 WebSocket:Socket.io、Redis 扩展与负载均衡

使用 Socket.io 构建可扩展的实时 WebSocket 服务器,涵盖房间、命名空间、Redis 适配器实现水平扩展、粘性会话以及连接状态恢复。

Scalable WebSockets in Node.js: Socket.io, Scaling with Redis, and Load Balancin

大规模 WebSocket 架构

单服务器 WebSocket 在需要水平扩展时就会遇到瓶颈。以下是如何处理这一问题。

Scalable WebSockets in Node.js: Socket.io, Scaling with Redis, and Load Balancin illustration

Socket.io 服务器设置

import { Server } from 'socket.io'
import { createServer } from 'http'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient } from 'redis'

const httpServer = createServer(app)
const io = new Server(httpServer, {
  cors: { origin: process.env.CLIENT_URL, credentials: true },
  connectionStateRecovery: {
    maxDisconnectionDuration: 2 * 60 * 1000, // 2 min
    skipMiddlewares: false,
  },
})

// Redis adapter for horizontal scaling
const pubClient = createClient({ url: process.env.REDIS_URL })
const subClient = pubClient.duplicate()
await Promise.all([pubClient.connect(), subClient.connect()])
io.adapter(createAdapter(pubClient, subClient))

httpServer.listen(3000)

认证中间件

import { ExtractJwt } from 'passport-jwt'

io.use((socket, next) => {
  const token = socket.handshake.auth.token
  if (!token) return next(new Error('Authentication required'))
  
  try {
    const payload = jwt.verify(token, process.env.JWT_SECRET!)
    socket.data.userId = payload.sub
    socket.data.role = payload.role
    next()
  } catch (err) {
    next(new Error('Invalid token'))
  }
})

Scalable WebSockets in Node.js: Socket.io, Scaling with Redis, and Load Balancin illustration

房间与命名空间

// Chat room logic
io.on('connection', (socket) => {
  console.log(`Connected: ${socket.id} (user: ${socket.data.userId})`)
  
  // Join user to their personal room
  socket.join(`user:${socket.data.userId}`)
  
  socket.on('join_room', async (roomId) => {
    // Verify permission
    const canJoin = await checkRoomAccess(socket.data.userId, roomId)
    if (!canJoin) return socket.emit('error', 'Access denied')
    
    socket.join(roomId)
    socket.to(roomId).emit('user_joined', { userId: socket.data.userId })
    
    // Send recent messages
    const messages = await getRecentMessages(roomId)
    socket.emit('room_history', messages)
  })
  
  socket.on('send_message', async ({ roomId, text }) => {
    const message = await saveMessage({ userId: socket.data.userId, roomId, text })
    io.to(roomId).emit('new_message', message) // Broadcast to everyone in room
  })
  
  socket.on('typing', ({ roomId }) => {
    socket.to(roomId).emit('user_typing', { userId: socket.data.userId })
  })
  
  socket.on('disconnect', () => {
    console.log(`Disconnected: ${socket.id}`)
  })
})

// Send to specific user (works across all servers with Redis adapter)
function notifyUser(userId: string, event: string, data: any) {
  io.to(`user:${userId}`).emit(event, data)
}

不同功能的命名空间

// Admin namespace
const adminNs = io.of('/admin')
adminNs.use((socket, next) => {
  if (socket.data.role !== 'admin') return next(new Error('Admin only'))
  next()
})

adminNs.on('connection', (socket) => {
  socket.on('broadcast_announcement', (message) => {
    io.emit('announcement', message) // Broadcast to ALL connected clients
  })
})

Scalable WebSockets in Node.js: Socket.io, Scaling with Redis, and Load Balancin illustration

客户端

import { io } from 'socket.io-client'

const socket = io('https://api.example.com', {
  auth: { token: localStorage.getItem('accessToken') },
  withCredentials: true,
})

socket.on('connect', () => console.log('Connected'))
socket.on('connect_error', (err) => console.error('Connection failed:', err.message))

socket.emit('join_room', 'room-123')
socket.on('new_message', (msg) => updateUI(msg))

// Reconnect recovery
socket.on('connect', () => {
  if (socket.recovered) {
    console.log('Recovered from disconnect')
  }
})

Nginx 粘性会话

upstream socketio {
  ip_hash;  # Sticky sessions by client IP
  server app1:3000;
  server app2:3000;
  server app3:3000;
}

location /socket.io/ {
  proxy_pass http://socketio;
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
  proxy_read_timeout 86400;
}