• 售前

  • 售后

热门帖子
入门百科

nodejs中使用worker_threads来创建新的线程的方法

[复制链接]
击管虽您 显示全部楼层 发表于 2021-10-25 19:20:12 |阅读模式 打印 上一主题 下一主题
简介

之前的文章中提到了,nodejs中有两种线程,一种是event loop用来相应用户的请求和处置惩罚各种callback。另一种就是worker pool用来处置惩罚各种耗时利用。
nodejs的官网提到了一个能够使用nodejs本地woker pool的lib叫做webworker-threads。
痛惜的是webworker-threads的末了一次更新还是在2年前,而在最新的nodejs 12中,根本无法使用。
而webworker-threads的作者则保举了一个新的lib叫做web-worker。
web-worker是构建于nodejs的worker_threads之上的,本文将会详细讲解worker_threads和web-worker的使用。
worker_threads

worker_threads模块的源代码源自lib/worker_threads.js,它指的是工作线程,可以开启一个新的线程来并行实行javascript步调。
worker_threads重要用来处置惩罚CPU麋集型利用,而不是IO利用,因为nodejs本身的异步IO已经非常强大了。
worker_threads中重要有5个属性,3个class和3个重要的方法。接下来我们将会一一讲解。
isMainThread

isMainThread用来判断代码是否在主线程中运行,我们看一个使用的例子:
  1. const { Worker, isMainThread } = require('worker_threads');
  2. if (isMainThread) {
  3. console.log('在主线程中');
  4. new Worker(__filename);
  5. } else {
  6. console.log('在工作线程中');
  7. console.log(isMainThread); // 打印 'false'。
  8. }
复制代码
上面的例子中,我们从worker_threads模块中引入了Worker和isMainThread,Worker就是工作线程的主类,我们将会在后面详细讲解,这里我们使用Worker创建了一个工作线程。
MessageChannel

MessageChannel代表的是一个异步双向通讯channel。MessageChannel中没有方法,重要通过MessageChannel来毗连两头的MessagePort。
  1. class MessageChannel {
  2.   readonly port1: MessagePort;
  3.   readonly port2: MessagePort;
  4. }
复制代码
当我们使用new MessageChannel()的时间,会主动创建两个MessagePort。
  1. const { MessageChannel } = require('worker_threads');
  2. const { port1, port2 } = new MessageChannel();
  3. port1.on('message', (message) => console.log('received', message));
  4. port2.postMessage({ foo: 'bar' });
  5. // Prints: received { foo: 'bar' } from the `port1.on('message')` listener
复制代码
通过MessageChannel,我们可以举行MessagePort间的通讯。
parentPort和MessagePort

