基于Futures Explained in 200 Lines of Rust文中的回调代码。
原作者的一些分析
基于回调方法背后的整个思想是保存一个指针,该指针指向一组我们希望稍后运行的一系列指令,以及需要的各种状态。在Rust,这(指针)将是一个闭包。在下面的示例中,我们将此信息保存在一个HashMap中,但这不是唯一的选项。
- 优势:
- 易于在大多数语言中实现
- 没有上下文切换
- 相对较低的内存开销(在大多数情况下)
- 缺点:
- 由于每个任务都必须保存以后需要的状态,因此内存使用将随着计算链中的回调次数线性增长。
- 很难理解。许多人已经知道这是“回调地狱”。
- 这是一种非常不同的编写程序的方式,并且需要大量重写才能从“正常”的程序流转换为使用“基于回调”的程序流。
- 由于Rust的所有权模型,任务之间的状态共享在使用该方法时是一个困难的问题。
代码
通过代码,学习到了一些新东西,主要包括:
thread_local!
:用来对每一个线程初始化同一个变量的一个新的副本。- trait 对象:
Box<dyn FnOnce() -> ()>
,dyn Trait
表示一个类型,强调是动态分发,并且必须是一个impl
了Trait
的类型。 - 传递闭包可以使用
impl Trait
,这也是一个类型,并且该类型是impl
了Trait
的类型。 thread::spawn
会直接新开一个子线程运行,不会造成主线程阻塞。std::sync::mpsc::Receiver.iter()
这是一个阻塞的迭代器,只有当sender都被drop的时候,该迭代器的.next()
才会变成None
,否则会一直等待。
回调是在同一个线程上运行的。 这个例子中,我们创建的子线程基本上只是用作计时器,但可以表示任何类型的我们将不得不等待的资源。
fn program_main() {
// 第一个输出
println!("So we start the program here!");
// 运行到这儿的时候, 0ms
// 1、把闭包里的任务放在callbacks中,next_id(此时为1)作为key,闭包作为val, next_id 递增。
// 2、会有一个子线程在set_timeout里面被创建,只负责延时。运行时候不会被该负责延时子线程阻塞
set_timeout(200, || {
// 第五个输出
println!("We create tasks with a callback that runs once the task finished!");
});
// 运行到这儿的时候, 仍然是0ms
// 1、把闭包里的任务放在callbacks中,next_id(此时为2)作为key,闭包作为val, next_id 递增。
// 2、会有一个子线程在set_timeout里面被创建,只负责延时。运行时候不会被该负责延时子线程阻塞
set_timeout(100, || {
// 第三个输出
println!("We can even chain sub-tasks...");
set_timeout(50, || {
// 第四个输出
println!("...like this!");
})
});
// 运行到这儿的时候, 仍然是0ms
// 第二个输出
println!("While our tasks are executing we can do other stuff instead of waiting.");
}
fn main() {
RT.with(|rt| rt.run(program_main));
}
use std::sync::mpsc::{channel, Receiver, Sender};
use std::{cell::RefCell, collections::HashMap, thread};
// threadlocal: 变量是同一个,但是每个线程都使用同一个初始值,也就是使用同一个变量的一个新的副本
// 每个线程都实例化了一个Runtime, 实际上RT是一个RT: std::thread::LocalKey<Runtime>
// 这儿好像只用到了一个副本
thread_local! {
static RT: Runtime = Runtime::new();
}
struct Runtime {
// callbacks 中的Box<dyn FnOnce() -> ()> 是一个 trait 对象。
// 每一个闭包实例有其自己独有的匿名类型, 闭包有trait bound,比如 Fn(u32) -> u32。
// callbacks 用来存下一个运行的程序块(这里用闭包来表示)。
callbacks: RefCell<HashMap<usize, Box<dyn FnOnce() -> ()>>>,
// 储存下一个闭包的id
next_id: RefCell<usize>,
// 每一个等待子线程拥有一个,在延时结束后,发送需要运行的id
evt_sender: Sender<usize>,
// 接受下一个该运行的闭包的id
evt_reciever: Receiver<usize>,
}
// cb 是传递的是一个闭包,The other use of the impl keyword is in impl Trait syntax, which can be seen as a shorthand for "a concrete type that implements this trait".
// Its primary use is working with closures, which have type definitions generated at compile time that can't be simply typed out.
// 传递闭包时候用impl Trait。表示一个【类型】,这个类型implements 了这个trait
// https://kaisery.github.io/trpl-zh-cn/ch10-02-traits.html
fn set_timeout(ms: u64, cb: impl FnOnce() + 'static) {
// with 方法是 在 std::thread::local::LocalKey 中有的,也就是 RT 有的 。
// 获取对这个TLS键中的值的引用。如果这个线程还没有引用这个键,这将延迟初始化这个值。
// 相当于用.with()就是在使用该变量的函数。
RT.with(|rt| {
let id = *rt.next_id.borrow();
// next_id递增
*rt.next_id.borrow_mut() += 1;
// callbacks存 (id: usize, Box<dyn FnOnce() -> ()>)
// 为什么需要 Box::new(cb) 这样子的呀, 因为输入的类型未知
// cb 在输入参数中用的impl FnOnce() + 'static 来限定。
rt.callbacks.borrow_mut().insert(id, Box::new(cb));
// The sending-half of Rust's asynchronous channel type.
// This half can only be owned by one thread, but it can be cloned to send to other threads.
// 一个sender只能够用在一个线程里面,但是可以克隆到其他线程中,此时receiver还是只有一个
let evt_sender = rt.evt_sender.clone();
// thread::spawn会直接新开一个子线程运行,不会造成主线程阻塞。功能是休眠后再把id输出出去
thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(ms));
// 在延时结束后,发送当前延时结束的任务id
evt_sender.send(id).unwrap();
});
});
}
impl Runtime {
fn new() -> Self {
// 这是一个asynchronous channel, 每个线程里面都有一个sender和receiver
let (evt_sender, evt_reciever) = channel();
Runtime {
callbacks: RefCell::new(HashMap::new()),
next_id: RefCell::new(1),
evt_sender,
evt_reciever,
}
}
// 这儿是在运行一个函数.
fn run(&self, program: fn()) {
// 直接运行,
program();
// 0ms 运行到这儿
// 运行结束后,开始逐个访问Runtime 里面的休眠子线程返回的 对应id的值。
// This iterator will block whenever next is called, waiting for a new message, and None will be returned when the corresponding channel has hung up.
// 这是一个阻塞的方法,只有当sender都被drop的时候,该迭代器的.next() 才会变成None
for evt_id in &self.evt_reciever {
// 这儿直接从HashMap里remove出对应编号的闭包。
let cb = self.callbacks.borrow_mut().remove(&evt_id).unwrap();
cb();
// 是不是也不需要这句话?
// 【不行!】: 因为在Runtime里面一个sender,不会被drop掉,就会陷入无限等待了
if self.callbacks.borrow().is_empty() {
break;
}
}
}
}
总结
- 如果用基于回调的方法来说Rust异步编程中的唱歌跳舞例子, 也就是,“计时器”代表的“等待资源”,等价于 “唱歌”前等待"学歌"。所以,在回调逻辑中
sing_song()
作为闭包,learn_song()
作为子线程,主线程运行dance()
。子线程结束后再运行这个闭包。运行顺序是learn_song()
(子线程)和dance()
(主线程)同时运行,在两个任务均完成后, 再回调sing_song()
这个闭包。 - 子线程对应 “等待资源” ; 闭包对应“接收到等待资源后的操作”。
async fn learn_and_sing() {
// 在唱歌之前等待学歌完成
// 这里我们使用 `.await` 而不是 `block_on` 来防止阻塞线程,这样就可以同时执行 `dance` 了。
let song = learn_song().await;
sing_song(song).await;
}
async fn async_main() {
let f1 = learn_and_sing();
let f2 = dance();
// `join!` 类似于 `.await` ,但是可以等待多个 future 并发完成
// 如果学歌的时候有了短暂的阻塞,跳舞将会接管当前的线程,如果跳舞变成了阻塞
// 学歌将会返回来接管线程。如果两个futures都是阻塞的,
// 这个‘async_main'函数就会变成阻塞状态,并生成一个执行器
futures::join!(f1, f2)
}
fn main() {
block_on(async_main());
}