2025-05-07🌱上海: ☀️ 🌡️+19°C 🌬️↖19km/h
# Part001 分片上传
# 1. 为什么(Why)
# 1.1 项目背景
part001
部分是一个基于 java 和 SQL 的模块,主要负责处理分片上传功能。随着文件上传需求的增加,传统的单次上传方式已经无法满足大文件上传的需求,因此引入了分片上传技术,以提高上传效率和稳定性。
# 1.2 解决的问题
-
大文件上传效率低:通过分片上传,将大文件分割成多个小文件并行上传,显著提高了上传速度。
-
网络不稳定导致上传失败:分片上传允许断点续传,即使网络中断,也可以从中断处继续上传,避免了重新上传的麻烦。
-
服务器压力大:分片上传减少了单次上传的数据量,降低了服务器的瞬时压力。
# 2. 如何实现(How)
# 2.1 项目结构
part001
部分的项目结构如下:
part001/ | |
├── src/ | |
│ ├── main/ | |
│ │ ├── java/ | |
│ │ │ └── com/ | |
│ │ │ └── muzi/ | |
│ │ │ └── part1/ | |
│ │ │ ├── comm/ # 通用工具类和响应对象 | |
│ │ │ ├── controller/ # 控制层,提供API接口 | |
│ │ │ ├── dto/ # 数据传输对象 | |
│ │ │ ├── mapper/ # MyBatis映射接口 | |
│ │ │ ├── po/ # 持久化对象 | |
│ │ │ ├── service/ # 业务逻辑层 | |
│ │ │ ├── utils/ # 工具类 | |
│ │ │ └── part1Application.java # 应用启动类 | |
│ │ └── resources/ | |
│ │ ├── db/ # 数据库初始化脚本 | |
│ │ └── application.yml # 应用配置文件 | |
│ └── test/ | |
│ └── java/ | |
│ └── com/ | |
│ └── muzi/ | |
│ └── part1/ # 测试类 | |
└── pom.xml # Maven配置文件 |
# 2.2 关键技术点
# 2.2.1 案例分析:分片上传的流程设计
技术实现: 分片上传功能采用了 "三步走" 策略:初始化、分片上传、合并完成。整个流程通过 REST API 实现,涉及四个核心接口:
-
初始化分片上传:
/shardUpload/init
-
创建上传任务记录,生成唯一任务 ID
-
记录文件名、分片数量、文件 MD5 值等元数据
-
-
上传分片:
/shardUpload/uploadPart
-
按顺序上传每个分片
-
验证分片是否已上传(避免重复上传)
-
将分片保存到临时存储位置
-
-
完成上传:
/shardUpload/complete
-
校验所有分片是否已上传完成
-
合并所有分片生成完整文件
-
通过 MD5 验证文件完整性
-
-
查询任务状态:
/shardUpload/detail
-
获取上传任务的详细信息
-
记录已上传的分片列表
-
判断整体上传是否已完成
-
原理分析:
-
数据存储设计
-
采用两张表结构:分片任务表 (
t_shard_upload
) 和分片文件表 (t_shard_upload_part
),形成一对多关系 -
分片任务表记录整体任务信息,分片文件表记录每个分片的详细信息
-
使用唯一索引确保分片不会重复上传 (
uq_part_order
)
-
-
文件处理机制
-
分片文件存储在临时目录 (
D:/muzi/shardupload/
) -
每个分片独立存储,通过唯一命名方式组织 (
shardUploadId/partOrder
) -
合并时按照分片顺序读取并拼接,确保文件完整性
-
-
安全校验
-
支持 MD5 校验,确保大文件上传后的完整性
-
仅当所有分片均上传完成时才允许合并操作
-
合并后的文件 MD5 与原始文件 MD5 进行比对验证
-
# 2.2.2 案例分析:多线程并行上传实现
技术实现: 测试类 ShardUploadTest
中实现了两种上传方式:
- 串行上传(已注释)
// 循环上传分片 | |
for (int partOrder = 1; partOrder <= partNum; partOrder++) { | |
this.shardUploadPart(shardUploadId, partOrder); | |
} |
- 并行上传(实际使用)
// 多线程上传分片 | |
ExecutorService executorService = Executors.newFixedThreadPool(partNum); | |
CountDownLatch countDownLatch = new CountDownLatch(partNum); | |
for (int i = 1; i <= partNum; i++) { | |
int partorder = i; | |
executorService.execute(() -> { | |
try { | |
ShardUploadTest shardUploadTest = new ShardUploadTest(); | |
shardUploadTest.shardUploadPart(shardUploadId, partorder); | |
} catch (Exception e) { | |
log.info("分片上传失败{}", e); | |
} finally { | |
countDownLatch.countDown(); | |
} | |
}); | |
} | |
countDownLatch.await(); | |
executorService.shutdown(); |
原理分析:
-
线程池优化
-
使用
ExecutorService
创建固定大小的线程池,线程数量等于分片数量 -
避免频繁创建和销毁线程的开销,提高性能
-
-
任务协调机制
-
采用
CountDownLatch
同步机制,确保所有分片任务完成后才进行合并 -
每个分片上传完成后调用
countDown()
方法,计数器减一 -
主线程通过
await()
方法等待所有分片上传完成
-
-
分片读取优化
-
使用
RandomAccessFile
实现高效的文件分片读取 -
通过
seek()
方法直接定位到分片起始位置,减少 IO 操作 -
针对最后一个可能不足分片大小的分片进行特殊处理
-
# 2.2.3 案例分析:断点续传实现
技术实现: 断点续传功能通过以下机制实现:
- 分片状态检查
// 如果分片已上传,则直接返回 | |
if (this.getUploadPartPO(request.getShardUploadId(), request.getPartOrder()) != null) { | |
return; | |
} |
- 上传任务恢复 通过
/shardUpload/detail
接口获取任务状态和已上传分片列表
// 获取分片任务的详细信息 (哪些分片文件是否已上传) | |
ShardUploadDetailResponse detail = this.shardUploadDetail(shardUploadId); |
原理分析:
-
状态管理
-
每个分片的上传状态通过数据库记录,确保持久化
-
通过唯一约束防止重复上传同一分片
-
分片上传前先检查是否已存在,实现秒传和断点续传
-
-
任务恢复策略
-
客户端可以通过详情接口获取已上传分片列表
-
仅上传未完成的分片,节省带宽和时间
-
服务端支持任意顺序上传分片,提高灵活性
-
-
容错机制
-
每个分片独立保存和记录,互不影响
-
单个分片上传失败不影响整体进度,可重试
-
完整性校验确保最终文件无损
-
# 3. 技术点详解(Detail)
# 3.1 数据库设计
系统使用两张表设计:
-
t_shard_upload:分片上传任务表
-
id
:主键,任务唯一标识 -
file_name
:上传文件名 -
part_num
:分片总数 -
md5
:文件 MD5 校验值 -
file_full_path
:合并后文件完整路径
-
-
t_shard_upload_part:分片文件表
-
id
:主键 -
shard_upload_id
:关联分片任务 ID -
part_order
:分片序号,从 1 开始 -
file_full_path
:分片文件存储路径 -
唯一索引:
uq_part_order (shard_upload_id, part_order)
-
# 3.2 核心算法
- 分片数量计算
public static int shardNum(long fileSize, long partSize) { | |
if (fileSize % partSize == 0) { | |
return (int) (fileSize / partSize); | |
} else { | |
return (int) (fileSize / partSize) + 1; | |
} | |
} |
- 分片读取
public byte[] readPart(int partOrder) throws Exception { | |
RandomAccessFile randomAccessFile = null; | |
byte[] bytes = new byte[(int) partSize]; | |
try { | |
randomAccessFile = new RandomAccessFile(file, "r"); | |
randomAccessFile.seek((partOrder - 1) * partSize); | |
int read = randomAccessFile.read(bytes); | |
if (read == partSize) { | |
return bytes; | |
} else { | |
byte[] tempBytes = new byte[read]; | |
System.arraycopy(bytes, 0, tempBytes, 0, read); | |
return tempBytes; | |
} | |
} finally { | |
IOUtils.closeQuietly(randomAccessFile); | |
} | |
} |
- 文件合并
private File mergeFile(ShardUploadPO shardUploadPO, List<ShardUploadPartPO> shardUploadPartList) throws IOException { | |
File file = ShardUploadUtils.createFileNotExists(new File(this.getFileFullName(shardUploadPO))); | |
FileOutputStream fileOutputStream = null; | |
try { | |
fileOutputStream = FileUtils.openOutputStream(file, true); | |
for (ShardUploadPartPO part : shardUploadPartList) { | |
File partFile = new File(part.getFileFullPath()); | |
FileInputStream partFileInputStream = null; | |
try { | |
partFileInputStream = FileUtils.openInputStream(partFile); | |
IOUtils.copyLarge(partFileInputStream, fileOutputStream); | |
} finally { | |
IOUtils.closeQuietly(partFileInputStream); | |
} | |
partFile.delete(); | |
} | |
} finally { | |
IOUtils.closeQuietly(fileOutputStream); | |
} | |
if (StringUtils.isNotBlank(shardUploadPO.getMd5()) && !shardUploadPO.getMd5().equals(SecureUtil.md5(file))) { | |
throw ServiceExceptionUtils.exception("文件md5不匹配"); | |
} | |
return file; | |
} |
# 3.3 性能与安全考量
-
性能优化
-
多线程并行上传分片,提高传输效率
-
使用
RandomAccessFile
实现高效的文件分片读取 -
采用固定大小的线程池,避免资源浪费
-
-
安全措施
-
MD5 完整性校验,防止文件损坏
-
分片上传状态持久化,支持断点续传
-
临时分片文件存储与合并完成后的清理
-
-
资源管理
-
文件资源使用后及时关闭,防止资源泄露
-
合并完成后删除临时分片文件,节省存储空间
-
使用
try-finally
结构确保资源正确释放
-
# 4. 使用示例(Usage)
# 4.1 客户端调用流程
- 初始化上传任务
public String shardUploadInit(String fileName, int partNum, String md5) { | |
ShardUploadInitRequest request = new ShardUploadInitRequest(); | |
request.setFileName(fileName); | |
request.setPartNum(partNum); | |
request.setMd5(md5); | |
RequestEntity<ShardUploadInitRequest> entity = RequestEntity | |
.post(this.getRequestUrl("shardUpload/init")) | |
.contentType(MediaType.APPLICATION_JSON) | |
.body(request); | |
ResponseEntity<Result<String>> exchange = this.restTemplate.exchange(entity, | |
new ParameterizedTypeReference<Result<String>>() {}); | |
return exchange.getBody().getData(); | |
} |
- 上传单个分片
public void shardUploadPart(String shardUploadId, int partOrder) throws Exception { | |
byte[] bytes = readPart(partOrder); | |
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>(); | |
body.add("shardUploadId", shardUploadId); | |
body.add("partOrder", partOrder); | |
body.add("file", new ByteArrayResource(bytes) { | |
@Override | |
public String getFilename() { | |
return "part" + partOrder; | |
} | |
}); | |
RequestEntity<MultiValueMap<String, Object>> entity = RequestEntity | |
.post(this.getRequestUrl("shardUpload/uploadPart")) | |
.body(body); | |
this.restTemplate.exchange(entity, new ParameterizedTypeReference<Result<String>>() {}); | |
} |
- 完成上传
public void shardUploadComplete(String shardUploadId) { | |
ShardUploadCompleteRequest request = new ShardUploadCompleteRequest(); | |
request.setShardUploadId(shardUploadId); | |
RequestEntity<ShardUploadCompleteRequest> entity = RequestEntity | |
.post(this.getRequestUrl("shardUpload/complete")) | |
.contentType(MediaType.APPLICATION_JSON) | |
.body(request); | |
ResponseEntity<Result<Boolean>> responseEntity = this.restTemplate.exchange(entity, | |
new ParameterizedTypeReference<Result<Boolean>>() {}); | |
} |
# 4.2 完整示例
测试类 ShardUploadTest
提供了一个完整的分片上传演示:
@Test | |
public void shardUpload() throws Exception { | |
long begin = System.currentTimeMillis(); | |
int partNum = ShardUploadUtils.shardNum(file.length(), partSize); | |
String fileMd5 = SecureUtil.md5(file); | |
// 1、分片上传初始化 | |
String shardUploadId = this.shardUploadInit(file.getName(), partNum, fileMd5); | |
// 2、多线程上传分片 | |
ExecutorService executorService = Executors.newFixedThreadPool(partNum); | |
CountDownLatch countDownLatch = new CountDownLatch(partNum); | |
for (int i = 1; i <= partNum; i++) { | |
int partorder = i; | |
executorService.execute(() -> { | |
try { | |
ShardUploadTest shardUploadTest = new ShardUploadTest(); | |
shardUploadTest.shardUploadPart(shardUploadId, partorder); | |
} catch (Exception e) { | |
log.info("分片上传失败{}", e); | |
} finally { | |
countDownLatch.countDown(); | |
} | |
}); | |
} | |
countDownLatch.await(); | |
executorService.shutdown(); | |
// 3、合并分片,完成上传 | |
this.shardUploadComplete(shardUploadId); | |
// 4、获取分片任务的详细信息 | |
ShardUploadDetailResponse detail = this.shardUploadDetail(shardUploadId); | |
long end = System.currentTimeMillis(); | |
log.info("运行时间:{}", end-begin); | |
log.info("分片任务详细信息:{}", detail); | |
} |
# 5. 总结与未来优化(Summary)
# 5.1 技术总结
本项目成功实现了基于 java 和 Spring Boot 的分片上传功能,解决了大文件上传面临的多种问题:
-
通过分片上传提高了大文件传输效率
-
支持断点续传,增强了上传任务的稳定性
-
实现了并行上传,充分利用网络带宽
-
提供了完整性校验,保证文件安全
# 5.2 可优化方向
-
存储方式优化
-
考虑使用对象存储服务替代本地文件系统
-
支持分布式存储,提高系统可扩展性
-
-
传输协议优化
-
支持 WebSocket 等更高效的传输协议
-
实现流式传输,减少内存占用
-
-
前端交互优化
-
提供上传进度实时反馈
-
实现可视化的断点续传界面
-
-
安全性加强
-
增加文件类型校验和安全扫描
-
实现传输过程加密
-
-
性能进一步提升
-
动态调整分片大小,适应不同网络环境
-
实现服务端分片合并的异步处理
-