1.线程池
1.1 线程池是什么?
一种线程管理方式。
1.2 为什么用线程池?
线程的创建和销毁都需要消耗系统开销,当线程数量过多,系统开销过大,就会影响缓存局部性和整体性能。而线程池能够在充分利用内核资源的前提下,避免系统资源被过度调用。
1.3 如何设计线程池?
简单来说,在线程池中提前创建好多个线程,使用时从线程池中取出,使用完放回线程池。线程池中的线程调度由线程池中的管理者线程调度。
2.基于C++11的实现
Talk is cheap. Show me the code.
直接看程序,原理、函数在后面再介绍。
2.1 程序
程序主要分为四个文件,分别为:
Task.h //任务类
ThreadPool.h //线程池类
ThreadPool.cpp //线程池类实现
main.cpp //测试程序
2.1.2 任务类Task.h
#pragma once
using callback = void(*(void*;//函数指针,定义别名
class Task{
public:
callback func;//回调任务函数
void* arg; //函数参数
public:
Task( { //无参构造函数
this->func = nullptr;
this->arg = nullptr;
}
Task(callback func, void* arg {//含参构造函数
this->func = func;
this->arg = arg;
}
~Task( = default; //析构函数
Task(const Task &t = default; //拷贝构造函数
Task& operator=(const Task &t; //拷贝赋值操作符
Task(Task &&t = default; //移动构造函数,注意不能有const
Task& operator=(const Task &&t;//移动赋值操作符
};
2.1.2 线程池类ThreadPool.h
#pragma once
#include "Task.h"
#include <thread>
#include <queue>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
using namespace std;
class ThreadPool {
public:
ThreadPool(int minSize, int maxSize;//构造函数
void AddTask(Task task; //添加新任务
int GetBusyNum(; //获取当前工作中的线程数
int GetAliveNum(; //获取当前活着的线程数
int GetTaskQueueSize(; //获取当前任务队列长度
~ThreadPool(;
ThreadPool(const ThreadPool &t = default; //拷贝构造函数
ThreadPool& operator=(const ThreadPool &t; //拷贝赋值操作符
ThreadPool(ThreadPool &&t = default; //移动构造函数
ThreadPool& operator=(const ThreadPool &&t;//移动赋值操作符
private:
queue<Task> taskQueue; //任务队列
thread managerID;//管理者线程ID
vector<thread> threadIDs;//工作中的线程组ID
int minNum;//最小线程数量(如果线程池中线程的数目过少,处理器的一些核可能就无法充分利用,浪费)
int maxNum;//最大线程数量(如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。)
atomic_int busyNum;//工作中的线程数量(atomic_int保证其赋值,取值操作的原子性)
atomic_int liveNum;//活着的线程数量
atomic_int exitNum;//将要被销毁的线程数量
mutex mutexPool;//线程池的锁
condition_variable cond;//条件变量
bool shutDown;//是不是要销毁线程池, 销毁为true, 不销毁为false
static void worker(void* arg;//工作的线程任务函数
static void manager(void* arg;//管理者线程任务函数
static const int NUMBER = 2;//管理者线程每次增加/销毁的线程数
};
2.1.3 线程池类实现ThreadPool.cpp
#include "ThreadPool.h"
#include <unistd.h> //pthread_self
#include <iostream>
using namespace std;
ThreadPool::ThreadPool(int minSize, int maxSize {
do{
minNum = minSize;
maxNum = maxSize;
busyNum = 0;
liveNum = minSize;
exitNum = 0;
shutDown = false;
//初始化管理者线程和工作线程组
managerID = thread(manager, this;
threadIDs.resize(maxSize;
for(int i = 0; i < minSize; ++i {
threadIDs[i] = thread(worker, this;
}
return;
} while(0;//do{...}while(0结构提高代码健壮性
}
ThreadPool::~ThreadPool( {
shutDown = true;
if(managerID.joinable( {//阻塞在管理者线程,直到其执行完,再向下进行
managerID.join(;
}
cond.notify_all(;//唤醒所有等待的线程
for(int i = 0; i < maxNum; ++i {//依次执行工作者的线程
if(threadIDs[i].joinable( {
threadIDs[i].join(;
}
}
}
//添加新任务
void ThreadPool::AddTask(Task task {
unique_lock<mutex> poolLock(mutexPool;
if(shutDown {
return;
}
taskQueue.emplace(task;
cond.notify_all(;
}
int ThreadPool::GetBusyNum( {
return busyNum;
}
int ThreadPool::GetAliveNum( {
return liveNum;
}
int ThreadPool::GetTaskQueueSize( {
unique_lock<mutex> poolLock(mutexPool;
int queueSize = taskQueue.size(;
poolLock.unlock(;
return queueSize;
}
//工作者线程
void ThreadPool::worker(void* arg {
ThreadPool* pool = static_cast<ThreadPool*>(arg;
while(true {
unique_lock<mutex> poolLock(pool->mutexPool;
//若当前任务队列为空且线程池处于开启状态
while(pool->taskQueue.empty( && !pool->shutDown {
pool->cond.wait(poolLock;//阻塞工作线程
//若存在待销毁线程
if(pool->exitNum > 0 {
--pool->exitNum;
if(pool->liveNum > pool->minNum {//若活着的线程数大于最小线程数,则可以进行销毁
--pool->liveNum;
cout << "threadID: " << pthread_self( << " has exited." << endl;
return;
}
}
}
//判断线程池是否关闭了
if(pool->shutDown {
cout << "threadID: " << pthread_self( << " has exited." << endl;
return;
}
//从任务队列中取出一个任务
Task task = pool->taskQueue.front(;
pool->taskQueue.pop(;
++pool->busyNum;
//解锁
poolLock.unlock(;
//执行任务
cout << "threadID: " << pthread_self( << " start to work." << endl;
task.func(task.arg;
task.arg = nullptr;
//执行完后,工作线程数-1
cout << "threadID: " << pthread_self( << " stop working." << endl;
--pool->busyNum;
}
}
//管理者线程
void ThreadPool::manager(void* arg {
ThreadPool* pool = static_cast<ThreadPool*>(arg;
while(!pool->shutDown {
//每隔3秒检测一次
sleep(3;
//添加新线程
//若任务个数大于活着的线程数,且活着的线程数小于最大线程数
if(pool->GetTaskQueueSize( > pool->liveNum && pool->liveNum < pool->maxNum {
unique_lock<mutex> poolLock(pool->mutexPool;
poolLock.lock(;
int count = 0;
for(int i = 0; i < pool->maxNum && count < ThreadPool::NUMBER && pool->liveNum < pool->maxNum; ++i {
if(pool->threadIDs[i].get_id( == thread::id( {
cout << "Create a new thread." << endl;
pool->threadIDs[i] = thread(worker, pool;
++count;
++pool->liveNum;
}
}
poolLock.unlock(;
}
//销毁线程
//若忙的线程*2小于存活的线程数,且存活的线程数大于最小的线程数
if(pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum {
pool->exitNum = ThreadPool::NUMBER;
for(int i = 0; i < ThreadPool::NUMBER; ++i {//让工作的线程自杀
pool->cond.notify_all(;
}
}
}
}
2.2 测试方法:
将上述文件放在Linux下的一个文件夹(我这里是\Share\study_threadPool\myself)
进入该文件夹:
编译:
运行:
2.2 C++11相关函数
thread类
ThreadPool.cpp第17行:表示创建一个新线程,是该线程执行的函数,是该线程执行函数的参数。
ThreadPool.cpp第29行: 判断该线程是否可以join
ThreadPool.cpp第30行: 阻塞在该线程,直到其执行完
ThreadPool.cpp第123行:表示获取该线程的ID
mutex
ThreadPool.cpp第42行: 自动加锁与解锁
ThreadPool.cpp第61行:解锁
ThreadPool.cpp第120行:加锁
condition_variable
ThreadPool.cpp第32行:唤醒所有等待的线程
atomic
ThreadPool.h第34行:本质还是int,只是每次对其操作时,都能保证是原子操作
using
Task.h第2行:函数的别名
3.调试过程中出现的问题及解决方法
3.1 warning:#pragma once in main file
解决方案:g++编译时不要编译头文件
3.2 移动构造函数出错
解决方案:移动构造函数的参数不能加const
4.参考
线程池工作原理和实现 - 【C语言版 】C/C++,
基于C++11的线程池