别。由于其所有权模型,工作池不太适合 Rust。相反,拥抱函数式编程和不可变数据。 Rust 提供了更简单易用和更优雅的工具:并行迭代器和流。需要注意的是,就像在任何编程语言中使用工作池一样,应该始终设置并发上限。否则,您可能会很快耗尽系统资源。对于计算密集型作业(CPU 密集型),有提供并行迭代器的 rayon crate:其组合器被分派到线程池的迭代器。好消息是线程池对我们这些开发人员是隐藏的。我们只需要像使用标准迭代器一样进行编码。 [ package] name = "rust_worker_pool" version = "0.1.0" authors = [ "Sylvain Kerkour <[email protected]>"] edition = "2018" # 在 https://doc.rust 查看更多键和它们的定义-lang.org/cargo/reference/manifest.html[dependencies] rand = "0.8" rayon = "1" 使用 rand::{thread_rng, Rng};使用人造丝::前奏:: *;使用 std::time::Duration; fn compute_job(job: i64) -> i64 { let mut rng = thread_rng();让 sleep_ms: u64 = rng.gen_range( 0.. 10); std::thread::sleep(Duration::from_millis(sleep_ms)); job * job} fn process_result(result: i64) { println !( "{}", result);} fn main() { let jobs = 0.. 100; jobs.into_par_iter() .map(compute_job) .for_each(process_result);} 默认情况下,线程池的大小等于机器的逻辑 CPU 数量。对于 I/O(输入/输出)绑定作业,我们需要转移到异步区域。更准确地说,我们将使用 Streams,它是可以并发处理项目的异步迭代器。
但是 Stream trait 本身不提供组合子。我们需要从未来的 crate 中导入 StreamExt trait。 [ package] name = "rust_worker_pool" version = "0.1.0" authors = [ "Sylvain Kerkour <[email protected]>"] edition = "2018" # 在 https://doc.rust 查看更多键和它们的定义-lang.org/cargo/reference/manifest.html[dependencies] rand = "0.8" tokio = { version = "1", features = [ "full"] } futures = "0.3" for_each_concurrent 是最容易使用的消耗流。这意味着它不会返回 Stream 本身,而是返回一个可以 .await 的 Future。使用期货::{stream, StreamExt};使用 rand::{thread_rng, Rng};使用 std::time::Duration; async fn compute_job(job: i64) -> i64 { let mut rng = thread_rng();让 sleep_ms: u64 = rng.gen_range( 0.. 10); tokio::time::sleep(Duration::from_millis(sleep_ms))。等待; job * job} async fn process_result(result: i64) { println !( "{}", result);} #[tokio::main] async fn main() { let jobs = 0.. 100;让并发= 42; stream::iter(jobs) .for_each_concurrent(concurrency, |job | async move { let result = compute_job(job). await; process_result(result). await; }) 。 await;} 另一方面,buffer_unordered 不消耗流。这就是为什么我们需要使用 for_each 作为接收器来消费 Stream。使用期货::{stream, StreamExt};使用 rand::{thread_rng, Rng};使用 std::time::Duration; async fn compute_job(job: i64) -> i64 { let mut rng = thread_rng();让 sleep_ms: u64 = rng.gen_range( 0.. 10); tokio::time::sleep(Duration::from_millis(sleep_ms))。等待; job * job} async fn process_result(result: i64) { println !( "{}", result);} #[tokio::main] async fn main() { let jobs = 0.. 100;让并发= 42; stream::iter(jobs) .map(compute_job) .buffer_unordered(concurrency) .for_each(process_result) 。 await;} 有时,我们可能需要收集它们而不是直接处理结果,例如稍后批量发送它们。好消息,collect 方法在 Streams 上可用:
使用期货::{stream, StreamExt};使用 rand::{thread_rng, Rng};使用 std::time::Duration; async fn compute_job(job: i64) -> i64 { let mut rng = thread_rng();让 sleep_ms: u64 = rng.gen_range( 0.. 10); tokio::time::sleep(Duration::from_millis(sleep_ms))。等待; job * job} async fn process_result(result: i64) { println !( "{}", result);} #[tokio::main] async fn main() { let jobs = 0.. 100;让并发= 42;让结果: Vec < i64 > = stream::iter(jobs) .map(compute_job) .buffer_unordered(concurrency) .collect() 。 await;} PS:想学习真实世界的 Rust 和攻击性安全吗?看看我的书 Black Hat Rust(可在早期访问中获得),我们将在其中详细介绍,以及我在那里分享我在编写 Rust 超过 2 年的时间里学到的所有东西:https://academy .kerkour.com/black-hat-rust?coupon=BLOG PS 2:为了避免与电子邮件通讯传递相关的问题,我正在启动一个矩阵频道,您可以加入以关注此博客的所有更新:) #blog: kerkour.com