用C++实现简单的协作线程

2020-05-25 03:21:17

斯蒂芬·布伦南·2020年5月24日,就像操作系统提供的许多服务一样,多任务处理是一件理所当然的事情,以至于感觉很平凡。有了我们功能强大的智能手机和计算机,一台计算机不能同时处理数百个进程的想法让人感觉很陌生。我认为正是这样的功能让计算机变得非常有用,但也让它们感觉如此复杂和神奇。

很难玩弄实现多任务的代码,如果不构建一个完整的操作系统,如何自己实现也不是很明显。我坚信,只有自己实现了一些东西,才能真正理解它,所以我想写一篇文章,让人们可以尝试简单的线程实现。在这篇文章中,我们将用普通的C程序(不是操作系统)实现简单的线程。

此调度程序将严重依赖函数setjmp()和long jmp()。他们感觉有点神奇,所以我想先描述一下他们是做什么的,然后花一点时间来揭开他们是如何做到这一点的。

函数setjmp()是一种记录有关程序执行位置的信息的方式,以便您稍后可以跳回该点。您为Ita提供了JMP_buf类型的变量,它将在其中存储该信息。setjmp()第一次返回时返回0。

稍后,您可以使用函数long jmp(jmp_buf,value)从您调用setjmp()的位置立即开始执行。对于您的程序来说,它看起来像是第二次返回setjmp()。这次将返回您传递给long jmp()的值参数,以帮助区分第二轮。这里有一个示例来帮助说明:

#include<;stdio.h>;#include<;setjmp.h>;jmp_buf saved_location;int main(int argc,char**argv){if(setjmp(Save_Location)==0){printf(";我们已成功设置跳转缓冲区!\n";);}否则{printf(";我们跳过!\n";);返回0;正在准备跳转!\n";);long jmp(save_location,1);printf(";这永远不会执行.。\n";);返回0;}。

狂野!它类似于GOTO语句,但它甚至可以用来跳出函数。它也比goto更难读,因为它看起来像一个常规的函数调用。如果您的代码随意使用setjmp()和long jmp(),那么任何人(包括您自己)阅读起来都会非常困惑。

与goto一样,常见的建议是避免setjmp()和long jmp(),但是,就像goto一样,有时单独使用并以一致的方式使用它也很有用。调度器需要能够切换上下文,因此我们必须负责任地使用这些功能。最重要的是,我们将对API隐藏这些函数的使用,这样我们的调度器的用户就不必处理这种复杂性。

然而,setjmp()和long jmp()函数并不是为支持任何类型的跳转而设计的。它们是为一种非常特殊的用例而设计的。假设您正在做一些复杂的事情,比如创建一个HttpRequest。这将涉及一组复杂的函数调用,如果其中任何一个调用失败,您将需要从每个函数返回一个特殊的错误代码。这将导致在您调用函数的任何地方(可能数十次)都会出现如下代码:

setjmp()和long jmp()的思想是,您可以在开始一些复杂的操作之前使用setjmp()来保存位置。然后,您可以将所有错误处理集中到一个位置:

int rv;jmp_buf buf;if((rv=setjmp(Buf))!=0){/*在此处理错误*/return;}do_Complex_task(buf,args.);

如果do_Complex_task()中涉及的任何函数失败,它只会是long jmp(buf,error_code)。这意味着do_Complex_task()中的每个函数都可以假定每个函数调用都是成功的,这意味着您可以为每个函数调用去掉错误处理代码。(在实践中,这几乎从来没有做过,但那是一篇单独的博客文章。)。

这里的重要思想是,long jmp()只允许您跳出深度嵌套函数。您不能跳回以前跳出的深度嵌套函数。下面是跳出函数时堆栈的图示。星号(*)标记setjmp()存储的堆栈指针。

long_mp前栈|long jmp后栈|+-+-stack()(*)|Main()增长|do_http_request()|down|send_a_Header()|Write_Bytes()|v|Write()-失败!

您可以看到,我们只在堆栈中向上移动,因此没有数据损坏的风险。另一方面,想象一下如果您想要在任务之间跳转。如果您调用setjmp()然后返回,执行一些其他操作,然后尝试恢复之前所做的操作,您将遇到一个问题:

