Nodejs线程池的设计与实现 设计线程池需要考虑什么?

时间:2020-10-19 10:59:34 来源: 编程杂技


前言:之前的版本不方便开放,重新设计了一版nodejs的线程池库,本文介绍该库的一些设计和实现。

nodejs虽然提供了线程的能力,但是很多时候,往往不能直接使用线程或者无限制地创建线程,比如我们有一个功能是cpu密集型的,如果一个请求就开一个线程,这很明显不是最好的实践,这时候,我们需要使用池化的技术,本文介绍在nodejs线程模块的基础上,如何设计和实现一个线程池库(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是线程池的总体架构。

设计一个线程池,在真正写代码之前,有很多设计需要考虑,大概如下:

1任务队列的设计,一个队列,多个线程互斥访问,或者每个线程一个队列,不需要互斥访问。

2 线程退出的设计,可以由主线程检测空闲线程,然后使子线程退出。或者子线程退出,通知主线程。空闲不一定是没有任务就退出,可以设计空闲时间达到阈值后退出,因为创建线程是有时间开销的。

3 任务数的设计,每个线程可以有个任务数,还可以增加一个总任务数,即全部线程任务数加起来

4 选择线程的设计,选择任务数最少的线程。

5 线程类型的设计,可以区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。

6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。

7 支持任务的取消和超时机制,防止一个任务时间过长或者死循环。

本文介绍的线程池具体设计思想如下(参考java):

1 主线程维护一个队列,子线程的任务由子线程负责分发,不需要互斥访问,子线程也不需要维护自己的队列。

2 线程退出的设计,主线程负责检查子线程空闲时间是否达到阈值,是则使子线程退出。

3 任务数的设计,主线程负责管理任务个数并应有相应的策略。

4 选择线程的设计,选择任务数最少的线程。

5 线程类型的设计,区分核心线程和预备线程,任务少的时候,核心线程处理就行。任务多也创建预备线程帮忙处理。

6 线程池类型的设计,cpu密集型的,线程数等于核数,否则自定义线程数就行。

7 支持任务的取消和超时机制,超时或者取消的时候,主线程判断任务是待执行还是正在执行,如果是待执行则从任务队列中删除,如果是正在执行则杀死对应的子线程。下面我们看一下具体的设计。

1 主线程和子线程通信的数据结构

//任务类,一个任务对应一个id

classWork{

constructor({workId,filename,options}){

//任务id

this.workId=workId;

//任务逻辑,字符串或者js文件路径

this.filename=filename;

//任务返回的结果

this.data=null;

//任务返回的错误

this.error=null;

//执行任务时传入的参数,用户定义

this.options=options;

}

}

主线程给子线程分派一个任务的时候,就给子线程发送一个Work对象。在nodejs中线程间通信需要经过序列化和反序列化,所以通信的数据结构包括的信息不能过多。

2 子线程处理任务逻辑

const{parentPort}=require('worker_threads');

constvm=require('vm');

const{isFunction,isJSFile}=require('./utils');

//监听主线程提交过来的任务

parentPort.on('message',async(work)=>{

try{

const{filename,options}=work;

letaFunction;

if(isJSFile(filename)){

aFunction=require(filename);

}else{

aFunction=vm.runInThisContext(`(${filename})`);

}

if(!isFunction(aFunction)){

thrownewError('worktypeerror:jsfileorstring');

}

work.data=awaitaFunction(options);

parentPort.postMessage({event:'done',work});

}catch(error){

work.error=error.toString();

parentPort.postMessage({event:'error',work});

}

});

process.on('uncaughtException',(...rest)=>{

console.error(...rest);

});

process.on('unhandledRejection',(...rest)=>{

console.error(...rest);

});

子线程的逻辑比较简单,就是监听主线程分派过来的任务,然后执行任务,执行完之后通知主线程。任务支持js文件和字符串代码的形式。需要返回一个Promise或者async函数。用于用于通知主线程任务已经完成。

3 线程池和业务的通信

//提供给用户侧的接口

classUserWorkextendsEventEmitter{

constructor({workId}){

super();

//任务id

this.workId=workId;

//支持超时取消任务

this.timer=null;

//任务状态

this.state=WORK_STATE.PENDDING;

}

//超时后取消任务

setTimeout(timeout){

this.timer=setTimeout(()=>{

this.timer&&this.cancel()&&this.emit('timeout');

},~~timeout);

}

//取消之前设置的定时器

clearTimeout(){

clearTimeout(this.timer);

this.timer=null;

}

//直接取消任务,如果执行完了就不能取消了,this.terminate是动态设置的

cancel(){

if(this.state===WORK_STATE.END||this.state===WORK_STATE.CANCELED){

returnfalse;

}else{

this.terminate();

returntrue;

}

}

//修改任务状态

setState(state){

this.state=state;

}

}

业务提交一个任务给线程池的时候,线程池会返回一个UserWork类,业务侧通过UserWork类和线程池通信。

4 管理子线程的数据结构

//管理子线程的数据结构

classThread{

constructor({worker}){

//nodejs的Worker对象,nodejs的worker_threads模块的Worker

this.worker=worker;

//线程状态

this.state=THREAD_STATE.IDLE;

//上次工作的时间

this.lastWorkTime=Date.now();

}

//修改线程状态

setState(state){

this.state=state;

}

//修改线程最后工作时间

setLastWorkTime(time){

this.lastWorkTime=time;

}

}

线程池中维护了多个子线程,Thread类用于管理子线程的信息。

5 线程池 线程池的实现是核心,我们分为几个部分讲。

5.1 支持的配置

constructor(options={}){

this.options=options;

//子线程队列

this.workerQueue=[];

//核心线程数

this.coreThreads=~~options.coreThreads||config.CORE_THREADS;

//线程池最大线程数,如果不支持动态扩容则最大线程数等于核心线程数

this.maxThreads=options.expansion!==false?Math.max(this.coreThreads,config.MAX_THREADS):this.coreThreads;

//超过任务队列长度时的处理策略

this.discardPolicy=options.discardPolicy?options.discardPolicy:DISCARD_POLICY.NOT_DISCARD;

//是否预创建子线程

this.preCreate=options.preCreate===true;

//线程最大空闲时间,达到后自动退出

this.maxIdleTime=~~options.maxIdleTime||config.MAX_IDLE_TIME;

//是否预创建线程池

this.preCreate&&this.preCreateThreads();

//保存线程池中任务对应的UserWork

this.workPool={};

//线程池中当前可用的任务id,每次有新任务时自增1

this.workId=0;

//线程池中的任务队列

this.queue=[];

//线程池总任务数

this.totalWork=0;

//支持的最大任务数

this.maxWork=~~options.maxWork||config.MAX_WORK;

//处理任务的超时时间,全局配置

this.timeout=~~options.timeout;

this.pollIdle();

}

上面的代码列出了线程池所支持的能力。

5.2 创建线程

newThread(){

constworker=newWorker(workerPath);

constthread=newThread({worker});

this.workerQueue.push(thread);

constthreadId=worker.threadId;

worker.on('exit',()=>{

//找到该线程对应的数据结构,然后删除该线程的数据结构

constposition=this.workerQueue.findIndex(({worker})=>{

returnworker.threadId===threadId;

});

constexitedThread=this.workerQueue.splice(position,1);

//退出时状态是BUSY说明还在处理任务(非正常退出)

this.totalWork-=exitedThread.state===THREAD_STATE.BUSY?1:0;

});

//和子线程通信

worker.on('message',(result)=>{

const{

work,

event,

}=result;

const{data,error,workId}=work;

//通过workId拿到对应的userWork

constuserWork=this.workPool[workId];

//不存在说明任务被取消了

if(!userWork){

return;

}

//修改线程池数据结构

this.endWork(userWork);

//修改线程数据结构

thread.setLastWorkTime(Date.now());

//还有任务则通知子线程处理,否则修改子线程状态为空闲

if(this.queue.length){

//从任务队列拿到一个任务交给子线程

this.submitWorkToThread(thread,this.queue.shift());

}else{

thread.setState(THREAD_STATE.IDLE);

}

switch(event){

case'done':

//通知用户,任务完成

userWork.emit('done',data);

break;

case'error':

//通知用户,任务出错

if(EventEmitter.listenerCount(userWork,'error')){

userWork.emit('error',error);

}

break;

default:break;

}

});

worker.on('error',(...rest)=>{

console.error(...rest);

});

returnthread;

}

创建线程,并保持线程对应的数据结构、退出、通信管理、任务分派。子线程执行完任务后,会通知线程池,主线程通知用户。

5.3 选择线程

selectThead(){

//找出空闲的线程,把任务交给他

for(leti=0;i

if(this.workerQueue[i].state===THREAD_STATE.IDLE){

returnthis.workerQueue[i];

}

}

//没有空闲的则随机选择一个

returnthis.workerQueue[~~(Math.random()*this.workerQueue.length)];

}

当用户给线程池提交一个任务时,线程池会选择一个空闲的线程处理该任务。如果没有可用线程则任务插入待处理队列等待处理。

5.4 提交任务

//给线程池提交一个任务

submit(filename,options={}){

returnnewPromise(async(resolve,reject)=>{

letthread;

//没有线程则创建一个

if(this.workerQueue.length){

thread=this.selectThead();

//该线程还有任务需要处理

if(thread.state===THREAD_STATE.BUSY){

//子线程个数还没有达到核心线程数,则新建线程处理

if(this.workerQueue.length

thread=this.newThread();

}elseif(this.totalWork+1>this.maxWork){

//总任务数已达到阈值,还没有达到线程数阈值,则创建

if(this.workerQueue.length

thread=this.newThread();

}else{

//处理溢出的任务

switch(this.discardPolicy){

caseDISCARD_POLICY.ABORT:

returnreject(newError('queueoverflow'));

caseDISCARD_POLICY.CALLER_RUN:

constworkId=this.generateWorkId();

constuserWork=newUserWork({workId});

userWork.setState(WORK_STATE.RUNNING);

userWork.terminate=()=>{

userWork.setState(WORK_STATE.CANCELED);

};

this.timeout&&userWork.setTimeout(this.timeout);

resolve(userWork);

try{

letaFunction;

if(isJSFile(filename)){

aFunction=require(filename);

}else{

aFunction=vm.runInThisContext(`(${filename})`);

}

if(!isFunction(aFunction)){

thrownewError('worktypeerror:jsfileorstring');

}

constresult=awaitaFunction(options);

//延迟通知,让用户有机会取消或者注册事件

setImmediate(()=>{

if(userWork.state!==WORK_STATE.CANCELED){

userWork.setState(WORK_STATE.END);

userWork.emit('done',result);

}

});

}catch(error){

setImmediate(()=>{

if(userWork.state!==WORK_STATE.CANCELED){

userWork.setState(WORK_STATE.END);

userWork.emit('error',error.toString());

}

});

}

return;

caseDISCARD_POLICY.OLDEST_DISCARD:

constwork=this.queue.shift();

//maxWork为1时,work会为空

if(work&&this.workPool[work.workId]){

this.cancelWork(this.workPool[work.workId]);

}else{

returnreject(newError('noworkcanbediscarded'));

}

break;

caseDISCARD_POLICY.DISCARD:

returnreject(newError('discard'));

caseDISCARD_POLICY.NOT_DISCARD:

break;

default:

break;

}

}

}

}

}else{

thread=this.newThread();

}

//生成一个任务id

constworkId=this.generateWorkId();

//新建一个UserWork

constuserWork=newUserWork({workId});

this.timeout&&userWork.setTimeout(this.timeout);

//新建一个work

constwork=newWork({workId,filename,options});

//修改线程池数据结构,把UserWork和Work关联起来

this.addWork(userWork);

//选中的线程正在处理任务,则先缓存到任务队列

if(thread.state===THREAD_STATE.BUSY){

this.queue.push(work);

userWork.terminate=()=>{

this.cancelWork(userWork);

this.queue=this.queue.filter((node)=>{

returnnode.workId!==work.workId;

});

}

}else{

this.submitWorkToThread(thread,work);

}

resolve(userWork);

})

}

submitWorkToThread(thread,work){

constuserWork=this.workPool[work.workId];

userWork.setState(WORK_STATE.RUNNING);

//否则交给线程处理,并修改状态和记录该线程当前处理的任务id

thread.setState(THREAD_STATE.BUSY);

thread.worker.postMessage(work);

userWork.terminate=()=>{

this.cancelWork(userWork);

thread.setState(THREAD_STATE.DEAD);

thread.worker.terminate();

}

}

addWork(userWork){

userWork.setState(WORK_STATE.PENDDING);

this.workPool[userWork.workId]=userWork;

this.totalWork++;

}

endWork(userWork){

deletethis.workPool[userWork.workId];

this.totalWork--;

userWork.setState(WORK_STATE.END);

userWork.clearTimeout();

}

cancelWork(userWork){

deletethis.workPool[userWork.workId];

this.totalWork--;

userWork.setState(WORK_STATE.CANCELED);

userWork.emit('cancel');

}

提交任务是线程池暴露给用户侧的接口,主要处理的逻辑包括,根据当前的策略判断是否需要新建线程、选择线程处理任务、排队任务等,如果任务数达到阈值,则根据丢弃策略处理该任务。

5.5 空闲处理

pollIdle(){

setTimeout(()=>{

for(leti=0;i

constnode=this.workerQueue[i];

if(node.state===THREAD_STATE.IDLE&&Date.now()-node.lastWorkTime>this.maxIdleTime){

node.worker.terminate();

}

}

this.pollIdle();

},1000);

}

当子线程空闲时间达到阈值后,主线程会杀死子线程,避免浪费系统资源。总结,这就是线程池具体的设计和实现,另外创建线程失败会导致主线程挂掉,所以使用线程的时候,最后新开一个子进程来管理该线程池。

关键词:Nodejs 线程池

关于我们 加入我们 广告服务 网站地图

All Rights Reserved, Copyright 2004-2020 www.ctocio.com.cn

如有意见请与我们联系 邮箱:5 53 13 8 [email protected]

豫ICP备20005723号    IT专家网 版权所有