2025-05-07🌱上海: ☀️ 🌡️+19°C 🌬️↖19km/h

# Part002 多线程任务批处理通用工具类

# 1. 为什么(Why)

# 1.1 项目背景

part002 模块实现了一个基于 java 的批处理任务工具,主要用于并行处理大量同类型任务(如批量发送短信、邮件、消息推送等)。随着业务量的增加,传统的串行处理方式已经无法满足高并发系统的需求,因此引入了并行批处理技术,以提高任务处理效率。

# 1.2 解决的问题

  • 任务处理效率低:通过线程池并行处理任务,显著提高了任务处理的速度。

  • 资源利用率不高:合理分配线程资源,提高 CPU 和系统资源的利用率。

  • 代码复用性差:封装通用的批处理工具类,提高代码的复用性和可维护性。

  • 任务完成同步问题:确保所有任务处理完毕后才继续执行后续操作。

# 2. 如何实现(How)

# 2.1 项目结构

part002 模块的项目结构如下:

part002/
├── src/
│   └── main/
│       └── java/
│           └── com/
│               └── muzi/
│                   ├── SimpleBatchTask.java      # 简单批处理任务示例
│                   └── TaskDisposeUtils.java     # 批处理任务工具类
└── pom.xml                                       # Maven配置文件

# 2.2 关键技术点

# 2.2.1 案例分析:批处理任务工具类

技术实现: 批处理任务工具类 TaskDisposeUtils 通过泛型和函数式编程实现了灵活的任务处理机制:

public static <T> void dispose(List<T> taskList, Consumer<? super T> consumer, Executor executor) throws InterruptedException {
    if (taskList == null || taskList.size() == 0) {
        return;
    }
    Objects.nonNull(consumer);
    CountDownLatch countDownLatch = new CountDownLatch(taskList.size());
    for (T item : taskList) {
        executor.execute(() -> {
            try {
                consumer.accept(item);
            } finally {
                countDownLatch.countDown();
            }
        });
    }
    countDownLatch.await();
}

原理分析

  1. 泛型设计

    1. 通过泛型参数 <T> 支持处理任意类型的任务列表

    2. 增强了工具类的通用性和扩展性

  2. 函数式编程

    1. 使用 Consumer<? super T> 函数式接口接收任务处理逻辑

    2. 允许调用者传入自定义的任务处理方法,提高灵活性

  3. 线程池管理

    1. 接收外部传入的线程池 Executor ,实现资源的统一管理

    2. 避免在工具类内部创建线程池导致的资源浪费

  4. 任务同步机制

    1. 使用 CountDownLatch 实现任务完成的同步等待

    2. 确保所有任务处理完毕后才返回,实现批处理任务的完整性控制

# 2.2.2 案例分析:简单批处理任务

技术实现SimpleBatchTask 类展示了基本的批处理任务实现方式:

