Ruby Ractor 实验:安全异步通信(2021)

2021-07-25 13:16:35

Ractors(api 文档、设计文档)是 Ruby 3.0 的一种新的并发抽象,其灵感来自于 actor 模型。从想要向另一个 Ractor 发送一些信息的角度来看,通信可以是: 异步(或非阻塞):一个 Ractor 可以使用 Ractor#send 向另一个发送信息,将其放入一个无限队列中,该队列可以通过 Ractor.receive 同步(或阻塞)被目标 Ractor 读取:一个 Ractor 可以使用 Ractor.yield 阻塞,直到另一个 Ractor 调用 Ractor#take 让我们考虑异步情况:假设我们想向另一个 Ractor 发送信息,但是不想阻止它完成处理。如果接收方 Ractor 太慢而无法处理数据,会发生什么? receiver_ractor = Ractor.new do loop do message = Ractor.receive sleep 1 puts " Processed #{message } " end endcounter = 0 while true counter += 1 receiver_ractor.send(counter) end 正如预期的那样,如果接收器跟不上发送方,将使用越来越多的内存,直到系统内存耗尽,应用程序崩溃。

查看 Ractor API,发送方没有内置方法来检查接收方是否落后,所以我想出了以下方法:receiver_ractor = Ractor.new do processing_queue = Queue.new Thread.new do sleep(1) # 模拟这个线程的慢启动循环 do message = processing_queue.pop puts " Processed from queue: #{message } " end end loop do queue_size = processing_queue.size sender, message = Ractor.select( Ractor.current , yield_value: queue_size) if sender != :yield processing_queue << message puts " 添加消息到队列:#{message } " else puts " Sent queue status: #{queue_size } " end end endreceiver_ractor.send(1)receiver_ractor.send (2)receiver_ractor.send(3)puts“完成提交”sleep(0.5)receiver_ractor.take#强制刷新statusputs“接收器队列长度:#{receiver_ractor.take}”sleep(1)receiver_ractor.take#强制刷新statusputs“接收器队列长度:#{receiver_ractor.take } " <internal:ractor>:267: 警告:Racto r 是实验性的,在未来的 Ruby 版本中行为可能会改变!还有很多实现问题。 完成提交添加消息到队列:1添加消息到队列:2添加消息到队列:3发送队列状态:3发送队列状态:3接收器队列长度:3从队列处理:1从队列处理:2从队列处理:3发送队列状态:3Sent queue status: 0Receiver queue length: 0 接收者Ractor唤醒,读取三个消息,并重定向到processing_queue 主Ractor唤醒,强制刷新queue_size(更多细节见下文)接收者Ractor的第二个线程唤醒并处理三个消息:1、2 和 3 主 Ractor 唤醒,强制刷新 queue_size(再次)-注意需要刷新,因为该值已过时(从 3 变为 0)

在接收器 Ractor 中,此策略的工作原理如下: 现在有两个线程。其中一个线程使用 Ractor.select 一次做两件事 - 接收新项目进行处理,将它们放在常规线程安全队列中,或者返回队列的当前大小。第二个线程只处理线程安全队列中的项目。发件人现在可以使用 send 来发送项目,或者调用 take 两次来获取队列的大小。为什么两次?因为这个值只在 select 被调用之前刷新,如果在 select 被输入和任何 send 或 take 调用之间经过了很长时间,这个值可能会过时,如上面的例子。连续两次调用 take 保证我们得到一个“新鲜”的值——我们知道这个值只是为第二次刷新而刷新。在这种结构的基础上,我们可以实施许多策略来更好地在两个 Ractor 之间进行通信。例如,我们可以查看原始示例以确保发送方永远不会“领先太多”接收方:receiver_ractor = Ractor.new do processing_queue = Queue.new Thread.new do loop do message = processing_queue.pop sleep(1 ) puts " Processed #{message } " end end loop do queue_size = processing_queue.size sender, message = Ractor.select( Ractor.current, yield_value: queue_size) if sender != :yield processing_queue << message puts " 添加消息到队列: #{message } " else puts " Sent queue status: #{queue_size } " end end endcounter = 0 while true counter += 1 receiver_ractor.send(counter) if counter % 10 == 0 receiver_ractor.take # 强制刷新状态 queue_size =receiver_ractor.take if queue_size > 5 puts " Ractor 落后 ( #{queue_size } 元素未处理); 睡眠一段时间 " sleep(1) while (receiver_ractor.take && receiver_ractor.take > 1) end end end <internal: ractor>:267: 警告:Ractor 是实验性的,未来的行为可能会改变e 版本的 Ruby!还有很多实现问题。 向队列添加消息:1向队列添加消息:2向队列添加消息:3向队列添加消息:4向队列添加消息:5向队列添加消息:6向队列添加消息:7向队列添加消息:8向队列添加消息queue: 9Added message to queue: 10Sent queue status: 10Sent queue status: 9Ractor is fall behind (9 个元素未处理);休眠一段时间发送队列状态:9发送队列状态:9Processed 1发送队列状态:9Sent队列状态:8Processed 2发送队列状态:8Sent队列状态:7Processed 3发送队列状态:7Sent队列状态:6Processed 4Sent队列状态:6Sent队列状态:5Processed 5Sent queue status: 5Sent queue status: 4Processed 6Sent queue status: 4Sent queue status: 3Processed 7Sent queue status: 3Sent queue status: 2Processed 8Sent queue status: 2Sent queue status: 1Added message to queue: 11Added message to queue: 12Added message to queue: 13Added message to queue: 14Added message to queue: 15Added message to queue: 16Added message to queue: 17Added message to queue: 18Added message to queue: 19Added message to queue: 20Sent queue status: 11Sent queue status: 11Ractor is fall behind (11 elements unprocessed) ;睡一会儿 睡眠是一个非常简单的解决方案,但可以完成工作。其他替代方案可能是切换到同步通信,或者将工作提交到不同的 Ractor/代码路径。