1.互斥量mutex

进程间也可以使用互斥锁来达到目的。但应该在pthread_mutex_init初始化之前,修改其属性为进程间共享。


主要应用函数

pthread_mutexattr_init函数

初始化一个mutex属性对象

int pthread_mutexattr_init(pthread_mutexattr_t *attr);

 

pthread_mutexattr_destroy函数

销毁mutex属性对象

int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);

 

pthread_mutexattr_setpshared函数

修改mutex属性

int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr,

           int pshared);

 

参数2pshared取值:

线程锁:PTHREAD_PORCESS_PRIVATE(mutex的默认属性即为线程锁,进程间私有)

进程锁:PTHREAD_PROCESS_SHARED

 

DEMO 进程间生产者与消费者模型(共享mmap

#include <iostream>

#include <sys/wait.h>

#include <sys/types.h>

#include <fcntl.h>

#include <unistd.h>

#include <sys/mman.h>

#include <pthread.h>

#include <semaphore.h>

 

using namespace std;

 

const int SIZE = 1000;

const int MAX_PRODUCE_NUM = 10;

const int NEED_PRODUCE_NUM = 100;

 

pthread_mutex_t mutex;

pthread_mutexattr_t mutexattr;

sem_t sem_empty;

sem_t sem_full;

 

struct Data{

       int num[SIZE] = {0};

       int current_num = 0;

       bool shutdown = false;

};

 

void get_lock(sem_t & sem){

       sem_wait(&sem);

       pthread_mutex_lock(&mutex);

}

 

void release_lock(sem_t & sem){

       sem_post(&sem);

       pthread_mutex_unlock(&mutex);

       sleep(1);

}

 

void produce(int & need_produce_num, Data * data){

       Record * pointer = NULL;

      

       while(need_produce_num > 0){

              get_lock(sem_empty);

 

              if(need_produce_num <= 0){

                     data->shutdown = true;

                     release_lock(sem_full);

                     return;

              }

 

              --need_produce_num;

              pointer = new Record(NEED_PRODUCE_NUM - need_produce_num);

              pointer->next = data->head;

              data->head = pointer;

 

              cout << "........................................." << endl;

              cout << "process id: " << getpid() << endl;

              cout << "produce the No " << pointer->num << " product" << endl;

              cout << "........................................." << endl;

 

              release_lock(sem_full);  

 

       }

 

      

}

 

void consume(int & need_produce_num, Data * data){

 

       int num = 0;

       while(true){

              get_lock(sem_full);

 

              if(data->shutdown && data->current_num <= 0){

                     release_lock(sem_empty);

                     return;

              }

 

              for(int i = 0; i < NEED_PRODUCE_NUM; ++i){

                     if(data->num[i] != 0){

                            num = data->num[i];

                            data->num[i] = 0;      

                            --data->current_num;

                     }

              }

 

              cout << "........................................." << endl;

              cout << "process id: " << getpid() << endl;

              cout << "consume the No " << pointer->num << " product" << endl;

              cout << "........................................." << endl;

 

 

              release_lock(sem_empty);    

 

       }

 

}

 

int main(){

 

       int len = sizeof(Record);

       int need_produce_num = NEED_PRODUCE_NUM;

 

       pthread_mutexattr_init(&mutexattr);

       pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED);

       pthread_mutex_init(&mutex, &mutexattr);

       sem_init(&sem_full, 0, MAX_PRODUCE_NUM);

       sem_init(&sem_empty, MAX_PRODUCE_NUM, MAX_PRODUCE_NUM);

 

       Data * data = (Data *)mmap(NULL, len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);

 

       pid_t pid = fork();

       if(pid == -1){

              perror("fork error");

              exit(1);

       }else if(pid == 0){

              //子进程负责生产

              usleep(1);

              produce(need_produce_num, data);

 

       }else{

              // 父进程负责消费

              consume(need_produce_num, data);

 

              waitpid(-1, NULL, 0);

              munmap(data, len);

              sem_destroy(&sem_full);

              sem_destroy(&sem_empty);

              pthread_mutex_destroy(&mutex);

              pthread_mutexattr_destroy(&mutexattr);

 

       }

 

 

       return 0;

}

 

2.文件锁

借助fcntl函数来实现锁机制。操作文件的进程没有获得锁时,可以打开,但无法执行readwrite操作

fcntl函数:获取、设置文件访问控制属性

int fcntl(int fd, int cmd, …/*arg*/);

参数2

      F_SETLK 设置文件锁(trylock)

       F_SETLKW 设置文件锁(lock) --- wait

       F_GETLK获取文件锁

参数3

      struct flock {

               ...

               short l_type;  //锁的类型:F_RDLCK, F_WRLCK, F_UNLCK

               short l_whence; // 偏移位置: SEEK_SET, SEEK_CUR, SEEK_END

               off_t l_start;   // 起始偏移

               off_t l_len;     // 长度:0表示整个文件加锁

               pid_t l_pid;    //持有该锁的进程ID:(F_GETLK only

         

           };

 

DEMO 通过文件锁实现进程间同步

#include <iostream>

#include <unistd.h>

#include <fcntl.h>

#include <stdlib.h>

#include <stdio.h>

#include <sys/types.h>

#include <sys/wait.h>

#include <ctime>

 

using namespace std;

 

void process_read(const int fd){

       struct flock f_lock;

       f_lock.l_type = F_RDLCK;

       f_lock.l_whence = SEEK_SET;

       f_lock.l_start = 0;

       f_lock.l_len = 0;

       char data[100];

       int flag = 10;

       while(flag){

              fcntl(fd, F_SETLKW, &f_lock);     

             

              lseek(fd, SEEK_SET, 0);

              if(read(fd, &data, sizeof(data)) == -1){

                     perror("read error");

                     exit(1);

              }

 

              cout << "reader's pid: " << getpid() << endl;

              cout << "data: " << data << endl;

              --flag;

 

              f_lock.l_type = F_UNLCK;

              fcntl(fd, F_SETLKW, &f_lock);

              usleep(0);

       }

 

}

 

void process_write(const int fd){

       struct flock f_lock;

       f_lock.l_type = F_WRLCK;

       f_lock.l_whence = SEEK_SET;

       f_lock.l_start = 0;

       f_lock.l_len = 0;

       int num = 0;

       string data;

       int flag = 10;  

       while(flag){

              fcntl(fd, F_SETLKW, &f_lock);

 

              lseek(fd, SEEK_SET, 0);

              data = "hello, world!, num: " + to_string(++num);

              cout << "write size: " << write(fd, data.data(), data.length()) << endl;

              cout << "writer's pid: " << getpid() << endl;

              cout << "data: " << data << endl;

              --flag;

 

              f_lock.l_type = F_UNLCK;

              fcntl(fd, F_SETLKW, &f_lock);

              usleep(0);

       }

 

}

      

int main(){

 

       int fd = open("tmp", O_RDWR, 0664);

       if(fd == -1){

              perror("open error");

              exit(1);

       }

 

       if(ftruncate(fd, 100) == -1){

              perror("ftruncate error");

              exit(1);

       }

 

       pid_t pid = fork();

 

       if(pid == -1){

              perror("fork error");

              exit(1);

       }else if(pid == 0){

              // 子进程负责写

              process_read(fd);

       }else{

              // 父进程负责读

              process_write(fd);

              close(fd);

              waitpid(-1, NULL, 0);

       }

 

       return 0;

}