楼主: 林2018
46 0

[問題求助] Java并发:JDK17下多生产者+单消费者的高性能消息处理模式 [推广有奖]

  • 0关注
  • 0粉丝

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
10 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-8-18
最后登录
2018-8-18

楼主
林2018 发表于 2025-12-8 18:16:03 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

Java 并发编程实践:基于 BlockingQueue 的生产者消费者模型

涉及技术点:线程安全、消息零丢失、优雅关闭机制、批量消费优化、LinkedBlockingQueue 应用与线程池协同

一、业务场景描述

设想一个典型的异步处理需求:

  • 启动 10 个独立的生产者线程,每个线程生成 1000 条结构化数据记录(以 Map<String, String> 形式表示);
  • 由单一消费者线程负责从共享队列中批量获取这些消息,并模拟持久化操作(如写入数据库);
  • 系统必须保证所有消息不丢失,并在全部任务完成后实现程序的平稳终止。

此类架构广泛应用于日志采集系统、事件驱动架构或后台任务调度等高并发场景。

二、关键技术选型分析

1. 阻塞队列选择:LinkedBlockingQueue

作为线程安全的数据通道,该实现支持以下关键方法:

  • put():当队列满时自动阻塞,确保生产者不会覆盖未处理消息;
  • drainTo():允许一次性批量取出多个元素,显著提升消费效率。

默认构造函数创建的实例容量为 Integer.MAX_VALUE,接近“无界”,适用于突发流量场景。但需注意潜在的内存溢出风险,后续可通过设定上限优化。

2. 线程池配置策略

  • 使用 Executors.newFixedThreadPool(10) 创建包含 10 个固定线程的生产者线程池;
  • 采用 Executors.newSingleThreadExecutor() 启动唯一消费者线程,保障消息处理顺序性。

3. 多线程协调机制

  • AtomicBoolean allProducersDone:标志位用于通知消费者——所有生产活动已经结束;
  • AtomicInteger producedCount:用于统计总产出量,便于监控和日志追踪(非核心流程依赖)。

三、完整代码实现

package oscollege.back;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多生产者-单消费者模式示例
 *
 * 功能说明:
 * - 共10个生产者线程,各自发送1000条消息(总计10,000条)
 * - 单一消费者线程通过批量拉取方式处理消息,模拟数据库插入行为
 * - 利用阻塞队列保障线程间通信的安全性
 * - 实现消息零丢失及应用的优雅退出
 */
