(四)多线程那些事儿:并行库 tbb

(四)多线程那些事儿:并行库 tbb

0. concepts

0.1 并行库带来的理论提升是什么?并行库和线程池实现的区别是什么?

线程管理究竟需要解决哪些问题?既然操作系统已经具备线程管理功能,为何还需要额外的线程池和并行库?

操作类型 内存开销 时间开销
线程上下文切换 约 2KB 约 6 微秒
线程创建 1MB - 10MB 300 微秒 - 3 毫秒
线程销毁 1MB - 10MB 300 微秒 - 3 毫秒

首先,线程复用是最直观的需求。对于大量的小任务来说,线程频繁创建核销毁的开销是不能接受的。因此不管是线程池还是并行库都支持线程复用,并行库底层其实是有一个类似线程池的概念去做复用这个事情的。也就是说使得线程资源是常驻的,不是被消费的,而任务才是可被消费的,一个线程能完成多个任务。

正如前面提到的线程饥饿的问题,当父任务占据所有线程资源,且父任务需要等待子任务完成才结束,而子任务因为没有线程资源了,就会导致卡死。因此线程资源是需要动态添加的。但一般而言,就我的经验而言,似乎子任务再开一个线程池就可以了,线程池管理动态添加的线程资源还是比较费劲。

除了线程复用核线程资源动态管理之外,并行库往往还做了一些其他事情。

比如说在多生产者 - 多消费者模型中,任务队列频繁变化,线程池中的每个线程都需要访问公共的任务队列以完成任务分配(dispatch)。但由于每个线程运行在独立的 CPU 核心上,这意味着每个核心都可能持有任务队列的副本。(现代计算机系统中,数据可能同时存在于内存和各核心缓存中,这些同步开销很容易被忽视)当多个核心同时修改同一任务队列时,会引发缓存一致性问题——不同 CPU 核心需要反复同步同一缓存行,带来显著的性能开销。

为解决这一问题,并行库一般都是采用独立任务队列策略:每个工作线程维护独立的任务队列,避免多线程竞争同一的全局的任务队列资源。但这种方案也存在局限性:不同任务的执行开销各异,即便一开始是任务均分,但每个任务的工作量不一样。可能会出现部分线程对应的任务队列为空,而其他线程对应的任务队列还很满的情况。因此,当采用本地任务队列模式时,并行库还需引入工作窃取(work-stealing)机制:当某个线程的任务队列为空时,主动从其他繁忙线程的任务队列中“窃取”任务,确保所有线程负载均衡。

一个设计优良的线程池或并行库通常需解决以下关键问题:

  1. 线程复用:池化线程资源,避免频繁创建销毁开销。
  2. 动态添加线程:根据任务负载动态调整线程数量,避免资源浪费或不足。
  3. 独立任务队列:通过本地任务队列减少资源竞争,提升并发性能。
  4. 工作窃取:实现任务的动态再分配,确保 CPU 资源高效利用。

需要明确的是,线程调度属于操作系统的职责范畴。在程序侧一般很少会考虑线程的具体管理。不管是线程池还是并行库,实际上都是在用户侧限制了直接使用线程资源,对操作系统也只是常规调用线程而已。可以认为多线程,核并行库更多专注于任务调度上。而该职责由并行库与开发者共同承担。需要处理好任务划分、任务依赖关系等等。

除此,并行库通常提供更高级的抽象能力,例如线程安全容器,以及parallel_forparallel_reduce等高性能并行算法。

0.2 有多少项目使用并行库?多少项目使用线程池?

在企业项目中,有多少是用了自己实现的线程池?有多少是用了并行库的?

  • 线程池方案
    • workflow
    • apollo
    • pytorch
    • nvidia-DALI
  • 并行库方案
    • tbb/ppl 搜不到有大型开源项目使用 tbb/ppl 的。搜到的开源项目大多以小项目在使用。而商业项目看不到代码,不清楚用了什么方式。
    • openmp 数值计算、高性能计算库大量使用。

可以初步下一个饥饿论,即使很多技术强的公司,如百度、搜狗、脸书、英伟达写业务开发的时候其实也是用自己写的线程池,而且线程池一般实现也就两百行左右。只有一些特别高性能的计算库会使用并行库方式,在正确使用并行库的时候,往往会比线程池更快,能有 10%的直接提升。

1. tbb::Task

1.1 tbb::task_group

  • 因此类似于线程池,去接收任务,在 tbb 则是通过一个类去管理tbb::task_group
  • tbb::task_group:它是一个用于组织和管理一组相关任务的容器。你可以把多个任务添加到一个 tbb::task_group 中,然后使用 wait() 方法等待所有任务完成。tbb::task_group 侧重于任务的分组和同步,和执行环境没有直接关联。
    tbb::task_group tg;
    tg.run([&] {
        foo();
    });
    tg.run([&] {
        bar();
    });

1.2 tbb::task_arena

  • tbb::task_arena:它代表了一个线程池或者说执行上下文。可以通过它来控制任务执行的资源,像指定线程数量、隔离任务等。tbb::task_arena 主要用于管理执行环境,能够限制在其范围内执行的任务所使用的线程数量。
