异步Rust中的模拟时间

2022-02-15 21:04:18

我们喜欢async Rust,同上。我们是早期采用者,早在async/await变得稳定之前,我们就在网络代码中使用未来和流。我们的产品是一个完美的用例。它';在I/O方面,有大量的对等连接同时同步数据,我们需要在手机等低功耗设备上高效地使用所有核心。

异步中的一个常见挑战是编写好的测试。举个简单的例子:下面我们创建一个假设的连接。如果10秒钟内没有活动,它应该在内部超时并关闭其事件通道。我们可以编写一个测试来验证这一点:

#[tokio::test]异步fn test_timeout_ocurses(){//创建连接让mut conn=Connection::new().Wait;//等待一段时间延迟_for(Duration::from_secs(11)).Wait.unwrap();//接收通道应该关闭assert_eq!(conn.try_recv().unwrap_err(),tryrecveror::Disconnected)}

这个测试的一个问题是执行总是需要11秒:10秒等待超时加上安全系数。缓慢的测试对开发人员和CI/CD来说都是一个巨大的麻烦。想象一下,如果是30分钟。你的CI运行至少需要30分钟,这都是因为这一次测试!

一个相关的挑战是测试计时器是否过期。在这里,我们确保事件频道在5秒时仍然打开:

#[tokio::test]异步fn test_timeout_not_Occurse(){//创建连接让mut conn=Connection::new().Wait;//等待时间小于其他测试延迟中的时间(持续时间::from_secs(5)).Wait.unwrap();//接收通道仍应打开(但没有值)assert_eq!(conn.try_recv().unwrap_err(),TryRecvError::Empty);]

这个测试也很慢,在5秒钟内称重,但有一个更隐蔽的问题。想象一下,我们的连接出现了一个错误,导致它在4秒钟后关闭了通道。这个测试能抓住它吗?可能…但不一定。

当内部4秒计时器过期时,必须执行一系列步骤。一个tokio worker线程必须被解列并轮询等待该计时器的任务。在注意到计时器已过期后,它会丢弃事件发送器,这向接收端指示通道已关闭。它';这只是一小部分工作,但它';它不是即时的,它必须与主测试任务同时运行。

如果在高端开发机器(Rust开发者使用的机器)上运行这个程序,那么过期和通道关闭过程将在几毫秒内发生。当测试任务在1秒后执行断言时,几乎肯定会关闭通道并检测到错误。然而,在负载沉重的CI服务器上,线程之间可能存在激烈的竞争。处理内部超时可能会延迟1–2秒,测试似乎会通过。上一次测试的问题相反;如果内部超时被延迟,它将失败,即使测试的代码是正确的。

这些测试是不确定的,会导致不稳定的CI:间歇性的构建失败,其中';我们不确定你是否能相信结果。这是次优的。

传统智慧认为你不应该';不要这样构造代码。要测试的任何逻辑都应该表示为完全确定的同步代码,并从系统时钟中抽象出来。这对于小型单元测试来说是可以的,但在大型集成测试中却成了一个头疼的问题。我们的一些业务逻辑与异步行为有关。为什么可以';我们不测试一下吗?我们的客户将如何体验?如果我们只测试同步部分,我们';我们正在失去报道。

在测试环境中,时间的流逝应该被抽象出来,这样无论延迟多长,测试都可以立即执行。

测试应该是完全确定的:一个更早的计时器总是在一个更晚的计时器之前工作到完成。

随着单元测试推进模拟时间,Instant::now()应该在触发每个计时器期间返回正确的中间时间。

附加在计时器上的处理工作应该允许新的计时器在此过程中注册。

同上,我们已经建立了一个内部库来解决所有这些问题。这个板条箱,同上,抽象了std::time和东京定时器的功能。这里是第一次测试,除了现在使用这个库。

#[test]fn test_timeout_occurrent_fast(){let(time_control,_guard)=register_new_control();let rt=build_instrumented_runtime(&;time_control);rt.block_on(async{let mut conn=Connection::new().wait;time_control.advance(Duration::from_secs(10)).wait;//<;--assert_eq!(conn.try_recv().unwrap)),TryRecvError::Disconnected)})