public class Temp {
    public static void main(String[] args) throws InterruptedException {
        // 创建拥有10个线程的生产者线程池
        ExecutorService producerPool = Executors.newFixedThreadPool(10);

        // 使用单线程执行器运行消费者,确保批次按序处理
        ExecutorService consumerPool = Executors.newSingleThreadExecutor();

        // 注意:new LinkedBlockingQueue<>() 默认容量极大(≈21亿),逻辑上视为无界
        // 尽管使用方便,但存在内存耗尽的风险。建议在生产环境中显式设置合理上限,例如 new LinkedBlockingQueue<>(10_000)
        BlockingQueue<Map<String, String>> mapBlockingQueue = new LinkedBlockingQueue<>();

        // 原子变量,标识所有生产者已完成数据提交
        // 必须保证多线程环境下的可见性,故选用 AtomicBoolean 或 volatile 修饰
        AtomicBoolean allProducersDone = new AtomicBoolean(false);

        // 记录已生成的消息总数(仅用于输出统计信息,不影响主逻辑)
        AtomicInteger producedCount = new AtomicInteger(0);

        // ========================
        // 开启10个生产者线程进行数据投递
        // ========================
        final int totalMessagesPerProducer = 1000; // 每个生产者负责生成1000条消息
for (int i = 0; i < 10; i++) {
    final int producerId = i; // 捕获当前循环索引,供Lambda使用
    producerPool.submit(() -> {
        try {
            for (int j = 0; j < totalMessagesPerProducer; j++) {
                // 创建模拟消息体(可代表日志、订单或事件数据)
                Map<String, String> message = new HashMap<>();
                message.put("producer", String.valueOf(producerId));
                message.put("seq", String.valueOf(j));
                message.put("data", "msg-" + producerId + "-" + j);

                // 使用 put() 方法入队:若队列满则阻塞,确保不丢消息
                mapBlockingQueue.put(message);

                // 原子更新生产计数器(仅用于监控和日志记录)
                producedCount.incrementAndGet();
            }
        } catch (InterruptedException e) {
            // 发生中断时恢复中断状态,保证线程安全退出
            Thread.currentThread().interrupt();
        }
        // 生产者任务执行完毕,自动结束
    });
}

// ========================
// 启动消费者线程(支持批量处理模式)
// ========================
consumerPool.submit(() -> {
    // 复用批次容器以降低GC压力(避免频繁创建ArrayList实例)
    List<Map<String, String>> batch = new ArrayList<>();

    try {
        while (true) {
            // 批量拉取消息,最多一次获取100条,提升吞吐量
            // drainTo 能减少锁争用,提高并发性能
            int drained = mapBlockingQueue.drainTo(batch, 100);

            if (drained > 0) {
                System.out.println("???? 消费者批量处理 " + drained + " 条消息");

                // 模拟真实业务中的耗时操作,如批量写入数据库或调用远程服务
                Thread.sleep(1000);

                // 清空已处理批次,准备下一轮复用
                batch.clear();
            }

            // ========== 安全终止机制 ==========
            // 当满足以下两个条件时,方可安全退出:
            // 1. allProducersDone 为 true:所有生产者已完成提交且不再生成新消息
            // 2. 队列为空:当前无待处理消息
            //
            // ???? 此判断为何可靠?
            // 因为 allProducersDone 是在 producerPool.awaitTermination() 成功返回后才置为 true,
            // 表示所有生产者线程已彻底结束。此时即使队列暂时为空,也不会再有新消息加入。
            // 故当两者同时成立,说明全部工作已完成。
            if (allProducersDone.get() && mapBlockingQueue.isEmpty()) {
                System.out.println("? 消费者已完成所有消息处理");
                break; // 安全退出消费循环
            }
        }
    } catch (InterruptedException e) {
        // 中断异常处理:恢复中断标志,便于上层逻辑响应
        Thread.currentThread().interrupt();
        System.err.println("消费者被中断");
    }
});

// ========================
// 等待所有生产者线程执行完成
// ========================
// 关闭生产者线程池——拒绝新任务,但允许已有任务完成
producerPool.shutdown();

// 最多等待5分钟,确保所有生产者任务结束
boolean producersFinished = producerPool.awaitTermination(5, TimeUnit.MINUTES);

if (!producersFinished) {
    // 若超时仍未完成,则强制中断所有仍在运行的生产者线程
// 关键步骤:标记“所有生产者已完成”
// 此时可以确保不会再有新消息被添加到队列中
allProducersDone.set(true);
System.out.println("? 所有生产者任务完成,共生成 " + producedCount.get() + " 条消息");

// 生产者超时处理
try {
    if (!producerPool.awaitTermination(5, TimeUnit.MINUTES)) {
        System.err.println("?? 生产者执行超时,触发强制关闭");
        producerPool.shutdownNow(); // 中断所有正在运行的任务
    }
} catch (InterruptedException e) {
    System.err.println("?? 生产者线程被中断,正在进行清理");
    producerPool.shutdownNow();
    Thread.currentThread().interrupt();
}

// ========================
// 开始等待消费者处理完剩余消息
// ========================
consumerPool.shutdown();
System.out.println("? 消费者线程池已关闭,正在等待处理完成...");

// 最多等待10分钟让消费者处理完毕
boolean consumerFinished = consumerPool.awaitTermination(10, TimeUnit.MINUTES);

if (!consumerFinished) {
    System.err.println("?? 消费者未能在规定时间内完成任务,执行强制终止");
    consumerPool.shutdownNow();
} else {
    System.out.println("? 所有消费者已成功处理完消息");
}

System.out.println("???? 主程序执行结束");
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Java 消费者 高性能 生产者 jav

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
jg-xs1
拉您进交流群
GMT+8, 2025-12-25 00:10