【Linux】线程池
目录
线程池
日志
线程池
在程序中,会预先创建一批线程,在内部会有一个叫任务队列task_queue的东西,未来如果有任务要处理,就把任务push到任务队列里,然后自动有线程去任务队列里拿任务并处理,如果没有任务,所有线程会自动休眠,一旦有任务到来,就唤醒某一个线程帮我们处理这个任务。这种提前把线程创建出来,当任务到来时,指定一个线程去帮我们处理任务,这种就叫做线程池。实际上,线程池的本质就是一个生产消费模型。线程池的基本结构如下:
template
class ThreadPool
{
ThreadPool(int thread_num = gdefaultnum):_pthread_num(thread_num),_isrunning(false)
{}
void Init()
{}
void Start()
{}
void Stop()
{}
void Equeue(const T& in)
{}
~ThreadPool()
{}
private:
int _pthread_num;
std::vector _threads;
std::queue _task_queue;
bool _isrunning;
};
其中,_pthread_num是要创建的线程数,_threads是对线程进行管理的数组,_task_queue是任务队列,使用模版的参数T。另外,我们还需要使用_isrunning用来控制线程池是否启动。对于ThreadPool的成员变量,我们暂时设置成这么多。后面需要了再加。
在构造函数中,我们唯一需要暴露出去的就是_pthread_num,用于让上层用户决定创建多少线程,我们也需要设置一个缺省值,然后_isrunning设置为false。
static const int gdefaultnum = 5;
ThreadPool(int thread_num = gdefaultnum)
: _thread_num(thread_num),_isrunning(false)
{}
接下来,我们的线程池初始化接口Init如下,使用test函数去初始化每个线程,也就是每个线程都去执行test函数,test暂时设置为void test():
void Init()
{
for (int i = 0; i < _thread_num; i++)
{
_threads.emplace_back(test);
}
}
void test()
{
while(true)
{
std::cout << "hello world" << std::endl;
sleep(1);
}
}
运行结果如下,可以看到除了一个主线程,还有创建的5个新线程:
接下来,我们就需要考虑向任务队列中push任务。由于所有线程都要访问_task_queue队列,所以它是共享资源,因此要加锁。一旦把一个任务推送进来,就要唤醒一个进程(如果有休眠的线程),因此,还要加一个条件变量_cond:
private:
int _thread_num;
std::vector _threads;
std::queue _task_queue;
bool _isrunning;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
入队列前需要加锁,出队列后需要解锁,因此,可以对加锁解锁做一下封装,并且在构造函数中对锁和条件变量做一下初始化,并在析构函数中销毁一下:
ThreadPool(int thread_num = gdefaultnum)
: _thread_num(thread_num),_isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool()
{
for (auto& thread : _threads)
{
if (thread.joinable())
thread.join();
}
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_thread_unlock(&_mutex);
}
因此,在入队列前后要进行加锁和解锁,使用LockQueue和UnLockQueue,在中间直接向任务队列中push任务。push完任务之后,只要有线程在休眠,就要先去唤醒一个线程(执行WakeUp函数,是对条件变量的封装),如果没有线程在休眠,就没必要唤醒线程了,所以,为了判断有没有线程在休眠,还要维护一个休眠线程数这样的一个计数器成员变量_sleep_thread_num,并在初始化列表中初始化为0,:
//唤醒一个线程
void WakeUp()
{
pthread_cond_signal(&_cond);
}
//任务入队列
void Equeue(const T& in)
{
LockQueue();
_task_queue.push(in);
if(_sleep_thread_num > 0)
WakeUp();
UnLockQueue();
}
那每个线程应该做什么呢?应该去处理我们push的任务,我们让每个线程都去执行HandlerTask函数,每个线程都不应该退出,所以HandlerTask里应该是一个死循环。每个进程都要从任务队列中取任务,但是不排除主线程正在往里面push任务,所以_task_queue就是一个临界资源,所以,每个线程都要竞争式地去申请锁LockQueue,把队列锁住,未来再UnQueueLock。所以,线程从任务队列中取任务时,在这里就保证了线程安全。如果任务队列为空(封装IsEmpty函数,判断任务队列是否为空),那就让线程去指定的条件变量下去休眠(封装Sleep函数,去_cond条件变量下去休眠,同时释放掉持有的锁);如果任务队列不为空||线程被唤醒,就从任务队列中拿出来一个任务t,并处理这个任务,我们约定,所有的任务都可以使用t()来执行,注意,此处不用/不能在临界区中处理,因为这个任务被pop出来后,只属于当前这个线程,而处理这个任务和临界资源的访问是两码事,如果放在加锁解锁之间处理,不仅浪费时间,而且所有线程处理任务都变成串行的了。
// 任务处理函数,每个线程执行此函数来处理任务
void HandlerTask()
{
while (true)
{
//取任务
LockQueue();
//任务队列为空,就去指定的条件变量下去休眠
while(IsEmpty()) //使用while,防止伪唤醒
{
Sleep(); //会把持有的锁释放掉
}
//有任务 || 被唤醒
T t = _task_queue.front();
_task_queue.pop();
UnLockQueue();
//处理任务
t(); // 处理任务,此处不能/不用在临界区处理
}
}
此时问题来了,我们刚才让每个线程执行的是test这样一个简单的函数,那如何让每个线程去执行HandlerTask(类内方法)呢?我们直接把HandlerTask作为参数在Init的时候传给每个线程吗?这样线程一启动就可以自动执行这个类内方法了,我们先来编译一下:
这个报错是为啥呢?因为HandlerTask参数中是有this指针的,此时我们并不能把HandlerTask定义为static,因为访问了类内的非静态成员变量和函数,为了解决这样的问题,在C++中,可以使用std::bind函数将HandlerTask的第一个参数绑定为this指针,这样做最大的好处,就是在面向对象的基础之上,让一个类去调用另一个类内部的方法(需要包functional头文件)。现在就可以编译通过了,我们在main函数中项线程池中push我们预先准备好的任务:
class Task
{
public:
Task()
{
}
Task(int x, int y):_x(x),_y(y)
{}
void Excute()
{
_result = _x + _y;
}
void operator()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
#include "ThreadPool.hpp"
#include "Task.hpp"
#include
int main()
{
std::unique_ptr> tp = std::make_unique>();
tp->Init();
tp->Start();
while(true)
{
//不断向线程池推送任务
sleep(1);
Task t(1, 1);
tp->Equeue(t);
}
return 0;
}
我们编译运行,发现没反应,实际上我们在HandlerTask中的Sleep之前,需要让_sleep_thread_num++,在Sleep之后,_sleep_thread_num--。
我们编译运行一下:
我们发现一个问题,我怎么知道这是哪个线程打印出来的呢?所以,我们还需要给HandlerTask传递一个参数--线程名,并使用std::bind使用一个占位符,用于传name。
运行结果如下:
但是,到现在还没有结束,现在只是把线程池初始化并启动, 并没有写怎么结束线程池。万一我只想让线程池处理10个任务,处理完就要Stop,Stop之后就应该看到线程数由6个变成1个,所以,我们应该如何让线程池中的所有线程退出呢?这就和我们之前的_isrunning这个成员变量有关系了,默认情况下_isrunning是false,在InitAndStart中我们需要将_isrunning设置为true。我们来想一下,某一个时间,线程池中的线程有的在处理任务,有的在休眠,有的在获取任务,反正每个线程所做的工作都可能是不一样的,但有一个基本条件就是,如果①任务队列里没有任务了&&②线程池的_isrunning是false状态,就应该让所有线程退出;否则,①只要任务队列中只要有任务,必须把任务处理完,线程池要退出,处理完再退出;②如果任务没有处理完,并且线程池没退出,那就处理完就休眠。所以我们发现还要对线程池做状态方面的判断。所以在休眠Sleep那个while的判断条件中,要判断任务队列中是否为空和_isrunning是不是false,如果任务队列为空&&线程池不退出,就去指定的条件变量下去休眠,否则就不应该休眠。当退出while循环后,只有当任务队列为空&&线程要退出,此时才释放锁并break,否则如果有任务还没被处理完,否则线程就不应该退!如果_isrunning是false,尽管还有任务没处理完,但是再也不会进while循环去休眠,直到把任务处理完后就退出!所以,线程池的Stop函数很简单,直接让_isrunning=false即可。
但是执行完Stop中的_isrunning=false时,有可能其他线程都在while循环中的_cond下休眠,那就无法唤醒这些线程,线程也就不会退出了。所以在Stop中,除了将_isrunning设置为false,还唤醒所有线程WakeUpAll,
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
另外,在_isrunning=false前后最好也加上加解锁,这样更安全一些,Stop函数如下:
现在还有一个小问题是,如果线程池已经退出,但是上层一直有一个“不长眼”的线程一直往线程池中push任务,导致线程池一直得处理任务而不能退出,这不就有问题了吗??因此,在任务入队列时,要先判断线程池要处于运行状态(_isrunning==true),才可以将任务入队列。
void Equeue(const T& in)
{
LockQueue();
//向线程池push任务时,需要先判断线程池是否在运行,在运行才可以push任务
if(_isrunning)
{
_task_queue.push(in);
if(_sleep_thread_num > 0)
WakeUp();
}
UnLockQueue();
}
所以,综合来看,当线程池Stop后,所有的线程都不会去休眠而去处理任务,把这个门关掉了;同时,通过Equeue这个函数中的if(_isrunning)判断,就不会让新的任务入队列,把这个门也关掉了。所以,我们把门关上,可以把存量的任务处理完,让线程去退出!
我们来看一下运行情况,使用 while :; do ps -aL; sleep 1; done 这个监控脚本来查看进程状态:
至此,进程池的代码完成,我们还可以将进程池改成单例模式,单例模式的线程池的代码附录在本文最后,同时代码Gitee链接敬上 线程池完整版代码C++。但是在上面的打印中,我们看起来不太明确,而且想在代码中加打印信息也要加std::cout这样的语句,看起来不直观,所以,我们之后可以使用日志来完成信息的打印查看!
日志
日志是软件运行的记录信息,可以向显示器或文件中打印,并且有特定的格式,我们希望的日志格式是:
[日志等级][pid][filename][filenumber][time] 日志内容(支持可变参数)
其中,日志等级按照分钟程度,分为DEBUG、INFO、WARNING、ERROR、FATAL。
创建一个Log.hpp文件,创建logmessage和Log两个类:
class logmessage
{
public:
std::string _level;
pid_t _id;
std::string _filename;
int _filenumber;
std::string _curr_time;
std::string _message_info;
};
在main函数中,使用日志时,需要创建Log对象,然后使用logMessage函数加载日志信息,设计一个LevelToString函数,
std::string LevelToString(int level)
{
switch(level)
{
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
case FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
再设计GetCurrentTime函数,在这个函数中,首先获取当前的时间戳,然后使用localtime函数将时间戳转为一个包含时间属性的结构体,
std::string GetCurrentTime()
{
time_t now = time(nullptr);
struct tm* curr_time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %2d:%2d:%2d",
1900+curr_time->tm_year,
1+curr_time->tm_mon,
curr_time->tm_mday,
curr_time->tm_hour,
curr_time->tm_min,
curr_time->tm_sec);
return buffer;
}
下一步就是要提取logMessage中的可变参数,为了提取可变参数,在C语言中提供了vsnprintf(需要包含stdarg.h),
这里的ap指向可变参数部分,通过vsprintf按照format格式得到可变部分,并放到log_info中。然后就需要把日志打印出来:
但是,在main函数使用打印日志时,真的要自己手动传入文件名以及行号吗?这未免太low了。其实,C语言中存在预处理符__FILE__和__LINE__,可以得到文件名和行号。
但是,这样写也有点low,能不能把这两个预处理符隐藏起来,另外,也不想使用log.logMessage这种方式了,为了解决这样的问题,我们可以使用宏替换,并使用宏支持可变参数。
#define LOG(Level, Format, ...) do{lg.logMessage(__FILE__, __LINE__, Level, Format, __VA_ARGS__);}while(0)//宏支持可变参数
#define EnableScreen() do{lg.Enable(SCREEN_TYPE);}while(0)
#define EnableFile() do{lg.Enable(FILE_TYPE);}while(0)
在使用LOG时,有可能不需要可变参数,因此,可以这样修改:
这表示,如果有就使用可变参数部分,如果没有,就把其之前的“,”去掉。
【附录】
-------------------------------------------------线程池代码完成版-------------------------------------------------------
目录结构
Makefile
ThreadPool:main.cc
g++ -o $@ $^ -std=c++14
.PHONY:clean
clean:
rm -rf ThreadPool
main.cc
#include "ThreadPool.hpp"
#include "Task.hpp"
#include
int main()
{
int cnt = 10;
while(cnt)
{
//不断向线程池推送任务
Task t(1, 1);
ThreadPool::GetInstance()->Equeue(t);
sleep(1);
std::cout << "cnt: " << cnt-- << std::endl;
}
ThreadPool::GetInstance()->Stop();
std::cout << "thread pool stop" << std::endl;
sleep(10); //此时应该看到线程由6个变成一个
return 0;
}
ThreadPool.hpp
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
static const int gdefaultnum = 5;
using func_t = std::function;
void test()
{
while(true)
{
std::cout << "hello world" << std::endl;
sleep(1);
}
}
template
class ThreadPool
{
private:
ThreadPool(int thread_num = gdefaultnum)
: _thread_num(thread_num),_isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
bool IsEmpty()
{
return _task_queue.empty();
}
// 任务处理函数,每个线程执行此函数来处理任务
void HandlerTask(const std::string& name)
{
while (true)
{
//取任务
LockQueue();
// 如果任务队列为空&&线程池不退出,就去指定的条件变量下去休眠
while(IsEmpty() && _isrunning) //使用while,防止伪唤醒
{
_sleep_thread_num++;
Sleep(); //会把持有的锁释放掉
_sleep_thread_num--;
}
//判定一种情况,任务队列为空&&线程池要退出了
if(IsEmpty() && !_isrunning)
{
std::cout << name << " quit" << std::endl;
UnLockQueue();
break;
}
//有任务 || 被唤醒
T t = _task_queue.front();
_task_queue.pop();
UnLockQueue();
//处理任务
t(); // 处理任务,此处不能/不用在临界区处理
std::cout << name << ":" << t.result() << std::endl;
}
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void WakeUp()
{
pthread_cond_signal(&_cond);
}
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void InitAndStart()
{
_isrunning = true;
for (int i = 0; i < _thread_num; i++)
{
std::string name = "thread-" + std::to_string(i+1);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
}
}
public:
static ThreadPool* GetInstance()
{
if(_tp == nullptr)
{
std::lock_guard lock(_sig_mutex);
if(_tp == nullptr)
{
std::cout << "create threadpool
";
_tp = new ThreadPool();
_tp->InitAndStart();
}
else
{
std::cout << "get threadpool
";
}
}
return _tp;
}
void Stop()
{
LockQueue();
_isrunning = false;
WakeUpAll();
UnLockQueue();
}
void Equeue(const T& in)
{
LockQueue();
//向线程池push任务时,需要先判断线程池是否在运行,在运行才可以push任务
if(_isrunning)
{
_task_queue.push(in);
if(_sleep_thread_num > 0)
WakeUp();
}
UnLockQueue();
}
~ThreadPool()
{
for (auto& thread : _threads)
{
if (thread.joinable())
thread.join();
}
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _thread_num;
std::vector _threads;
std::queue _task_queue;
bool _isrunning;
int _sleep_thread_num; //休眠线程数
pthread_mutex_t _mutex;
pthread_cond_t _cond;
//单例模式
static ThreadPool* _tp;
static std::mutex _sig_mutex;
};
template
ThreadPool* ThreadPool::_tp = nullptr;
template
std::mutex ThreadPool::_sig_mutex;
Task.hpp
#include
#include
class Task
{
public:
Task()
{
}
Task(int x, int y):_x(x),_y(y)
{}
void Excute()
{
_result = _x + _y;
}
void operator()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};