该测试是可靠的,并立即完成。首先,有一个小样本,确保测试中的代码将使用模拟计时器,而不是真正的计时器。现在,请关注对时间控制的调用。advance()。请注意,它将时间精确地向前移动10秒。然后在下一条线路上,通道保证关闭。

这是最困难的部分。触发计时器是不够的。不知何故,我们必须知道接收计时器事件的代码何时完成了所有相关工作。这不是';使用传统计时器是不可能的。我们没有';我不知道相关的任务将在什么时候被安排,更不用说什么时候';结束了。因此,在同上的时间里,期货是不同的,它们输出一个FrozenTimeControlGuard。

{let _guard=ditto_time::delay_for(Duration::from_secs(10)).wait.unwrap()//处理计时器事件//保护被丢弃;时间可以提前}

事实证明,在几乎所有情况下,计时器驱动的代码在等待之后立即在单个块中执行所有相关处理。因此,我们所要做的就是创建一个像_-guard这样的绑定,现在我们有了一个值,可以在到达块的末尾时通知我们,因此时间可以继续。在不寻常的情况下,开发人员可以根据需要保留警卫。

在单元测试代码中,警卫包含一个一次性发送器。在drop上,通道关闭,对advance()的调用知道它可以转到下一个计时器。advance()是一个异步循环,覆盖所有当前注册的计时器,一次触发一个计时器,并等待相应的通道关闭。在非测试代码中,根本没有通道,不需要额外的处理。

最后一个技巧是正确选择真实计时器和模拟计时器,并确保它们与正确的时间控件相关联。Rust并行运行测试,因此可能有许多tokio运行时同时执行测试。如果我们试图使用全局存储,不同的测试将相互碰撞。

解决方案是线程本地存储(TLS)。Tokio提供了在每个工作线程启动时运行代码的能力,这是一个存储Arc的机会<;时间控制>;。这就是测试中构建的测试运行时模板的目的。当被测代码调用delay_for()时,它将动态创建正确变量的延迟未来:

本地线程!(静态模拟_控件:RefCell<;Option<;Arc<;TimeControl>;=RefCell::new(无));发布fn延迟(持续时间:标准时间:持续时间)->;延迟{MOCK_CONTROL.with(|it | match it.borrow().as_ref(){Some(CONTROL)=>;{let deadline=CONTROL.now()+持续时间;延迟::new_MOCK(CONTROL,deadline)}None=>;延迟::真实(盒子::引脚(东京::时间::睡眠(持续时间)),}

#[test]fn test_timeout_occurrent_fast(){let(time_control,_guard)=register_new_control();let rt=build_instrumented_runtime(&;time_control);rt.block_on(异步{let mut conn=Connection::new().wait;time_control.advance(持续时间::from_secs(10)).wait;断言_eq!(conn.try_recv().unwrap_err(),TryRecvError::Disconnected)})

创建了一个TimeControl实例,测试将使用该实例来提前计时,并且还将收集模拟计时器的注册。

调用advance()的持续时间为10秒。这将提前到10秒,允许连接中的所有代码运行。频道关闭了。

第二个测试也可以进行类似的调整。如果通道在5秒或更短时间内关闭,我们';你一定会抓住的。

#[test]fn test_timeout_not_occurrent_fast(){let(time_control,_guard)=register_new_control();let rt=build_instrumented_runtime(&;time_control);rt.block_on(async{let mut conn=Connection::new().wait;time_control.advance(持续时间:from_secs(5)).wait;assert_eq!(conn.try_recv().try_err().unwrap(),TryRecvError::Empty)})

这种技术使Ditto能够在我们的客户使用的完全相同的异步代码上构建快速运行的单元测试,只需对代码进行适度的修改即可解决这些问题。测试让我们有信心将业务逻辑直接包含在异步层中,避免了将计时器驱动的代码构造为同步和异步组件的开销。我们相信async Rust有着光明的未来,我们的客户已经看到了它的好处。