条件变量的生产者与消费者模型
#include <iostream>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
using namespace std;
const int PTHREAD_NUM = 10;
const int PRODUCT_MAX = 10;
const int PRODUCT_NUM = 10;
class Product{
private:
struct Data{
int data = 0;
int num = 0;
Data * next = NULL;
Data(const int data) : data(data){}
};
int m_max_product_num = 1;// 产品队列最大容纳的产品数
int m_product_num = 0; // 要生产的产品数`
int m_current_num = 0;// 当前产品队列中的剩余产品数
pthread_cond_t m_cond; // 产品数的条件变量
pthread_mutex_t m_mutex;// 产品队列的操作互斥锁
Data * m_head = NULL; // 产品队列链接头部指针
bool m_shutdown = false; // 生产是否结束
public:
Product(const int max_num = 1, const int product_num = 0) : m_max_product_num(max_num), m_product_num(product_num){
pthread_cond_init(&m_cond, NULL);
pthread_mutex_init(&m_mutex, NULL);
}
bool produce(){
Data * data_pointer = NULL;
pthread_mutex_lock(&m_mutex);
//如果当前产品数量已经达到极限,则阻塞等待消费者消费产品,等待过程中释放互斥锁
while(m_current_num >= m_max_product_num && !m_shutdown){
pthread_cond_wait(&m_cond, &m_mutex);
}
// 退出标志
if(m_shutdown){
pthread_mutex_unlock(&m_mutex);
pthread_cond_broadcast(&m_cond);
return false;
}
// 当前产品数未到极限,获得互斥锁并生产产品
data_pointer = new Data(rand() % 100);
data_pointer->next = m_head;
m_head = data_pointer;
++m_current_num;
//减少要生产的产品数,如果减到0,则设置生产结束标志
if(--m_product_num == 0)m_shutdown = true;
m_head->num = PRODUCT_NUM - m_product_num;
cout << "................................." << endl;
cout << "thread No: " << pthread_self() << endl;
cout << "produce the No " << m_head->num << " product, data: " << data_pointer->data << endl;
cout << "................................." << endl;
//释放互斥锁
pthread_mutex_unlock(&m_mutex);
//唤醒等待条件变量的所有消费者线程
pthread_cond_broadcast(&m_cond);
return true;
}
bool consume(){
Data * data_pointer = NULL;
pthread_mutex_lock(&m_mutex);
bool ret_flag = true;
//如果当前没有产品,则阻塞等待生产者生产产品,等待过程中释放互斥锁
while(m_current_num <= 0 && !m_shutdown){
pthread_cond_wait(&m_cond, &m_mutex);
}
ret_flag = !m_shutdown;
// 有产品可供消费,获得互斥锁并消费产品
if(m_current_num > 0){
data_pointer = m_head;
m_head = m_head->next;
cout << "................................." << endl;
cout << "thread No: " << pthread_self() << endl;
cout << "consume the No " << data_pointer->num << " product, data: " << data_pointer->data << endl;
cout << "................................." << endl;
delete data_pointer;
ret_flag = (--m_current_num <= 0 && m_shutdown ? false : true);
}
//释放互斥锁
pthread_mutex_unlock(&m_mutex);
//唤醒等待条件变量的所有生产者线程
pthread_cond_broadcast(&m_cond);
return ret_flag;
}
~Product(){
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
};
void * producer(void * arg){
Product * product_obj = static_cast<Product *>(arg);
while(product_obj->produce())usleep(1);
}
void * consumer(void * arg){
Product * product_obj = static_cast<Product *>(arg);
while(product_obj->consume())usleep(1);
}
int main(){
srand(time(NULL));
pthread_t pt_producer[PTHREAD_NUM];
pthread_t pt_consumer[PTHREAD_NUM];
Product * product_obj = new Product(PRODUCT_MAX, PRODUCT_NUM);
for(int i = 0; i < PTHREAD_NUM; ++i){
pthread_create(&pt_producer[i], NULL, producer, (void *)product_obj);
pthread_create(&pt_consumer[i], NULL, consumer, (void *)product_obj);
}
for(int i = 0; i < PTHREAD_NUM; ++i){
pthread_join(pt_producer[i], NULL);
pthread_join(pt_consumer[i], NULL);
}
delete product_obj;
return 0;
}
信号量的生产者与消费者模型
#include <iostream>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
const int PTHREAD_NUM = 10;
const int PRODUCT_MAX = 10;
const int PRODUCT_NUM = 100;
class Product{
private:
struct Data{
int data = 0;
int num = 0;
Data * next = NULL;
Data(const int data) : data(data){}
};
int m_product_num = 0; // 要生产的产品数`
int m_current_num = 0;// 当前产品队列中的剩余产品数
pthread_mutex_t m_mutex;// 产品队列的操作互斥锁
sem_t m_empty; // 能生产的产品数信号量
sem_t m_full; // 已经生产的产品数信号量
Data * m_head = NULL; // 产品队列链接头部指针
bool m_shutdown = false; // 生产是否结束
public:
Product(const int max_num = 1, const int product_num = 0) : m_product_num(product_num){
pthread_mutex_init(&m_mutex, NULL);
sem_init(&m_empty, 0, max_num);
sem_init(&m_full, 0, 0);
}
bool produce(){
Data * data_pointer = NULL;
// 对m_empty信号量加锁
sem_wait(&m_empty);
// 获得互斥锁
pthread_mutex_lock(&m_mutex);
// 退出标志
if(m_shutdown){
sem_post(&m_full);
pthread_mutex_unlock(&m_mutex);
return false;
}
// 当前产品数未到极限,生产产品
data_pointer = new Data(rand() % 100);
data_pointer->next = m_head;
m_head = data_pointer;
++m_current_num;
//减少要生产的产品数,如果减到0,则设置生产结束标志
if(--m_product_num == 0)m_shutdown = true;
m_head->num = PRODUCT_NUM - m_product_num;
cout << "................................." << endl;
cout << "thread No: " << pthread_self() << endl;
cout << "produce the No " << m_head->num << " product, data: " << data_pointer->data << endl;
cout << "................................." << endl;
// 对full信号量解锁,唤醒所有消费者
sem_post(&m_full);
//释放互斥锁
pthread_mutex_unlock(&m_mutex);
return true;
}
bool consume(){
Data * data_pointer = NULL;
// 对full信号量加锁
sem_wait(&m_full);
// 获得互斥锁
pthread_mutex_lock(&m_mutex);
// 返回标志,true为继续可以消费,false为消费结束,线程退出
bool ret_flag = !m_shutdown;
// 有产品可供消费
if(m_current_num > 0){
data_pointer = m_head;
m_head = m_head->next;
cout << "................................." << endl;
cout << "thread No: " << pthread_self() << endl;
cout << "consume the No " << data_pointer->num << " product, data: " << data_pointer->data << endl;
cout << "................................." << endl;
delete data_pointer;
ret_flag = (--m_current_num <= 0 && m_shutdown ? false : true);
}
//对empty信号量解锁,唤醒所有生产者
sem_post(&m_empty);
//释放互斥锁
pthread_mutex_unlock(&m_mutex);
return ret_flag;
}
~Product(){
pthread_mutex_destroy(&m_mutex);
sem_destroy(&m_empty);
sem_destroy(&m_full);
}
};
void * producer(void * arg){
Product * product_obj = static_cast<Product *>(arg);
while(product_obj->produce())usleep(1);
}
void * consumer(void * arg){
Product * product_obj = static_cast<Product *>(arg);
while(product_obj->consume())usleep(1);
}
int main(){
srand(time(NULL));
pthread_t pt_producer[PTHREAD_NUM];
pthread_t pt_consumer[PTHREAD_NUM];
Product * product_obj = new Product(PRODUCT_MAX, PRODUCT_NUM);
for(int i = 0; i < PTHREAD_NUM; ++i){
pthread_create(&pt_producer[i], NULL, producer, (void *)product_obj);
pthread_create(&pt_consumer[i], NULL, consumer, (void *)product_obj);
}
for(int i = 0; i < PTHREAD_NUM; ++i){
pthread_join(pt_producer[i], NULL);
pthread_join(pt_consumer[i], NULL);
}
delete product_obj;
return 0;
}
Comments