1.互斥量mutex
进程间也可以使用互斥锁来达到目的。但应该在pthread_mutex_init初始化之前,修改其属性为进程间共享。
主要应用函数
pthread_mutexattr_init函数
初始化一个mutex属性对象
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
pthread_mutexattr_destroy函数
销毁mutex属性对象
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
pthread_mutexattr_setpshared函数
修改mutex属性
int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr,
int pshared);
参数2:pshared取值:
线程锁:PTHREAD_PORCESS_PRIVATE(mutex的默认属性即为线程锁,进程间私有)
进程锁:PTHREAD_PROCESS_SHARED
DEMO 进程间生产者与消费者模型(共享mmap)
#include <iostream>
#include <sys/wait.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
const int SIZE = 1000;
const int MAX_PRODUCE_NUM = 10;
const int NEED_PRODUCE_NUM = 100;
pthread_mutex_t mutex;
pthread_mutexattr_t mutexattr;
sem_t sem_empty;
sem_t sem_full;
struct Data{
int num[SIZE] = {0};
int current_num = 0;
bool shutdown = false;
};
void get_lock(sem_t & sem){
sem_wait(&sem);
pthread_mutex_lock(&mutex);
}
void release_lock(sem_t & sem){
sem_post(&sem);
pthread_mutex_unlock(&mutex);
sleep(1);
}
void produce(int & need_produce_num, Data * data){
Record * pointer = NULL;
while(need_produce_num > 0){
get_lock(sem_empty);
if(need_produce_num <= 0){
data->shutdown = true;
release_lock(sem_full);
return;
}
--need_produce_num;
pointer = new Record(NEED_PRODUCE_NUM - need_produce_num);
pointer->next = data->head;
data->head = pointer;
cout << "........................................." << endl;
cout << "process id: " << getpid() << endl;
cout << "produce the No " << pointer->num << " product" << endl;
cout << "........................................." << endl;
release_lock(sem_full);
}
}
void consume(int & need_produce_num, Data * data){
int num = 0;
while(true){
get_lock(sem_full);
if(data->shutdown && data->current_num <= 0){
release_lock(sem_empty);
return;
}
for(int i = 0; i < NEED_PRODUCE_NUM; ++i){
if(data->num[i] != 0){
num = data->num[i];
data->num[i] = 0;
--data->current_num;
}
}
cout << "........................................." << endl;
cout << "process id: " << getpid() << endl;
cout << "consume the No " << pointer->num << " product" << endl;
cout << "........................................." << endl;
release_lock(sem_empty);
}
}
int main(){
int len = sizeof(Record);
int need_produce_num = NEED_PRODUCE_NUM;
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&mutex, &mutexattr);
sem_init(&sem_full, 0, MAX_PRODUCE_NUM);
sem_init(&sem_empty, MAX_PRODUCE_NUM, MAX_PRODUCE_NUM);
Data * data = (Data *)mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
pid_t pid = fork();
if(pid == -1){
perror("fork error");
exit(1);
}else if(pid == 0){
//子进程负责生产
usleep(1);
produce(need_produce_num, data);
}else{
// 父进程负责消费
consume(need_produce_num, data);
waitpid(-1, NULL, 0);
munmap(data, len);
sem_destroy(&sem_full);
sem_destroy(&sem_empty);
pthread_mutex_destroy(&mutex);
pthread_mutexattr_destroy(&mutexattr);
}
return 0;
}
2.文件锁
借助fcntl函数来实现锁机制。操作文件的进程没有获得锁时,可以打开,但无法执行read,write操作
fcntl函数:获取、设置文件访问控制属性
int fcntl(int fd, int cmd, …/*arg*/);
参数2:
F_SETLK 设置文件锁(trylock)
F_SETLKW 设置文件锁(lock) --- wait
F_GETLK获取文件锁
参数3:
struct flock {
...
short l_type; //锁的类型:F_RDLCK, F_WRLCK, F_UNLCK
short l_whence; // 偏移位置: SEEK_SET, SEEK_CUR, SEEK_END
off_t l_start; // 起始偏移
off_t l_len; // 长度:0表示整个文件加锁
pid_t l_pid; //持有该锁的进程ID:(F_GETLK only)
};
DEMO 通过文件锁实现进程间同步
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <ctime>
using namespace std;
void process_read(const int fd){
struct flock f_lock;
f_lock.l_type = F_RDLCK;
f_lock.l_whence = SEEK_SET;
f_lock.l_start = 0;
f_lock.l_len = 0;
char data[100];
int flag = 10;
while(flag){
fcntl(fd, F_SETLKW, &f_lock);
lseek(fd, SEEK_SET, 0);
if(read(fd, &data, sizeof(data)) == -1){
perror("read error");
exit(1);
}
cout << "reader's pid: " << getpid() << endl;
cout << "data: " << data << endl;
--flag;
f_lock.l_type = F_UNLCK;
fcntl(fd, F_SETLKW, &f_lock);
usleep(0);
}
}
void process_write(const int fd){
struct flock f_lock;
f_lock.l_type = F_WRLCK;
f_lock.l_whence = SEEK_SET;
f_lock.l_start = 0;
f_lock.l_len = 0;
int num = 0;
string data;
int flag = 10;
while(flag){
fcntl(fd, F_SETLKW, &f_lock);
lseek(fd, SEEK_SET, 0);
data = "hello, world!, num: " + to_string(++num);
cout << "write size: " << write(fd, data.data(), data.length()) << endl;
cout << "writer's pid: " << getpid() << endl;
cout << "data: " << data << endl;
--flag;
f_lock.l_type = F_UNLCK;
fcntl(fd, F_SETLKW, &f_lock);
usleep(0);
}
}
int main(){
int fd = open("tmp", O_RDWR, 0664);
if(fd == -1){
perror("open error");
exit(1);
}
if(ftruncate(fd, 100) == -1){
perror("ftruncate error");
exit(1);
}
pid_t pid = fork();
if(pid == -1){
perror("fork error");
exit(1);
}else if(pid == 0){
// 子进程负责写
process_read(fd);
}else{
// 父进程负责读
process_write(fd);
close(fd);
waitpid(-1, NULL, 0);
}
return 0;
}
Comments