条件变量的生产者与消费者模型

#include <iostream>

#include <unistd.h>

#include <stdlib.h>

#include <pthread.h>


using namespace std;


const int PTHREAD_NUM = 10;

const int PRODUCT_MAX = 10;

const int PRODUCT_NUM = 10;


class Product{

private:

struct Data{

int data = 0;

int num = 0;

Data * next = NULL;

Data(const int data) : data(data){}

};


int m_max_product_num = 1;// 产品队列最大容纳的产品数

int m_product_num = 0; // 要生产的产品数`

int m_current_num = 0;// 当前产品队列中的剩余产品数

pthread_cond_t m_cond; // 产品数的条件变量

pthread_mutex_t m_mutex;// 产品队列的操作互斥锁

Data * m_head = NULL; // 产品队列链接头部指针

bool m_shutdown = false; // 生产是否结束


public:

Product(const int max_num = 1, const int product_num = 0) : m_max_product_num(max_num), m_product_num(product_num){

pthread_cond_init(&m_cond, NULL);

pthread_mutex_init(&m_mutex, NULL);

}


bool produce(){

Data * data_pointer = NULL;

pthread_mutex_lock(&m_mutex);

//如果当前产品数量已经达到极限,则阻塞等待消费者消费产品,等待过程中释放互斥锁

while(m_current_num >= m_max_product_num && !m_shutdown){

pthread_cond_wait(&m_cond, &m_mutex);

}


// 退出标志

if(m_shutdown){

pthread_mutex_unlock(&m_mutex);

pthread_cond_broadcast(&m_cond);

return false;

}


// 当前产品数未到极限,获得互斥锁并生产产品

data_pointer = new Data(rand() % 100);

data_pointer->next = m_head;

m_head = data_pointer;

++m_current_num;


//减少要生产的产品数,如果减到0,则设置生产结束标志

if(--m_product_num == 0)m_shutdown = true;

m_head->num = PRODUCT_NUM - m_product_num;

cout << "................................." << endl;

cout << "thread No: " << pthread_self() << endl;

cout << "produce the No " << m_head->num << " product, data: " << data_pointer->data << endl;

cout << "................................." << endl;


//释放互斥锁

pthread_mutex_unlock(&m_mutex);

//唤醒等待条件变量的所有消费者线程

pthread_cond_broadcast(&m_cond);


return true;

}


bool consume(){

Data * data_pointer = NULL;

pthread_mutex_lock(&m_mutex);

bool ret_flag = true;

//如果当前没有产品,则阻塞等待生产者生产产品,等待过程中释放互斥锁

while(m_current_num <= 0 && !m_shutdown){

pthread_cond_wait(&m_cond, &m_mutex);

}


ret_flag = !m_shutdown;


// 有产品可供消费,获得互斥锁并消费产品

if(m_current_num > 0){

data_pointer = m_head;

m_head = m_head->next;

cout << "................................." << endl;

cout << "thread No: " << pthread_self() << endl;

cout << "consume the No " << data_pointer->num << " product, data: " << data_pointer->data << endl;

cout << "................................." << endl;

delete data_pointer;

ret_flag = (--m_current_num <= 0 && m_shutdown ? false : true);

}


//释放互斥锁

pthread_mutex_unlock(&m_mutex);

//唤醒等待条件变量的所有生产者线程

pthread_cond_broadcast(&m_cond);


return ret_flag;

}



~Product(){

pthread_mutex_destroy(&m_mutex);

pthread_cond_destroy(&m_cond);

}


};


void * producer(void * arg){


Product * product_obj = static_cast<Product *>(arg);

while(product_obj->produce())usleep(1);

}


void * consumer(void * arg){


Product * product_obj = static_cast<Product *>(arg);

while(product_obj->consume())usleep(1);


}


int main(){


srand(time(NULL));

pthread_t pt_producer[PTHREAD_NUM];

pthread_t pt_consumer[PTHREAD_NUM];

Product * product_obj = new Product(PRODUCT_MAX, PRODUCT_NUM);


for(int i = 0; i < PTHREAD_NUM; ++i){

pthread_create(&pt_producer[i], NULL, producer, (void *)product_obj);

pthread_create(&pt_consumer[i], NULL, consumer, (void *)product_obj);

}

for(int i = 0; i < PTHREAD_NUM; ++i){

pthread_join(pt_producer[i], NULL);

pthread_join(pt_consumer[i], NULL);

}


delete product_obj;


return 0;


}


