正在加载,请稍候…

WebSockets 与 Node.js 和 Socket.io 的实时功能

使用 WebSockets 和 Socket.io 构建实时功能。学习房间管理、身份验证、使用 Redis 适配器进行扩展以及处理重连。

WebSockets 与 Node.js 和 Socket.io 的实时功能

WebSockets 与 Node.js 的实时功能

Socket.io 服务器设置

import express from 'express';
import { createServer } from 'http';
import { Server, Socket } from 'socket.io';

const app = express();
const httpServer = createServer(app);

const io = new Server(httpServer, {
  cors: {
    origin: process.env.FRONTEND_URL || 'http://localhost:3000',
    credentials: true,
  },
  transports: ['websocket', 'polling'],
});

httpServer.listen(3001);

WebSockets 与 Node.js 和 Socket.io 的实时功能插图

身份验证

import { verifyToken } from './auth';

io.use(async (socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) return next(new Error('Authentication required'));

  try {
    const payload = await verifyToken(token);
    socket.data.userId = payload.userId;
    socket.data.user = payload;
    next();
  } catch (err) {
    next(new Error('Invalid token'));
  }
});

WebSockets 与 Node.js 和 Socket.io 的实时功能插图

房间管理(聊天示例)

interface ServerToClientEvents {
  'message:received': (message: ChatMessage) => void;
  'user:joined': (user: { id: string; name: string }) => void;
  'user:left': (userId: string) => void;
  'room:users': (users: User[]) => void;
}

interface ClientToServerEvents {
  'message:send': (data: { roomId: string; content: string }, callback: (ack: MessageAck) => void) => void;
  'room:join': (roomId: string) => void;
  'room:leave': (roomId: string) => void;
}

const io = new Server<ClientToServerEvents, ServerToClientEvents>(httpServer);

io.on('connection', (socket) => {
  console.log(`User ${socket.data.userId} connected`);

  socket.on('room:join', async (roomId) => {
    await socket.join(roomId);

    // 通知房间内其他用户
    socket.to(roomId).emit('user:joined', {
      id: socket.data.userId,
      name: socket.data.user.name,
    });

    // 向加入的用户发送当前房间用户列表
    const socketsInRoom = await io.in(roomId).fetchSockets();
    const users = socketsInRoom.map(s => ({ id: s.data.userId, name: s.data.user.name }));
    socket.emit('room:users', users);
  });

  socket.on('message:send', async ({ roomId, content }, callback) => {
    const message = await saveMessage({ userId: socket.data.userId, roomId, content });

    // 广播给房间内所有用户(包括发送者)
    io.to(roomId).emit('message:received', message);

    callback({ ok: true, messageId: message.id });
  });

  socket.on('disconnect', () => {
    // 通知用户所在的所有房间
    socket.rooms.forEach(roomId => {
      if (roomId !== socket.id) {
        socket.to(roomId).emit('user:left', socket.data.userId);
      }
    });
  });
});

WebSockets 与 Node.js 和 Socket.io 的实时功能插图

使用 Redis 适配器进行扩展

import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

io.adapter(createAdapter(pubClient, subClient));

// 现在 socket.io 事件可以在多个 Node.js 实例间工作
// io.to('room:123').emit('message', data)  <- 在所有服务器上工作

客户端(React)

import { io, Socket } from 'socket.io-client';
import { useEffect, useRef, useState } from 'react';

function useSocket(token: string) {
  const socketRef = useRef<Socket | null>(null);
  const [connected, setConnected] = useState(false);

  useEffect(() => {
    const socket = io(process.env.NEXT_PUBLIC_WS_URL!, {
      auth: { token },
      reconnection: true,
      reconnectionAttempts: 5,
      reconnectionDelay: 1000,
    });

    socketRef.current = socket;

    socket.on('connect', () => setConnected(true));
    socket.on('disconnect', () => setConnected(false));
    socket.on('connect_error', (err) => console.error('Connection error:', err));

    return () => { socket.disconnect(); };
  }, [token]);

  return { socket: socketRef.current, connected };
}

在线状态系统

// 使用 Redis 跟踪在线用户
const onlineKey = (userId: string) => `online:${userId}`;

io.on('connection', async (socket) => {
  await redis.set(onlineKey(socket.data.userId), '1', { EX: 30 });

  socket.on('disconnect', async () => {
    await redis.del(onlineKey(socket.data.userId));
  });
});

// 心跳维持在线状态
setInterval(() => {
  io.sockets.sockets.forEach(async (socket) => {
    await redis.expire(onlineKey(socket.data.userId), 30);
  });
}, 15000);

// 检查用户是否在线
async function isUserOnline(userId: string): Promise<boolean> {
  return !!(await redis.exists(onlineKey(userId)));
}

WebSockets 实现了丰富的实时体验,而 Redis 适配器则处理了水平扩展。