
WebSockets 与 Node.js 的实时功能
Socket.io 服务器设置
import express from 'express';
import { createServer } from 'http';
import { Server, Socket } from 'socket.io';
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: process.env.FRONTEND_URL || 'http://localhost:3000',
credentials: true,
},
transports: ['websocket', 'polling'],
});
httpServer.listen(3001);
身份验证
import { verifyToken } from './auth';
io.use(async (socket, next) => {
const token = socket.handshake.auth.token;
if (!token) return next(new Error('Authentication required'));
try {
const payload = await verifyToken(token);
socket.data.userId = payload.userId;
socket.data.user = payload;
next();
} catch (err) {
next(new Error('Invalid token'));
}
});
房间管理(聊天示例)
interface ServerToClientEvents {
'message:received': (message: ChatMessage) => void;
'user:joined': (user: { id: string; name: string }) => void;
'user:left': (userId: string) => void;
'room:users': (users: User[]) => void;
}
interface ClientToServerEvents {
'message:send': (data: { roomId: string; content: string }, callback: (ack: MessageAck) => void) => void;
'room:join': (roomId: string) => void;
'room:leave': (roomId: string) => void;
}
const io = new Server<ClientToServerEvents, ServerToClientEvents>(httpServer);
io.on('connection', (socket) => {
console.log(`User ${socket.data.userId} connected`);
socket.on('room:join', async (roomId) => {
await socket.join(roomId);
// 通知房间内其他用户
socket.to(roomId).emit('user:joined', {
id: socket.data.userId,
name: socket.data.user.name,
});
// 向加入的用户发送当前房间用户列表
const socketsInRoom = await io.in(roomId).fetchSockets();
const users = socketsInRoom.map(s => ({ id: s.data.userId, name: s.data.user.name }));
socket.emit('room:users', users);
});
socket.on('message:send', async ({ roomId, content }, callback) => {
const message = await saveMessage({ userId: socket.data.userId, roomId, content });
// 广播给房间内所有用户(包括发送者)
io.to(roomId).emit('message:received', message);
callback({ ok: true, messageId: message.id });
});
socket.on('disconnect', () => {
// 通知用户所在的所有房间
socket.rooms.forEach(roomId => {
if (roomId !== socket.id) {
socket.to(roomId).emit('user:left', socket.data.userId);
}
});
});
});
使用 Redis 适配器进行扩展
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
io.adapter(createAdapter(pubClient, subClient));
// 现在 socket.io 事件可以在多个 Node.js 实例间工作
// io.to('room:123').emit('message', data) <- 在所有服务器上工作
客户端(React)
import { io, Socket } from 'socket.io-client';
import { useEffect, useRef, useState } from 'react';
function useSocket(token: string) {
const socketRef = useRef<Socket | null>(null);
const [connected, setConnected] = useState(false);
useEffect(() => {
const socket = io(process.env.NEXT_PUBLIC_WS_URL!, {
auth: { token },
reconnection: true,
reconnectionAttempts: 5,
reconnectionDelay: 1000,
});
socketRef.current = socket;
socket.on('connect', () => setConnected(true));
socket.on('disconnect', () => setConnected(false));
socket.on('connect_error', (err) => console.error('Connection error:', err));
return () => { socket.disconnect(); };
}, [token]);
return { socket: socketRef.current, connected };
}
在线状态系统
// 使用 Redis 跟踪在线用户
const onlineKey = (userId: string) => `online:${userId}`;
io.on('connection', async (socket) => {
await redis.set(onlineKey(socket.data.userId), '1', { EX: 30 });
socket.on('disconnect', async () => {
await redis.del(onlineKey(socket.data.userId));
});
});
// 心跳维持在线状态
setInterval(() => {
io.sockets.sockets.forEach(async (socket) => {
await redis.expire(onlineKey(socket.data.userId), 30);
});
}, 15000);
// 检查用户是否在线
async function isUserOnline(userId: string): Promise<boolean> {
return !!(await redis.exists(onlineKey(userId)));
}
WebSockets 实现了丰富的实时体验,而 Redis 适配器则处理了水平扩展。