异步系统调用

前面已经梳理了WaitForThread协程的执行方式和thread_join系统调用,这一节介绍另外一些系统调用和他们对应的异步协程。

sys_proc_wait()

/// 当前线程等待一个进程结束
///
/// 若等待的进程不存在则返回255
pub fn sys_proc_wait(pid: usize) -> (usize, usize) {
    // 获取当前线程
    let current_thread = CURRENT_THREAD.get().as_ref().unwrap().clone();

    let waited_process = match PROCESS_MAP.get().get(&pid) {
        Some(process) => process.clone(),
        None => {
            println!("[Kernel] waited proc does not existed");
            return (255, 0);
        }
    };
    current_thread.set_state(ThreadState::Waiting);
    executor::spawn(WaitForProc::new(current_thread, waited_process));
    (0, 0)
}

sys_proc_wait函数非阻塞式地等待一个进程结束,若现在等待的进程还未结束,则当前线程进入异步等待(Waiting)状态。

然后创建一个WaitForProc异步协程,将其添加到内核协程执行线程的任务队列中去,等待执行。

当系统调用结束时返回内核主线程的主循环main_loop中下一次循环时,Waiting的线程便不会被调度执行,而是调度其他线程。

/// 当前线程等待一个进程结束的协程任务
/// 进程等待一个用户线程结束
pub struct ProcWaitFuture {
    /// 正在等待的线程
    #[allow(unused)]
    thread: Arc<Thread>,
    /// 被等待的进程
    waited_process: Arc<Process>,
}

协程任务内部创建一个ProcWaitFuture结构,并等待。当ProcWaitFuture就绪后(此时等待的进程已经结束),将用户线程状态置为就绪(可调度)。

impl Future for ProcWaitFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 等待其根线程结束
        let waited_root_thread = self.waited_process.root_thread();
        let state_lock = waited_root_thread.state();
        let state = wait_lock_or_yield(&state_lock);
        if *state == ThreadState::Exited {
            // 已经退出
            Poll::Ready(())
        } else {
            // 向被等待线程添加一个唤醒器,其状态改变时再唤醒这个协程
            waited_root_thread.add_state_waker(cx.waker().clone(), ThreadState::Exited);
            Poll::Pending
        }
    }
}

协程执行线程轮讯ProcWaitFuture时:

若等待的进程未退出,则向被等待的线程中添加一个唤醒器,且等待的状态为Exited,然后返回Pending,若已退出则返回Ready

当被等待的线程状态改变为Exited时,线程持有的唤醒器就唤醒ProcWaitFuture协程,这时再次轮讯就会返回Ready。然后在async_proc_wait中将等待状态的线程设置为就绪。

至此,完成了整个系统调用流程。

sys_mutex_lock()

/// 为指定编号的互斥锁加锁
///
/// 成功返回0,失败返回usize::MAX
pub fn sys_mutex_lock(mutex_id: usize) -> (usize, usize) {
    let current_thread = CURRENT_THREAD.get().as_ref().unwrap().clone();
    let current_proc = current_thread.proc().unwrap();
    if let Some(mutex) = current_proc.mutexes().get_mut(mutex_id) {
        mutex.lock(mutex.clone(), current_thread);
    } else {
        return (usize::MAX, 0);
    }
    (0, 0)
}

/// 指定线程获得锁
pub fn lock(&self, arc_self: Arc<MutexBlocking>, thread: Arc<Thread>) {
    if *self.locked {
        // 当前线程加入等待队列,并进入等待状态
        thread.set_state(ThreadState::Waiting);
        executor::spawn(WaitForMutex::new(thread, arc_self));
    } else {
        *self.locked.get_mut() = true;
        // println!("Thread {} get mutex now!", current_thread.tid());
    }
}

这个系统调用使当前用户线程获得锁,其对应一个WaitForMutex协程。

/// 线程等待一个锁
pub struct WaitForMutex {
    /// 正在等待的线程
    thread: Arc<Thread>,
    /// 被等待的锁
    mutex: Arc<MutexBlocking>,
}

就绪时令线程获得锁,并恢复就绪态;否则将唤醒器注册到锁里面去

impl Future for WaitForMutex {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if !self.mutex.is_locked() {
            // 锁已经释放,线程获得锁且恢复为就绪态
            self.thread.set_state(ThreadState::Runnable);
            self.mutex.set_locked();
            // [Debug]
            // println!("thread: {:x} get mutex now!", self.thread.tid());
            Poll::Ready(())
        } else {
            // 锁被占用了,将线程和唤醒器注册到锁里面去
            self.mutex
                .add_thread((self.thread.clone(), cx.waker().clone()));
            Poll::Pending
        }
    }
}

这里只列举了两个典型的系统调用,内核中还有很多其他的协程类型,但是原理都是相同的,轮讯时若完成则恢复之前等待的事件,若未完成,则将协程的唤醒器注册到等待的事件中去。