楼主: nuoketisi
28 0

【并发详解】Semaphore、Condition与Event详解 [推广有奖]

  • 0关注
  • 0粉丝

等待验证会员

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-8-27
最后登录
2018-9-6

楼主
nuoketisi 发表于 2025-12-4 07:01:30 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、核心设计哲学:解决什么问题?

Semaphore、Condition 和 Event 虽然都用于控制线程在特定条件下继续执行,但它们的设计初衷和解决问题的模型截然不同。三者分别聚焦于“数量”、“状态”与“事件”的管理,混淆使用可能导致逻辑混乱或性能瓶颈。

同步原语 设计目标(根本目的) 问题模型
Semaphore 控制对有限资源的并发访问数量 “最多允许 N 个线程同时做某事”
Condition 在共享状态变化时,高效通知等待线程 “当某个条件(如
queue not empty
)成立时,唤醒等待者”
Event 广播一个一次性全局信号 “所有人注意!某个重大事件发生了(如系统启动、开售)”

关键区分:

  • Semaphore
    关注“数量”——即有多少资源可用;
  • Condition
    关注“状态”——即某个布尔条件是否为真;
  • Event
    关注“事件”——即某个一次性信号是否已发出。

二、状态与通信机制:如何表示“可继续”?

三种机制在内部状态建模和通信方式上存在本质差异,尤其体现在状态是否可被“消耗”这一特性上。

