一、生产者消费者模型的深入理解
在上一篇文章中,我们已经介绍了线程同步的一种方式——条件变量(cond)。今天我们将继续探讨另一种重要的同步机制:信号量(Semaphore)。
首先回顾一下生产者消费者模型。当生产者获取锁后开始生产数据,若缓冲区已满,则会触发消费者的条件变量,唤醒消费者进行消费。这个过程看似是串行执行的,那为何我们还说该模型具有高效性呢?
关键在于不能孤立地看待单个操作,而应从整体并发行为来分析。所谓“高效”,其核心在于并行化重叠。
试想,生产者需要的数据是从外部传入的,在接收这些数据的过程中,并不需要进入临界区加锁。此时,消费者完全可以持有锁去读取缓冲区中的已有资源。一旦消费者完成读取并释放锁,就可以自行处理数据,而生产者则能立即进入临界区继续生产。
这样一来,生产和消费的操作实际上实现了时间上的重叠与并行执行。引入缓冲队列的目的正是为了缓解生产者和消费者之间的速度差异,使二者能够异步、并发运行。
二、POSIX信号量的基本概念
本篇主要讲解POSIX信号量,尽管系统V信号量也存在,但更多用于进程间通信场景,而POSIX信号量更适合线程间的同步控制。
信号量是一种用于协调多线程或多进程对共享资源访问的同步工具,由计算机科学家 Edsger Dijkstra 于1965年提出。它的本质是一个带有等待队列的计数器,通过原子操作管理资源分配,有效避免竞态条件和死锁问题。
当我们面对整个临界区资源的互斥访问时,通常使用互斥锁。但如果我们要访问的是临界区内多个独立的资源单元,而不是整体占用,这时候再用互斥锁就会限制并发能力。
举个例子帮助理解:假设一个电影院的放映厅有固定数量的座位,这就像一组可被独立使用的资源。信号量就相当于电影票的数量。当你购票成功,即申请到一个信号量,即便你尚未入场,这个座位也不会被他人使用。
因此,信号量实质上是一种资源预定机制。它允许我们根据可用资源的数量发放对应额度的“许可”。有多少个资源,就可以初始化相应数值的信号量。
三、信号量的核心接口及其原子性保障
接下来介绍POSIX信号量的主要API。相比复杂的System V信号量,POSIX信号量作为用户级接口,使用起来更加简洁,类似于之前学习的互斥锁和条件变量。
常用接口如下:
初始化信号量:
int sem_init(sem_t *sem, int pshared, unsigned int value);
此函数用于初始化一个信号量对象。需先定义一个sem_t类型的变量,然后调用该函数进行初始化设置。
sem:指向信号量对象的指针。pshared:表示是否在线程间共享。· 若设为0,表示同一进程内的线程共享(最常见情况)。
非0:若非0,则表示可用于进程间共享,此时信号量必须位于共享内存区域。value:设定信号量的初始值,即可用资源的数量,如同可售电影票总数。
销毁信号量:
int sem_destroy(sem_t *sem);
用于释放信号量所占资源。注意:必须确保没有任何线程或进程正在等待该信号量;对于无名信号量(通过sem_init创建),必须显式调用此函数进行销毁。
P操作(申请信号量):
int sem_wait(sem_t *sem); // 阻塞等待
int sem_trywait(sem_t *sem); // 非阻塞尝试
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); // 超时等待
执行P操作将尝试获取一个资源许可。如果当前信号量值大于0,则值减1并立即返回;否则线程将阻塞,直到有其他线程释放信号量。这一操作也被称为wait或down操作。
V操作(释放信号量):
int sem_post(sem_t *sem);
执行V操作会将信号量值加1,并唤醒一个正在等待的线程(如有)。这一操作也称为post或up操作。
获取当前信号量值:
int sem_getvalue(sem_t *sem, int *sval);
该函数用于查询当前可用资源数量,结果通过输出型参数sval返回,即剩余“票数”。
关于原子性的说明:
虽然P/V操作逻辑上表现为“减一”和“加一”,但绝不能使用普通的++或--运算符实现。因为这些操作不具备原子性,可能在多线程环境下导致数据竞争。
由于信号量本身也是共享资源,若其内部操作不具原子性,就无法保证自身的安全,更谈不上保护其他资源。因此,信号量的所有修改操作都依赖于底层硬件提供的原子指令,如CAS(Compare-And-Swap)或TSL(Test-and-Set Lock),从而确保操作不可中断。
四、基于环形队列的生产者消费者实现
现在我们来构建一个基于环形队列的生产者消费者模型。这种结构非常适合配合信号量使用,因为它允许多个位置被独立访问,无需整体锁定。
传统的队列在频繁执行push/pop操作时会产生大量内存分配与释放开销,同时容易引发激烈的锁竞争。而环形队列采用数组作为底层存储结构(类似C++中循环缓冲区的实现方式),通过下标模运算实现首尾相连的效果。
在这种设计下,只需更新指定位置的数据即可,无需动态调整内存空间。更重要的是,我们可以利用两个信号量分别记录空槽位和满槽位的数量,从而实现高效的并发控制。
该模型充分利用了信号量对部分资源的细粒度管理能力,使得生产者和消费者可以在不同位置同时工作,极大提升了并行效率。
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem
pshared
0,
非0
value
int sem_destroy(sem_t *sem);
sem_init
int sem_wait(sem_t *sem); // 阻塞等待
int sem_trywait(sem_t *sem); // 非阻塞尝试
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); // 超时等待
int sem_post(sem_t *sem);
int sem_getvalue(sem_t *sem, int *sval);数组的容量决定了存储空间的大小,通过信号量来追踪生产者和消费者所处的位置,从而判断缓冲区是否已满或为空。这样可以避免在数据尚未被消费时就被新数据覆盖的问题。
针对这一模型,主要存在以下几种运行状态:
-
生产者与消费者访问同一位置的情况
尽管临界区的操作理论上可以错开执行,但如果两者恰好指向同一个位置,则必须串行处理。此时又可分为两种子情况:
-
缓冲区为空
当消费者等待的数据尚未生成时,说明当前无可用数据。此时应优先让生产者执行。具体流程如下:首先对生产者的信号量执行P操作(即申请资源),然后向队列中写入数据,接着移动生产者下标。完成之后,通过V操作释放消费者的信号量,唤醒正在等待的消费者线程。
-
缓冲区已满
若空间已被填满,则需先由消费者取出数据以腾出位置。因此先对消费者信号量进行P操作(减少可用数据计数),随后读取内容并移动消费者下标,最后执行V操作唤醒可能阻塞的生产者。
-
缓冲区为空
-
生产者与消费者未访问相同位置
在这种情况下,双方操作的是不同的存储单元,彼此之间没有资源冲突,因此可以并发执行,无需等待。
为了更直观地理解该机制,我们可以将其类比为一个容量为20的环形箱子:
生产者负责放入苹果,消费者负责取出苹果。每次操作完成后,各自向前移动一格。如果箱内没有苹果,消费者无法继续;只有当生产者放入苹果后,消费者才能进行下一步。反之,若箱子已满,生产者必须等待消费者取走一个苹果后才能再次放入新的数据。
这就像一场追逐游戏——一个放,一个拿,永远不能“套圈”(即超过对方一圈)。只要两者不相遇,就可以并行操作;一旦相遇,则转为串行控制。
#pragma once
#include <semaphore.h>
namespace SemMudule
{
const int defalutsemval = 1;
class sem
{
public:
sem(int semval = defalutsemval):_init_value(semval)
{
::sem_init(&_sem,0,_init_value);
}
void P()
{
::sem_wait(&_sem);
}
void V()
{
::sem_post(&_sem);
}
~sem()
{
::sem_destroy(&_sem);
}
private:
sem_t _sem;
int _init_value;
};
}
接下来我们基于代码实现这个环形队列。在此之前,建议先封装好信号量接口,类似于条件变量的封装方式。
构建环形队列需要哪些成员变量?
- 底层使用数组作为存储结构,因此需要一个数组及记录其大小的整型变量。
- 分别用两个整型变量保存生产者和消费者的当前位置下标。
- 还需要两个信号量:一个用于表示剩余空间数量(供生产者使用),另一个表示待消费数据的数量(供消费者使用)。
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
namespace RingBufferModule
{
using namespace SemMudule;
template<typename T>
class ringbuffer
{
public:
ringbuffer()
{
}
~ringbuffer()
{
}
private:
std::vector<T> _buffer;//环形缓冲区
size_t _size;//缓冲区大小
sem _psem; //生产者信号量
sem _csem; //消费者信号量
size_t _p_step; //生产者下标
size_t _c_step; //消费者下标
}
}
接下来完善构造函数,并实现 pop 和 enqueue 接口。
在构造函数中,应对所有成员变量进行初始化:
ringbuffer(int cap)//cap为外界传进来的环形队列的大小
:_size(cap),
_buffer(cap),
_psem(cap),
_csem(0),
_p_step(0),
_c_step(0)
{
}
对于插入与取出操作的实现逻辑如下:
插入数据属于生产者行为,因此首先要对生产者信号量执行P操作(减少可用空间计数),然后将数据写入当前生产位置,更新生产者下标,并通过V操作增加消费者信号量,通知等待中的消费者。
同理,取出数据时消费者先执行P操作(确保有数据可取),读取数据后移动消费者指针,再通过V操作释放生产者信号量。
void Equeue(const T&in)//采取引用传参,减少拷贝
{
_psem.P();
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
void Pop(T*out)
{
_csem.P();
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
_psem.V();
}
为何这里不需要使用 if 或 while 来判断是否有空间或数据?
因为信号量本身的数值就代表了资源的数量:当生产者信号量为0时,表示无空位,自动阻塞;当消费者信号量为0时,表示无数据,同样会阻塞。这种机制天然具备条件判断功能,只要P操作成功,就意味着资源一定存在,无需额外检查。
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
namespace RingBufferModule
{
using namespace SemMudule;
template<typename T>
class ringbuffer
{
public:
ringbuffer(int cap)//cap为外界传进来的环形队列的大小
:_size(cap),
_buffer(cap),
_psem(cap),
_csem(0),
_p_step(0),
_c_step(0)
{
}
void Equeue(const T&in)//采取引用传参,减少拷贝
{
_psem.P();
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
void Pop(T*out)
{
_csem.P();
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
_psem.V();
}
~ringbuffer()
{
}
private:
std::vector<T> _buffer;//环形缓冲区
size_t _size;//缓冲区大小
sem _psem; //生产者信号量,代表的是剩余空间
sem _csem; //消费者信号量,代表的是剩余数据
size_t _p_step; //生产者下标
size_t _c_step; //消费者下标
}
}
下面是一个简单的测试用例:
#include "RingBuffer.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace RingBufferModule;
void *Consumer(void *args)
{
ringbuffer<int> *ring_buffer = static_cast<ringbuffer<int> *>(args);
while (true)
{
sleep(1);
// sleep(1);
// 1. 消费数据
int data;
ring_buffer->Pop(&data);
// 2. 处理:花时间
std::cout << "消费了一个数据: " << data << std::endl;
}
}
void *Productor(void *args)
{
ringbuffer<int> *ring_buffer = static_cast<ringbuffer<int> *>(args);
int data = 0;
while (true)
{
// 1. 获取数据:花时间
// sleep(1);
// 2. 生产数据
ring_buffer->Equeue(data);
std::cout << "生产了一个数据: " << data << std::endl;
data++;
}
}
int main()
{
ringbuffer<int> *ring_buffer = new ringbuffer<int>(5); // 共享资源 -> 临界资源
// 单生产,单消费
pthread_t c1, p1;
pthread_create(&c1, nullptr, Consumer, ring_buffer);
pthread_create(&p1, nullptr, Productor, ring_buffer);
pthread_join(c1, nullptr);
delete ring_buffer;
return 0;
}
从结果可以看出,生产者与消费者按照顺序进行生产和消费。当它们操作不同位置时,能够并发执行,提升效率。
然而,上述实现仅适用于单生产者与单消费者场景。如果扩展到多生产者或多消费者呢?
其实这类问题可以转化为多个个体竞争成为唯一的生产者或消费者的过程。也就是说,多个生产者需竞争获取权限来执行生产动作,多个消费者也需竞争出一个实际执行消费的线程。这样一来,系统仍可视为一组“单生产者-单消费者”的协作关系。
由于多个线程将访问相同的信号量,这就引入了对共享资源的互斥访问问题,因此必须引入锁机制加以保护:只有获得锁的线程才能继续执行后续操作,其余则需等待。
为此,我们可以在原有基础上添加两把锁(分别用于保护生产者和消费者的临界区),并对代码做出相应调整:
void Equeue(const T&in)//采取引用传参,减少拷贝
{
LockGuard lockguard(_p_lock);//_p_lock是我们对生产者的锁
_psem.P();
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
关于加锁时机:是在信号量P操作之前还是之后?
虽然两种方式都可行,但从效率角度考虑,应当优先选择哪种策略?
可以把P操作看作“买票”,而加锁则是“排队入场”。那么是排好队后再买票高效,还是提前买好票、轮到时直接入场更高效?显然后者更为合理。
因此,正确的顺序应该是先获取锁,完成资源申请及相关操作后再释放锁。
void Equeue(const T&in)//采取引用传参,减少拷贝
{
_psem.P();
LockGuard lockguard(_p_lock);
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
_csem.V();
}
void Pop(T*out)
{
_csem.P();
LockGuard lockguard(_c_lock);
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
_psem.V();
}
为进一步精确控制 lock_guard 的作用范围,可使用大括号显式限定其生命周期,确保锁在关键代码段结束后立即释放。
今天我们探讨了信号量(sem)的相关知识,内容涵盖了其基本概念与实际应用。通过学习,我们深入了解了信号量在进程同步中的关键作用,以及如何利用它来有效管理对共享资源的访问。
在并发编程中,信号量是一种重要的同步机制,能够帮助开发者避免竞态条件,确保多线程或多进程环境下的数据一致性。我们还结合JavaScript语言风格,对相关逻辑进行了模拟解析,以便更直观地理解其运行原理。
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
#include "mutex.hpp"
namespace RingBufferModule
{
using namespace SemMudule;
using namespace MutexModule;
template <typename T>
class ringbuffer
{
public:
ringbuffer(int cap) // cap为外界传进来的环形队列的大小
: _size(cap),
_buffer(cap),
_psem(cap),
_csem(0),
_p_step(0),
_c_step(0)
{
}
void Equeue(const T &in) // 采取引用传参,减少拷贝
{
_psem.P();
{
LockGuard lockguard(_p_lock);
_buffer[_p_step] = in;
_p_step++;
_p_step %= _size;
}
_csem.V();
}
void Pop(T *out)
{
_csem.P();
{
LockGuard lockguard(_c_lock);
*out = _buffer[_c_step];
_c_step++;
_c_step %= _size;
}
_psem.V();
}
~ringbuffer()
{
}
private:
std::vector<T> _buffer; // 环形缓冲区
size_t _size; // 缓冲区大小
sem _psem; // 生产者信号量,代表的是剩余空间
sem _csem; // 消费者信号量,代表的是剩余数据
size_t _p_step; // 生产者下标
size_t _c_step; // 消费者下标
Mutex _p_lock;
Mutex _c_lock;
};
}
总体而言,掌握信号量的工作机制对于构建稳定、高效的并发系统具有重要意义。希望本次内容能为你带来实质性的帮助和启发。


雷达卡


京公网安备 11010802022788号







