libtask是什么?介绍libtask的基础原理及架构

时间:2021-02-23 08:22:22 来源: 编程杂技公众号


前言:假设读者已经了解了协程的概念,实现协程的底层技术支持。本文会介绍基于底层基础,如何实现协程以及协程的应用。

libtask是google大佬Russ Cox(Go的核心开发者)所写,本文介绍libtask的基础原理。我们从libtask的main函数开始,这个main函数就是我们在c语言中使用的c函数,libtask本身实现了main这个函数,用户使用libtask时,要实现的是taskmain函数。taskmain和main的函数声明是一样的。下面我们看一下main函数。

intmain(intargc,char**argv)

{

structsigactionsa,osa;

//注册SIGQUIT信号处理函数

memset(&sa,0,sizeofsa);

sa.sa_handler=taskinfo;

sa.sa_flags=SA_RESTART;

sigaction(SIGQUIT,&sa,&osa);

//保存命令行参数

argv0=argv[0];

taskargc=argc;

taskargv=argv;

if(mainstacksize==0)

mainstacksize=256*1024;

//创建第一个协程

taskcreate(taskmainstart,nil,mainstacksize);

//开始调度

taskscheduler();

fprint(2,"taskschedulerreturnedinmain!\n");

abort();

return0;

}

main函数主要的两个逻辑是taskcreate和taskscheduler函数。我们先来看taskcreate。

inttaskcreate(void(*fn)(void*),void*arg,uintstack)

{

intid;

Task*t;

t=taskalloc(fn,arg,stack);

taskcount++;

id=t->id;

//记录位置

t->alltaskslot=nalltask;

//保存到alltask中

alltask[nalltask++]=t;

//修改状态为就绪,可以被调度,并且加入到就绪队列

taskready(t);

returnid;

}

taskcreate首先调用taskalloc分配一个表示协程的结构体Task。我们看看这个结构体的定义。

structTask

{

charname[256];//offsetknowntoacid

charstate[256];

//前后指针

Task*next;

Task*prev;

Task*allnext;

Task*allprev;

//执行上下文

Contextcontext;

//睡眠时间

uvlongalarmtime;

uintid;

//栈信息

uchar*stk;

uintstksize;

//是否退出了

intexiting;

//在alltask的索引

intalltaskslot;

//是否是系统协程

intsystem;

//是否就绪状态

intready;

//入口函数

void(*startfn)(void*);

//入口参数

void*startarg;

//自定义数据

void*udata;

};

接着看看taskalloc的实现。

//分配一个协程所需要的内存和初始化某些字段

staticTask*

taskalloc(void(*fn)(void*),void*arg,uintstack)

{

Task*t;

sigset_tzero;

uintx,y;

ulongz;

/*allocatethetaskandstacktogether*/

//结构体本身的大小+栈大小

t=malloc(sizeof*t+stack);

memset(t,0,sizeof*t);

//栈的内存位置

t->stk=(uchar*)(t+1);

//栈大小

t->stksize=stack;

//协程id

t->id=++taskidgen;

//协程工作函数和参数

t->startfn=fn;

t->startarg=arg;

/*doareasonableinitialization*/

memset(&t->context.uc,0,sizeoft->context.uc);

sigemptyset(&zero);

//初始化uc_sigmask字段为空,即不阻塞信号

sigprocmask(SIG_BLOCK,&zero,&t->context.uc.uc_sigmask);

/*mustinitializewithcurrentcontext*/

//初始化uc字段

getcontext(&t->context.uc)

//设置协程执行时的栈位置和大小

t->context.uc.uc_stack.ss_sp=t->stk+8;

t->context.uc.uc_stack.ss_size=t->stksize-64;

z=(ulong)t;

y=z;

z>>=16;/*hideundefined32-bitshiftfrom32-bitcompilers*/

x=z>>16;

//保存信息到uc字段

makecontext(&t->context.uc,(void(*)())taskstart,2,y,x);

returnt;

}

taskalloc函数代码看起来很多,但是逻辑不算复杂,就是申请Task结构体所需的内存和执行时栈的内存,然后初始化各个字段。这样,一个协程就诞生了。接着执行taskready把协程加入就绪队列。

//修改协程的状态为就绪并加入就绪队列

voidtaskready(Task*t)

{

t->ready=1;

addtask(&taskrunqueue,t);

}

//把协程插入队列中,如果之前在其他队列,则会被移除

voidaddtask(Tasklist*l,Task*t)

{

if(l->tail){

l->tail->next=t;

t->prev=l->tail;

}else{

l->head=t;

t->prev=nil;

}

l->tail=t;

t->next=nil;

}

taskrunqueue记录了所有就绪的协程。创建了协程并加入队列后,协程还没有开始执行,就像操作系统的进程和线程一样,需要有一个调度器来调度执行。下面我们看看调度器的实现。

//协程调度中心

staticvoidtaskscheduler(void)

