异步协程

接着上一节的讨论,来看看WaitForThread结构:

/// 等待另一个线程
pub struct WaitForThread {
    /// 正在等待的线程
    waiting_thread: Arc<Thread>,
    /// 被等待的线程
    waited_thread: Arc<Thread>,
}

impl Future for WaitForThread {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 等待的线程已结束
        if self.waited_thread.state() == ThreadState::Exited {
            // 已经退出,将等待的线程设置为就绪态
            self.waiting_thread.set_state(ThreadState::Runnable);
            Poll::Ready(())
        } else {
            // 向被等待线程添加一个唤醒器,其状态改变时再唤醒这个协程
            self.waited_thread
                .add_state_waker(cx.waker().clone(), ThreadState::Exited);
            Poll::Pending
        }
    }
}

当我们为WaitForThread实现了Future trait后,他就成为了一个协程。协程指的是一个可能还没有准备好的值,poll方法查询这个协程任务是否完成,若完成返回Ready,否则返回Pending,在这个等待线程的例子中,若内等待的线程已经退出了,我们就可以返回Ready,否则则返回Pending

但是协程创建后Poll方法是由谁来调用的呢?这就引出了协程执行器,执行器会轮讯内核中产生的所有协程,调用他们的Poll方法。

所以在Poll方法中,在返回Ready之前,我们就要唤醒之前等待的线程。在这个例子中,若轮讯时发现等待的线程已经退出,则唤醒之前等待的主线程,他便可以被调度器调度从而重新返回用户态执行。

若轮讯时还没有就绪怎么办呢,这时执行器就暂时不再轮讯这个协程了,而我们需要将协程的唤醒器注册到等待的事件中去,当事件发生时,使用唤醒器唤醒协程,这时执行器才会再次轮讯协程,因为等待的事件已经发生,再次轮讯时便会返回Ready,并把之前等待的线程唤醒。这个过程在下面还会说明。

回到这个例子,若等待的线程还没有退出,则在返回Pending之前首先把协程的Waker添加到了被等待线程的唤醒器队列中去。当被等待线程的状态改变时,会唤醒这个协程:

/// 设置线程状态
pub fn set_state(&self, new_state: ThreadState) {
    // 线程已经退出,不再改变状态
    if *self.state.get() == ThreadState::Exited {
        return;
    }
    *self.state.get_mut() = new_state;
    // 唤醒等待的唤醒器
    self.state_wakers.get_mut().retain(|state_waker| {
        let (waker, wait_state) = state_waker;
        if *wait_state == new_state {
            waker.wake_by_ref();
            return false;
        }
        true
    });
}

协程执行器中的协程任务

/// 协程执行器轮讯的协程任务
///
/// 包含一个Future对象和一个sleep标记
pub struct Task {
    /// 内含的协程
    inner_future: Cell<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
    /// sleep标记,当为true时协程不会被执行器轮讯
    /// 协程的Waker和执行器executor是唯一能够改变
    /// sleep标记的代码区域,实现该Future的开发者
    /// 必须自行决定何时使用Waker来取消sleep标记
    sleep_flag: Cell<bool>,
    /// 执行器的若引用
    executor: Weak<Executor>,
}

前面提到,执行器轮讯内核中产生的所有协程,但实际上执行器中的协程任务进行了一层封装,额外包含了一个睡眠标记sleep_flag,当其为true时,执行器就不会轮讯这个协程。

impl Task {
    /// 将此任务休眠等待唤醒器唤醒
    ///
    /// 当轮讯任务返回阻塞时,Future应当保证将Waker注册到等待的事件中区
    pub fn sleep(&self) {
        *self.sleep_flag.get_mut() = true;
    }

    /// 唤醒任务
    pub fn wakeup(&self) {
        *self.sleep_flag.get_mut() = false;
    }
}

impl Woke for Task {
    /// 唤醒任务,且将执行器设置为需要执行
    fn wake_by_ref(arc_self: &Arc<Self>) {
        arc_self.wakeup();
        arc_self
            .executor
            .upgrade()
            .unwrap()
            .set_state(ExecutorState::NeedRun);
    }
}

前面提到,轮讯时若未就绪则暂时不会再轮讯这个协程,实际上就是将其sleep_flag标记设置为true。而当等待的事件发生时,使用wake_by_ref方法将协程任务状态设置为就绪,实际上就是将sleep_flag设置为false,这时执行器就会再次轮讯这个协程。