void foo() {
    std::cout << "Task is running in a task arena." << std::endl;
}

void foo() {
    tbb::task_arena arena(4);

    arena.execute([&]() {
        tbb::task_group group;
        group.run(foo);
        group.wait();
    });
}

简单来说,tbb::task_arena 关注的是执行资源的管理,而 tbb::task_group 关注的是任务的组织与同步。

2. tbb::Parallel

2.1 tbb::parallel_invoke

  • 如果任务不需要动态添加,一开始就全部知道了,也可以用一个封装好的接口tbb::parallel_invoke
tbb::parallel_invoke(
    [&] {
        foo();
    },
    [&] {
        bar();
    });

2.2 tbb::parallel_for

  • 如果我有一个 for 循环任务,for 循环的每个操作都是可并行的,这个时候可以用tbb::parallel_for
#include <iostream>
#include <tbb/parallel_for.h>
#include <vector>
#include <cmath>

int main() {
    size_t n = 1<<26;
    std::vector<float> a(n);

    tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) {
        a[i] = std::sin(i);
    });

    return 0;
}

  • 如果说 for 循环像是对一个类似 vector 容器,每个 element 做一个操作。那如果是一个 vector容器呢?是套用两层`tbb::parallel_for`吗?不用,tbb 对这种`corner case`有优化。
#include <iostream>
#include <tbb/parallel_for.h>
#include <tbb/blocked_range2d.h>
#include <vector>
#include <cmath>

int main() {
    size_t n = 1 << 13;
    std::vector<std::vector<float>> a(n, std::vector<float>(n));

    tbb::parallel_for(tbb::blocked_range2d<size_t>(0, n, 0, n),
    [&] (tbb::blocked_range2d<size_t> r) {
        for (size_t i = r.rows().begin(); i < r.rows().end(); ++i) {
            for (size_t j = r.cols().begin(); j < r.cols().end(); ++j) {
                a[i][j] = std::sin(i) * std::sin(j);
            }
        }
    });

    return 0;
}

2.3 tbb::parallel_for_each

  • 如果是处理的是可迭代对象,tbb::parallel_for_each会更好。tbb::parallel_for的入参更多像是一个索引,而tbb::parallel_for_each的入参是一个迭代对象。tbb 也许会从在内存亲和性对tbb::parallel_for有一些特殊处理,提升这种corner case的性能。
size_t n = 1<<26;
    std::vector<float> a(n);

    tbb::parallel_for_each(a.begin(), a.end(), [&] (float &f) {
        f = 32.f;
    });

2.4 tbb::parallel_reduce

  • 并行归约。什么是归约?简单来就是可以将一个问题转换为另一个问题。然后基于此性质去递归转换,或者去重复转换,来达到对原问题的变形。这样子能够将一个大问题变为若干个小问题,小问题解决了,那么大问题就解决了。而大问题变为若干个小问题的做法,是形式化,是可复用的,因此熟练归约思维,就可以将大问题变为小问题,来解决。

