第 13 篇讲了 jthread、semaphore、latch/barrier——都是同步工具。生产环境里更常见的问题是:任务来得比处理快,线程该复用还是现开?队列该无限长还是限流?队列满了怎么办?

这一篇用 demo ref/cpp_demo/concurrency/thread_pool/线程池架构、有界队列、背压策略,以及 future/promise 取异步结果;附带 HTTP REST Server/Client 实战。

这是「现代 C++ 实战」系列的第 14 篇。建议先读 第 13 篇:C++20 同步原语

一、为什么需要线程池?

每来一个任务就 std::thread(...).detach()

问题 后果
创建开销 线程栈分配、内核调度——毫秒级,短任务不划算
线程数爆炸 1 万并发 → 1 万线程,内存与上下文切换拖垮系统
难以复用 无法统一限流、监控、优雅关闭

线程池:固定 N 个工作线程 + 任务队列,提交方只 submit,worker 循环取任务执行——摊薄创建成本、控制并发上限

二、基本架构

1
2
3
4
submit() ──→ [ 任务队列 ] ──→ Worker 1
│ Worker 2
│ Worker 3
└─ condition_variable 唤醒

核心组件(demo 中 ThreadPool 类):

组件 作用
workers_ std::vector<std::thread>,长期运行
tasks_ std::queue<std::function<void()>>
queue_mutex_ 保护队列
condition_ 队列空时 worker 阻塞,有任务时 notify_one
stop_ 析构时置 true,唤醒 worker 退出

Worker 循环(简化):

1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
std::function<void()> task;
{
std::unique_lock lock(queue_mutex_);
condition_.wait(lock, [this] {
return stop_ || !tasks_.empty();
});
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task(); // 在锁外执行,避免阻塞其他 worker
}

在锁外执行任务——否则长任务会饿死队列操作和其他 worker。

三、有界队列:防止内存无限增长

无界队列:生产者快于消费者 → 队列里堆积成万上十万 std::functionOOM

demo 构造函数支持 max_queue_size

1
ThreadPool pool(4, 100);  // 4 线程,队列最多 100 个待处理任务

submit 时检查:

1
2
3
if (max_queue_size_ > 0 && tasks_.size() >= max_queue_size_) {
throw QueueFullException(tasks_.size(), max_queue_size_);
}
max_queue_size 行为
0 无限制(仅适合压测或任务量可控)
> 0 满则拒绝新任务(demo 抛异常)

生产环境常见变体:阻塞 submit(等队列有空位)、丢弃最旧任务返回错误码——取决于业务能否丢任务。

四、背压(Backpressure)

背压:下游处理不过来时,向上游施加压力,让生产者减速或失败,而不是无限缓冲。

1
2
3
4
客户端 ──快──→ [队列满] ──慢──→ 线程池 worker

拒绝 / 阻塞 / 503
(背压传回上游)
策略 适用
抛异常 / 返回 false API 层返回 503,客户端重试
阻塞 submit 同步调用方自然减速
丢弃 + 计数 可丢的 telemetry、采样
semaphore 限流 配合 第 13 篇 限制「在途任务」总数

demo 的 HTTP Server 在队列满时返回 503 Service Unavailable,即把背压暴露给 REST 客户端。

五、future / promise / packaged_task

调用 submit 往往要拿返回值异常——用标准库异步 trio:

1
2
3
4
5
6
7
8
9
10
11
12
13
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>>
{
using R = std::invoke_result_t<F, Args...>;
auto task = std::make_shared<std::packaged_task<R()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<R> fut = task->get_future();
tasks_.emplace([task]() { (*task)(); });
condition_.notify_one();
return fut;
}
类型 角色
packaged_task 包装可调用对象,执行结果写入关联的 future
future 调用方 get() 阻塞取结果(或异常)
promise 手动设值(更底层;线程池一般用 packaged_task)

用法:

1
2
auto fut = pool.submit(computeTask, 42, 200);
int result = fut.get(); // 阻塞直到任务完成

shared_ptr 包装 packaged_task:因为 std::function 要求可拷贝,而 packaged_task 只能移动。

六、HTTP REST:线程池的实际应用

demo 提供 ServerClient 两个可执行文件(server.cpp / client.cpp):

端点 行为
POST /tasks 提交耗时任务到线程池,返回 task_id
GET /tasks/{id} 查询任务状态(pending / running / completed)
GET /health 线程数、队列长度、是否已满

流程:HTTP 请求线程只做解析 + submit,重活在线程池 worker 里跑——避免「一请求一线程」把连接数撑爆。队列满时 REST 层返回 503,客户端可退避重试。

1
2
3
4
cd ref/cpp_demo/concurrency/thread_pool
./build.sh --run # 本地 submit 压测
./build.sh --run-server # 启动 REST 服务
./build.sh --run-client # 客户端提交任务

七、性能调优:线程数设多少?

场景 经验
CPU 密集 std::thread::hardware_concurrency()(物理/逻辑核数)
I/O 密集 可大于核数(线程阻塞等 I/O 时不占 CPU)
混合 压测:从小往大调,看吞吐与延迟拐点

demo 默认 num_threads == 0 时用 hardware_concurrency(),失败则回退 4。

别迷信公式:线程过多 → 上下文切换、缓存失效;过少 → CPU 闲置。用指标(队列长度、P99 延迟、CPU 利用率)迭代。

八、优雅关闭

析构顺序(demo):

  1. stop_ = true
  2. condition_.notify_all() 唤醒所有 worker
  3. worker 发现 stop_ && tasks_.empty() 则退出
  4. 主线程 join() 所有 worker

waitAll():等队列空且 active_tasks_ == 0——适合 main 里「提交完一批任务再退出」。

九、小结

概念 要点
线程池 固定 worker + 任务队列,复用线程
有界队列 限制待处理任务数,防 OOM
背压 队列满时拒绝/阻塞/503,压力回传上游
future submit 返回异步结果
REST 请求线程轻量,计算进池

现代 C++ 实战系列第 14 篇完。下一篇 现代设计模式——Strategy、Observer、Builder 的 C++17 写法。

系列导航

篇号 标题 状态
13 C++20 同步原语
14 线程池与背压控制(本篇)
15 现代设计模式 下一篇

完整大纲见工作区 docs/CPP_SERIES_OUTLINE.md