Java 并发编程实践:基于 BlockingQueue 的生产者消费者模型
涉及技术点:线程安全、消息零丢失、优雅关闭机制、批量消费优化、LinkedBlockingQueue 应用与线程池协同
一、业务场景描述
设想一个典型的异步处理需求:
- 启动 10 个独立的生产者线程,每个线程生成 1000 条结构化数据记录(以 Map<String, String> 形式表示);
- 由单一消费者线程负责从共享队列中批量获取这些消息,并模拟持久化操作(如写入数据库);
- 系统必须保证所有消息不丢失,并在全部任务完成后实现程序的平稳终止。
此类架构广泛应用于日志采集系统、事件驱动架构或后台任务调度等高并发场景。
二、关键技术选型分析
1. 阻塞队列选择:LinkedBlockingQueue
作为线程安全的数据通道,该实现支持以下关键方法:
put():当队列满时自动阻塞,确保生产者不会覆盖未处理消息;drainTo():允许一次性批量取出多个元素,显著提升消费效率。
默认构造函数创建的实例容量为 Integer.MAX_VALUE,接近“无界”,适用于突发流量场景。但需注意潜在的内存溢出风险,后续可通过设定上限优化。
2. 线程池配置策略
- 使用
Executors.newFixedThreadPool(10)创建包含 10 个固定线程的生产者线程池; - 采用
Executors.newSingleThreadExecutor()启动唯一消费者线程,保障消息处理顺序性。
3. 多线程协调机制
AtomicBoolean allProducersDone:标志位用于通知消费者——所有生产活动已经结束;AtomicInteger producedCount:用于统计总产出量,便于监控和日志追踪(非核心流程依赖)。
三、完整代码实现
package oscollege.back;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多生产者-单消费者模式示例
*
* 功能说明:
* - 共10个生产者线程,各自发送1000条消息(总计10,000条)
* - 单一消费者线程通过批量拉取方式处理消息,模拟数据库插入行为
* - 利用阻塞队列保障线程间通信的安全性
* - 实现消息零丢失及应用的优雅退出
*/
public class Temp {
public static void main(String[] args) throws InterruptedException {
// 创建拥有10个线程的生产者线程池
ExecutorService producerPool = Executors.newFixedThreadPool(10);
// 使用单线程执行器运行消费者,确保批次按序处理
ExecutorService consumerPool = Executors.newSingleThreadExecutor();
// 注意:new LinkedBlockingQueue<>() 默认容量极大(≈21亿),逻辑上视为无界
// 尽管使用方便,但存在内存耗尽的风险。建议在生产环境中显式设置合理上限,例如 new LinkedBlockingQueue<>(10_000)
BlockingQueue<Map<String, String>> mapBlockingQueue = new LinkedBlockingQueue<>();
// 原子变量,标识所有生产者已完成数据提交
// 必须保证多线程环境下的可见性,故选用 AtomicBoolean 或 volatile 修饰
AtomicBoolean allProducersDone = new AtomicBoolean(false);
// 记录已生成的消息总数(仅用于输出统计信息,不影响主逻辑)
AtomicInteger producedCount = new AtomicInteger(0);
// ========================
// 开启10个生产者线程进行数据投递
// ========================
final int totalMessagesPerProducer = 1000; // 每个生产者负责生成1000条消息
for (int i = 0; i < 10; i++) {
final int producerId = i; // 捕获当前循环索引,供Lambda使用
producerPool.submit(() -> {
try {
for (int j = 0; j < totalMessagesPerProducer; j++) {
// 创建模拟消息体(可代表日志、订单或事件数据)
Map<String, String> message = new HashMap<>();
message.put("producer", String.valueOf(producerId));
message.put("seq", String.valueOf(j));
message.put("data", "msg-" + producerId + "-" + j);
// 使用 put() 方法入队:若队列满则阻塞,确保不丢消息
mapBlockingQueue.put(message);
// 原子更新生产计数器(仅用于监控和日志记录)
producedCount.incrementAndGet();
}
} catch (InterruptedException e) {
// 发生中断时恢复中断状态,保证线程安全退出
Thread.currentThread().interrupt();
}
// 生产者任务执行完毕,自动结束
});
}
// ========================
// 启动消费者线程(支持批量处理模式)
// ========================
consumerPool.submit(() -> {
// 复用批次容器以降低GC压力(避免频繁创建ArrayList实例)
List<Map<String, String>> batch = new ArrayList<>();
try {
while (true) {
// 批量拉取消息,最多一次获取100条,提升吞吐量
// drainTo 能减少锁争用,提高并发性能
int drained = mapBlockingQueue.drainTo(batch, 100);
if (drained > 0) {
System.out.println("???? 消费者批量处理 " + drained + " 条消息");
// 模拟真实业务中的耗时操作,如批量写入数据库或调用远程服务
Thread.sleep(1000);
// 清空已处理批次,准备下一轮复用
batch.clear();
}
// ========== 安全终止机制 ==========
// 当满足以下两个条件时,方可安全退出:
// 1. allProducersDone 为 true:所有生产者已完成提交且不再生成新消息
// 2. 队列为空:当前无待处理消息
//
// ???? 此判断为何可靠?
// 因为 allProducersDone 是在 producerPool.awaitTermination() 成功返回后才置为 true,
// 表示所有生产者线程已彻底结束。此时即使队列暂时为空,也不会再有新消息加入。
// 故当两者同时成立,说明全部工作已完成。
if (allProducersDone.get() && mapBlockingQueue.isEmpty()) {
System.out.println("? 消费者已完成所有消息处理");
break; // 安全退出消费循环
}
}
} catch (InterruptedException e) {
// 中断异常处理:恢复中断标志,便于上层逻辑响应
Thread.currentThread().interrupt();
System.err.println("消费者被中断");
}
});
// ========================
// 等待所有生产者线程执行完成
// ========================
// 关闭生产者线程池——拒绝新任务,但允许已有任务完成
producerPool.shutdown();
// 最多等待5分钟,确保所有生产者任务结束
boolean producersFinished = producerPool.awaitTermination(5, TimeUnit.MINUTES);
if (!producersFinished) {
// 若超时仍未完成,则强制中断所有仍在运行的生产者线程
// 关键步骤:标记“所有生产者已完成”
// 此时可以确保不会再有新消息被添加到队列中
allProducersDone.set(true);
System.out.println("? 所有生产者任务完成,共生成 " + producedCount.get() + " 条消息");
// 生产者超时处理
try {
if (!producerPool.awaitTermination(5, TimeUnit.MINUTES)) {
System.err.println("?? 生产者执行超时,触发强制关闭");
producerPool.shutdownNow(); // 中断所有正在运行的任务
}
} catch (InterruptedException e) {
System.err.println("?? 生产者线程被中断,正在进行清理");
producerPool.shutdownNow();
Thread.currentThread().interrupt();
}
// ========================
// 开始等待消费者处理完剩余消息
// ========================
consumerPool.shutdown();
System.out.println("? 消费者线程池已关闭,正在等待处理完成...");
// 最多等待10分钟让消费者处理完毕
boolean consumerFinished = consumerPool.awaitTermination(10, TimeUnit.MINUTES);
if (!consumerFinished) {
System.err.println("?? 消费者未能在规定时间内完成任务,执行强制终止");
consumerPool.shutdownNow();
} else {
System.out.println("? 所有消费者已成功处理完消息");
}
System.out.println("???? 主程序执行结束");


雷达卡


京公网安备 11010802022788号







