Rust 基于回调的异步方法分析

基于Futures Explained in 200 Lines of Rust文中的回调代码。

原作者的一些分析

基于回调方法背后的整个思想是保存一个指针,该指针指向一组我们希望稍后运行的一系列指令,以及需要的各种状态。在Rust,这(指针)将是一个闭包。在下面的示例中,我们将此信息保存在一个HashMap中,但这不是唯一的选项。

  • 优势:
    • 易于在大多数语言中实现
    • 没有上下文切换
    • 相对较低的内存开销(在大多数情况下)
  • 缺点:
    • 由于每个任务都必须保存以后需要的状态,因此内存使用将随着计算链中的回调次数线性增长。
    • 很难理解。许多人已经知道这是“回调地狱”。
    • 这是一种非常不同的编写程序的方式,并且需要大量重写才能从“正常”的程序流转换为使用“基于回调”的程序流。
    • 由于Rust的所有权模型,任务之间的状态共享在使用该方法时是一个困难的问题。

代码

通过代码,学习到了一些新东西,主要包括:

  • thread_local! :用来对每一个线程初始化同一个变量的一个新的副本。
  • trait 对象: Box<dyn FnOnce() -> ()>, dyn Trait表示一个类型,强调是动态分发,并且必须是一个implTrait的类型。
  • 传递闭包可以使用impl Trait,这也是一个类型,并且该类型是implTrait的类型。
  • thread::spawn会直接新开一个子线程运行,不会造成主线程阻塞。
  • std::sync::mpsc::Receiver.iter() 这是一个阻塞的迭代器,只有当sender都被drop的时候,该迭代器的.next() 才会变成None,否则会一直等待。

回调是在同一个线程上运行的。 这个例子中,我们创建的子线程基本上只是用作计时器,但可以表示任何类型的我们将不得不等待的资源。


fn program_main() {
    // 第一个输出
    println!("So we start the program here!");
    // 运行到这儿的时候, 0ms
    // 1、把闭包里的任务放在callbacks中,next_id(此时为1)作为key,闭包作为val, next_id 递增。
    // 2、会有一个子线程在set_timeout里面被创建,只负责延时。运行时候不会被该负责延时子线程阻塞
    set_timeout(200, || {
        // 第五个输出
        println!("We create tasks with a callback that runs once the task finished!");
    });
    // 运行到这儿的时候, 仍然是0ms
    // 1、把闭包里的任务放在callbacks中,next_id(此时为2)作为key,闭包作为val, next_id 递增。
    // 2、会有一个子线程在set_timeout里面被创建,只负责延时。运行时候不会被该负责延时子线程阻塞
    set_timeout(100, || {
        // 第三个输出
        println!("We can even chain sub-tasks...");
        set_timeout(50, || {
            // 第四个输出
            println!("...like this!");
        })
    });
    // 运行到这儿的时候, 仍然是0ms
    // 第二个输出
    println!("While our tasks are executing we can do other stuff instead of waiting.");
}

fn main() {
    RT.with(|rt| rt.run(program_main));
}

use std::sync::mpsc::{channel, Receiver, Sender};
use std::{cell::RefCell, collections::HashMap, thread};


// threadlocal: 变量是同一个,但是每个线程都使用同一个初始值,也就是使用同一个变量的一个新的副本
// 每个线程都实例化了一个Runtime, 实际上RT是一个RT: std::thread::LocalKey<Runtime>
// 这儿好像只用到了一个副本
thread_local! {
    static RT: Runtime = Runtime::new();
}

struct Runtime {
    // callbacks 中的Box<dyn FnOnce() -> ()> 是一个 trait 对象。 
    // 每一个闭包实例有其自己独有的匿名类型, 闭包有trait bound,比如 Fn(u32) -> u32。 
    // callbacks 用来存下一个运行的程序块(这里用闭包来表示)。
    callbacks: RefCell<HashMap<usize, Box<dyn FnOnce() -> ()>>>,
    // 储存下一个闭包的id
    next_id: RefCell<usize>,
    // 每一个等待子线程拥有一个,在延时结束后,发送需要运行的id
    evt_sender: Sender<usize>,
    // 接受下一个该运行的闭包的id
    evt_reciever: Receiver<usize>,
}