|setjmp()处堆栈|稍后堆栈|long jmp()后堆栈|+-stack()|Main()|Main()增长|do_task_one()|do_task_Two()|do_stack_Two()向下|subtask()。|subtask()|subtask()||foo()|??V|bar()(*)|(*)|?(*)

setjmp()保存的堆栈指针将指向不再存在的堆栈帧,并且可能已在某个点被其他数据覆盖。当您尝试将long jmp()返回到已经返回的函数时,您将开始体验到一些非常奇怪的行为,这可能会使您的程序崩溃。

这个故事的寓意是,如果您想要使用setjmp()和long jmp()在这样的复杂任务之间跳转,您需要确保每个任务都有自己的单独堆栈。这完全消除了问题,因为当long jmp()重置堆栈指针时,它将为您交换堆栈,并且不会发生堆栈覆盖。

这有点费时费力,但是有了这些知识,我们应该能够实现用户空间线程。开始时,我发现设计用于初始化、创建和运行线程的API非常有帮助。提前做这件事真的有助于理解我们正在努力构建的是什么!

这些函数将用于初始化调度器、添加任务,然后最终开始在调度器中运行任务。一旦我们启动Scheduler_Run(),它将一直运行到所有任务完成。对于正在运行的任务,它们将有以下接口:

第一个函数将退出任务。任务也可以通过返回FROMITS函数退出,所以这只是为了方便。第二个函数是我们的线程将如何告诉调度器让另一个任务运行一段时间。当任务调用Scheduler_RELINISH()时,当其他任务运行时,它可能会暂停一段时间,但最终函数会返回,任务可以继续运行。

为了给出API的具体示例,下面是此API的假设用法,我们将使用它来测试我们的调度器:

#include<;stdlib.h>;#include<;stdio.h>;#include";Scheduler.h";struct tester_args{char*name;int iters;};void tester(void*arg){int i;struct tester_args*ta=(struct tester_args*)arg;for(i=0;i<;ta->;iters;任务%s:%d\n";,ta-&>;name,i);Scheduler_relquiish();}free(Ta);}void create_test_task(char*name,int iters){struct tester_args*ta=malloc(sizeof(*ta));ta->;name=name;ta->;iters=iters;Scheduler_create_task(tester,ta);}int main(int argc,char**argv){Scheduler_init();create_test_task(";first";,5);create_test_task(";Second";,2);Scheduler_Run();printf(";已运行完所有任务!\n";);return exit_SUCCESS;}。

在本例中,我们创建了两个运行同一函数的任务,但它们将使用不同的参数,以便我们可以分别跟踪它们的执行情况。每个任务都迭代设定的次数。每次迭代,它都会打印出一条消息,然后让另一个任务运行。我们期望看到类似以下内容的内容作为此程序的输出:

任务第一:0任务第二:0任务第一:1任务第二:1任务第一:2任务第一:3任务第一:4所有任务运行完毕!

要实现此API,我们需要某种ATASK的内部表示形式,因此让我们继续将我们需要的字段放在一起:

struct task{enum{ST_CREATED,ST_RUNNING,ST_WAITING,}status;int id;jmp_buf buf;void(*func)(void*);void*arg;struct sc_list_head task_list;void*STACK_Bottom;void*STACK_TOP;INT STACK_SIZE;};

让我们把田地一个接一个地看一遍。所有任务在创建后应立即处于“已创建”状态。一旦任务开始执行,它将处于“运行”状态,如果任务需要等待某些异步操作,则可以将其置于“等待”状态。id字段只是任务的唯一标识符。buf包含我们在long jmp()恢复任务时的数据。func和arg被传递给Scheduler_create_task(),它们是启动任务所必需的。TASK_LIST字段是实现所有任务的双向链表所必需的。STACK_BOTTH、STACK_TOP和STACK_SIZE字段都与为此任务分配的单独堆栈相关。“底部”是malloc()返回的地址,但“顶部”指向内存区域正上方的地址。由于x86堆栈向下增长,我们需要将堆栈指针设置为STACK_TOP而不是STACK_BOTLOW。

void Scheduler_CREATE_TASK(void(*func)(void*),void*arg){static int id=1;struct task*task=malloc(sizeof(*task));task->;status=ST_Created;task->;func=func;task->;arg=arg;task->;id=id++;task->;stack_size=16*1024;task->;stack。STACK_SIZE);TASK->;STACK_TOP=TASK->;STACK_Bottom+TASK->;STACK_SIZE;sc_list_insert_end(&;pri.。TASK_LIST,&;TASK->;TASK_LIST);}。

