(四)多线程那些事儿:并行库 tbb
(四)多线程那些事儿:并行库 tbb
0. concepts
0.1 并行库带来的理论提升是什么?并行库和线程池实现的区别是什么?
线程管理究竟需要解决哪些问题?既然操作系统已经具备线程管理功能,为何还需要额外的线程池和并行库?
操作类型 | 内存开销 | 时间开销 |
---|---|---|
线程上下文切换 | 约 2KB | 约 6 微秒 |
线程创建 | 1MB - 10MB | 300 微秒 - 3 毫秒 |
线程销毁 | 1MB - 10MB | 300 微秒 - 3 毫秒 |
首先,线程复用
是最直观的需求。对于大量的小任务来说,线程频繁创建核销毁的开销是不能接受的。因此不管是线程池还是并行库都支持线程复用,并行库底层其实是有一个类似线程池的概念去做复用这个事情的。也就是说使得线程资源是常驻的,不是被消费的,而任务才是可被消费的,一个线程能完成多个任务。
正如前面提到的线程饥饿的问题,当父任务占据所有线程资源,且父任务需要等待子任务完成才结束,而子任务因为没有线程资源了,就会导致卡死。因此线程资源是需要动态添加
的。但一般而言,就我的经验而言,似乎子任务再开一个线程池就可以了,线程池管理动态添加的线程资源还是比较费劲。
除了线程复用核线程资源动态管理之外,并行库往往还做了一些其他事情。
比如说在多生产者 - 多消费者模型中,任务队列频繁变化,线程池中的每个线程都需要访问公共的任务队列以完成任务分配(dispatch)。但由于每个线程运行在独立的 CPU 核心上,这意味着每个核心都可能持有任务队列的副本。(现代计算机系统中,数据可能同时存在于内存和各核心缓存中,这些同步开销很容易被忽视)当多个核心同时修改同一任务队列时,会引发缓存一致性问题——不同 CPU 核心需要反复同步同一缓存行,带来显著的性能开销。
为解决这一问题,并行库一般都是采用独立任务队列
策略:每个工作线程维护独立的任务队列,避免多线程竞争同一的全局的任务队列
资源。但这种方案也存在局限性:不同任务的执行开销各异,即便一开始是任务均分,但每个任务的工作量不一样。可能会出现部分线程对应的任务队列为空,而其他线程对应的任务队列还很满的情况。因此,当采用本地任务队列模式时,并行库还需引入工作窃取(work-stealing)
机制:当某个线程的任务队列为空时,主动从其他繁忙线程的任务队列中“窃取”任务,确保所有线程负载均衡。
一个设计优良的线程池或并行库通常需解决以下关键问题:
-
线程复用
:池化线程资源,避免频繁创建销毁开销。 -
动态添加线程
:根据任务负载动态调整线程数量,避免资源浪费或不足。 -
独立任务队列
:通过本地任务队列减少资源竞争,提升并发性能。 -
工作窃取
:实现任务的动态再分配,确保 CPU 资源高效利用。
需要明确的是,线程调度属于操作系统的职责范畴。在程序侧一般很少会考虑线程的具体管理。不管是线程池还是并行库,实际上都是在用户侧限制了直接使用线程资源,对操作系统也只是常规调用线程而已。可以认为多线程,核并行库更多专注于任务调度上。而该职责由并行库与开发者共同承担。需要处理好任务划分、任务依赖关系等等。
除此,并行库通常提供更高级的抽象能力,例如线程安全容器,以及parallel_for
、parallel_reduce
等高性能并行算法。
0.2 有多少项目使用并行库?多少项目使用线程池?
在企业项目中,有多少是用了自己实现的线程池?有多少是用了并行库的?
- 线程池方案
- workflow
- apollo
- pytorch
- nvidia-DALI
- 并行库方案
- tbb/ppl 搜不到有大型开源项目使用 tbb/ppl 的。搜到的开源项目大多以小项目在使用。而商业项目看不到代码,不清楚用了什么方式。
- openmp 数值计算、高性能计算库大量使用。
可以初步下一个饥饿论,即使很多技术强的公司,如百度、搜狗、脸书、英伟达写业务开发的时候其实也是用自己写的线程池,而且线程池一般实现也就两百行左右。只有一些特别高性能的计算库会使用并行库方式,在正确使用并行库的时候,往往会比线程池更快,能有 10%的直接提升。
1. tbb::Task
1.1 tbb::task_group
- 因此类似于线程池,去接收任务,在 tbb 则是通过一个类去管理
tbb::task_group
。 -
tbb::task_group
:它是一个用于组织和管理一组相关任务的容器。你可以把多个任务添加到一个tbb::task_group
中,然后使用wait()
方法等待所有任务完成。tbb::task_group
侧重于任务的分组和同步,和执行环境没有直接关联。
tbb::task_group tg;
tg.run([&] {
foo();
});
tg.run([&] {
bar();
});
1.2 tbb::task_arena
-
tbb::task_arena
:它代表了一个线程池或者说执行上下文。可以通过它来控制任务执行的资源,像指定线程数量、隔离任务等。tbb::task_arena
主要用于管理执行环境,能够限制在其范围内执行的任务所使用的线程数量。
void foo() {
std::cout << "Task is running in a task arena." << std::endl;
}
void foo() {
tbb::task_arena arena(4);
arena.execute([&]() {
tbb::task_group group;
group.run(foo);
group.wait();
});
}
简单来说,tbb::task_arena
关注的是执行资源的管理,而 tbb::task_group
关注的是任务的组织与同步。
2. tbb::Parallel
2.1 tbb::parallel_invoke
- 如果任务不需要动态添加,一开始就全部知道了,也可以用一个封装好的接口
tbb::parallel_invoke
tbb::parallel_invoke(
[&] {
foo();
},
[&] {
bar();
});
2.2 tbb::parallel_for
- 如果我有一个 for 循环任务,for 循环的每个操作都是可并行的,这个时候可以用
tbb::parallel_for
#include <iostream>
#include <tbb/parallel_for.h>
#include <vector>
#include <cmath>
int main() {
size_t n = 1<<26;
std::vector<float> a(n);
tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) {
a[i] = std::sin(i);
});
return 0;
}
- 如果说 for 循环像是对一个类似 vector 容器,每个 element 做一个操作。那如果是一个 vector
容器呢?是套用两层`tbb::parallel_for`吗?不用,tbb 对这种`corner case`有优化。
#include <iostream>
#include <tbb/parallel_for.h>
#include <tbb/blocked_range2d.h>
#include <vector>
#include <cmath>
int main() {
size_t n = 1 << 13;
std::vector<std::vector<float>> a(n, std::vector<float>(n));
tbb::parallel_for(tbb::blocked_range2d<size_t>(0, n, 0, n),
[&] (tbb::blocked_range2d<size_t> r) {
for (size_t i = r.rows().begin(); i < r.rows().end(); ++i) {
for (size_t j = r.cols().begin(); j < r.cols().end(); ++j) {
a[i][j] = std::sin(i) * std::sin(j);
}
}
});
return 0;
}
2.3 tbb::parallel_for_each
- 如果是处理的是可迭代对象,
tbb::parallel_for_each
会更好。tbb::parallel_for
的入参更多像是一个索引,而tbb::parallel_for_each
的入参是一个迭代对象。tbb 也许会从在内存亲和性对tbb::parallel_for
有一些特殊处理,提升这种corner case
的性能。
size_t n = 1<<26;
std::vector<float> a(n);
tbb::parallel_for_each(a.begin(), a.end(), [&] (float &f) {
f = 32.f;
});
2.4 tbb::parallel_reduce
- 并行归约。什么是归约?简单来就是可以将一个问题转换为另一个问题。然后基于此性质去递归转换,或者去重复转换,来达到对原问题的变形。这样子能够将一个大问题变为若干个小问题,小问题解决了,那么大问题就解决了。而大问题变为若干个小问题的做法,是形式化,是可复用的,因此熟练归约思维,就可以将大问题变为小问题,来解决。
那并行归约怎么理解?
- 常规思路:比如说从 1 到 n 的求和,就是一个同时有 n 个加法的大问题(一个长等式);
- 递归归约:这可以是一个,
1到n-1和的值
与一个n
的和的问题,这个时候如果已经知道了1到n-1
的结果值,我只需要一个加法就能解决了。因此原问题(很多加法,长等式),等价于一个(1到n-1和的值
)和一个n
的小问题。然后如此递归展开1到n-1和的值
的问题,就可以将原问题转变为若干个小问题,而这种递归展开是形式化,可复用的,和具体问题无关的,因此这种递归归约做法才有价值。公式解题,就是快嘛。 - 重复归约:但是从并行的角度看,递归这种有依赖的问题,是不利于并行的。而归约的本质是问题的转换。因此并行规约说的其实只是针对重复归约的并行化。还是以
1到n的和
的问题为例子,这个问题其实也等价于1和结果值的和
,2和结果值的和
, …,n和结果值的和
等等的 n 个子问题的合并,这个时候每个子问题都是可并行化求解的,只需要对结果值做一些原子保护就好了。然后从编程角度来看,结果值需要有一个初始值;还需要指定,子问题的解如何合并到大问题上去的方法。因此完整的并行归约如下:
// (float) 0是结果值的初始值
float res = tbb::parallel_reduce(tbb::blocked_range<size_t>(0, n), (float)0,
[&] (tbb::blocked_range<size_t> r, float local_res) {
for (size_t i = r.begin(); i < r.end(); i++) {
local_res += std::sin(i);
}
return local_res;
}, [] (float x, float y) {
return x + y;
});
// 第一个lambda是子问题求解方法
// 第二个lambda是子问题如何合并到但问题。
- 归约方式的交换律和结合律。上面说到了
1到n的和
的大问题,等价于1和结果值的和
,2和结果值的和
, …,n和结果值的和
的若干个子问题的合并。那合并会有什么问题吗?子问题是必然能够并行化的,但是合并方式能够满足交换律和结合律吗?即原始顺序:合并(1和结果值的和
,2和结果值的和
, …,n和结果值的和
)交换顺序:合并(n和结果值的和
,n-1和结果值的和
, …,1和结果值的和
)结合律:合并(合并(1和结果值的和
,2和结果值的和
), …,n和结果值的和
)
因此早期的时候是会区分这种的,tbb::parallel_deterministic_reduce
就是用原始顺序的。tbb::parallel_reduce
就不一定是用原始顺序的。现在在oneTBB
已经合并两个用法到tbb:parallel_reduce
了,内部会判断。
3. others
3.1 containers
就是线程安全的容器 todo:
3.2 partitioner
- 任务分配
tbb::xxx_partitioner
前面提过,线程,任务是两种概念了。如果有一个函数性能很紧急,算法已经很难优化了,需要对这个函数做精细化地调整和测试,那怎么做呢?比如说对tbb::parallel_for
要做一个提升,tbb
是不是应该暴露相关参数出来,让我控制它内部如何分配线程和任务嘛。那这个参数就是tbb::xxx_partitioner
。首先,要理解for(auto i=0 : i<32; i++){ x[i]...; }
这个是 32 次的循环,但是 parallel_for 的时候,能够做到类似循环展开的操作,从而变成 16 次的循环。
for(auto i=0 : i<32; i++++){ x[i]...; x[i+1]...; }
所以可以是 32 个任务,1 个任务处理 1 个元素;也可以是 16 个任务,1 个任务处理 2 个元素;也可以是 4 个任务,一个任务处理 8 个元素。然后开多少个线程也可以调整。这些设置都可以通过
tbb::xxx_partitioner
设置。
#include "mtprint.h"
#include <cmath>
#include <tbb/parallel_for.h>
#include <thread>
int main() {
size_t n = 32;
tbb::task_arena ta(4);
ta.execute([&] {
tbb::parallel_for(tbb::blocked_range<size_t>(0, n),
[&] (tbb::blocked_range<size_t> r) {
mtprint("thread", tbb::this_task_arena::current_thread_index(), "size", r.size());
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}, tbb::static_partitioner{});
});
return 0;
}
这个就是4个线程,每次任务处理8个元素,共4个任务。
4. tbb in practice
4.1 嵌套 parallel_for 的死锁
下面这个代码是有可能会死锁的。怎么理解这个死锁如何出现?首先,这是一个无实际数据竞争的代码,用锁只是演示死锁问题。然后,因为内层的 for 循环执行完了,就可能窃取到另一个外部 for 循环的任务,导致 mutex 被重复上锁。就是自己锁了一遍,然后自己又想锁一下。
#include <iostream>
#include <tbb/parallel_for.h>
#include <vector>
#include <cmath>
#include <mutex>
int main() {
size_t n = 1<<13;
std::vector<float> a(n * n);
std::recursive_mutex mtx;
tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) {
std::lock_guard lck(mtx);
tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t j) {
a[i * n + j] = std::sin(i) * std::sin(j);
});
});
return 0;
}
-
任务域
task_arena
上面的死锁问题的根源在于任务窃取。因此解决思路就是要控制任务窃取行为。做法如下- 方法一:
#include <tbb/parallel_for.h> #include <tbb/task_arena.h> #include <vector> #include <cmath> #include <mutex> int main() { size_t n = 1<<13; std::vector<float> a(n * n); std::mutex mtx; tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) { std::lock_guard lck(mtx); tbb::task_arena ta; ta.execute([&] { tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t j) { a[i * n + j] = std::sin(i) * std::sin(j); }); }); }); return 0; }
内层的
tbb::parallel_for
通过tbb::task_arena
达到了独立一个任务域的目的,不会跨任务域去窃取。从此还可以控制任务的线程投入。tb::task_arena ta(2);
这个就是表示当前任务域用两个线程。-
方法二:
#include <tbb/parallel_for.h> #include <tbb/task_arena.h> #include <vector> #include <cmath> #include <mutex> int main() { size_t n = 1<<13; std::vector<float> a(n * n); std::mutex mtx; tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) { std::lock_guard lck(mtx); tbb::this_task_arena::isolate([&] { tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t j) { a[i * n + j] = std::sin(i) * std::sin(j); }); }); }); return 0; }
类似语法糖,和前面是类似的。也是针对同一个任务域,但用 isolate 隔离,禁止其内部的工作被窃取
Enjoy Reading This Article?
Here are some more articles you might like to read next: