
大规模 WebSocket 架构
单服务器 WebSocket 在需要水平扩展时就会遇到瓶颈。以下是如何处理这一问题。
Socket.io 服务器设置
import { Server } from 'socket.io'
import { createServer } from 'http'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient } from 'redis'
const httpServer = createServer(app)
const io = new Server(httpServer, {
cors: { origin: process.env.CLIENT_URL, credentials: true },
connectionStateRecovery: {
maxDisconnectionDuration: 2 * 60 * 1000, // 2 min
skipMiddlewares: false,
},
})
// Redis adapter for horizontal scaling
const pubClient = createClient({ url: process.env.REDIS_URL })
const subClient = pubClient.duplicate()
await Promise.all([pubClient.connect(), subClient.connect()])
io.adapter(createAdapter(pubClient, subClient))
httpServer.listen(3000)
认证中间件
import { ExtractJwt } from 'passport-jwt'
io.use((socket, next) => {
const token = socket.handshake.auth.token
if (!token) return next(new Error('Authentication required'))
try {
const payload = jwt.verify(token, process.env.JWT_SECRET!)
socket.data.userId = payload.sub
socket.data.role = payload.role
next()
} catch (err) {
next(new Error('Invalid token'))
}
})
房间与命名空间
// Chat room logic
io.on('connection', (socket) => {
console.log(`Connected: ${socket.id} (user: ${socket.data.userId})`)
// Join user to their personal room
socket.join(`user:${socket.data.userId}`)
socket.on('join_room', async (roomId) => {
// Verify permission
const canJoin = await checkRoomAccess(socket.data.userId, roomId)
if (!canJoin) return socket.emit('error', 'Access denied')
socket.join(roomId)
socket.to(roomId).emit('user_joined', { userId: socket.data.userId })
// Send recent messages
const messages = await getRecentMessages(roomId)
socket.emit('room_history', messages)
})
socket.on('send_message', async ({ roomId, text }) => {
const message = await saveMessage({ userId: socket.data.userId, roomId, text })
io.to(roomId).emit('new_message', message) // Broadcast to everyone in room
})
socket.on('typing', ({ roomId }) => {
socket.to(roomId).emit('user_typing', { userId: socket.data.userId })
})
socket.on('disconnect', () => {
console.log(`Disconnected: ${socket.id}`)
})
})
// Send to specific user (works across all servers with Redis adapter)
function notifyUser(userId: string, event: string, data: any) {
io.to(`user:${userId}`).emit(event, data)
}
不同功能的命名空间
// Admin namespace
const adminNs = io.of('/admin')
adminNs.use((socket, next) => {
if (socket.data.role !== 'admin') return next(new Error('Admin only'))
next()
})
adminNs.on('connection', (socket) => {
socket.on('broadcast_announcement', (message) => {
io.emit('announcement', message) // Broadcast to ALL connected clients
})
})
客户端
import { io } from 'socket.io-client'
const socket = io('https://api.example.com', {
auth: { token: localStorage.getItem('accessToken') },
withCredentials: true,
})
socket.on('connect', () => console.log('Connected'))
socket.on('connect_error', (err) => console.error('Connection failed:', err.message))
socket.emit('join_room', 'room-123')
socket.on('new_message', (msg) => updateUI(msg))
// Reconnect recovery
socket.on('connect', () => {
if (socket.recovered) {
console.log('Recovered from disconnect')
}
})
Nginx 粘性会话
upstream socketio {
ip_hash; # Sticky sessions by client IP
server app1:3000;
server app2:3000;
server app3:3000;
}
location /socket.io/ {
proxy_pass http://socketio;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400;
}