同步原语 内部状态模型 通信方式 状态是否可“消耗”?
Semaphore 计数器(整数,初始为 N)
acquire()
获取一个单位,
release()
归还一个单位
可消耗:每
acquire()
一次,计数器减 1
Condition 无独立状态,依赖外部共享变量
wait()
释放锁并休眠,
notify()
唤醒等待者
不可消耗:通知只是“提醒你去检查条件”,条件真假由外部变量决定
Event 布尔标志(
flag
,初始为
False
wait()
阻塞直到
flag=True
set()
设为
True
不可消耗:一旦
set()
,所有后续
wait()
立即通过(永久有效,除非
clear()

举例说明:

  • Semaphore(2):相当于“两个通行证”。线程 A、B 拿走后,C 必须等待其中一个归还才能进入。这体现了资源是有限且可再生的。
  • Condition:库存为 0 时用户线程
    wait()
    ;库存大于 0 时管理员
    notify()
    。但多个用户可能被同时唤醒,最终只有一个能完成购买(需重新检查
    if tickets > 0
    )。这表明:通知 ≠ 资源分配。
  • Event
    set()
    后,无论多少线程正在
    wait()
    ,都将立即被放行。这是一种一次性广播,不涉及资源概念。

三、与锁的耦合关系:是否需要配合锁使用?

不同同步机制在是否内置锁以及是否必须配合外部锁方面表现各异,直接影响其使用模式。

同步原语 是否内置锁? 使用时是否必须配合外部锁? 原因
Semaphore 不需要 本身是一个独立的计数同步机制,不保护特定数据。
Condition 必须
wait()
notify()
必须在持有 Condition 关联锁的上下文中调用,否则会报错。
Event 不需要 仅表示一个全局信号状态,不涉及对共享数据的保护。

重要实践:

Condition
的正确用法总是:

with condition:
    while not shared_condition:  # 必须用 while(防虚假唤醒)
        condition.wait()
# 执行操作

Semaphore
Event
可直接使用,无需额外加锁(除非你要保护其他共享数据)。

四、唤醒语义:如何通知等待者?

三者的唤醒机制在方式、数量及是否存在“通知丢失”风险方面有显著区别。

同步原语 唤醒方式 唤醒数量 是否有“通知丢失”风险?
Semaphore
release()
隐式唤醒一个
无(
release()
总会增加计数器,
acquire()
总能感知)
Condition
notify()
/
notify_all()
1 个或全部 有!必须在持有锁时调用
notify()
,否则可能在
wait()
前发生通知,导致等待者永远阻塞
Event
set()
全部 无(一旦
set()
,状态永久为真)

Condition 的“通知丢失”陷阱:
如果先

notify()
wait()
,则通知将无法被捕获。因此,条件变量必须与一个可检查的共享状态配合使用,并且应在
while
循环中验证状态,而非使用
if

五、典型应用场景对比(避免误用)

根据实际需求选择合适的同步原语至关重要。以下是常见场景及其推荐方案。

场景 推荐原语 为什么?
限制数据库连接池最多 10 个并发 Semaphore 直接控制“数量”,资源用完即阻塞,归还即释放。
生产者-消费者队列(队列空时消费者等待) Condition 需要基于队列状态(空/非空)进行等待和通知,且操作队列本身需要锁保护。
所有测试线程等待主控线程发出“开始”信号 Event 一次性全局信号,发出后所有线程同时启动,无需资源或状态检查。
实现一个有界缓冲区(既限数量又需状态通知) Condition + Lock
Semaphore
只能限数量,但无法区分“空”和“满”;
Condition
可配合两个条件(not_empty / not_full)实现。
实现“最多 5 个线程同时下载文件” Semaphore 纯并发数控制,与文件内容无关。
线程等待某个全局配置更新完成 Event 配置更新是一次性事件,完成后所有依赖线程应继续。

六、Semaphore与Condition 用法对比

6.1 Semaphore 的使用特性分析

在并发编程中,Semaphore 存在一个明显的局限性:它无法自然地区分“空”和“满”的状态。这一问题通常出现在需要双向同步控制的场景中,例如 有界缓冲区循环队列

其核心原因在于:一个单独的 Semaphore 实例只能表示“可执行操作的剩余次数”,而不能同时表达两种互斥的状态——即“可写入”和“可读取”。这意味着,仅靠一个信号量无法准确反映资源的双向流动情况。

sem = Semaphore(3)

实例说明:容量为3的票务消息队列

考虑如下模型:

  • 生产者线程(补票):负责向队列中添加票(执行写操作)
  • 消费者线程(购票):从队列中取出票(执行读操作)
  • 队列最大容量为3

根据业务逻辑:

  • 当队列已满(含3张票)时,禁止写入,但允许读取;
  • 当队列为空(0张票)时,禁止读取,但允许写入。

如果仅使用一个 Semaphore 来管理该队列:

  • 它可以用来表示“还可写入多少张票”,初始值设为3;
  • 但它无法反馈当前是否有可供消费的数据项

举例来说,当队列为空时,该 Semaphore 的计数值为3(表示空间充足),此时消费者线程若查看此信号量,会误判为“有资源可用”,从而尝试进行取票操作,最终导致从空队列中取数据,引发错误。

sem

根本问题在于:这个 Semaphore 实际上是服务于写权限的,对生产者有效,但对消费者而言不具备任何判断依据。因此,单靠一个信号量无法实现安全的双端同步。

sem=3

正确解决方案:采用两个 Semaphore 协同工作

经典的解决方法源自操作系统中的同步机制设计,使用两个独立的信号量分别追踪“空槽位数量”和“已填充项数量”:

import threading
import time
import random

class BoundedTicketQueue_Semaphore:
    def __init__(self, maxsize=3):
        self.queue = []
        self.maxsize = maxsize
        # empty 表示剩余可写入的空位数,初始等于最大容量
        self.empty = threading.Semaphore(maxsize)
        # full 表示当前可读取的元素数量,初始为0
        self.full = threading.Semaphore(0)
        # 用于保护队列访问的互斥锁
        self.mutex = threading.Lock()

    def put(self, ticket):
        self.empty.acquire()          # 等待空位,若无则阻塞
        with self.mutex:              # 进入临界区
            self.queue.append(ticket)
            print(f"[生产者] 放入票 {ticket},队列: {self.queue}")
        self.full.release()           # 新增一个可读元素

    def get(self):
        self.full.acquire()           # 等待有票可取,若无则阻塞
        with self.mutex:              # 进入临界区
            ticket = self.queue.pop(0)
            print(f"[消费者] 取出票 {ticket},队列: {self.queue}")
        self.empty.release()          # 释放一个空位
        return ticket

测试代码如下:

def test_semaphore():
    queue = BoundedTicketQueue_Semaphore(maxsize=3)

    def producer(name, tickets):
        for t in tickets:
            queue.put(f"{name}-{t}")
            time.sleep(random.uniform(0.1, 0.3))

    def consumer(name, count):
        for _ in range(count):
            queue.get()
            time.sleep(random.uniform(0.2, 0.4))

    # 创建两个生产者,一个消费者
    p1 = threading.Thread(target=producer, args=("P1", [1, 2, 3, 4]))
    p2 = threading.Thread(target=producer, args=("P2", [5, 6]))
    c1 = threading.Thread(target=consumer, args=("C1", 6))

    p1.start(); p2.start(); c1.start()
    p1.join(); p2.join(); c1.join()
    print("? Semaphore 方案测试完成\n")

if __name__ == "__main__":
    print("【方案一:双 Semaphore】")
    test_semaphore()
empty.acquire()

流程解析

  • 生产者行为
    full.release()
    → 等待存在空位 → 成功放入票 → 调用 release() 增加“可读项”计数(通知消费者)
  • 消费者行为
    full.acquire()
    → 等待有票可取 → 成功取出票 → 释放一个空位供生产者使用

通过引入两个语义明确的信号量(empty 和 full),系统能够精确控制读写两端的行为,避免资源竞争与非法操作,实现了高效且安全的同步机制。

empty.release()
(通知有空位了) 如此一来,
empty
控制“满”的状态,
full
管理“空”的情况,二者协同工作,才能实现读写操作的正确同步。

6.2 Condition 的实现方式

Condition 通过直接检测队列的实际状态来进行协调。以下是基于 threading.Condition 实现的有界票务队列示例:
import threading
import time
import random

class BoundedTicketQueue_Condition:
    def __init__(self, maxsize=3):
        self.queue = []
        self.maxsize = maxsize
        self.cond = threading.Condition()  # 内置锁机制

    def put(self, ticket):
        with self.cond:
            # 等待条件:队列未满
            while len(self.queue) >= self.maxsize:
                print(f"[生产者] 队列已满 ({len(self.queue)}),等待空位...")
                self.cond.wait()
            self.queue.append(ticket)
            print(f"[生产者] 放入票 {ticket},队列: {self.queue}")
            self.cond.notify_all()  # 唤醒所有等待线程(主要是消费者)

    def get(self):
        with self.cond:
            # 等待条件:队列非空
            while len(self.queue) == 0:
                print(f"[消费者] 队列为空,等待票...")
                self.cond.wait()
            ticket = self.queue.pop(0)
            print(f"[消费者] 取出票 {ticket},队列: {self.queue}")
            self.cond.notify_all()  # 唤醒所有等待线程(主要是生产者)
            return ticket

# === 测试代码 ===
def test_condition():
    queue = BoundedTicketQueue_Condition(maxsize=3)

    def producer(name, tickets):
        for t in tickets:
            queue.put(f"{name}-{t}")
            time.sleep(random.uniform(0.1, 0.3))

    def consumer(name, count):
        for _ in range(count):
            queue.get()
            time.sleep(random.uniform(0.2, 0.4))

    p1 = threading.Thread(target=producer, args=("P1", [1, 2, 3, 4]))
    p2 = threading.Thread(target=producer, args=("P2", [5, 6]))
    c1 = threading.Thread(target=consumer, args=("C1", 6))

    p1.start(); p2.start(); c1.start()
    p1.join(); p2.join(); c1.join()
    print("? Condition 方案测试完成\n")

if __name__ == "__main__":
    print("【方案二:Condition】")
    test_condition()

6.3 Condition 执行流程分析 — 单生产者与单消费者场景

Condition 机制依赖于对共享变量的实时读取来判断队列是否为空或满。这种设计天然支持双向条件判断。
len(queue)
其核心在于线程在不同操作阶段对锁的持有与释放行为:
操作 锁状态 说明
with self.cond
获取锁 进入临界区,开始访问共享资源
self.cond.wait()
释放锁 允许其他线程获得执行机会
被唤醒后 重新获取锁 继续从 wait 处恢复执行
self.cond.notify_all()
保持持有锁 仅标记唤醒动作,不立即释放控制权
退出
with
最终释放锁,使被唤醒线程得以真正运行
假设队列最大容量为 3,初始状态为空,且系统中仅存在一个生产者和一个消费者。整个执行过程如下所述: 首先,在 T1 时刻,消费者 C1 调用 get(),发现队列为空,于是进入等待状态;随后 P1 开始 put 操作,因队列有空间而成功插入数据,并在适当时候唤醒等待中的消费者。 详细时间线如下:
时间 线程 操作 队列状态 说明
T1 C1 尝试 get()
[]
队列为空,调用
wait()
释放锁并进入休眠
T2 P1 执行 put(1)
[1]
获取锁,添加数据,并通过
notify_all()
唤醒 C1
T3 P1 执行 put(2)
[1,2]
继续向队列中添加元素
T4 P1 执行 put(3)
[1,2,3]
此时队列已满
T5 P1 尝试 put(4)
[1,2,3]
队列已满,调用
wait()
释放锁并休眠
T6 C1 执行 get()
[2,3]
被唤醒,取出数据,并触发
notify_all()
唤醒 P1
T7 P1 继续 put(4)
[2,3,4]
重新获得执行权,完成第4个元素的插入

6.4 Condition 执行流程分析——多生产者与多消费者场景

假设系统中存在两个生产者(P1、P2)和两个消费者(C1、C2),整体执行流程如下所示。初始状态下,队列容量为 3,且队列为空。

[]
时间 线程 操作 队列状态 说明
T1 C1 尝试 get()
[]
队列为空,
wait()
释放锁并进入休眠
T2 C2 尝试 get()
[]
队列为空,
wait()
释放锁并休眠(C1 和 C2 均在等待)
T3 P1 执行 put(P1-1)
[P1-1]
获取锁,成功放入数据,
notify_all()
唤醒 C1 和 C2
T4 P1 执行 put(P1-2)
[P1-1, P1-2]
继续向队列中添加元素
T5 C1 执行 get()
[P1-2]
被唤醒并成功获取锁,取出 P1-1,
notify_all()
T6 P2 执行 put(P2-5)
[P1-2, P2-5]
获取锁后将数据写入队列
T7 P1 执行 put(P1-3)
[P1-2, P2-5, P1-3]
队列已满(3/3)
T8 P2 执行 put(P2-6)
[P1-2, P2-5, P1-3]
队列已满,
wait()
释放锁并休眠
T9 P1 尝试 put(P1-4)
[P1-2, P2-5, P1-3]
队列满,
wait()
释放锁并休眠(P1 和 P2 都在等待)
T10 C2 执行 get()
[P2-5, P1-3]
被唤醒并获取锁,取出 P1-2,
notify_all()
唤醒 P1 和 P2
T11 P1 继续 put(P1-4)
[P2-5, P1-3, P1-4]
被唤醒,检查队列未满,成功放入数据,队列再次满(3/3)
T12 C1 执行 get()
[P1-3, P1-4]
取出 P2-5,
notify_all()
唤醒 P2
T13 P2 继续 put(P2-6)
[P1-3, P1-4, P2-6]
被唤醒,确认队列未满,完成数据写入
T14 C2 执行 get()
[P1-4, P2-6]
取出 P1-3,
notify_all()
T15 C1 执行 get()
[P2-6]
取出 P1-4,
notify_all()
T16 C2 执行 get()
[]
取出 P2-6,
notify_all()
T17 C1 尝试 get()
[]
队列为空,
wait()
释放锁并休眠

典型场景分析

场景一:多个消费者同时被唤醒(T5 时刻)
T3: P1 调用 notify_all() → C1 和 C2 都被标记为"可唤醒"
T5: C1 先获取到锁,取出数据
    C2 虽然被唤醒,但要等 C1 释放锁后才能获取锁

关键点:

notify_all()
虽然 Condition 会唤醒所有等待线程,但实际只能有一个线程成功获取锁,其余线程仍需竞争。

场景二:多个生产者同时处于等待状态(T8–T9)
队列状态: [P1-2, P2-5, P1-3] (已满)
P2: 尝试 put(P2-6) → wait() 休眠
P1: 尝试 put(P1-4) → wait() 休眠

等待队列: [P2, P1] (都在等待空位)
场景三:唤醒后的竞争执行(T10–T11)
T10: C2 取出数据后 notify_all() → P1 和 P2 都被唤醒
T11: 假设 P1 先抢到锁
     - P1 检查 while 条件:队列未满 ?
     - P1 放入 P1-4
     - P2 还在等待获取锁
     
T12: C1 再次取出数据,notify_all()
     - P2 终于获取锁,放入 P2-6

可能的其他执行顺序变体

由于线程调度具有不确定性,实际运行中可能出现以下不同执行路径:

变体一:消费者连续执行
时间 线程 操作 队列状态
P1 put(1), put(2), put(3)
[1,2,3]
T1 C1 get()
[2,3]
T2 C1 get()
[3]
T3 C2 get()
[]
T4 C1 尝试 get()
[]
← 阻塞
变体二:生产者连续填满队列
时间 线程 操作 队列状态
T1 P1 put(1), put(2)
[1,2]
T2 P2 put(5)
[1,2,5]
T3 P1 尝试 put(3)
[1,2,5]
← P1 阻塞
T4 P2 尝试 put(6)
[1,2,5]
← P2 阻塞

核心要点归纳

  • notify_all()
    Condition 唤醒的是所有等待线程,但仅有一个线程能够最终获得锁。
  • while
    必须通过循环机制重新校验条件,以避免虚假唤醒带来的问题。
  • 锁的获取具有随机性,无法预知哪个线程会优先执行。
  • 等待队列中的多个线程处于
    wait()
    状态,等待被唤醒。
  • 无公平性保障,可能出现某一线程连续多次执行的情况。

七、总结:一句话区分三者

Semaphore:作为“资源配额管理器”——必须持有“许可(permit)”才能进行操作。

Condition:充当“状态变化监听器”——收到通知后需自行验证当前状态是否满足条件。

Event:类似于“一次性广播喇叭”——信号触发后,所有监听者都会得知事件发生。

总体来看,Condition 是三者中最灵活但也最容易误用的机制,因为它要求开发者手动维护共享状态,并正确处理加锁、解锁以及循环判断逻辑;而 Semaphore 和 Event 更接近于“状态机”模型,前者用于管理资源数量,后者则控制开关状态。

在工程实践中:

  • 若只需简单的通知机制(如启动信号或初始化完成),应优先选用
    Event
  • 若涉及复杂状态协调(例如缓冲队列、库存管理等),则必须使用
    Condition
  • 若目标仅为限流控制,则
    Semaphore
    是最直接高效的方案。
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:condition dition Event Even vent

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
jg-xs1
拉您进交流群
GMT+8, 2026-1-7 17:28