parentPort是一个MessagePort类型,parentPort重要用于worker线程和主线程举行消息交互。
通过parentPort.postMessage()发送的消息在主线程中将可以通过worker.on(‘message')接收。
主线程中通过worker.postMessage()发送的消息将可以在工作线程中通过parentPort.on(‘message')接收。
我们看一下MessagePort的界说:
  1. class MessagePort extends EventEmitter {
  2.   close(): void;
  3.   postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
  4.   ref(): void;
  5.   unref(): void;
  6.   start(): void;
  7.   addListener(event: "close", listener: () => void): this;
  8.   addListener(event: "message", listener: (value: any) => void): this;
  9.   addListener(event: string | symbol, listener: (...args: any[]) => void): this;
  10.   emit(event: "close"): boolean;
  11.   emit(event: "message", value: any): boolean;
  12.   emit(event: string | symbol, ...args: any[]): boolean;
  13.   on(event: "close", listener: () => void): this;
  14.   on(event: "message", listener: (value: any) => void): this;
  15.   on(event: string | symbol, listener: (...args: any[]) => void): this;
  16.   once(event: "close", listener: () => void): this;
  17.   once(event: "message", listener: (value: any) => void): this;
  18.   once(event: string | symbol, listener: (...args: any[]) => void): this;
  19.   prependListener(event: "close", listener: () => void): this;
  20.   prependListener(event: "message", listener: (value: any) => void): this;
  21.   prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
  22.   prependOnceListener(event: "close", listener: () => void): this;
  23.   prependOnceListener(event: "message", listener: (value: any) => void): this;
  24.   prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
  25.   removeListener(event: "close", listener: () => void): this;
  26.   removeListener(event: "message", listener: (value: any) => void): this;
  27.   removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
  28.   off(event: "close", listener: () => void): this;
  29.   off(event: "message", listener: (value: any) => void): this;
  30.   off(event: string | symbol, listener: (...args: any[]) => void): this;
  31. }
复制代码
MessagePort继续自EventEmitter,它表现的是异步双向通讯channel的一端。这个channel就叫做MessageChannel,MessagePort通过MessageChannel来举行通讯。
我们可以通过MessagePort来传输结构体数据,内存地区大概其他的MessagePorts。
从源代码中,我们可以看到MessagePort中有两个变乱,close和message。
close变乱将会在channel的中任何一端断开毗连的时间触发,而message变乱将会在port.postMessage时间触发,下面我们看一个例子:
  1. const { MessageChannel } = require('worker_threads');
  2. const { port1, port2 } = new MessageChannel();
  3. // Prints:
  4. // foobar
  5. // closed!
  6. port2.on('message', (message) => console.log(message));
  7. port2.on('close', () => console.log('closed!'));
  8. port1.postMessage('foobar');
  9. port1.close();
复制代码
port.on(‘message')实际上为message变乱添加了一个listener,port还提供了addListener方法来手动添加listener。
port.on(‘message')会主动触发port.start()方法,表现启动一个port。
当port有listener存在的时间,这表现port存在一个ref,当存在ref的时间,步调是不会竣事的。我们可以通过调用port.unref方法来取消这个ref。
接下来我们看一下怎么通过port来传输消息:
  1. port.postMessage(value[, transferList])
复制代码
postMessage可以担当两个参数,第一个参数是value,这是一个JavaScript对象。第二个参数是transferList。
先看一个传递一个参数的情况:
  1. const { MessageChannel } = require('worker_threads');
  2. const { port1, port2 } = new MessageChannel();
  3. port1.on('message', (message) => console.log(message));
  4. const circularData = {};
  5. circularData.foo = circularData;
  6. // Prints: { foo: [Circular] }
  7. port2.postMessage(circularData);
复制代码
通常来说postMessage发送的对象都是value的拷贝,但是假如你指定了transferList,那么在transferList中的对象将会被transfer到channel的担当端,而且不再存在于发送端,就好像把对象传送出去一样。
transferList是一个list,list中的对象可以是ArrayBuffer, MessagePort 和 FileHandle。
假如value中包罗SharedArrayBuffer对象,那么该对象不能被包罗在transferList中。
看一个包罗两个参数的例子:
  1. const { MessageChannel } = require('worker_threads');
  2. const { port1, port2 } = new MessageChannel();
  3. port1.on('message', (message) => console.log(message));
  4. const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
  5. // post uint8Array的拷贝:
  6. port2.postMessage(uint8Array);
  7. port2.postMessage(uint8Array, [ uint8Array.buffer ]);
  8. //port2.postMessage(uint8Array);
复制代码
上面的例子将输出:
  1. Uint8Array(4) [ 1, 2, 3, 4 ]
  2. Uint8Array(4) [ 1, 2, 3, 4 ]
复制代码
第一个postMessage是拷贝,第二个postMessage是transfer Uint8Array底层的buffer。
假如我们再次调用port2.postMessage(uint8Array),我们会得到下面的错误:
  1. DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.
复制代码
buffer是TypedArray的底层存储结构,假如buffer被transfer,那么之前的TypedArray将会变得不可用。
markAsUntransferable

要想制止这个题目,我们可以调用markAsUntransferable将buffer标记为不可transferable. 我们看一个markAsUntransferable的例子:
  1. const { MessageChannel, markAsUntransferable } = require('worker_threads');
  2. const pooledBuffer = new ArrayBuffer(8);
  3. const typedArray1 = new Uint8Array(pooledBuffer);
  4. const typedArray2 = new Float64Array(pooledBuffer);
  5. markAsUntransferable(pooledBuffer);
  6. const { port1 } = new MessageChannel();
  7. port1.postMessage(typedArray1, [ typedArray1.buffer ]);
  8. console.log(typedArray1);
  9. console.log(typedArray2);
复制代码
SHARE_ENV

SHARE_ENV是传递给worker构造函数的一个env变量,通过设置这个变量,我们可以在主线程与工作线程举行共享情况变量的读写。
  1. const { Worker, SHARE_ENV } = require('worker_threads');
  2. new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
  3. .on('exit', () => {
  4. console.log(process.env.SET_IN_WORKER); // Prints 'foo'.
  5. });
复制代码
workerData

除了postMessage(),还可以通过在主线程中传递workerData给worker的构造函数,从而将主线程中的数据传递给worker:
  1. const { Worker, isMainThread, workerData } = require('worker_threads');
  2. if (isMainThread) {
  3. const worker = new Worker(__filename, { workerData: 'Hello, world!' });
  4. } else {
  5. console.log(workerData); // Prints 'Hello, world!'.
  6. }
复制代码
worker类

先看一下worker的界说:
  1. class Worker extends EventEmitter {
  2.   readonly stdin: Writable | null;
  3.   readonly stdout: Readable;
  4.   readonly stderr: Readable;
  5.   readonly threadId: number;
  6.   readonly resourceLimits?: ResourceLimits;
  7.   constructor(filename: string | URL, options?: WorkerOptions);
  8.   postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
  9.   ref(): void;
  10.   unref(): void;
  11.   terminate(): Promise<number>;
  12.   getHeapSnapshot(): Promise<Readable>;
  13.   addListener(event: "error", listener: (err: Error) => void): this;
  14.   addListener(event: "exit", listener: (exitCode: number) => void): this;
  15.   addListener(event: "message", listener: (value: any) => void): this;
  16.   addListener(event: "online", listener: () => void): this;
  17.   addListener(event: string | symbol, listener: (...args: any[]) => void): this;
  18.   ...
  19. }
复制代码
worker继续自EventEmitter,而且包罗了4个重要的变乱:error,exit,message和online。
worker表现的是一个独立的 JavaScript 实行线程,我们可以通过传递filename大概URL来构造worker。
每一个worker都有一对内置的MessagePort,在worker创建的时间就会相互关联。worker使用这对内置的MessagePort来和父线程举行通讯。
通过parentPort.postMessage()发送的消息在主线程中将可以通过worker.on(‘message')接收。
主线程中通过worker.postMessage()发送的消息将可以在工作线程中通过parentPort.on(‘message')接收。
当然,你也可以显式的创建MessageChannel 对象,然后将MessagePort作为消息传递给其他线程,我们看一个例子:
  1. const assert = require('assert');
  2. const {
  3. Worker, MessageChannel, MessagePort, isMainThread, parentPort
  4. } = require('worker_threads');
  5. if (isMainThread) {
  6. const worker = new Worker(__filename);
  7. const subChannel = new MessageChannel();
  8. worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
  9. subChannel.port2.on('message', (value) => {
  10. console.log('接收到:', value);
  11. });
  12. } else {
  13. parentPort.once('message', (value) => {
  14. assert(value.hereIsYourPort instanceof MessagePort);
  15. value.hereIsYourPort.postMessage('工作线程正在发送此消息');
  16. value.hereIsYourPort.close();
  17. });
  18. }
复制代码
上面的例子中,我们借助了worker和parentPort本身的消息传递功能,传递了一个显式的MessageChannel中的MessagePort。
然后又通过该MessagePort来举行消息的分发。
receiveMessageOnPort
除了port的on(‘message')方法之外,我们还可以使用receiveMessageOnPort来手动接收消息:
  1. const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
  2. const { port1, port2 } = new MessageChannel();
  3. port1.postMessage({ hello: 'world' });
  4. console.log(receiveMessageOnPort(port2));
  5. // Prints: { message: { hello: 'world' } }
  6. console.log(receiveMessageOnPort(port2));
  7. // Prints: undefined
复制代码
moveMessagePortToContext

先了解一下nodejs中的Context的概念,我们可以从vm中创建context,它是一个隔离的上下文情况,从而包管不同运行情况的安全性,我们看一个context的例子:
  1. const vm = require('vm');
  2. const x = 1;
  3. const context = { x: 2 };
  4. vm.createContext(context); // 上下文隔离化对象。
  5. const code = 'x += 40; var y = 17;';
  6. // `x` and `y` 是上下文中的全局变量。
  7. // 最初,x 的值为 2,因为这是 context.x 的值。
  8. vm.runInContext(code, context);
  9. console.log(context.x); // 42
  10. console.log(context.y); // 17
  11. console.log(x); // 1; y 没有定义。
复制代码
在worker中,我们可以将一个MessagePort move到其他的context中。
  1. worker.moveMessagePortToContext(port, contextifiedSandbox)
复制代码
这个方法接收两个参数,第一个参数就是要move的MessagePort,第二个参数就是vm.createContext()创建的context对象。
worker_threads的线程池

上面我们提到了使用单个的worker thread,但是现在步调中一个线程往往是不够的,我们必要创建一个线程池来维护worker thread对象。
nodejs提供了AsyncResource类,来作为对异步资源的扩展。
AsyncResource类是async_hooks模块中的。
下面我们看下怎么使用AsyncResource类来创建worker的线程池。
假设我们有一个task,使用来实行两个数相加,脚本名字叫做task_processor.js:
  1. const { parentPort } = require('worker_threads');
  2. parentPort.on('message', (task) => {
  3. parentPort.postMessage(task.a + task.b);
  4. });
复制代码
下面是worker pool的实现:
  1. const { AsyncResource } = require('async_hooks');
  2. const { EventEmitter } = require('events');
  3. const path = require('path');
  4. const { Worker } = require('worker_threads');
  5. const kTaskInfo = Symbol('kTaskInfo');
  6. const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
  7. class WorkerPoolTaskInfo extends AsyncResource {
  8. constructor(callback) {
  9. super('WorkerPoolTaskInfo');
  10. this.callback = callback;
  11. }
  12. done(err, result) {
  13. this.runInAsyncScope(this.callback, null, err, result);
  14. this.emitDestroy(); // `TaskInfo`s are used only once.
  15. }
  16. }
  17. class WorkerPool extends EventEmitter {
  18. constructor(numThreads) {
  19. super();
  20. this.numThreads = numThreads;
  21. this.workers = [];
  22. this.freeWorkers = [];
  23. for (let i = 0; i < numThreads; i++)
  24.   this.addNewWorker();
  25. }
  26. addNewWorker() {
  27. const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
  28. worker.on('message', (result) => {
  29.   // In case of success: Call the callback that was passed to `runTask`,
  30.   // remove the `TaskInfo` associated with the Worker, and mark it as free
  31.   // again.
  32.   worker[kTaskInfo].done(null, result);
  33.   worker[kTaskInfo] = null;
  34.   this.freeWorkers.push(worker);
  35.   this.emit(kWorkerFreedEvent);
  36. });
  37. worker.on('error', (err) => {
  38.   // In case of an uncaught exception: Call the callback that was passed to
  39.   // `runTask` with the error.
  40.   if (worker[kTaskInfo])
  41.   worker[kTaskInfo].done(err, null);
  42.   else
  43.   this.emit('error', err);
  44.   // Remove the worker from the list and start a new Worker to replace the
  45.   // current one.
  46.   this.workers.splice(this.workers.indexOf(worker), 1);
  47.   this.addNewWorker();
  48. });
  49. this.workers.push(worker);
  50. this.freeWorkers.push(worker);
  51. this.emit(kWorkerFreedEvent);
  52. }
  53. runTask(task, callback) {
  54. if (this.freeWorkers.length === 0) {
  55.   // No free threads, wait until a worker thread becomes free.
  56.   this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
  57.   return;
  58. }
  59. const worker = this.freeWorkers.pop();
  60. worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
  61. worker.postMessage(task);
  62. }
  63. close() {
  64. for (const worker of this.workers) worker.terminate();
  65. }
  66. }
  67. module.exports = WorkerPool;
复制代码
我们给worker创建了一个新的kTaskInfo属性,而且将异步的callback封装到WorkerPoolTaskInfo中,赋值给worker.kTaskInfo.
接下来我们就可以使用workerPool了:
  1. const WorkerPool = require('./worker_pool.js');
  2. const os = require('os');
  3. const pool = new WorkerPool(os.cpus().length);
  4. let finished = 0;
  5. for (let i = 0; i < 10; i++) {
  6. pool.runTask({ a: 42, b: 100 }, (err, result) => {
  7. console.log(i, err, result);
  8. if (++finished === 10)
  9.   pool.close();
  10. });
  11. }
复制代码
到此这篇关于nodejs中使用worker_threads来创建新的线程的方法的文章就先容到这了,更多相干nodejs使用worker_threads创建线程内容请搜索草根技能分享从前的文章或继续欣赏下面的相干文章盼望各人以后多多支持草根技能分享!

帖子地址: 

回复

使用道具 举报

分享
推广
火星云矿 | 预约S19Pro,享500抵1000!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

草根技术分享(草根吧)是全球知名中文IT技术交流平台,创建于2021年,包含原创博客、精品问答、职业培训、技术社区、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区。
  • 官方手机版

  • 微信公众号

  • 商务合作