那并行归约怎么理解?

  • 常规思路:比如说从 1 到 n 的求和,就是一个同时有 n 个加法的大问题(一个长等式);
  • 递归归约:这可以是一个,1到n-1和的值与一个n的和的问题,这个时候如果已经知道了1到n-1的结果值,我只需要一个加法就能解决了。因此原问题(很多加法,长等式),等价于一个(1到n-1和的值)和一个n的小问题。然后如此递归展开1到n-1和的值的问题,就可以将原问题转变为若干个小问题,而这种递归展开是形式化,可复用的,和具体问题无关的,因此这种递归归约做法才有价值。公式解题,就是快嘛。
  • 重复归约:但是从并行的角度看,递归这种有依赖的问题,是不利于并行的。而归约的本质是问题的转换。因此并行规约说的其实只是针对重复归约的并行化。还是以1到n的和的问题为例子,这个问题其实也等价于1和结果值的和, 2和结果值的和, …,n和结果值的和等等的 n 个子问题的合并,这个时候每个子问题都是可并行化求解的,只需要对结果值做一些原子保护就好了。然后从编程角度来看,结果值需要有一个初始值;还需要指定,子问题的解如何合并到大问题上去的方法。因此完整的并行归约如下:
    // (float) 0是结果值的初始值
    float res = tbb::parallel_reduce(tbb::blocked_range<size_t>(0, n), (float)0,
    [&] (tbb::blocked_range<size_t> r, float local_res) {
        for (size_t i = r.begin(); i < r.end(); i++) {
            local_res += std::sin(i);
        }
        return local_res;
    }, [] (float x, float y) {
        return x + y;
    });
    // 第一个lambda是子问题求解方法
    // 第二个lambda是子问题如何合并到但问题。
  • 归约方式的交换律和结合律。上面说到了1到n的和的大问题,等价于1和结果值的和, 2和结果值的和, …,n和结果值的和的若干个子问题的合并。那合并会有什么问题吗?子问题是必然能够并行化的,但是合并方式能够满足交换律和结合律吗?即原始顺序:合并(1和结果值的和, 2和结果值的和, …,n和结果值的和)交换顺序:合并(n和结果值的和, n-1和结果值的和, …,1和结果值的和)结合律:合并(合并(1和结果值的和, 2和结果值的和), …,n和结果值的和

因此早期的时候是会区分这种的,tbb::parallel_deterministic_reduce就是用原始顺序的。tbb::parallel_reduce就不一定是用原始顺序的。现在在oneTBB已经合并两个用法到tbb:parallel_reduce了,内部会判断。

3. others

3.1 containers

就是线程安全的容器 todo:

3.2 partitioner

  • 任务分配tbb::xxx_partitioner 前面提过,线程,任务是两种概念了。如果有一个函数性能很紧急,算法已经很难优化了,需要对这个函数做精细化地调整和测试,那怎么做呢?比如说对tbb::parallel_for要做一个提升,tbb是不是应该暴露相关参数出来,让我控制它内部如何分配线程和任务嘛。那这个参数就是tbb::xxx_partitioner。首先,要理解
    for(auto i=0 : i<32; i++){
      x[i]...;
    }
    

    这个是 32 次的循环,但是 parallel_for 的时候,能够做到类似循环展开的操作,从而变成 16 次的循环。

    for(auto i=0 : i<32; i++++){
      x[i]...;
      x[i+1]...;
    }
    

    所以可以是 32 个任务,1 个任务处理 1 个元素;也可以是 16 个任务,1 个任务处理 2 个元素;也可以是 4 个任务,一个任务处理 8 个元素。然后开多少个线程也可以调整。这些设置都可以通过tbb::xxx_partitioner设置。

#include "mtprint.h"
#include <cmath>
#include <tbb/parallel_for.h>
#include <thread>

int main() {
    size_t n = 32;

    tbb::task_arena ta(4);
    ta.execute([&] {
        tbb::parallel_for(tbb::blocked_range<size_t>(0, n),
        [&] (tbb::blocked_range<size_t> r) {
            mtprint("thread", tbb::this_task_arena::current_thread_index(), "size", r.size());
            std::this_thread::sleep_for(std::chrono::milliseconds(400));
        }, tbb::static_partitioner{});
    });

    return 0;
}
这个就是4个线程,每次任务处理8个元素,共4个任务。

4. tbb in practice

4.1 嵌套 parallel_for 的死锁

下面这个代码是有可能会死锁的。怎么理解这个死锁如何出现?首先,这是一个无实际数据竞争的代码,用锁只是演示死锁问题。然后,因为内层的 for 循环执行完了,就可能窃取到另一个外部 for 循环的任务,导致 mutex 被重复上锁。就是自己锁了一遍,然后自己又想锁一下。

#include <iostream>
#include <tbb/parallel_for.h>
#include <vector>
#include <cmath>
#include <mutex>

int main() {
    size_t n = 1<<13;
    std::vector<float> a(n * n);
    std::recursive_mutex mtx;

    tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) {
        std::lock_guard lck(mtx);
        tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t j) {
            a[i * n + j] = std::sin(i) * std::sin(j);
        });
    });

    return 0;
}
  • 任务域task_arena 上面的死锁问题的根源在于任务窃取。因此解决思路就是要控制任务窃取行为。做法如下

    • 方法一:
      #include <tbb/parallel_for.h>
      #include <tbb/task_arena.h>
      #include <vector>
      #include <cmath>
      #include <mutex>
    
      int main() {
          size_t n = 1<<13;
          std::vector<float> a(n * n);
          std::mutex mtx;
    
          tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) {
              std::lock_guard lck(mtx);
              tbb::task_arena ta;
              ta.execute([&] {
                  tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t j) {
                      a[i * n + j] = std::sin(i) * std::sin(j);
                  });
              });
          });
    
          return 0;
      }
    

    内层的tbb::parallel_for通过tbb::task_arena达到了独立一个任务域的目的,不会跨任务域去窃取。从此还可以控制任务的线程投入。tb::task_arena ta(2);这个就是表示当前任务域用两个线程。

    • 方法二:

      #include <tbb/parallel_for.h>
      #include <tbb/task_arena.h>
      #include <vector>
      #include <cmath>
      #include <mutex>
      
      int main() {
          size_t n = 1<<13;
          std::vector<float> a(n * n);
          std::mutex mtx;
      
          tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t i) {
              std::lock_guard lck(mtx);
              tbb::this_task_arena::isolate([&] {
                  tbb::parallel_for((size_t)0, (size_t)n, [&] (size_t j) {
                      a[i * n + j] = std::sin(i) * std::sin(j);
                  });
              });
          });
      
          return 0;
      }
      

      类似语法糖,和前面是类似的。也是针对同一个任务域,但用 isolate 隔离,禁止其内部的工作被窃取




    Enjoy Reading This Article?

    Here are some more articles you might like to read next:

  • (六)模板那些事儿:类型擦除
  • (五)多线程那些事儿:并行库 openmp
  • (五)模板那些事儿:模板元
  • (三)多线程那些事儿:怎么用好
  • (四)模板那些事儿:不定长参数