//! oneshot通道是用于发送单//!的同步原语。从一项任务到另一项的消息。当发件人发送了一条消息并且它//!准备好接收时,oneshot会通知接收者。接收者是//!可以等待的Rust异步`Future`。我们的实现不//!取决于std或alloc.//!//!参见https://tweedegolf.nl/blog/50/build-your-own-async-primitive //!有关内部结构的完整说明。
那是什么意思呢?首先,不依赖于std或alloc意味着我们不能使用许多常见的库。 Rust异步仍然是一个新领域,到目前为止,库至少依赖于分配内存。这使得它们的基元更易于构建,但也无法使用无法负担分配器的嵌入式设备。我会定期使用此类设备,因此请尽可能避免分配。我们将看到我们可以在没有分配的情况下写出oneshot。
接下来,我们不想在等待消息发送时阻塞整个CPU。一方面,如果CPU被阻塞,则不会发送该消息。当操作系统可用时,它可以安排发送任务,同时阻止接收任务,这没有问题。小型嵌入式设备通常负担不起操作系统的费用,因此您只能手动执行任务切换。这可能涉及将状态从唯一的调用堆栈移到结构中,然后再移回,因此您可以使用调用堆栈来继续执行另一项任务。
Rust异步可以为您做到这一点:首先,您选择一个异步执行程序来运行任务。这些执行器甚至比嵌入式OS更简单。然后您可以正常编写任务。当您需要等待另一个任务完成某件事时,只需编写.await。执行程序将存储您的调用堆栈的状态,并继续执行未被阻止的任务。当您等待的事情完成后,执行程序将再次唤醒您的任务。
让我们从Oneshot结构开始,该结构将包含发送者和接收者都需要访问以传输消息的数据。需要在调用堆栈的顶部创建它,因此它将超过所有可能引用它的任务。然后,我们创建一个Sender和Receiverstruct,其中包含对Oneshot的引用。然后,我们可以将发送方交给一个任务,将接收方交给另一个任务。由于它们都有对Oneshot的引用,因此它们必须是共享引用,这意味着我们不能安全地修改内部的任何内容。相反,我们需要使用可以通过共享引用进行安全修改的类型。
首先,我们需要一种将消息从发送方传输到接收方的方法。如果我们等待发送方和接收方都准备好进行传输,则可以将其直接移动:我们知道数据在发送方的什么位置以及应该在接收方的什么位置。我们称之为同步通信,在通信发生之前,发送方和接收方都需要准备就绪。它有其用途,但是在这种情况下,我们不希望发送方等待接收方,因为无论如何它都不会得到响应。相反,我们会将消息存储在Oneshot中,以便在该消息中等待接收者准备就绪。
我们不想将oneshot限制为仅一种消息,因此我们将其概括化为消息的类型。我们将其称为T。创建新的Oneshot时,我们还没有消息,因此我们需要告诉Rust,稍后再对其进行初始化。在Rust中,您通常使用Option< T>但是我们将跟踪是否在一个单独的字段中有一个T,因此我们将使用MaybeUninit< T>代替。最后,我们需要通过共享引用来修改字段。
我们将存储一个UnsafeCell< MaybeUninit< T>。只要我们在标记为不安全的代码块中进行修改,就可以通过共享引用来修改消息。不安全的块调整意味着我们要自己负责检查代码的安全,而不是由编译器负责。每当安全逻辑过于复杂而编译器无法理解时,我们都需要这样做。
接下来,我们需要记住是否已准备好接收消息,或者换句话说,消息字段是否已初始化。这并不像看起来那样容易。编译器和处理器都会对内存访问进行重新排序,以使您的代码运行得更快。他们将确保单个任务中的代码看起来像以正确的顺序运行,但是他们将为代码提供相同的保证在不同的任务中运行。例如,如果我们使用了Option< T>为了存储消息,发送者可以先将其转换为Some(),然后再存储主题。如果接收者碰巧在两者之间进行检查,它将在存储一条消息之前尝试读取一条消息,并且最终会产生一堆垃圾。
为了使并发内存访问安全,发明了原子。我们会记得是否有一个带有AtomicBool的消息。原子确保没有任务看到解剖的部分更新。原子要么完全更新,要么根本不更新。最重要的是,它还有助于我们同步访问其他内存:AtomicBool的值确定我们是否允许发送方或接收方访问消息。这样,他们永远不会同时访问它。
最后,我们要使用Rust异步/等待来等待消息准备就绪。这需要我们存储唤醒器。为了能够从共享引用中更新唤醒程序,我们使用了来自期货库的非常好的AtomicWaker。
使用core :: cell :: UnsafeCell;使用core :: mem :: MaybeUninit;使用core :: sync :: atomic :: {AtomicBool,Ordering};使用futures :: task :: AtomicWaker; ///传输单个消息使用async / await.pub结构在任务之间进行一次Oneshot< T> {消息:UnsafeCell< MaybeUninit< T>&gt ;,唤醒者:AtomicWaker,has_message:AtomicBool,}
发送者和接收者只是围绕Oneshot引用的薄包装。它们在基础消息的类型(T)和引用的生存期(' a)上都是通用的。将引用包装在结构中允许我们命名它们并指定它们支持的操作。
似乎我们的步伐不是很快,但是选择正确的数据表示通常是编程中最难的部分。从良好的数据表示出发,实现自然而然。我们从允许创建新的Oneshot的新功能开始。我们还没有消息,所以我们不初始化它,并将has_message设置为false。
impl T Oneshot< T> {pub const fn new()->自我{自我{消息:UnsafeCell :: new(MaybeUninit :: uninit()),唤醒者:AtomicWaker :: new(),has_message:AtomicBool :: new(false),})
接下来,我们执行最基本的操作,将消息放入并再次取出。稍后我们将进行同步,因此这将是不安全的功能。出于性能原因,用户可能希望直接使用这些功能,因此我们将其公开。只要我们记录了安全使用该函数所必须具备的属性,就可以了。
put函数将存储一条消息。由于我们正在实现一个单发,因此我们只希望进行一次通信,因此我们假设在调用put之前没有消息。必须将使用此不安全功能的用户告知该财产!
一旦我们将has_message设置为true,接收方可能会尝试读取消息,因此我们必须确保首先写入消息。我们还需要防止对消息and has_message的写入进行重新排序。我们通过用hasing :: Release存储has_message来做到这一点。这是向编译器发出的信号,它必须确保必须完成任何内存访问,然后才能完成它并对其他内核可见。
///注意(不安全):当oneshot可能包含//消息或存在引用此oneshot的`Sender`时,不得使用此功能。这////意味着它不能与它本身并发使用,否则将无法运行///将违反该约束。 pub unsafe fn put(&self,message:T){self.message.get()。write(MaybeUninit :: new(message)); self.has_message.store(true,Ordering :: Release); self.waker.wake(); }
接收功能将检查消息是否可用,如果有则将其从oneshot中删除。这次,我们使用Ordering :: Acquire来确保在has_message加载之后真正发生所有内存访问。对于任务之间的每个同步,都需要一个“发布-获取”对。有时同步需要双向进行,并且Ordering :: AcqRel可用于同时获得两种效果。
接收完消息后,我们通过将has_messageage设置为false来再次释放其内存。如果两个take实例同时运行,则它们可能都在将has_message设置为false之前都到达了messageread,因此复制了该消息,因此我们需要在安全合同中禁止这样做。
另一方面,如果它与put并发运行,则根据put的合同,必须事先没有消息。如果先加载has_message,它将查找nomessage并返回。如果put首先写has_message,则确保已经准备好发送消息,因此take接收消息没有问题。因此,单次抽签可以安全地同时进行一次看跌期权。这正是我们oneshot频道所需要的。 Wenever希望这两个功能可以运行不止一次,但是可以在任务之间安全地传递一条消息。现在在Rust类型系统中强制执行该安全性。
///注意(不安全):此函数不能与其自身同时使用,///但可以与put同时使用。 pub unsafe fn take(& self)->选项< T> {如果self.has_message.load(Ordering :: Acquire){让message = self.message.get()。read()。assume_init(); self.has_message.store(false,Ordering :: Release);某些(消息)}其他{无}}
split功能将oneshot分为发送者和接收者。发送者可以用来发送一条消息,接收者可以接收一条消息。 split函数采用对oneshot的唯一引用,并以相同的生存期返回两个共享引用。这意味着Rust将认为oneshot是唯一借用的,直到确定发送者和接收者的生命周期都已结束。它使我们无法制造多对!
但是,一旦生命周期结束,oneshot将不再被借用,因此没有什么可以阻止某人再次调用split。那是问题吗?并非完全如此:这意味着Oneshot可以重新用于发送另一条消息。我们仍将其称为oneshot,因为它创建的发送方和接收方只能使用一次。
但是,我们需要注意一件事:在发送方发送其消息之前,接收任务可能会停止关照并丢弃其接收方。在这种情况下,引用的生存期结束后,oneshot仍将包含一条消息。为了允许再次使用Oneshot,我们需要删除该消息。
请注意,简单地将has_message设置为false是一个问题,因为该消息存储在MaybeUninit中,该MaybeUninit本身不知道是否有消息,因此也不知道何时应该删除该消息。被遗忘而未被丢弃的值可能会泄漏资源,甚至导致未定义的行为!在将has_message设置为true之前,我们始终需要确保已初始化主题,然后在将其设置为false之前将其删除或移至其他位置。
安全的做法是将MaybeUninit中的任何消息带入再次隐式丢弃的类型。但是,take是不安全的,因此在使用它之前,我们必须检查其合同:合同状态take不能与其本身同时使用。在这种情况下,我们可以确定这一点,因为split拥有对Oneshot的唯一引用,因此其他人也可以拥有一个可以调用take的引用。
///将Oneshot拆分为发送方和接收方。发件人可以发送一条///消息。接收方是未来,可以用来等待该消息。 ///如果接收者在接收消息之前被丢弃,则消息也被///丢弃。在发送方和接收方的生存期结束后///,可以再次调用此函数以发送新消息。 pub fn split<< a>(&' mut self)-> (发件人a,T>接收者a,T>){不安全{self.take()}; (发送者(自己),接收者(自己))}
即使用户打算跟踪何时可以安全地手动发送和接收,他们仍可能仍希望使用发件人和/或接收者。我们可以允许使用标记为不安全的功能分别创建它们。例如,这允许用户在直接使用放置权的同时创建接收方,并防止发送方的开销。
///注意(不安全):一次最多只能有一个`Receiver`。 ///当接收器存在时,不应调用“ take”。 pub unsafe fn recv<& a((&' a self)->接收者a,T> {Receiver(self)} ///注意(不安全):一次最多只能有一个`Sender`。 `Sender`存在时不应调用`put` ///。创建`Sender`时,`Oneshot`必须为///为空。 pub unsafe fn send<& a(&' a self)->发件人a,T> {发件人(自己)}
我们定义了一个简单的is_empty函数,以检查是否已发送消息。这通常不会有用,因为发送方会知道它是否已发送任何内容,而接收方的要点是它会在尝试接收时找出来。但是,它对于调试和断言仍然有用。由于我们不知道确切地检查了什么,调试的性能也不是问题,因此我们应该在此处使用AcqRel排序。
如果我们删除Oneshot,则还需要删除它可能具有的所有消息。
发件人非常简单:它允许您调用put函数,但只能调用一次。这是通过以下事实来强制执行的:该发送函数不会引用self,而是使用self。使用send后,发送者的生命周期已经结束,无法再次使用。一次调用一次放置是安全的,因为使用split创建发件人时,我们确保不存储任何消息。
impl< a,T>发件人a,T> {pub fn send(self,message:T){不安全{self.0.put(message)}; }}
我们从分离中获得的接收者是可以等待发送消息的未来,这是我们实现中的最后一个棘手的问题。期货是具有轮询功能的事物,执行者将调用它。在其中,它接收对其自身的固定引用以及包含唤醒程序的上下文。在我们的情况下,我们不在乎引用是否固定。 pollcan返回以下两项之一:Poll :: Ready(message)表示将来已完成,.await将随消息一起返回。 Poll :: Pending表示未来还没有完成,Rust将把控制权交还给调用poll的执行程序,以便它可以找到要运行的另一个任务。
首次等待时,将对“未来”进行第一次轮询。此后,原则上它将不会再次运行,直到我们使用上次轮询中收到的唤醒程序再次唤醒它为止。这意味着我们需要将唤醒器存储在发送任务可以用来再次唤醒接收任务的位置。当然,这是在Oneshot的唤醒者字段中。感谢使用原子唤醒器,我们不必担心存储唤醒器是否安全,我们可以随时进行操作。
在实践中,即使没有被唤醒,一些执行者也会对“未来”进行轮询,因此检查我们是否真的完成总是很重要的。在这种情况下,因为我们拥有对接收方的唯一引用,所以我们可以放心地进行接听,而发件人绝不会进行接听。请注意,AtomicWaker的文档指出,消费者应在检查计算结果之前先调用register,这样我们才能仅在消息尚未准备好时调用它。
使用core :: future :: Future;使用core :: pin :: Pin;使用core :: task :: {Context,Poll}; impl<' a,T>接收者的未来,T&T {类型输出= T; fn poll(self:Pin<& mut Self&gt ;, cx:& mut Context<' _>)->投票T {self.0.waker.register(cx.waker());如果让Some(message)=不安全{self.0.take()} {Poll :: Ready(message)}否则{Poll :: Pending}}}
如果我们放下接收器,oneshot中的任何消息将不再使用。当oneshot本身被删除或重用时,它最终将已经被删除,但是我们可能会通过提前删除它来节省资源,因此如果消息存在,我们将其放在此处。请注意,这样做是安全的,因为我们拥有对接收者的唯一引用,而发送者将永远不会调用take,因此不会同时运行。
impl< a,T>丢弃接收者,a,T> {fn drop(& mut self){不安全{self.0.take()}; }}
到此,我们的oneshot实施工作就完成了。还不错吧?希望您现在对并发原语有什么了解。该博客的代码可从https://github.com/tweedegolf/async-heapless获取,因此,如果有问题,请创建一个问题或请求请求。另请参阅https://github.com/tweedegolf/async-spi/blob/main/src/common.rs,在这里我使用(不安全的方法)oneshot构建异步SPI驱动程序。
我想鼓励您尝试编写自己的抽象。 对于初学者,您可以创建一个容量为一条消息的通道,其中发送方和接收方可以多次使用,发送方可以阻塞直到该通道中有空间。 之后,您可以查看可以存储多个消息或允许多个并发发件人或收件人的渠道。 也许您想构建一个原始任务,其中两个任务互相等待,然后同时交换消息。 只需确保您不尝试一次添加所有内容,那么每个内容就很难单独添加。 最后,请记住记录每个不安全的功能以及安全使用它的条件! 随时了解我们的工作和博客文章? 在Github,Twitter或LinkedIn上关注我们。