信号量的生产者与消费者模型

#include <iostream>

#include <unistd.h>

#include <stdlib.h>

#include <pthread.h>

#include <semaphore.h>


using namespace std;


const int PTHREAD_NUM = 10;

const int PRODUCT_MAX = 10;

const int PRODUCT_NUM = 100;


class Product{

private:

struct Data{

int data = 0;

int num = 0;

Data * next = NULL;

Data(const int data) : data(data){}

};


int m_product_num = 0; // 要生产的产品数`

int m_current_num = 0;// 当前产品队列中的剩余产品数

pthread_mutex_t m_mutex;// 产品队列的操作互斥锁

sem_t m_empty; // 能生产的产品数信号量

sem_t m_full; // 已经生产的产品数信号量

Data * m_head = NULL; // 产品队列链接头部指针

bool m_shutdown = false; // 生产是否结束


public:

Product(const int max_num = 1, const int product_num = 0) : m_product_num(product_num){

pthread_mutex_init(&m_mutex, NULL);

sem_init(&m_empty, 0, max_num);

sem_init(&m_full, 0, 0);

}


bool produce(){

Data * data_pointer = NULL;

// 对m_empty信号量加锁

sem_wait(&m_empty);

// 获得互斥锁

pthread_mutex_lock(&m_mutex);


// 退出标志

if(m_shutdown){

sem_post(&m_full);

pthread_mutex_unlock(&m_mutex);

return false;

}


// 当前产品数未到极限,生产产品

data_pointer = new Data(rand() % 100);

data_pointer->next = m_head;

m_head = data_pointer;

++m_current_num;


//减少要生产的产品数,如果减到0,则设置生产结束标志

if(--m_product_num == 0)m_shutdown = true;

m_head->num = PRODUCT_NUM - m_product_num;

cout << "................................." << endl;

cout << "thread No: " << pthread_self() << endl;

cout << "produce the No " << m_head->num << " product, data: " << data_pointer->data << endl;

cout << "................................." << endl;


// 对full信号量解锁,唤醒所有消费者

sem_post(&m_full);

//释放互斥锁

pthread_mutex_unlock(&m_mutex);


return true;

}


bool consume(){

Data * data_pointer = NULL;


// 对full信号量加锁

sem_wait(&m_full);

// 获得互斥锁

pthread_mutex_lock(&m_mutex);

// 返回标志,true为继续可以消费,false为消费结束,线程退出

bool ret_flag = !m_shutdown;


// 有产品可供消费

if(m_current_num > 0){

data_pointer = m_head;

m_head = m_head->next;

cout << "................................." << endl;

cout << "thread No: " << pthread_self() << endl;

cout << "consume the No " << data_pointer->num << " product, data: " << data_pointer->data << endl;

cout << "................................." << endl;

delete data_pointer;

ret_flag = (--m_current_num <= 0 && m_shutdown ? false : true);

}


//对empty信号量解锁,唤醒所有生产者 

sem_post(&m_empty);

//释放互斥锁

pthread_mutex_unlock(&m_mutex);


return ret_flag;

}



~Product(){

pthread_mutex_destroy(&m_mutex);

sem_destroy(&m_empty);

sem_destroy(&m_full);

}


};


void * producer(void * arg){


Product * product_obj = static_cast<Product *>(arg);

while(product_obj->produce())usleep(1);

}


void * consumer(void * arg){


Product * product_obj = static_cast<Product *>(arg);

while(product_obj->consume())usleep(1);


}


int main(){


srand(time(NULL));

pthread_t pt_producer[PTHREAD_NUM];

pthread_t pt_consumer[PTHREAD_NUM];

Product * product_obj = new Product(PRODUCT_MAX, PRODUCT_NUM);


for(int i = 0; i < PTHREAD_NUM; ++i){

pthread_create(&pt_producer[i], NULL, producer, (void *)product_obj);

pthread_create(&pt_consumer[i], NULL, consumer, (void *)product_obj);

}

for(int i = 0; i < PTHREAD_NUM; ++i){

pthread_join(pt_producer[i], NULL);

pthread_join(pt_consumer[i], NULL);

}


delete product_obj;


return 0;


}