// cb 是传递的是一个闭包,The other use of the impl keyword is in impl Trait syntax, which can be seen as a shorthand for "a concrete type that implements this trait". 
// Its primary use is working with closures, which have type definitions generated at compile time that can't be simply typed out.
// 传递闭包时候用impl Trait。表示一个【类型】,这个类型implements 了这个trait
// https://kaisery.github.io/trpl-zh-cn/ch10-02-traits.html
fn set_timeout(ms: u64, cb: impl FnOnce() + 'static) {
    // with 方法是 在 std::thread::local::LocalKey 中有的,也就是 RT 有的 。
    // 获取对这个TLS键中的值的引用。如果这个线程还没有引用这个键,这将延迟初始化这个值。
    // 相当于用.with()就是在使用该变量的函数。
    RT.with(|rt| {
        let id = *rt.next_id.borrow();
        // next_id递增
        *rt.next_id.borrow_mut() += 1;
        // callbacks存 (id: usize, Box<dyn FnOnce() -> ()>)
        // 为什么需要 Box::new(cb) 这样子的呀, 因为输入的类型未知
        // cb 在输入参数中用的impl FnOnce() + 'static 来限定。
        rt.callbacks.borrow_mut().insert(id, Box::new(cb));
        // The sending-half of Rust's asynchronous channel type. 
        // This half can only be owned by one thread, but it can be cloned to send to other threads.
        // 一个sender只能够用在一个线程里面,但是可以克隆到其他线程中,此时receiver还是只有一个
        let evt_sender = rt.evt_sender.clone();
        // thread::spawn会直接新开一个子线程运行,不会造成主线程阻塞。功能是休眠后再把id输出出去
        thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(ms));
            // 在延时结束后,发送当前延时结束的任务id
            evt_sender.send(id).unwrap();
        });
    });
}


impl Runtime {
    fn new() -> Self {
        // 这是一个asynchronous channel, 每个线程里面都有一个sender和receiver
        let (evt_sender, evt_reciever) = channel();
        Runtime {
            callbacks: RefCell::new(HashMap::new()),
            next_id: RefCell::new(1),
            evt_sender,
            evt_reciever,
        }
    }

    // 这儿是在运行一个函数.
    fn run(&self, program: fn()) {
        // 直接运行, 
        program();
        // 0ms 运行到这儿
        // 运行结束后,开始逐个访问Runtime 里面的休眠子线程返回的 对应id的值。
        // This iterator will block whenever next is called, waiting for a new message, and None will be returned when the corresponding channel has hung up.
        // 这是一个阻塞的方法,只有当sender都被drop的时候,该迭代器的.next() 才会变成None
        for evt_id in &self.evt_reciever {
            // 这儿直接从HashMap里remove出对应编号的闭包。
            let cb = self.callbacks.borrow_mut().remove(&evt_id).unwrap();
            cb();
            // 是不是也不需要这句话?
            // 【不行!】: 因为在Runtime里面一个sender,不会被drop掉,就会陷入无限等待了
            if self.callbacks.borrow().is_empty() {
                break;
            }
        }
    }
}

总结

  • 如果用基于回调的方法来说Rust异步编程中的唱歌跳舞例子, 也就是,“计时器”代表的“等待资源”,等价于 “唱歌”前等待"学歌"。所以,在回调逻辑中 sing_song()作为闭包, learn_song() 作为子线程,主线程运行dance() 。子线程结束后再运行这个闭包。运行顺序是 learn_song() (子线程)和 dance() (主线程)同时运行,在两个任务均完成后, 再回调sing_song()这个闭包。
  • 子线程对应 “等待资源”闭包对应“接收到等待资源后的操作”
async fn learn_and_sing() {
    // 在唱歌之前等待学歌完成
    // 这里我们使用 `.await` 而不是 `block_on` 来防止阻塞线程,这样就可以同时执行 `dance` 了。
    let song = learn_song().await;
    sing_song(song).await;
}
 async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();
     // `join!` 类似于 `.await` ,但是可以等待多个 future 并发完成
     // 如果学歌的时候有了短暂的阻塞,跳舞将会接管当前的线程,如果跳舞变成了阻塞
     // 学歌将会返回来接管线程。如果两个futures都是阻塞的,
     // 这个‘async_main'函数就会变成阻塞状态,并生成一个执行器
    futures::join!(f1, f2)
}
 fn main() {
    block_on(async_main());
}