public static void batchTaskTest() {
    long startTime = System.currentTimeMillis();
    List<String> messgList = new ArrayList<>();
    for (int i = 0; i < 50; i++) {
        messgList.add("短信-" + i);
    }
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    CountDownLatch countDownLatch = new CountDownLatch(messgList.size());
    for (String mess : messgList) {
        executorService.execute(() -> {
            try {
                // 交给线程池处理任务
                disposeTask(mess);
            } finally {
                // 处理完成后调用 countDownLatch.countDown ()
                countDownLatch.countDown();
            }
        });
    }
    try {
        // 阻塞当前线程池
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("任务处理完毕,耗时(ms):" + (System.currentTimeMillis() - startTime));
    executorService.shutdown();
}

原理分析

  1. 线程池配置

    1. 创建固定大小的线程池 newFixedThreadPool(10) ,控制并发数量为 10

    2. 合理设置线程池大小,避免线程过多导致的上下文切换开销

  2. 任务分配

    1. 将 50 个短信发送任务分配给 10 个线程并行处理

    2. 每个线程处理的任务数量约为 5 个,实现任务的均衡分配

  3. 同步等待

    1. 使用 CountDownLatch.await() 阻塞主线程,直到所有任务处理完毕

    2. 通过 finally 块确保即使任务执行异常,计数器也会正常减少

  4. 资源释放

    1. 所有任务完成后,调用 executorService.shutdown() 释放线程池资源

    2. 防止资源泄露,符合资源管理最佳实践

# 3. 技术点详解(Detail)

# 3.1 线程池原理

批处理任务工具核心使用了 java 的 ExecutorService 线程池机制,具有以下特点:

  1. 线程复用

    1. 避免频繁创建和销毁线程的开销

    2. 提高系统资源利用率和任务执行效率

  2. 任务队列

    1. 当活动线程数达到核心线程数时,新任务将被放入队列等待

    2. 实现任务的平滑调度,避免瞬时高负载

  3. 线程池配置策略

    1. newFixedThreadPool :适用于负载较重的服务器,创建固定数量的线程

    2. newCachedThreadPool :适用于执行许多短期异步任务的程序

    3. newSingleThreadExecutor :适用于需要保证顺序执行各个任务的应用场景

# 3.2 CountDownLatch 同步机制

CountDownLatch 是 java 并发包中的同步工具,用于协调多个线程之间的同步,其工作原理为:

  1. 计数器初始化

    1. 创建时指定计数值,表示需要等待完成的任务数量

    2. 每个任务完成时调用 countDown() 使计数值减一

  2. 阻塞与唤醒

    1. 主线程调用 await() 方法阻塞等待

    2. 当计数值减至零时,所有等待的线程被唤醒继续执行

  3. 使用场景

    1. 适用于一个线程需要等待多个线程完成工作的场景

    2. 批处理任务中,确保所有子任务完成后再进行结果汇总或后续处理

# 3.3 函数式编程应用

工具类中使用的 Consumer<? super T> 是 java 8 引入的函数式接口,具有以下优势:

  1. 代码简洁

    1. 使用 Lambda 表达式简化代码,提高可读性

    2. 减少匿名内部类的使用,代码更加简洁明了

  2. 灵活传递行为

    1. 允许将任务处理行为作为参数传递

    2. 实现策略模式,不同业务场景可以传入不同的处理逻辑

  3. 代码复用

    1. 将通用的批处理框架与具体业务逻辑分离

    2. 提高代码复用性,减少重复代码

# 4. 使用示例(Usage)

# 4.1 使用工具类处理批量任务

// 定义任务列表
List<String> taskList = new ArrayList<>();
for (int i = 0; i < 50; i++) {
    taskList.add("短信-" + i);
}
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 调用工具类处理任务
try {
    TaskDisposeUtils.dispose(taskList, 
        // 使用 Lambda 表达式定义任务处理逻辑
        (msg) -> {
            System.out.println(String.format("【%s】发送成功", msg));
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, 
        executorService);
} catch (InterruptedException e) {
    throw new RuntimeException(e);
} finally {
    // 关闭线程池
    executorService.shutdown();
}

# 4.2 使用方法引用简化代码

// 使用方法引用,代码更加简洁
TaskDisposeUtils.dispose(taskList, TaskDisposeUtils::disposeTask, executorService);

# 4.3 自定义任务处理

// 定义复杂对象列表
List<User> users = getUserList();
// 处理复杂对象
TaskDisposeUtils.dispose(users, user -> {
    // 发送邮件
    emailService.send(user.getEmail(), "标题", "内容");
    // 记录日志
    logService.log(user.getId(), "邮件已发送");
}, executorService);

# 5. 总结与优化方向(Summary)

# 5.1 技术总结

本模块实现了一个灵活高效的批处理任务工具,具有以下特点:

  1. 使用线程池机制实现任务的并行处理,提高执行效率

  2. 通过 CountDownLatch 确保任务同步,保证批处理完整性

  3. 采用泛型和函数式编程提高代码复用性和扩展性

  4. 分离框架和业务逻辑,实现高内聚低耦合的设计

# 5.2 优化方向

  1. 异常处理机制优化

    1. 增加异常捕获和传递机制,提供更详细的失败信息

    2. 实现任务执行状态的返回,区分成功和失败的任务

  2. 任务分组处理

    1. 支持任务的分组和优先级处理

    2. 实现不同组任务的差异化处理策略

  3. 性能监控与统计

    1. 增加任务执行时间统计

    2. 添加性能监控,以便于系统调优

  4. 动态线程池

    1. 实现线程池参数的动态调整

    2. 根据系统负载自动优化线程池配置

  5. 任务熔断和降级

    1. 增加熔断机制,当系统负载过高时,自动降级

    2. 实现任务的优雅失败和重试策略