{

inti;

Task*t;

for(;;){

//没有用户协程了,则退出

if(taskcount==0)

exit(taskexitval);

//从就绪队列拿出一个协程

t=taskrunqueue.head;

if(t==nil){

fprint(2,"norunnabletasks!%dtasksstalled\n",taskcount);

exit(1);

}

//从就绪队列删除该协程

deltask(&taskrunqueue,t);

t->ready=0;

//保存正在执行的协程

taskrunning=t;

//切换次数加一

tasknswitch++;

//切换到t执行,并且保存当前上下文到taskschedcontext(即下面要执行的代码)

contextswitch(&taskschedcontext,&t->context);

//执行到这说明没有协程在执行(t切换回来的),置空

taskrunning=nil;

//刚才执行的协程t退出了

if(t->exiting){

//不是系统协程,则个数减一

if(!t->system)

taskcount--;

//当前协程在alltask的索引

i=t->alltaskslot;

//把最后一个协程换到当前协程的位置,因为他要退出了

alltask[i]=alltask[--nalltask];

//更新被置换协程的索引

alltask[i]->alltaskslot=i;

//释放堆内存

free(t);

}

}

}

调度器的代码看起来很多,但是核心逻辑就三个 1 从就绪队列中拿出一个协程t,并把t移出就绪队列 2 通过contextswitch切换到协程t中执行 3 协程t切换回调度中心,如果t已经退出,则修改数据结构,然后回收他占据的内存。如果t没退出,则继续调度其他协程执行。至此,协程就开始跑起来了。并且也有了调度系统。这里的调度机制是比较简单的,就是按着先进先出的方式就绪调度,并且是非抢占的。即没有按时间片调度的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用taskyield让出cpu。

//协程主动让出cpu

inttaskyield(void)

{

intn;

//当前切换协程的次数

n=tasknswitch;

//插入就绪队列,等待后续的调度

taskready(taskrunning);

taskstate("yield");

//切换协程

taskswitch();

//等于0说明当前只有自己一个协程,调度的时候tasknswitch加一,所以这里减一

returntasknswitch-n-1;

}

/*

切换协程,taskrunning是正在执行的协程,taskschedcontext是调度协程(主线程)的上下文,

切换到调度中心,并保持当前上下文到taskrunning->context

*/

voidtaskswitch(void)

{

needstack(0);

contextswitch(&taskrunning->context,&taskschedcontext);

}

//真正切换协程的逻辑

staticvoidcontextswitch(Context*from,Context*to)

{

if(swapcontext(&from->uc,&to->uc)<0){

fprint(2,"swapcontextfailed:%r\n");

assert(0);

}

}

yield的逻辑也很简单,因为协程在执行的时候,是不处于就绪队列的,当协程准备让出cpu时,协程首先把自己重新加入到就绪队列,等待下次被调度执行。当然我们也可以直接调度contextswitch切换到其他协程。重点在于什么时候应该让出cpu,又什么时候应该被调度执行。接下来会详细讲解。至此,我们已经有了支持协程所需要的底层基础。我们看到这个实现的思路也不是很复杂,首先有一个队列表示待执行的的协程,每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。接下来我们看看,基于这些底层基础,如果实现一个基于协程的服务器。下面我们通过一个例子进行讲解。

void

taskmain(intargc,char**argv)

{

//启动一个tcp服务器

if((fd=netannounce(TCP,0,atoi(argv[1])))<0){

//...

}

//改为非阻塞模式

fdnoblock(fd);

//accept成功后创建一个客户端协程

while((cfd=netaccept(fd,remote,&rport))>=0){

taskcreate(proxytask,(void*)cfd,STACK);

}

}

我们刚才讲过taskmain是我们需要实现的函数,首先通过netannounce建立一个tcp服务器。接着把fd改成非阻塞的,这个非常重要,因为在后面调用accept的时候,如果是阻塞的文件描述符,那么就会引起进程挂起,而非阻塞模式下,操作系统会返回EAGAIN的错误码,通过这个错误码我们可以决定下一步做什么。我们看看netaccept的实现。

//处理(摘下)连接

int

netaccept(intfd,char*server,int*port)

{

intcfd,one;

structsockaddr_insa;

uchar*ip;

socklen_tlen;

//注册事件到epoll,等待事件触发

fdwait(fd,'r');

len=sizeofsa;

//触发后说明有连接了,则执行accept

if((cfd=accept(fd,(void*)&sa,&len))<0){

return-1;

}

//和客户端通信的fd也改成非阻塞模式

fdnoblock(cfd);

one=1;

setsockopt(cfd,IPPROTO_TCP,TCP_NODELAY,(char*)&one,sizeofone);

returncfd;

}

netaccept就是通过调用accept逐个处理tcp连接,但是在accept之前,有一个非常重要的操作fdwait。

//协程因为等待io需要切换

voidfdwait(intfd,intrw)

