正在加载,请稍候…

Web Workers 实现并行 JavaScript:优化 CPU 密集型任务

掌握 Web Workers 和 SharedArrayBuffer 实现并行 JavaScript 执行:工作池、可转移对象、Atomics 同步、Comli

Web Workers 实现并行 JavaScript:优化 CPU 密集型任务

Web Workers:浏览器中的真正并行

JavaScript 是单线程的——每次计算都会阻塞 UI 线程。Web Workers 在后台线程中运行脚本,为 CPU 密集型工作实现真正的并行,而不会冻结浏览器。

问题:主线程饱和

// 这会冻结 UI 几秒钟
function processLargeDataset(data) {
    return data.map(item => heavyComputation(item)); // 阻塞 UI!
}

// 用户会看到:
// - 滚动卡顿
// - 按钮无响应
// - '页面无响应' 对话框
// Chrome DevTools:长任务 > 50ms = 卡顿

Web Workers 实现并行 JavaScript:优化 CPU 密集型任务 插图

基本 Worker 模式

// worker.js - 在独立线程中运行
self.onmessage = function(event) {
    const { type, data } = event.data;
    if (type === 'PROCESS') {
        const result = heavyComputation(data);
        self.postMessage({ type: 'RESULT', result });
    }
};

function heavyComputation(data) {
    // 在此进行 CPU 密集型工作 - 不会阻塞 UI
    return data.map(x => x * x).filter(x => x % 2 === 0).reduce((a, b) => a + b, 0);
}

// main.js
const worker = new Worker('/worker.js');

worker.onmessage = ({ data }) => {
    if (data.type === 'RESULT') {
        displayResult(data.result);
    }
};

worker.onerror = (error) => {
    console.error('Worker 错误:', error.message);
    worker.terminate();
};

worker.postMessage({ type: 'PROCESS', data: largeArray });

可转移对象:零拷贝数据传输

默认情况下,postMessage 会拷贝数据(对于大缓冲区开销很大)。改为转移所有权:

// 不好:拷贝 100MB 缓冲区(耗时约 100ms)
worker.postMessage({ buffer: largeArrayBuffer });

// 好:转移所有权(耗时约 0.1ms,但发送方无法再使用该缓冲区)
worker.postMessage({ buffer: largeArrayBuffer }, [largeArrayBuffer]);
// largeArrayBuffer.byteLength === 0 转移后!

// 模式:完成后转移回来
// worker.js
self.onmessage = ({ data }) => {
    const { buffer } = data;
    const view = new Float32Array(buffer);
    for (let i = 0; i < view.length; i++) view[i] *= 2.0;
    self.postMessage({ buffer }, [buffer]); // 转移回来
};

Web Workers 实现并行 JavaScript:优化 CPU 密集型任务 插图

工作池实现并行

class WorkerPool {
    constructor(workerScript, poolSize = navigator.hardwareConcurrency) {
        this.workers = Array.from({ length: poolSize }, () =>
            new Worker(workerScript)
        );
        this.queue = [];
        this.idle = [...this.workers];
        this.workers.forEach(w => w.onmessage = this._onMessage.bind(this));
    }

    execute(data, transferables = []) {
        return new Promise((resolve, reject) => {
            const task = { data, transferables, resolve, reject };
            if (this.idle.length > 0) {
                this._dispatch(task);
            } else {
                this.queue.push(task);
            }
        });
    }

    _dispatch(task) {
        const worker = this.idle.pop();
        worker._currentTask = task;
        worker.postMessage(task.data, task.transferables);
    }

    _onMessage({ target: worker, data }) {
        worker._currentTask.resolve(data);
        if (this.queue.length > 0) {
            this._dispatch(this.queue.shift());
        } else {
            this.idle.push(worker);
        }
    }

    async processAll(items) {
        return Promise.all(items.map(item => this.execute(item)));
    }

