2025-05-07🌱上海: ☀️ 🌡️+19°C 🌬️↖19km/h
# Part002 多线程任务批处理通用工具类
# 1. 为什么(Why)
# 1.1 项目背景
part002
模块实现了一个基于 java 的批处理任务工具,主要用于并行处理大量同类型任务(如批量发送短信、邮件、消息推送等)。随着业务量的增加,传统的串行处理方式已经无法满足高并发系统的需求,因此引入了并行批处理技术,以提高任务处理效率。
# 1.2 解决的问题
-
任务处理效率低:通过线程池并行处理任务,显著提高了任务处理的速度。
-
资源利用率不高:合理分配线程资源,提高 CPU 和系统资源的利用率。
-
代码复用性差:封装通用的批处理工具类,提高代码的复用性和可维护性。
-
任务完成同步问题:确保所有任务处理完毕后才继续执行后续操作。
# 2. 如何实现(How)
# 2.1 项目结构
part002
模块的项目结构如下:
part002/ | |
├── src/ | |
│ └── main/ | |
│ └── java/ | |
│ └── com/ | |
│ └── muzi/ | |
│ ├── SimpleBatchTask.java # 简单批处理任务示例 | |
│ └── TaskDisposeUtils.java # 批处理任务工具类 | |
└── pom.xml # Maven配置文件 |
# 2.2 关键技术点
# 2.2.1 案例分析:批处理任务工具类
技术实现: 批处理任务工具类 TaskDisposeUtils
通过泛型和函数式编程实现了灵活的任务处理机制:
public static <T> void dispose(List<T> taskList, Consumer<? super T> consumer, Executor executor) throws InterruptedException { | |
if (taskList == null || taskList.size() == 0) { | |
return; | |
} | |
Objects.nonNull(consumer); | |
CountDownLatch countDownLatch = new CountDownLatch(taskList.size()); | |
for (T item : taskList) { | |
executor.execute(() -> { | |
try { | |
consumer.accept(item); | |
} finally { | |
countDownLatch.countDown(); | |
} | |
}); | |
} | |
countDownLatch.await(); | |
} |
原理分析:
-
泛型设计
-
通过泛型参数
<T>
支持处理任意类型的任务列表 -
增强了工具类的通用性和扩展性
-
-
函数式编程
-
使用
Consumer<? super T>
函数式接口接收任务处理逻辑 -
允许调用者传入自定义的任务处理方法,提高灵活性
-
-
线程池管理
-
接收外部传入的线程池
Executor
,实现资源的统一管理 -
避免在工具类内部创建线程池导致的资源浪费
-
-
任务同步机制
-
使用
CountDownLatch
实现任务完成的同步等待 -
确保所有任务处理完毕后才返回,实现批处理任务的完整性控制
-
# 2.2.2 案例分析:简单批处理任务
技术实现: SimpleBatchTask
类展示了基本的批处理任务实现方式:
public static void batchTaskTest() { | |
long startTime = System.currentTimeMillis(); | |
List<String> messgList = new ArrayList<>(); | |
for (int i = 0; i < 50; i++) { | |
messgList.add("短信-" + i); | |
} | |
ExecutorService executorService = Executors.newFixedThreadPool(10); | |
CountDownLatch countDownLatch = new CountDownLatch(messgList.size()); | |
for (String mess : messgList) { | |
executorService.execute(() -> { | |
try { | |
// 交给线程池处理任务 | |
disposeTask(mess); | |
} finally { | |
// 处理完成后调用 countDownLatch.countDown () | |
countDownLatch.countDown(); | |
} | |
}); | |
} | |
try { | |
// 阻塞当前线程池 | |
countDownLatch.await(); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
System.out.println("任务处理完毕,耗时(ms):" + (System.currentTimeMillis() - startTime)); | |
executorService.shutdown(); | |
} |
原理分析:
-
线程池配置
-
创建固定大小的线程池
newFixedThreadPool(10)
,控制并发数量为 10 -
合理设置线程池大小,避免线程过多导致的上下文切换开销
-
-
任务分配
-
将 50 个短信发送任务分配给 10 个线程并行处理
-
每个线程处理的任务数量约为 5 个,实现任务的均衡分配
-
-
同步等待
-
使用
CountDownLatch.await()
阻塞主线程,直到所有任务处理完毕 -
通过
finally
块确保即使任务执行异常,计数器也会正常减少
-
-
资源释放
-
所有任务完成后,调用
executorService.shutdown()
释放线程池资源 -
防止资源泄露,符合资源管理最佳实践
-
# 3. 技术点详解(Detail)
# 3.1 线程池原理
批处理任务工具核心使用了 java 的 ExecutorService
线程池机制,具有以下特点:
-
线程复用
-
避免频繁创建和销毁线程的开销
-
提高系统资源利用率和任务执行效率
-
-
任务队列
-
当活动线程数达到核心线程数时,新任务将被放入队列等待
-
实现任务的平滑调度,避免瞬时高负载
-
-
线程池配置策略
-
newFixedThreadPool
:适用于负载较重的服务器,创建固定数量的线程 -
newCachedThreadPool
:适用于执行许多短期异步任务的程序 -
newSingleThreadExecutor
:适用于需要保证顺序执行各个任务的应用场景
-
# 3.2 CountDownLatch 同步机制
CountDownLatch
是 java 并发包中的同步工具,用于协调多个线程之间的同步,其工作原理为:
-
计数器初始化
-
创建时指定计数值,表示需要等待完成的任务数量
-
每个任务完成时调用
countDown()
使计数值减一
-
-
阻塞与唤醒
-
主线程调用
await()
方法阻塞等待 -
当计数值减至零时,所有等待的线程被唤醒继续执行
-
-
使用场景
-
适用于一个线程需要等待多个线程完成工作的场景
-
批处理任务中,确保所有子任务完成后再进行结果汇总或后续处理
-
# 3.3 函数式编程应用
工具类中使用的 Consumer<? super T>
是 java 8 引入的函数式接口,具有以下优势:
-
代码简洁
-
使用 Lambda 表达式简化代码,提高可读性
-
减少匿名内部类的使用,代码更加简洁明了
-
-
灵活传递行为
-
允许将任务处理行为作为参数传递
-
实现策略模式,不同业务场景可以传入不同的处理逻辑
-
-
代码复用
-
将通用的批处理框架与具体业务逻辑分离
-
提高代码复用性,减少重复代码
-
# 4. 使用示例(Usage)
# 4.1 使用工具类处理批量任务
// 定义任务列表 | |
List<String> taskList = new ArrayList<>(); | |
for (int i = 0; i < 50; i++) { | |
taskList.add("短信-" + i); | |
} | |
// 创建线程池 | |
ExecutorService executorService = Executors.newFixedThreadPool(10); | |
// 调用工具类处理任务 | |
try { | |
TaskDisposeUtils.dispose(taskList, | |
// 使用 Lambda 表达式定义任务处理逻辑 | |
(msg) -> { | |
System.out.println(String.format("【%s】发送成功", msg)); | |
try { | |
TimeUnit.SECONDS.sleep(1); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} | |
}, | |
executorService); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} finally { | |
// 关闭线程池 | |
executorService.shutdown(); | |
} |
# 4.2 使用方法引用简化代码
// 使用方法引用,代码更加简洁 | |
TaskDisposeUtils.dispose(taskList, TaskDisposeUtils::disposeTask, executorService); |
# 4.3 自定义任务处理
// 定义复杂对象列表 | |
List<User> users = getUserList(); | |
// 处理复杂对象 | |
TaskDisposeUtils.dispose(users, user -> { | |
// 发送邮件 | |
emailService.send(user.getEmail(), "标题", "内容"); | |
// 记录日志 | |
logService.log(user.getId(), "邮件已发送"); | |
}, executorService); |
# 5. 总结与优化方向(Summary)
# 5.1 技术总结
本模块实现了一个灵活高效的批处理任务工具,具有以下特点:
-
使用线程池机制实现任务的并行处理,提高执行效率
-
通过 CountDownLatch 确保任务同步,保证批处理完整性
-
采用泛型和函数式编程提高代码复用性和扩展性
-
分离框架和业务逻辑,实现高内聚低耦合的设计
# 5.2 优化方向
-
异常处理机制优化
-
增加异常捕获和传递机制,提供更详细的失败信息
-
实现任务执行状态的返回,区分成功和失败的任务
-
-
任务分组处理
-
支持任务的分组和优先级处理
-
实现不同组任务的差异化处理策略
-
-
性能监控与统计
-
增加任务执行时间统计
-
添加性能监控,以便于系统调优
-
-
动态线程池
-
实现线程池参数的动态调整
-
根据系统负载自动优化线程池配置
-
-
任务熔断和降级
-
增加熔断机制,当系统负载过高时,自动降级
-
实现任务的优雅失败和重试策略
-