{

//是否已经初始化epoll

if(!startedfdtask){

startedfdtask=1;

epfd=epoll_create(1);

//没有初始化则创建一个协程,做io管理

taskcreate(fdtask,0,32768);

}

structepoll_eventev={0};

//记录事件对应的协程和感兴趣的事件

ev.data.ptr=taskrunning;

switch(rw){

case'r':

ev.events|=EPOLLIN|EPOLLPRI;

break;

case'w':

ev.events|=EPOLLOUT;

break;

}

intr=epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);

//切换到其他协程,等待被唤醒

taskswitch();

//唤醒后函数刚才注册的事件

epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&ev);

}

fdwait首先把fd注册到epoll中,然后把协程切换到下一个待执行的协程。这里有个细节,当协程X被调度执行的时候,他是脱离了就绪队列的,而taskswitch函数只是实现了切换上下文到调度中心,调度中心会从就绪队列从选择下一个协程执行,那么这时候,脱离就绪队列的协程X就处于孤岛状态,看起来再也无法给调度中心选中执行,这个问题的处理方式是,把协程、fd和感兴趣的事件信息一起注册到epoll中,当epoll监听到某个fd的事件发生时,就会把对应的协程加入就绪队列,这样协程就可以被调度执行了。在fdwait函数一开始那里处理了epoll相关的逻辑。epoll的逻辑也是在一个协程中执行的,但是epoll所在协程和一般协程不一样,类似于操作系统的内核线程一样,epoll所在的协程成为系统协程,即不是用户定义的,而是系统定义的。我们看一下实现

voidfdtask(void*v)

{

inti,ms;

Task*t;

uvlongnow;

//变成系统协程

tasksystem();

structepoll_eventevents[1000];

for(;;){

/*leteveryoneelserun*/

//大于0说明还有其他就绪协程可执行,则先让给他们执行,否则往下执行

while(taskyield()>0)

;

/*we'retheonlyonerunnable-pollfori/o*/

errno=0;

//没有定时事件则一直阻塞

if((t=sleeping.head)==nil)

ms=-1;

else{

/*sleepatmost5s*/

now=nsec();

if(now>=t->alarmtime)

ms=0;

elseif(now+5*1000*1000*1000LL>=t->alarmtime)

ms=(t->alarmtime-now)/1000000;

else

ms=5000;

}

intnevents;

//等待事件发生,ms是等待的超时时间

if((nevents=epoll_wait(epfd,events,1000,ms))<0){

if(errno==EINTR)

continue;

fprint(2,"epoll:%s\n",strerror(errno));

taskexitall(0);

}

/*wakeuptheguyswhodeserveit*/

//事件触发,把对应协程插入就绪队列

for(i=0;i

taskready((Task*)events[i].data.ptr);

}

now=nsec();

//处理超时事件

while((t=sleeping.head)&&now>=t->alarmtime){

deltask(&sleeping,t);

if(!t->system&&--sleepingcounted==0)

taskcount--;

taskready(t);

}

}

}

我们看到epoll的处理逻辑和一般服务器的类似,通过epoll_wait阻塞,然后epoll_wait返回时,处理每一个发生的事件,而且libtask还支持超时事件。另外libtask中当还有其他就绪协程的时候,是不会进入epoll_wait的,它会把cpu让给就绪的协程(通过taskyield函数),当就绪队列只有epoll所在的协程时才会进入epoll的逻辑。至此,我们看到了libtask中如何把异步变成同步的。当用户要调用一个可能会引起进程挂起的接口时,就可以调用libtask提供的一个相应的API,比如我们想读一个文件,我们可以调用libtask的fdread。

int

fdread(intfd,void*buf,intn)

{

intm;

//非阻塞读,如果不满足则再注册到epoll,参考fdread1

while((m=read(fd,buf,n))<0&&errno==EAGAIN)

fdwait(fd,'r');

returnm;

}

这样就不需要担心进程被挂起,同时也不需要处理epoll相关的逻辑(注册事件,事件触发时的处理等等)。异步转同步,libtask的方式就是通过提供对应的API,先把用户的fd注册到epoll中,然后切换到其他协程,等epoll监听到事件触发时,就会把对应的协程插入就绪队列,当该协程被调度中心选中执行时,就会继续执行剩下的逻辑而不会引起进程挂起,因为这时候所等待的条件已经满足。

总结:libtask的设计思想就是把业务逻辑封装到一个个协程中,由libtask实现协程的调度,在各个业务逻辑中进行切换,从而驱动着系统的运行。另外libtask也提供了一个网络和文件io异步变同步的解决方案。使得我们使用起来更加方便,高效。今天先讲到这里。


关于我们 加入我们 广告服务 网站地图

All Rights Reserved, Copyright 2004-2020 www.ctocio.com.cn

如有意见请与我们联系 邮箱:5 53 13 8 [email protected]

豫ICP备20005723号    IT专家网 版权所有 

营业执照