异步协程
接着上一节的讨论,来看看WaitForThread
结构:
/// 等待另一个线程
pub struct WaitForThread {
/// 正在等待的线程
waiting_thread: Arc<Thread>,
/// 被等待的线程
waited_thread: Arc<Thread>,
}
impl Future for WaitForThread {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 等待的线程已结束
if self.waited_thread.state() == ThreadState::Exited {
// 已经退出,将等待的线程设置为就绪态
self.waiting_thread.set_state(ThreadState::Runnable);
Poll::Ready(())
} else {
// 向被等待线程添加一个唤醒器,其状态改变时再唤醒这个协程
self.waited_thread
.add_state_waker(cx.waker().clone(), ThreadState::Exited);
Poll::Pending
}
}
}
当我们为WaitForThread
实现了Future trait
后,他就成为了一个协程。协程指的是一个可能还没有准备好的值,poll
方法查询这个协程任务是否完成,若完成返回Ready
,否则返回Pending
,在这个等待线程的例子中,若内等待的线程已经退出了,我们就可以返回Ready
,否则则返回Pending
。
但是协程创建后Poll
方法是由谁来调用的呢?这就引出了协程执行器,执行器会轮讯内核中产生的所有协程,调用他们的Poll
方法。
所以在Poll
方法中,在返回Ready
之前,我们就要唤醒之前等待的线程。在这个例子中,若轮讯时发现等待的线程已经退出,则唤醒之前等待的主线程,他便可以被调度器调度从而重新返回用户态执行。
若轮讯时还没有就绪怎么办呢,这时执行器就暂时不再轮讯这个协程了,而我们需要将协程的唤醒器注册到等待的事件中去,当事件发生时,使用唤醒器唤醒协程,这时执行器才会再次轮讯协程,因为等待的事件已经发生,再次轮讯时便会返回Ready
,并把之前等待的线程唤醒。这个过程在下面还会说明。
回到这个例子,若等待的线程还没有退出,则在返回Pending
之前首先把协程的Waker
添加到了被等待线程的唤醒器队列中去。当被等待线程的状态改变时,会唤醒这个协程:
/// 设置线程状态
pub fn set_state(&self, new_state: ThreadState) {
// 线程已经退出,不再改变状态
if *self.state.get() == ThreadState::Exited {
return;
}
*self.state.get_mut() = new_state;
// 唤醒等待的唤醒器
self.state_wakers.get_mut().retain(|state_waker| {
let (waker, wait_state) = state_waker;
if *wait_state == new_state {
waker.wake_by_ref();
return false;
}
true
});
}
协程执行器中的协程任务
/// 协程执行器轮讯的协程任务
///
/// 包含一个Future对象和一个sleep标记
pub struct Task {
/// 内含的协程
inner_future: Cell<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
/// sleep标记,当为true时协程不会被执行器轮讯
/// 协程的Waker和执行器executor是唯一能够改变
/// sleep标记的代码区域,实现该Future的开发者
/// 必须自行决定何时使用Waker来取消sleep标记
sleep_flag: Cell<bool>,
/// 执行器的若引用
executor: Weak<Executor>,
}
前面提到,执行器轮讯内核中产生的所有协程,但实际上执行器中的协程任务进行了一层封装,额外包含了一个睡眠标记sleep_flag
,当其为true时,执行器就不会轮讯这个协程。
impl Task {
/// 将此任务休眠等待唤醒器唤醒
///
/// 当轮讯任务返回阻塞时,Future应当保证将Waker注册到等待的事件中区
pub fn sleep(&self) {
*self.sleep_flag.get_mut() = true;
}
/// 唤醒任务
pub fn wakeup(&self) {
*self.sleep_flag.get_mut() = false;
}
}
impl Woke for Task {
/// 唤醒任务,且将执行器设置为需要执行
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.wakeup();
arc_self
.executor
.upgrade()
.unwrap()
.set_state(ExecutorState::NeedRun);
}
}
前面提到,轮讯时若未就绪则暂时不会再轮讯这个协程,实际上就是将其sleep_flag
标记设置为true。而当等待的事件发生时,使用wake_by_ref
方法将协程任务状态设置为就绪,实际上就是将sleep_flag
设置为false,这时执行器就会再次轮讯这个协程。