    terminate() { this.workers.forEach(w => w.terminate()); }
}

const pool = new WorkerPool('/image-processor.js', 4);
const results = await pool.processAll(imageChunks);

SharedArrayBuffer 和 Atomics

用于线程间的真正共享内存(需要 COOP/COEP 头):

// 共享内存 - 主线程和 worker 都可以读写
const sharedBuffer = new SharedArrayBuffer(4 * 1024 * 1024); // 4MB
const sharedArray = new Float32Array(sharedBuffer);

// 用于工作分配的原子计数器
const counterBuffer = new SharedArrayBuffer(4);
const counter = new Int32Array(counterBuffer);

// Worker:原子地获取一个块
self.onmessage = ({ data }) => {
    const { sharedBuffer, counterBuffer, total } = data;
    const arr = new Float32Array(sharedBuffer);
    const cnt = new Int32Array(counterBuffer);
    const CHUNK_SIZE = 1000;

    while (true) {
        const start = Atomics.add(cnt, 0, CHUNK_SIZE); // 原子获取并增加
        if (start >= total) break;
        const end = Math.min(start + CHUNK_SIZE, total);
        for (let i = start; i < end; i++) {
            arr[i] = Math.sqrt(arr[i]); // 处理块
        }
    }
    self.postMessage('done');
};

// Atomics.wait/notify 用于同步
const lockBuffer = new SharedArrayBuffer(4);
const lock = new Int32Array(lockBuffer);

// 等待值改变(worker 在此阻塞)
Atomics.wait(lock, 0, 0); // 等待直到 lock[0] !== 0

// 从主线程发送信号
Atomics.store(lock, 0, 1);
Atomics.notify(lock, 0, Infinity); // 唤醒所有等待的 worker

Web Workers 实现并行 JavaScript:优化 CPU 密集型任务 插图

Comlink:人性化的 Worker API

// processor.worker.js
import { expose } from 'comlink';

const api = {
    async processImage(imageData) {
        // 在 worker 线程中运行
        return applyGrayscaleFilter(imageData);
    },

    async computeHash(data) {
        const buffer = await crypto.subtle.digest('SHA-256', data);
        return Array.from(new Uint8Array(buffer))
            .map(b => b.toString(16).padStart(2, '0')).join('');
    }
};
expose(api);

// main.js - 看起来像常规的异步函数调用!
import { wrap } from 'comlink';
const worker = new Worker('/processor.worker.js', { type: 'module' });
const processor = wrap(worker);

// 像常规异步函数一样调用 worker 方法
const hash = await processor.computeHash(fileBuffer);
const processed = await processor.processImage(imageData);

实际应用场景

// 使用 Papa Parse 在 worker 中解析 CSV
// worker.js
importScripts('https://unpkg.com/papaparse/papaparse.min.js');
self.onmessage = ({ data }) => {
    const result = Papa.parse(data.csv, { header: true, dynamicTyping: true });
    self.postMessage(result.data);
};

// Markdown 编译
// worker.js
import { marked } from 'marked';
self.onmessage = ({ data }) => {
    self.postMessage(marked(data.markdown));
};

// 打包:vite/webpack worker 语法
// main.js
const csvWorker = new Worker(new URL('./csv.worker.js', import.meta.url));

性能指南

  • 使用 worker 的场景:耗时 >16ms 且阻塞 UI 的任务、图像/视频处理、CSV/JSON 解析、加密、模拟
  • 避免使用 worker 的场景:DOM 操作(不允许)、快速操作(<1ms)、以网络 I/O 为主的任务
  • 池大小:最多 navigator.hardwareConcurrency 个 worker(通常 4-16)
  • 消息大小:大于 10KB 的缓冲区使用转移;小对象使用拷贝

Web Workers 是处理 CPU 密集型 JavaScript 的正确工具。借助 Comlink,易用性障碍很小。先进行性能分析,然后卸载特定的慢路径。