使用静态int可以确保每次调用函数时,idfield都会递增到一个新数字。其他一切都应该不言而喻,除了sc_list_insert_end(),它只是将struct任务添加到全局列表中。全局列表存储在包含所有私有调度器数据的第二结构中。该结构及其初始化功能如下所示:

struct Scheduler_Private{jmp_buf buf;struct task*current;struct sc_list_head task_list;}priv;void Scheduler_init(Void){priv.。CURRENT=NULL;sc_list_init(&;pri.。TASK_LIST);}。

TASK_LIST字段用于引用任务列表(毫不奇怪),CURRENT字段用于存储当前正在执行的任务(如果当前没有任何任务正在运行,则为NULL)。最重要的是,buf字段将用于跳转到Scheduler_Run()的代码:

enum{INIT=0,Schedule,exit_task,};void Scheduler_Run(Void){/*这是调度程序的退出路径!*/switch(setjmp(priv.。buf){case exit_task:SCHEDLER_FREE_CURRENT_TASK();CASE INIT:CASE Schedule:Schedule();/*如果返回,则没有其他事情可做,我们退出*/return;默认值:fprintf(stderr,";Uh oh,Scheduler Error\n";);return;}}。

一旦调用Scheduler_run()函数,我们就设置setjmp()缓冲区,这样我们就可以始终返回到此函数。第一次返回0(INIT),我们立即调用Schedule()。随后,我们可以将调度或EXIT_TASK常量传递给long jmp(),这将触发不同的行为。让我们暂时忽略exit_task案例,直接进入Schedule()的实现:

静态作废调度(Void){struct task*next=SCHEDLER_CHOOSE_TASK();如果(!下一步){return;}prv.。CURRENT=NEXT;IF(NEXT-&>;STATUS==ST_CREATED){/**此任务尚未开始。分配一个新的STACK*指针,运行任务,并在结束时退出。*/register void*top=Next->;STACK_TOP;ASM Volatil(";mov%[rs],%%rsp\n";:[RS]";+r";(Top):);/**运行任务函数*/Next->;status=ST_Running;Next->;func(Next-gt;arg);/**堆栈指针应回到我们设置它的位置。返回将是一个非常、非常糟糕的主意。让';s改为退出*/Scheduler_Exit_Current_Task();}否则{long jmp(Next-gt;buf,1);}/*不返回*/}。

首先,我们调用一个内部函数来选择应该运行的下一个任务,这将是一个简单的循环调度程序,因此它只选择任务列表中最接近的任务。如果此函数返回NULL,则我们有更多任务要运行,然后返回。否则,我们需要启动任务运行(如果它处于ST_CREATED状态)或继续运行它。

要启动创建的任务,我们使用x86_64汇编指令将STACK_TOP字段分配给RSP寄存器(堆栈指针)。然后,我们更改任务状态,运行函数,如果函数返回,则退出。请注意,setjmp()和long jmp()存储和交换堆栈指针,因此这是我们唯一需要使用汇编修改堆栈指针的时候。

如果任务已经启动,那么buf字段应该包含我们需要long jmp()进入以恢复任务的上下文,所以我们就这样做了。接下来,让我们看一下选择下一个要运行的任务的helper函数。这是调度器的核心,就像我前面说的,这是一个循环调度器:

静态结构TASK*SCHEDLER_CHOOSE_TASK(Void){结构TASK*TASK;sc_list_for_each_entry(TASK,&;pri.。TASK_LIST,TASK_LIST,STRUCT TASK){IF(TASK-&>;STATUS==ST_RUNNING||TASK-&>;STATUS==ST_CREATED){sc_LIST_REMOVE(&;TASK-&gT;TASK_LIST);sc_LIST_INSERT_END(&;PRIV.。TASK_LIST,&;TASK->;TASK_LIST);return task;}}return null;}。

如果您不熟悉我的链表实现(取自Linux内核),也没问题。sc_list_for_each_entry()函数是一个宏,允许我们迭代任务列表中的每个任务。我们找到的第一个符合条件的(Notwaiting)任务将从其当前位置移除,并插入到任务列表的末尾。这确保了下次我们运行调度程序时,我们将得到一个不同的任务(如果有另一个任务)。我们返回第一个符合条件的任务,如果根本没有任务,则返回NULL。

最后,让我们来看一下Scheduler_RELINISH()的实现,看看任务是如何切换自身的:

这是我们的调度程序中setjmp()函数的另一个用法。因此,它可能会有点令人困惑。当任务调用此函数时,我们使用setjmp()保存当前上下文(包括当前堆栈指针)。然后,我们使用long jmp()进入调度器(回到Scheduler_Run()),并传递请求调度新任务的调度函数。

当任务恢复时,setjmp()函数将返回非零值,我们将返回到任务之前正在执行的操作!

最后,下面是任务退出时发生的情况(通过显式调用exit函数,或者通过从其任务函数返回):

void Scheduler_Exit_Current_TASK(Void){struct task*task=priv.。当前;sc_list_remove(&;task->;task_list);long jmp(初始。buf,exit_task);/*不返回*/}static void Scheduler_free_current_task(Void){struct task*task=priv.。当前的;当前的。CURRENT=NULL;FREE(任务-&>;STACK_BOOT);FREE(任务);}。

这个过程分为两个部分:第一个函数由任务直接调用。我们从任务列表中删除该任务的条目,因为它不应该再被调度。然后,我们将long jmp()返回到Scheduler_run()函数。这一次,我们使用exit_task。这向调度器表明,在调度新任务之前,它应该首先调用Scheduler_free_current_task()。如果您向上滚动到Scheduler_run(),您将看到这正是Scheduler_Run()所做的事情。

我们必须分两部分完成这项工作,因为当调用SCHEDLER_EXIT_CURRENT_TASK()时,它会主动使用任务结构中包含的堆栈。如果您在释放堆栈的同时仍在使用它,则函数仍有可能访问我们刚刚释放的堆栈内存!为了确保这种情况不会发生,我们必须将long jmp()返回到调度器,该调度器使用单独的堆栈。这样我们就可以安全地释放任务的数据。

到此为止,我们已经介绍了该调度器的整个实现。如果您继续编译这段代码,以及我的链表实现和上面的主程序,您就会有一个工作的调度器!我建议您查看包含所有这些代码的githubrepository,而不是所有那些复制和粘贴操作。

如果你已经走到这一步了,我想我不需要说服你这很有趣。然而,它可能看起来没有那么有用。毕竟,您可以在C中使用“真正的”线程,它可以并行运行,不需要等待彼此调用Scheduler_RELINISH()。

然而,我认为这是一系列令人兴奋的有用功能实现的起点。对于I/O繁重的任务,这也可以用来简单地实现单线程异步应用程序,这是Python新的异步实用程序的工作方式。该系统还可以实现生成器和协程。最后,只要付出足够的努力,这个系统甚至可以与“真正的”操作系统线程结合起来,在需要的地方提供更多的并行性。这些想法中的每一个都是一个有趣的项目,我鼓励读者在我开始写一篇关于它们的新文章之前尝试一下!

我是说,可能不会!使用内联程序集修改堆栈指针可能不安全。不要在您的生产代码中使用它,但是一定要使用它来捣乱和探索!

这个系统的一个更安全的实现可以构建在“ucontext”API上(参见man getcontext),它提供了一种在这些类型的用户空间“线程”之间进行交换的方法,而不需要干预内联汇编。不幸的是,API是非标准的(它已从最新的POSIX规范中删除)。但是,您仍然可以使用此API,因为它是glibc的一部分。

正如当前编写的那样,此调度程序仅在线程显式地将控制权交还给调度程序时才起作用。这对于像操作系统这样的通用实现是不好的,因为行为不佳的线程可能会阻止所有其他线程运行。(当然,这并没有阻止MS-DOS使用协作多任务!)。我不认为这会使协作多任务变得不好,这只是取决于应用程序。

如果使用非标准的“ucontext”API,那么POSIX信号将实际存储先前执行的代码的上下文。通过设置周期性定时器信号,用户空间调度器实际上可以获得抢占式多任务工作!这是另一个非常酷的项目,我希望尽快试用并撰写。

如果你已经走到这一步,感谢你的阅读,我希望你有机会在此基础上尝试一个有趣的项目!

法律·RSS斯蒂芬·布伦南(Stephen Brennan)的博客采用知识共享署名-ShareAlike 4.0国际许可