crossbeam-deque包提供了一个无锁的双向队列(deque)。那么这个双向队列在并发中又起到了什么重要的作用呢?这就涉及到了在多任务环境下的一个重要算法:work-stealing算法,既工作窃取算法。最初,工作窃取算法是在join/fork的模型下,作为调度算法用来给多线程计算分配任务的。分配任务(这里是线程)有很多需要注意的点,比如线程的数量需要平衡(太少则有些处理器会处于空闲,太多可能内存会枯竭),并且相关的线程应该尽量分配在相同的处理器中,等等。而工作窃取算法简单的说就是,每个处理器先处理自己的任务,如果处理完了,就去别的处理器的任务列表里“偷”一个过来执行。与之相对的有一个work-sharing算法,在该算法中,新产生的任务由调度算法直接分配给相应的处理器,每个处理器都是被动的。而在工作窃取算法中,每个处理器都是主动的。在Burton&Sleep[3]和Halstead[5]中已经可以看到工作窃取算法的雏形。而Blumofe&Leiserson[4]的随机工作窃取算法算是比较经典的对该算法的描述,以下是该算法的一个概述,其中每个处理器都维护一个双向队列。初始状态下,计算是一个线程(类比main函数)并被分配给某个处理器,而其他的处理器都处于空闲状态。处于空闲状态的处理器会立即执行窃取操作。每个处理器按指令逐一执行当前线程,直到遇到以下四种情况:
- 遇到了spawn指令,产生了一个新的线程。当前线程被放入双向队列底部,处理器开始执行新的线程
- 线程被挂起(处于阻塞状态)。这个时候处理器会从双向队列的底部取出一个线程去执行。如果双向队列为空,那么处理器就会去执行窃取。
- 指令导致线程死亡,这时和2相同处理
- 指令激活了另一个线程。此时被激活的线程会被放入双向队列的底部,处理器继续执行现在的线程 窃取�ֿ�,����操作:处理器随机选取另一处理器,如果被选择的处理器的双向队列非空,那么从该队列的头部取出一个线程并开始执行,否则再次进入随机选取。
可以看到在该算法中,双向队列是一个关键数据结构。双向队列在本地被当作栈来使用:从本地取任务总是从栈顶(也既双向队列的底部)取出,这在crossbeam中被成为工作者队列(Worker queue)。而在窃取时,则把它当作队列来使用:总是从队列的头部窃取。
crossbeam的双向队列Arora,Blumofe和Plaxton[1]基于Blumofe&Leiserson[4],提出了使用yield系统调用以及无锁数据结构的无锁工作窃取算法,即ABP工作窃取算法。(论文中后者使用的就是上一讲中提到的CAS。为什么需要CAS呢?试想,当双向队列只有一个元素,而窃取和本地取任务同时发生时就会产生竞态。基本上和上一讲提到的无锁并发栈的问题类似)。而Chase&Lev[2]则是改进了Blumofe&Leiserson[4]的deque,使其在保持简洁和高性能的同时,底层不受限于固定长数组(固定长数组会有溢出的问题)。而crossbeam的deque是在Chase&Lev[2]的deque的基础上又作出了一些改进:(注意,接下来我们就不讨论处理器中的线程调度,而是线程中的任务调度问题了。比如tokio,goroutine面临的都是这样的问题)
- 支持先进先出的工作者队列(既本地可以当队列而不是栈使用)
- 支持一次窃取多个任务
- 加入了一个注水器队列(Injector queue),和原来的工作者队列可以额配合使用。
这里我们先来说一下这个注水器队列。这是一个先进先出MPMC队列(任务从一端入队,从另一端被窃取),被线程共享(全局)。新的任务可以被放到这个队列中供空闲的线程窃取。相对于将新任务放进工作者队列的,这一操作更加公平(即只要有空闲的线程,这个队列的处理就会有进展)
API// 先进先出MPMC队列(任务从一端入队,从另一端被窃取)例子
struct Injector<T>;
// 本地的工作者队列
struct Worker<T>;
// 用来从相应的工作者队列窃取任务
struct Stealer<T>;
#[must_use]
enum Steal<T> {
Empty,
Success(T),
Retry,
}
impl<T> Injector<T> {
fn new() -> Injector<T>;
fn is_empty(&self) -> bool;
fn len(&self) -> usize;
fn push(&self, task: T);
// 从队列窃取一个任务
fn steal(&self) -> Steal<T>;
// 从队列窃取多个任务并交给目标工作者
fn steal_batch(&self, dest: &Worker<T>) -> Steal<()>;
// 从队列窃取多个任务并交给工作者,并弹出第一个任务
fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>;
}
impl<T> Worker<T> {
// 初始化一个先进先出工作者队列
fn new_fifo() -> Worker<T>;
// 初始化一个后进先出工作者队列
fn new_lifo() -> Worker<T>;
// 从当前队列产生一个窃取者
fn stealer(&self) -> Stealer<T>;
fn is_empty(&self) -> bool;
fn push(&self, task: T);
fn pop(&self) -> Option<T>;
}
impl<T> Stealer<T> {
fn is_empty(&self) -> bool;
// 从队列窃取一个任务
fn steal(&self) -> Steal<T>;
// 从队列窃取多个任务并交给目标工作者
fn steal_batch(&self, dest: &Worker<T>) -> Steal<()>;
// 从队列窃取多个任务并交给工作者,并弹出第一个任务
fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>;
}
impl<T> Steal<T> {
fn is_empty(&self) -> bool;
fn is_success(&self) -> bool;
fn is_retry(&self) -> bool;
// 如果是Success(T)则返回内容
fn success(self) -> Option<T>;
// 如过没有steal到任务,则执行F
fn or_else<F: FnOnce() -> Steal<T>>(self, f: F);
}
// 一直找到一个Success(T)为止
impl<T> FromIterator<Steal<T>> for Steal<T>;
对于一个简单的包含注水器队列的工作窃取算法,可以写出如下的窃取逻辑:
- 先从工作者队列本地试图获得一个任务
- 试图从全局的注水器队列中窃取一打任务
- 试图从另一个线程窃取一个任务
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use std::iter;
fn find_task<T>(
local: &Worker<T>,
global: &Injector<T>,
stealers: &[Stealer<T>],
) -> Option<T> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
global.steal_batch_and_pop(local)
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
当然,Chase&Lev的不包含注水器的窃取算法也可以很容易写出来。有了crossbeam-deque,写无锁工作窃取算法是不是容易了很多?
小结好了,今天我们简单入门了以下工作窃取算法,以及它和双向队列的关系。也看到了crossbeam-deque就是为此而生的。下期我们来看看crossbeam中的channel