pthread_pool.h
#ifndef __PTHREADPOOL__
#define __PTHREADPOOL__
#include <pthread.h>
#include <semaphore.h>
class PthreadPool{
public:
explicit PthreadPool(const int min_num = 4, const int max_num = 10, const int task_size = 10);
~PthreadPool();
bool create_pool();//创建线程池
bool add_task(void *(*func)(void *), void * arg);// 添加任务
void pool_destroy();
private:
static void * manage_thread(void *);// 线程池里面的管理线程
static void * work_thread(void *); // 线程池里面的工作线程
void create_thread(pthread_t & thread_id, void *(*thread)(void *));// 创建线程
bool judge_thread(const pthread_t & thread_id); // 判断线程是否存在
void wait_mutex(pthread_mutex_t & mutex); // 阻塞等待互斥锁
void post_mutex(pthread_mutex_t & mutex); // 释放互斥锁
private:
// 记录任务信息和任务执行的回调函数,便于线程回调执行任务
struct Record{
void *(*func)(void *) = nullptr; // 回调函数
void * arg = nullptr; // 回调函数参数
void clear(){
func = nullptr;
arg = nullptr;
}
};
class Queue{
private:
int m_que_max_size; // 任务队列可容纳长度
int m_que_front; // 任务队列头指针
int m_que_rear; // 任务队列尾指针
int m_que_size; // 任务队列长度
sem_t m_sem_full; // 任务队列为满的信号量
sem_t m_sem_empty; // 任务队列为空的信号量
Record * m_tasks; // 任务队列
pthread_mutex_t m_mutex;
bool wait_sem(sem_t & sem);
bool post_sem(sem_t & sem);
public:
explicit Queue(const int max_size = 10);
~Queue();
bool add_task(const Record & task); // 任务入队
bool get_task(Record & task); // 出队, 获取任务
bool shutdown_wake(); // 唤醒等待任务的线程并让其自毁
int get_maxSize() const { return m_que_max_size; }
int get_curSize() const { return m_que_size; }
};
private:
enum{MAX_DEFAULT_EXIT_NUM = 4};
pthread_mutex_t m_pool_mutex; // 操作整个线程池的互斥锁
pthread_mutex_t m_busy_mutex; // 忙状态线程个数操作的锁
pthread_t m_manage_id; // 管理线程ID
pthread_t * m_threads = nullptr; // 线程组
int m_busy_num = 0; // 忙状态的线程数量
int m_current_num = 0; // 当前线程池的线程总数
int m_max_num; // 线程数量的最大容量
int m_min_num; // 最少的线程数
int m_exit_num = 0; // 线程管理时要销毁的线程
Queue * m_queue = nullptr;
bool m_shutdown = false;
bool m_create_flag = false; // 是否创建了线程池
};
#endif
pthread_pool.cpp
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include "pthread_pool.h"
PthreadPool::PthreadPool(const int min_num, const int max_num, const int task_size)
: m_min_num(min_num), m_max_num(max_num), m_queue(new Queue(task_size)){}
PthreadPool::~PthreadPool(){
if(m_create_flag){
pool_destroy();
}else{
delete m_queue;
}
}
void PthreadPool::pool_destroy(){
wait_mutex(m_pool_mutex);
m_shutdown = true;
int tmp_current_num = m_current_num;
post_mutex(m_pool_mutex);
// 唤醒并关闭当前所有线程
for(int i = 0; i < tmp_current_num; ++i){
m_queue->shutdown_wake();
}
bool flag = true;
while(flag){
wait_mutex(m_pool_mutex);
flag = m_current_num > 0;
post_mutex(m_pool_mutex);
}
pthread_mutex_destroy(&m_pool_mutex);
pthread_mutex_destroy(&m_busy_mutex);
delete [] m_threads;
delete m_queue;
}
// 创建线程池
bool PthreadPool::create_pool(){
m_threads = new pthread_t [m_max_num];
// 初始化互斥锁
if(pthread_mutex_init(&m_pool_mutex, NULL) || pthread_mutex_init(&m_busy_mutex, NULL)){
return false;
}
// 往线程池里创建多个线程
for(int i = 0; i < m_min_num; ++i){
create_thread(m_threads[i], work_thread);
}
// 创建管理者线程
create_thread(m_manage_id, manage_thread);
m_create_flag = true;
return true;
}
void PthreadPool::create_thread(pthread_t & thread_id, void *(*thread)(void *)){
if(pthread_create(&thread_id, NULL, thread, (void *)this) != 0){
perror("pthread_create error");
exit(1);
}
// 线程分离,销毁时自动回收
if(pthread_detach(thread_id) != 0){
perror("pthread_detach error");
exit(1);
}
//printf("create thread\n");
wait_mutex(m_pool_mutex);
if(thread_id != m_manage_id){
++m_current_num;
}
post_mutex(m_pool_mutex);
}
// 判断线程是否存在
bool PthreadPool::judge_thread(const pthread_t & thread_id){
return thread_id != 0;
}
// 添加任务
bool PthreadPool::add_task(void *(*func)(void *), void * arg){
Record task;
task.func = func;
task.arg = arg;
bool flag = true;
// 获取操作线程池的互斥锁
wait_mutex(m_pool_mutex);
if(m_queue->get_maxSize() <= m_queue->get_curSize()){
post_mutex(m_pool_mutex);
return false;
}
flag = m_queue->add_task(task);
// 释放操作线程池的互斥锁
post_mutex(m_pool_mutex);
return flag;
}
// 线程池管理线程
void * PthreadPool::manage_thread(void * arg){
PthreadPool * pool = static_cast<PthreadPool *>(arg);
bool shutdown_flag = false;
int create_num = 0;
while(!shutdown_flag){
// 任务过多且空余线程较少时,增加线程个数
pool->wait_mutex(pool->m_pool_mutex);
shutdown_flag = pool->m_shutdown;
pool->post_mutex(pool->m_pool_mutex);
if(shutdown_flag)break;
// 任务队列的任务数大于最少线程数且当前的线程数少于线程数量的最大容量时
pool->wait_mutex(pool->m_pool_mutex);
if(pool->m_queue->get_curSize() > pool->m_min_num && pool->m_current_num < pool->m_max_num){
create_num = MAX_DEFAULT_EXIT_NUM;
}else{
create_num = 0;
}
pool->post_mutex(pool->m_pool_mutex);
while(create_num--){
for(int i = 0; i < pool->m_max_num; ++i){
if(pool->judge_thread(pool->m_threads[i]))continue;
pool->create_thread(pool->m_threads[i], PthreadPool::work_thread);
break;
}
pool->wait_mutex(pool->m_pool_mutex);
create_num = (pool->m_queue->get_curSize() > pool->m_min_num
&& pool->m_current_num < pool->m_max_num ? create_num : 0);
pool->post_mutex(pool->m_pool_mutex);
}
// 释放CPU资源
sleep(1);
// 任务较少且空余线程过多时,销毁一些线程
pool->wait_mutex(pool->m_pool_mutex);
// 只有一半线程在忙碌且当前线程数大于最少的线程数时
int exit_num = pool->MAX_DEFAULT_EXIT_NUM;
if(pool->m_current_num > pool->m_min_num && pool->m_busy_num*2 < pool->m_current_num){
pool->m_exit_num = pool->MAX_DEFAULT_EXIT_NUM;
}else{
exit_num = 0;
}
pool->post_mutex(pool->m_pool_mutex);
while(exit_num--){
pool->wait_mutex(pool->m_pool_mutex);
if(!pool->m_queue->shutdown_wake()){
perror("shutdown_wake error");
exit(1);
}
pool->post_mutex(pool->m_pool_mutex);
}
}
//printf("shutdown manage thread\n");
return NULL;
}
// 线程池工作线程
void * PthreadPool::work_thread(void * arg){
PthreadPool * pool = static_cast<PthreadPool *>(arg);
Record task;
bool shutdown_flag = false;
while(!shutdown_flag){
task.clear();
// 阻塞取任务
if(!pool->m_queue->get_task(task)){
// 系统异常
perror("get task error");
exit(1);
}
pool->wait_mutex(pool->m_pool_mutex);
// 需要销毁一些空闲线程的情况
if(pool->m_exit_num > 0 && pool->m_current_num > pool->m_min_num){
shutdown_flag = true;
--pool->m_exit_num;
}else if(pool->m_current_num <= pool->m_min_num){
pool->m_exit_num = 0;
}
pool->post_mutex(pool->m_pool_mutex);
//关闭标志被设置, 退出循环
if(shutdown_flag){
break;
}
// 获取是否要销毁的状态标志
pool->wait_mutex(pool->m_pool_mutex);
shutdown_flag = pool->m_shutdown;
pool->post_mutex(pool->m_pool_mutex);
//关闭标志被设置, 退出循环
if(shutdown_flag){
break;
}
// 增加忙线程个数
pool->wait_mutex(pool->m_busy_mutex);
++pool->m_busy_num;
pool->post_mutex(pool->m_busy_mutex);
// 执行回调函数
if(task.func != nullptr){
(*task.func)(task.arg);
}
// 减少忙线程个数
pool->wait_mutex(pool->m_busy_mutex);
--pool->m_busy_num;
pool->post_mutex(pool->m_busy_mutex);
}
pool->wait_mutex(pool->m_pool_mutex);
--pool->m_current_num;
pthread_t cur_thread = pthread_self();
for(int i = 0; i < pool->m_max_num; ++i){
if(pool->m_threads[i] == cur_thread){
pool->m_threads[i] = 0;
break;
}
}
pool->post_mutex(pool->m_pool_mutex);
//printf("exit work thread\n");
return NULL;
}
// 等待互斥锁
void PthreadPool::wait_mutex(pthread_mutex_t & mutex){
//printf("wait\n");
if(pthread_mutex_lock(&mutex) != 0){
perror("wait mutex error");
exit(1);
}
}
// 释放互斥锁
void PthreadPool::post_mutex(pthread_mutex_t & mutex){
//printf("post\n");
if(pthread_mutex_unlock(&mutex) != 0){
perror("wait mutex error");
exit(1);
}
}
PthreadPool::Queue::Queue(const int max_size) : m_que_max_size(max_size)
{
m_tasks = new Record [max_size];
sem_init(&m_sem_full, 0, 0);
sem_init(&m_sem_empty, 0, max_size);
pthread_mutex_init(&m_mutex, NULL);
}
PthreadPool::Queue::~Queue(){
sem_destroy(&m_sem_full);
sem_destroy(&m_sem_empty);
pthread_mutex_destroy(&m_mutex);
delete m_tasks;
}
// 请求信号量
bool PthreadPool::Queue::wait_sem(sem_t & sem){
struct timespec abs_timeout;
time_t cur = time(NULL);
abs_timeout.tv_sec = cur + 60;
if(sem_timedwait(&sem, &abs_timeout) == -1 && errno == ETIMEDOUT){
return false;
}
return pthread_mutex_lock(&m_mutex) == 0;
}
// 释放信号量
bool PthreadPool::Queue::post_sem(sem_t & sem){
return sem_post(&sem) == 0 && pthread_mutex_unlock(&m_mutex) == 0;
}
// 任务入队
bool PthreadPool::Queue::add_task(const Record & task){
if(!wait_sem(m_sem_empty)){
return false;
}
m_tasks[m_que_rear].func = task.func;
m_tasks[m_que_rear].arg = task.arg;
m_que_rear = (m_que_rear + 1) % m_que_max_size;
++m_que_size;
return post_sem(m_sem_full);
}
// 出队,获取任务
bool PthreadPool::Queue::get_task(Record & task){
if(!wait_sem(m_sem_full)){
return false;
}
if(m_que_size > 0){
task.func = m_tasks[m_que_front].func;
task.arg = m_tasks[m_que_front].arg;
m_tasks[m_que_front].func = nullptr;
m_tasks[m_que_front].arg = nullptr;
m_que_front = (m_que_front + 1) % m_que_max_size;
--m_que_size;
}
return post_sem(m_sem_empty);
}
bool PthreadPool::Queue::shutdown_wake(){
if(!wait_sem(m_sem_empty)){
return false;
}
return post_sem(m_sem_full);
}
测试程序 test.cpp
#include <iostream>
#include <string>
#include <ctime>
#include <unistd.h>
#include "pthread_pool.h"
using namespace std;
const int size = 1000;
void * func(void * arg){
int num = 10;
while(num--){
cout << "thread id: " << pthread_self() << endl;
cout << "Data: " << *static_cast<int *>(arg) << endl;
usleep(0);
}
printf("exit\n");
}
int main(){
PthreadPool * pool = new PthreadPool(10, 30, size*2);
if(pool->create_pool()){
cout << "create successfully!" << endl;
}
srand(time(0));
int num[size];
string str;
while(true){
cout << "Order: ";
cin >> str;
cout << "模拟大量客户端连接" << endl;
for(int i = 0; i < size; ++i){
num[i] = rand() % 10;
pool->add_task(func, (void *)&num[i]);
}
usleep(10);
for(int i = 0; i < size; ++i){
num[i] = rand() % 10000;
pool->add_task(func, (void *)&num[i]);
}
}
delete pool;
cout << "delete successfully!" << endl;
return 0;
}
Comments