作为探索Rust中的一些基本并发工作流的上一篇文章的后续文章,现在让我们探索一种稍微更高级的模式:工作从一个阶段流到下一个阶段的并发管道,以及在该上下文中发出背压信号的方法。我们还将查看流数据的“推”和“拉”源之间的区别。
让我们从初始代码开始,与前一篇文章不同的是,该代码示例非常庞大和复杂,出乎意料:
我们有三个不同的“组件”,每个组件代表管道中的一个阶段:
“源”,也就是数据的生产者,它将数据发送到。
“Processor”组件,该组件使用工作器池处理数据,其结果被发送到。
同样,“工作”什么也不包含:它只是一个在各个阶段传递的数字,我们关心的是并发工作流的结构,而不是实际的工作。
每个组件都运行一个“事件循环”,从这个意义上讲,组件是通过在循环中接收和处理消息来驱动的。
每个事件循环都略有不同,并且相对复杂,因此让我们逐一介绍它们。
“源”组件“一次运行一个刻度”,使用来自横梁的出色刻度。然后,它简单地递增一个数字并将其发送到下游进行处理。一旦该号码上的溢出被命中,这一过程就会停止,此时会向下游发送另一条消息,通知生产已经停止。
它还对两个标志进行操作:一个是退出,用于记录“源”何时永久停止生产;另一个是ONDING_WORK,用于跟踪池中正在执行的工作单元数。
向处理器的“主”事件循环发回一条消息,让它知道已经执行了一个工作单元。
在测试的“主线程”上运行的使用者事件循环简单地计算接收到的工作单元,一旦从处理器接收到“停止”信号就退出。因为我们知道一旦发生溢出,信号源就会停止生产,所以我们可以断言U8的工作单位加起来达到了最大值。
没有背压的概念,“源”只会在每个滴答机上保持生产,然后将作品送到下游的一个无界通道上进行处理。
“源”充当“处理器”工作的“推送源”,因为它只是保持尽可能快地将工作推向下游。
“处理机”也充当“消费者”的“推源”,简单地把处理的结果以最快的速度送去消费,也没有背压。
尽管没有背压,“源”和“处理器”之间的无界通道的大小并没有增长很多。为什么会这样呢?
答案是,工作反而堆积在线程池的内部工作窃取队列上。
同样值得一提的是整体组件驱动的设计:虽然通过将线程池移入“源”组件来“简化事情”,并在产生工作时简单地派生工作是很有诱惑力的,但是通过将“源”、“处理器”和“使用者”组件清楚地分开,我们在业务逻辑的封装方面获得了一些好处。
“源”只是在通道上发送工作,并不知道“处理器”在内部使用的线程池。此外,处理器的“主事件循环”对于它拥有的EXISTING和ONDING_WORK状态,充当一个被清楚封装的线性状态机。
通过将线程池上执行的工作嵌套到“处理器”组件的事件循环中,我们可以获得清晰的结构层次结构。
当您在Rust中读到使用通道的背压时,您通常会遇到一些关于需要使用“有界通道”的内容,或者一些关于某种风格的异步流带有对背压的“内置”支持的内容。
我想提出一个不同的角度,通过思考一条用于自动驾驶车辆的高速公路的简化设计。
因此,在这样的高速公路上,你希望车辆以最快的速度行驶,但你也希望在出现拥堵的情况下,让它们减速,并彻底停车(如果你认为这样的高速公路不会出现拥堵,那就再想一想)。
在我看来,使用有限制的通道(或带有“内置”背压支持的溪流)来发出背压信号,有点像是让这些汽车尽可能快地行驶,然后在它们即将撞到前面的汽车时全速刹车。
你会对这样一条高速公路感到满意吗?在那里,“背压信号”通过刺耳的刹车和可能不止几个撞坏的保险杠逆流而上。
相反,你可能想要的是,汽车(不能改道)在任何拥堵发生之前就开始减速,然后逐渐进一步减速,如果拥堵在到达时还没有消失,就会一直减速,直到完全停止。
换句话说,您需要一点缓冲区,并且希望根据一些专门编写的业务逻辑在缓冲区达到限制之前开始发送信号。
因此,我的观点是将重点放在业务逻辑上,将其作为限制工作管道的一种手段,而忽略限制通道,因为它们将带来的主要是死锁。
哇,这远远超出了“简单的例子”,但是我们可以一步一步地完成它。
因此,让我们从“源”组件开始,该组件现在还生成了一个SELECT(包括我们之前看到的滚动条)和一个允许它从“处理器”接收消息的新通道。
所以这里有一件好事:股票代码可以简单地是可变的,我们可以在SELECT中对其进行变异。我们在这里做的是改变股票代码,以响应那些RegulateSourceMsg消息,这些消息以“减速”、“加速”和“停止”的形式出现。
当我们得到一个减速信息时,我们会加倍报价器,从而降低我们“生产工作”的速度。
当我们收到加速消息时,我们会将报价器减半,从而提高我们生产工作的速度,
最后,当我们被告知停止时,我们用Never通道替换股票代码,同样是从优秀的横梁板条箱中。而该通道正如其名称所暗示的那样,它永远不会唤醒阻塞在其上的线程。因为它只是SELECT中两个通道中的一个,所以它实际上意味着我们在收到另一个RegulateSourceMsg消息之前不会产生任何东西。很酷,对吧?
首先,它现在还拥有一个“缓冲区”,本质上是一个VecDeque。然后,当它接收工作时,如果池中的两个工作进程已经很忙,它将缓冲工作。当它接收到来自工作进程的“Work Done”消息时,它还将检查缓冲区,如果有什么情况,立即将其发送到池中进行处理。
此外,在执行完这些操作后,它将“检查缓冲区的大小”,并相应地对源进行调制,如下图所示:
因此,这基本上是我们刚才看到的源代码中消息处理部分的另一面。
我们添加了一个新的SourceMsg::TickAdjusted消息,源在处理RegulateSourceMsg消息时发送该消息。这使我们可以不向源发送大量这样的消息。实质上,处理器将发送一条这样的消息,然后仅在从源接收到最后一条消息已被处理的确认之后才发送另一条消息。
请注意,此确认是异步的:处理器不会阻塞它,而只是继续运行,并在收到SourceMsg::TickAdjusted消息时将其内部TICK_ADJUSTED标志设置为TRUE。同时,它不会发送其他RegulateSourceMsg消息。
很好,在引言中我写道:“我们还将看看”推送“和”拉取“流数据来源之间的区别”,但你知道吗,我认为这对今天来说已经足够了。
我们只需注意,这绝对是一种“推送源”类型的工作流。
同样值得一提的是,我们在这里看到的是一个有限的工作管道(可能有一些中断的溢出代码),并且看不到一个有限的通道。
这给我们带来了巨大的好处:您可以忘记消息传递引起的任何死锁风险。