一、核心设计哲学:解决什么问题?
Semaphore、Condition 和 Event 虽然都用于控制线程在特定条件下继续执行,但它们的设计初衷和解决问题的模型截然不同。三者分别聚焦于“数量”、“状态”与“事件”的管理,混淆使用可能导致逻辑混乱或性能瓶颈。
| 同步原语 | 设计目标(根本目的) | 问题模型 |
|---|---|---|
| Semaphore | 控制对有限资源的并发访问数量 | “最多允许 N 个线程同时做某事” |
| Condition | 在共享状态变化时,高效通知等待线程 | “当某个条件(如 )成立时,唤醒等待者” |
| Event | 广播一个一次性全局信号 | “所有人注意!某个重大事件发生了(如系统启动、开售)” |
关键区分:
关注“数量”——即有多少资源可用;Semaphore
关注“状态”——即某个布尔条件是否为真;Condition
关注“事件”——即某个一次性信号是否已发出。Event
二、状态与通信机制:如何表示“可继续”?
三种机制在内部状态建模和通信方式上存在本质差异,尤其体现在状态是否可被“消耗”这一特性上。
| 同步原语 | 内部状态模型 | 通信方式 | 状态是否可“消耗”? |
|---|---|---|---|
| Semaphore | 计数器(整数,初始为 N) | 获取一个单位, 归还一个单位 |
可消耗:每 一次,计数器减 1 |
| Condition | 无独立状态,依赖外部共享变量 | 释放锁并休眠, 唤醒等待者 |
不可消耗:通知只是“提醒你去检查条件”,条件真假由外部变量决定 |
| Event | 布尔标志(,初始为 ) |
阻塞直到 , 设为 |
不可消耗:一旦 ,所有后续 立即通过(永久有效,除非 ) |
举例说明:
- Semaphore(2):相当于“两个通行证”。线程 A、B 拿走后,C 必须等待其中一个归还才能进入。这体现了资源是有限且可再生的。
- Condition:库存为 0 时用户线程
;库存大于 0 时管理员wait()
。但多个用户可能被同时唤醒,最终只有一个能完成购买(需重新检查notify()
)。这表明:通知 ≠ 资源分配。if tickets > 0 - Event:
后,无论多少线程正在set()
,都将立即被放行。这是一种一次性广播,不涉及资源概念。wait()
三、与锁的耦合关系:是否需要配合锁使用?
不同同步机制在是否内置锁以及是否必须配合外部锁方面表现各异,直接影响其使用模式。
| 同步原语 | 是否内置锁? | 使用时是否必须配合外部锁? | 原因 |
|---|---|---|---|
| Semaphore | 否 | 不需要 | 本身是一个独立的计数同步机制,不保护特定数据。 |
| Condition | 是 | 必须 | 和 必须在持有 Condition 关联锁的上下文中调用,否则会报错。 |
| Event | 否 | 不需要 | 仅表示一个全局信号状态,不涉及对共享数据的保护。 |
重要实践:
Condition 的正确用法总是:
with condition:
while not shared_condition: # 必须用 while(防虚假唤醒)
condition.wait()
# 执行操作
而
Semaphore 和 Event 可直接使用,无需额外加锁(除非你要保护其他共享数据)。
四、唤醒语义:如何通知等待者?
三者的唤醒机制在方式、数量及是否存在“通知丢失”风险方面有显著区别。
| 同步原语 | 唤醒方式 | 唤醒数量 | 是否有“通知丢失”风险? |
|---|---|---|---|
| Semaphore | 隐式唤醒一个 |
— | 无( 总会增加计数器, 总能感知) |
| Condition | / |
1 个或全部 | 有!必须在持有锁时调用 ,否则可能在 前发生通知,导致等待者永远阻塞 |
| Event | |
全部 | 无(一旦 ,状态永久为真) |
Condition 的“通知丢失”陷阱:
如果先
notify() 再 wait(),则通知将无法被捕获。因此,条件变量必须与一个可检查的共享状态配合使用,并且应在 while 循环中验证状态,而非使用 if。
五、典型应用场景对比(避免误用)
根据实际需求选择合适的同步原语至关重要。以下是常见场景及其推荐方案。
| 场景 | 推荐原语 | 为什么? |
|---|---|---|
| 限制数据库连接池最多 10 个并发 | Semaphore | 直接控制“数量”,资源用完即阻塞,归还即释放。 |
| 生产者-消费者队列(队列空时消费者等待) | Condition | 需要基于队列状态(空/非空)进行等待和通知,且操作队列本身需要锁保护。 |
| 所有测试线程等待主控线程发出“开始”信号 | Event | 一次性全局信号,发出后所有线程同时启动,无需资源或状态检查。 |
| 实现一个有界缓冲区(既限数量又需状态通知) | Condition + Lock | 只能限数量,但无法区分“空”和“满”; 可配合两个条件(not_empty / not_full)实现。 |
| 实现“最多 5 个线程同时下载文件” | Semaphore | 纯并发数控制,与文件内容无关。 |
| 线程等待某个全局配置更新完成 | Event | 配置更新是一次性事件,完成后所有依赖线程应继续。 |
六、Semaphore与Condition 用法对比
6.1 Semaphore 的使用特性分析
在并发编程中,Semaphore 存在一个明显的局限性:它无法自然地区分“空”和“满”的状态。这一问题通常出现在需要双向同步控制的场景中,例如 有界缓冲区 或 循环队列。
其核心原因在于:一个单独的 Semaphore 实例只能表示“可执行操作的剩余次数”,而不能同时表达两种互斥的状态——即“可写入”和“可读取”。这意味着,仅靠一个信号量无法准确反映资源的双向流动情况。
sem = Semaphore(3)
实例说明:容量为3的票务消息队列
考虑如下模型:
- 生产者线程(补票):负责向队列中添加票(执行写操作)
- 消费者线程(购票):从队列中取出票(执行读操作)
- 队列最大容量为3
根据业务逻辑:
- 当队列已满(含3张票)时,禁止写入,但允许读取;
- 当队列为空(0张票)时,禁止读取,但允许写入。
如果仅使用一个 Semaphore 来管理该队列:
- 它可以用来表示“还可写入多少张票”,初始值设为3;
- 但它无法反馈当前是否有可供消费的数据项。
举例来说,当队列为空时,该 Semaphore 的计数值为3(表示空间充足),此时消费者线程若查看此信号量,会误判为“有资源可用”,从而尝试进行取票操作,最终导致从空队列中取数据,引发错误。
sem
根本问题在于:这个 Semaphore 实际上是服务于写权限的,对生产者有效,但对消费者而言不具备任何判断依据。因此,单靠一个信号量无法实现安全的双端同步。
sem=3
正确解决方案:采用两个 Semaphore 协同工作
经典的解决方法源自操作系统中的同步机制设计,使用两个独立的信号量分别追踪“空槽位数量”和“已填充项数量”:
import threading
import time
import random
class BoundedTicketQueue_Semaphore:
def __init__(self, maxsize=3):
self.queue = []
self.maxsize = maxsize
# empty 表示剩余可写入的空位数,初始等于最大容量
self.empty = threading.Semaphore(maxsize)
# full 表示当前可读取的元素数量,初始为0
self.full = threading.Semaphore(0)
# 用于保护队列访问的互斥锁
self.mutex = threading.Lock()
def put(self, ticket):
self.empty.acquire() # 等待空位,若无则阻塞
with self.mutex: # 进入临界区
self.queue.append(ticket)
print(f"[生产者] 放入票 {ticket},队列: {self.queue}")
self.full.release() # 新增一个可读元素
def get(self):
self.full.acquire() # 等待有票可取,若无则阻塞
with self.mutex: # 进入临界区
ticket = self.queue.pop(0)
print(f"[消费者] 取出票 {ticket},队列: {self.queue}")
self.empty.release() # 释放一个空位
return ticket
测试代码如下:
def test_semaphore():
queue = BoundedTicketQueue_Semaphore(maxsize=3)
def producer(name, tickets):
for t in tickets:
queue.put(f"{name}-{t}")
time.sleep(random.uniform(0.1, 0.3))
def consumer(name, count):
for _ in range(count):
queue.get()
time.sleep(random.uniform(0.2, 0.4))
# 创建两个生产者,一个消费者
p1 = threading.Thread(target=producer, args=("P1", [1, 2, 3, 4]))
p2 = threading.Thread(target=producer, args=("P2", [5, 6]))
c1 = threading.Thread(target=consumer, args=("C1", 6))
p1.start(); p2.start(); c1.start()
p1.join(); p2.join(); c1.join()
print("? Semaphore 方案测试完成\n")
if __name__ == "__main__":
print("【方案一:双 Semaphore】")
test_semaphore()
empty.acquire()
流程解析
- 生产者行为:
→ 等待存在空位 → 成功放入票 → 调用 release() 增加“可读项”计数(通知消费者)full.release() - 消费者行为:
→ 等待有票可取 → 成功取出票 → 释放一个空位供生产者使用full.acquire()
通过引入两个语义明确的信号量(empty 和 full),系统能够精确控制读写两端的行为,避免资源竞争与非法操作,实现了高效且安全的同步机制。
empty.release()
(通知有空位了)
如此一来,
empty
控制“满”的状态,
full
管理“空”的情况,二者协同工作,才能实现读写操作的正确同步。
6.2 Condition 的实现方式
Condition 通过直接检测队列的实际状态来进行协调。以下是基于 threading.Condition 实现的有界票务队列示例:
import threading
import time
import random
class BoundedTicketQueue_Condition:
def __init__(self, maxsize=3):
self.queue = []
self.maxsize = maxsize
self.cond = threading.Condition() # 内置锁机制
def put(self, ticket):
with self.cond:
# 等待条件:队列未满
while len(self.queue) >= self.maxsize:
print(f"[生产者] 队列已满 ({len(self.queue)}),等待空位...")
self.cond.wait()
self.queue.append(ticket)
print(f"[生产者] 放入票 {ticket},队列: {self.queue}")
self.cond.notify_all() # 唤醒所有等待线程(主要是消费者)
def get(self):
with self.cond:
# 等待条件:队列非空
while len(self.queue) == 0:
print(f"[消费者] 队列为空,等待票...")
self.cond.wait()
ticket = self.queue.pop(0)
print(f"[消费者] 取出票 {ticket},队列: {self.queue}")
self.cond.notify_all() # 唤醒所有等待线程(主要是生产者)
return ticket
# === 测试代码 ===
def test_condition():
queue = BoundedTicketQueue_Condition(maxsize=3)
def producer(name, tickets):
for t in tickets:
queue.put(f"{name}-{t}")
time.sleep(random.uniform(0.1, 0.3))
def consumer(name, count):
for _ in range(count):
queue.get()
time.sleep(random.uniform(0.2, 0.4))
p1 = threading.Thread(target=producer, args=("P1", [1, 2, 3, 4]))
p2 = threading.Thread(target=producer, args=("P2", [5, 6]))
c1 = threading.Thread(target=consumer, args=("C1", 6))
p1.start(); p2.start(); c1.start()
p1.join(); p2.join(); c1.join()
print("? Condition 方案测试完成\n")
if __name__ == "__main__":
print("【方案二:Condition】")
test_condition()
6.3 Condition 执行流程分析 — 单生产者与单消费者场景
Condition 机制依赖于对共享变量的实时读取来判断队列是否为空或满。这种设计天然支持双向条件判断。len(queue)
其核心在于线程在不同操作阶段对锁的持有与释放行为:
| 操作 | 锁状态 | 说明 |
|---|---|---|
|
获取锁 | 进入临界区,开始访问共享资源 |
|
释放锁 | 允许其他线程获得执行机会 |
| 被唤醒后 | 重新获取锁 | 继续从 wait 处恢复执行 |
|
保持持有锁 | 仅标记唤醒动作,不立即释放控制权 |
| 退出 | |
最终释放锁,使被唤醒线程得以真正运行 |
| 时间 | 线程 | 操作 | 队列状态 | 说明 |
|---|---|---|---|---|
| T1 | C1 | 尝试 get() | |
队列为空,调用 释放锁并进入休眠 |
| T2 | P1 | 执行 put(1) | |
获取锁,添加数据,并通过 唤醒 C1 |
| T3 | P1 | 执行 put(2) | |
继续向队列中添加元素 |
| T4 | P1 | 执行 put(3) | |
此时队列已满 |
| T5 | P1 | 尝试 put(4) | |
队列已满,调用 释放锁并休眠 |
| T6 | C1 | 执行 get() | |
被唤醒,取出数据,并触发 唤醒 P1 |
| T7 | P1 | 继续 put(4) | |
重新获得执行权,完成第4个元素的插入 |
6.4 Condition 执行流程分析——多生产者与多消费者场景
假设系统中存在两个生产者(P1、P2)和两个消费者(C1、C2),整体执行流程如下所示。初始状态下,队列容量为 3,且队列为空。
[]
| 时间 | 线程 | 操作 | 队列状态 | 说明 |
|---|---|---|---|---|
| T1 | C1 | 尝试 get() | |
队列为空, 释放锁并进入休眠 |
| T2 | C2 | 尝试 get() | |
队列为空, 释放锁并休眠(C1 和 C2 均在等待) |
| T3 | P1 | 执行 put(P1-1) | |
获取锁,成功放入数据, 唤醒 C1 和 C2 |
| T4 | P1 | 执行 put(P1-2) | |
继续向队列中添加元素 |
| T5 | C1 | 执行 get() | |
被唤醒并成功获取锁,取出 P1-1, |
| T6 | P2 | 执行 put(P2-5) | |
获取锁后将数据写入队列 |
| T7 | P1 | 执行 put(P1-3) | |
队列已满(3/3) |
| T8 | P2 | 执行 put(P2-6) | |
队列已满, 释放锁并休眠 |
| T9 | P1 | 尝试 put(P1-4) | |
队列满, 释放锁并休眠(P1 和 P2 都在等待) |
| T10 | C2 | 执行 get() | |
被唤醒并获取锁,取出 P1-2, 唤醒 P1 和 P2 |
| T11 | P1 | 继续 put(P1-4) | |
被唤醒,检查队列未满,成功放入数据,队列再次满(3/3) |
| T12 | C1 | 执行 get() | |
取出 P2-5, 唤醒 P2 |
| T13 | P2 | 继续 put(P2-6) | |
被唤醒,确认队列未满,完成数据写入 |
| T14 | C2 | 执行 get() | |
取出 P1-3, |
| T15 | C1 | 执行 get() | |
取出 P1-4, |
| T16 | C2 | 执行 get() | |
取出 P2-6, |
| T17 | C1 | 尝试 get() | |
队列为空, 释放锁并休眠 |
典型场景分析
场景一:多个消费者同时被唤醒(T5 时刻)
T3: P1 调用 notify_all() → C1 和 C2 都被标记为"可唤醒"
T5: C1 先获取到锁,取出数据
C2 虽然被唤醒,但要等 C1 释放锁后才能获取锁
关键点:
notify_all() 虽然 Condition 会唤醒所有等待线程,但实际只能有一个线程成功获取锁,其余线程仍需竞争。
场景二:多个生产者同时处于等待状态(T8–T9)
队列状态: [P1-2, P2-5, P1-3] (已满)
P2: 尝试 put(P2-6) → wait() 休眠
P1: 尝试 put(P1-4) → wait() 休眠
等待队列: [P2, P1] (都在等待空位)
场景三:唤醒后的竞争执行(T10–T11)
T10: C2 取出数据后 notify_all() → P1 和 P2 都被唤醒
T11: 假设 P1 先抢到锁
- P1 检查 while 条件:队列未满 ?
- P1 放入 P1-4
- P2 还在等待获取锁
T12: C1 再次取出数据,notify_all()
- P2 终于获取锁,放入 P2-6
可能的其他执行顺序变体
由于线程调度具有不确定性,实际运行中可能出现以下不同执行路径:
变体一:消费者连续执行
| 时间 | 线程 | 操作 | 队列状态 |
|---|---|---|---|
| — | P1 | put(1), put(2), put(3) | |
| T1 | C1 | get() | |
| T2 | C1 | get() | |
| T3 | C2 | get() | |
| T4 | C1 | 尝试 get() | ← 阻塞 |
变体二:生产者连续填满队列
| 时间 | 线程 | 操作 | 队列状态 |
|---|---|---|---|
| T1 | P1 | put(1), put(2) | |
| T2 | P2 | put(5) | |
| T3 | P1 | 尝试 put(3) | ← P1 阻塞 |
| T4 | P2 | 尝试 put(6) | ← P2 阻塞 |
核心要点归纳
Condition 唤醒的是所有等待线程,但仅有一个线程能够最终获得锁。notify_all()
必须通过循环机制重新校验条件,以避免虚假唤醒带来的问题。while- 锁的获取具有随机性,无法预知哪个线程会优先执行。
- 等待队列中的多个线程处于
状态,等待被唤醒。wait() - 无公平性保障,可能出现某一线程连续多次执行的情况。
七、总结:一句话区分三者
Semaphore:作为“资源配额管理器”——必须持有“许可(permit)”才能进行操作。
Condition:充当“状态变化监听器”——收到通知后需自行验证当前状态是否满足条件。
Event:类似于“一次性广播喇叭”——信号触发后,所有监听者都会得知事件发生。
总体来看,Condition 是三者中最灵活但也最容易误用的机制,因为它要求开发者手动维护共享状态,并正确处理加锁、解锁以及循环判断逻辑;而 Semaphore 和 Event 更接近于“状态机”模型,前者用于管理资源数量,后者则控制开关状态。
在工程实践中:
- 若只需简单的通知机制(如启动信号或初始化完成),应优先选用
;Event - 若涉及复杂状态协调(例如缓冲队列、库存管理等),则必须使用
;Condition - 若目标仅为限流控制,则
是最直接高效的方案。Semaphore


雷达卡


京公网安备 11010802022788号







