Spring Boot中大文件分片上传—支持本地文件和Amazon S3
编辑前言
本篇主要整理了大文件分片上传客户端和服务端的实现,其中客户端是通过Java代码来模拟的文件分片上传的逻辑(我不太会写前端,核心逻辑都是一样的,这边前端可以参考开源组件:vue-uploader),服务端实现包含本地文件系统和Amazon S3对象存储两种文件存储类型。
分片上传实现原理
实现原理其实很简单,网上也有很多资料,核心就是客户端把大文件按照一定规则进行拆分,比如20MB为一个小块,分解成一个一个的文件块,然后把这些文件块单独上传到服务端,等到所有的文件块都上传完毕之后,客户端再通知服务端进行文件合并的操作,合并完成之后整个任务结束。
主要能力
提供下面几个能力:
方法 | URL | 功能 |
---|---|---|
POST | /chunk | 分片上传 |
GET | /merge?filename={filename} | 文件合并 |
GET | /files/{filename} | 下载文件 |
GET | /files | 获取文件列表 |
具体实现
代码结构
主要功能实现
服务端
这边直接贴一下代码
首先是Chunk类,这个类主要包含了分块的信息:
/**
* 文件块
*
* @author yuanzhihao
* @since 2023/4/10
*/
@Data
public class Chunk {
/**
* 当前文件块,从1开始
*/
private Integer chunkNumber;
/**
* 分块大小
*/
private Long chunkSize;
/**
* 当前分块大小
*/
private Long currentChunkSize;
/**
* 总大小
*/
private Long totalSize;
/**
* 文件名
*/
private String filename;
/**
* 总块数
*/
private Integer totalChunks;
/**
* 分块文件内容
*/
private MultipartFile file;
}
controller,提供四个接口:
/**
* 分片上传
*
* @author yuanzhihao
* @since 2023/4/10
*/
@RestController
@RequestMapping
public class ChunkController {
@Autowired
private ChunkService chunkService;
/**
* 分块上传文件
*
* @param chunk 文件块信息
* @return 响应
*/
@PostMapping(value = "chunk")
public ResponseEntity<String> chunk(Chunk chunk) {
chunkService.chunk(chunk);
return ResponseEntity.ok("File Chunk Upload Success");
}
/**
* 文件合并
*
* @param filename 文件名
* @return 响应
*/
@GetMapping(value = "merge")
public ResponseEntity<Void> merge(@RequestParam("filename") String filename) {
chunkService.merge(filename);
return ResponseEntity.ok().build();
}
/**
* 获取文件列表
*
* @return 文件列表
*/
@GetMapping("/files")
public ResponseEntity<List<FileInfo>> list() {
return ResponseEntity.ok(chunkService.list());
}
/**
* 获取指定文件
*
* @param filename 文件名称
* @return 文件
*/
@GetMapping("/files/{filename:.+}")
public ResponseEntity<Resource> getFile(@PathVariable("filename") String filename) {
return ResponseEntity.ok().header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + filename + "\"").body(chunkService.getFile(filename));
}
}
service中,是分块的一些业务逻辑,这边存储的信息我都是写在内存中的,比如文件信息,分片信息等等:
/**
* 文件分块上传
*
* @author yuanzhihao
* @since 2023/4/10
*/
public interface ChunkService {
/**
* 分块上传文件
*
* @param chunk 文件块信息
*/
void chunk(Chunk chunk);
/**
* 文件合并
*
* @param filename 文件名
*/
void merge(String filename);
/**
* 获取文件列表
*
* @return 文件列表
*/
List<FileInfo> list();
/**
* 获取指定文件
*
* @param filename 文件名称
* @return 文件
*/
Resource getFile(String filename);
}
/**
* 分片上传
*
* @author yuanzhihao
* @since 2023/4/10
*/
@Service
@Slf4j
public class ChunkServiceImpl implements ChunkService {
// 分片进度
private static final Map<String, ChunkProcess> CHUNK_PROCESS_STORAGE = new ConcurrentHashMap<>();
// 文件列表
private static final List<FileInfo> FILE_STORAGE = new CopyOnWriteArrayList<>();
@Autowired
private FileClient fileClient;
@Override
public void chunk(Chunk chunk) {
String filename = chunk.getFilename();
boolean match = FILE_STORAGE.stream().anyMatch(fileInfo -> fileInfo.getFilename().equals(filename));
if (match) {
throw new RuntimeException("File [ " + filename + " ] already exist");
}
ChunkProcess chunkProcess;
String uploadId;
if (CHUNK_PROCESS_STORAGE.containsKey(filename)) {
chunkProcess = CHUNK_PROCESS_STORAGE.get(filename);
uploadId = chunkProcess.getUploadId();
AtomicBoolean isUploaded = new AtomicBoolean(false);
Optional.ofNullable(chunkProcess.getChunkList()).ifPresent(chunkPartList ->
isUploaded.set(chunkPartList.stream().anyMatch(chunkPart -> chunkPart.getChunkNumber() == chunk.getChunkNumber())));
if (isUploaded.get()) {
log.info("文件【{}】分块【{}】已经上传,跳过", chunk.getFilename(), chunk.getChunkNumber());
return;
}
} else {
uploadId = fileClient.initTask(filename);
chunkProcess = new ChunkProcess().setFilename(filename).setUploadId(uploadId);
CHUNK_PROCESS_STORAGE.put(filename, chunkProcess);
}
List<ChunkProcess.ChunkPart> chunkList = chunkProcess.getChunkList();
String chunkId = fileClient.chunk(chunk, uploadId);
chunkList.add(new ChunkProcess.ChunkPart(chunkId, chunk.getChunkNumber()));
CHUNK_PROCESS_STORAGE.put(filename, chunkProcess.setChunkList(chunkList));
}
@Override
public void merge(String filename) {
ChunkProcess chunkProcess = CHUNK_PROCESS_STORAGE.get(filename);
fileClient.merge(chunkProcess);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = simpleDateFormat.format(new Date());
FILE_STORAGE.add(new FileInfo().setUploadTime(currentTime).setFilename(filename));
CHUNK_PROCESS_STORAGE.remove(filename);
}
@Override
public List<FileInfo> list() {
return FILE_STORAGE;
}
@Override
public Resource getFile(String filename) {
return fileClient.getFile(filename);
}
}
具体文件上传的实现是在FileClient中,我抽象了一个具体的接口:
/**
* 文件客户端
*
* @author yuanzhihao
* @since 2023/4/10
*/
public interface FileClient {
/**
* 初始化文件客户端
*/
void initFileClient();
/**
* 初始化任务
*
* @param filename 文件名称
* @return 任务id
*/
String initTask(String filename);
/**
* 上传文件块
*
* @param chunk 文件块
* @param uploadId 任务ID
* @return 对于S3返回eTag地址 对于本地文件返回文件块地址
*/
String chunk(Chunk chunk, String uploadId);
/**
* 文件合并
*
* @param chunkProcess 分片详情
*/
void merge(ChunkProcess chunkProcess);
/**
* 根据文件名称获取文件
*
* @param filename 文件名称
* @return 文件资源
*/
Resource getFile(String filename);
}
这个接口有两个实现类,一个是LocalFileSystemClient本地文件系统,还有一个是AWSFileClient对象存储,这块我是通过配置文件来指定加载哪一个文件客户端,类似懒加载的方式,具体加载类FileClientConfig:
/**
* 文件上传客户端 通过配置文件指定加载哪个客户端
* 目前支持本地文件和AWS S3对象存储
*
* @author yuanzhihao
* @since 2023/4/10
*/
@Configuration
public class FileClientConfig {
@Value("${file.client.type:local-file}")
private String fileClientType;
private static final Map<String, Supplier<FileClient>> FILE_CLIENT_SUPPLY = new HashMap<String, Supplier<FileClient>>() {
{
put("local-file", LocalFileSystemClient::new);
put("aws-s3", AWSFileClient::new);
}
};
/**
* 注入文件客户端对象
*
* @return 文件客户端
*/
@Bean
public FileClient fileClient() {
return FILE_CLIENT_SUPPLY.get(fileClientType).get();
}
}
下面讲一下两种文件服务器的实现
本地文件存储
本地文件是把所有的分块上传到一个指定的目录,等上传完成之后,将所有的文件块排序,按照分片顺序一块一块的写到指定的合并文件中,之后清理所有的文件块,完成上传。
具体实现LocalFileSystemClient:
/**
* 本地文件系统
*
* @author yuanzhihao
* @since 2023/4/10
*/
@Slf4j
public class LocalFileSystemClient implements FileClient {
private static final String DEFAULT_UPLOAD_PATH = "/data/upload/";
public LocalFileSystemClient() {
// 初始化
this.initFileClient();
}
@Override
public void initFileClient() {
FileUtil.mkdir(DEFAULT_UPLOAD_PATH);
}
@Override
public String initTask(String filename) {
// 分块文件存储路径
String tempFilePath = DEFAULT_UPLOAD_PATH + filename + UUID.randomUUID();
FileUtil.mkdir(tempFilePath);
return tempFilePath;
}
@Override
public String chunk(Chunk chunk, String uploadId) {
String filename = chunk.getFilename();
String chunkFilePath = uploadId + "/" + chunk.getChunkNumber();
try (InputStream in = chunk.getFile().getInputStream();
OutputStream out = Files.newOutputStream(Paths.get(chunkFilePath))) {
FileCopyUtils.copy(in, out);
} catch (IOException e) {
log.error("File [{}] upload failed", filename, e);
throw new RuntimeException(e);
}
return chunkFilePath;
}
@Override
public void merge(ChunkProcess chunkProcess) {
String filename = chunkProcess.getFilename();
// 需要合并的文件所在的文件夹
File chunkFolder = new File(chunkProcess.getUploadId());
// 合并后的文件
File mergeFile = new File(DEFAULT_UPLOAD_PATH + filename);
try (BufferedOutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(mergeFile.toPath()))) {
byte[] bytes = new byte[1024];
File[] fileArray = Optional.ofNullable(chunkFolder.listFiles())
.orElseThrow(() -> new IllegalArgumentException("Folder is empty"));
List<File> fileList = Arrays.stream(fileArray).sorted(Comparator.comparing(File::getName)).collect(Collectors.toList());
fileList.forEach(file -> {
try (BufferedInputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath()))) {
int len;
while ((len = inputStream.read(bytes)) != -1) {
outputStream.write(bytes, 0, len);
}
} catch (IOException e) {
log.info("File [{}] chunk [{}] write failed", filename, file.getName(), e);
throw new RuntimeException(e);
}
});
} catch (IOException e) {
log.info("File [{}] merge failed", filename, e);
throw new RuntimeException(e);
} finally {
FileUtil.del(chunkProcess.getUploadId());
}
}
@Override
public Resource getFile(String filename) {
File file = new File(DEFAULT_UPLOAD_PATH + filename);
return new FileSystemResource(file);
}
}
Amazon S3对象存储
对象存储中,首先是需要调用initiateMultipartUpload方法,初始化生成一个任务标识uploadId,然后每个分块任务上传都需要携带这个uploadId,分块任务上传是调用uploadPart方法,这个方法会返回一个eTag标识,最后等所有文件块上传完成之后,需要使用这个eTag标识和uploadId去调用completeMultipartUpload方法完成文件合并。
具体实现:
/**
* AWS S3对象存储
*
* @author yuanzhihao
* @since 2023/4/10
*/
@Slf4j
public class AWSFileClient implements FileClient {
// 默认桶
private static final String DEFAULT_BUCKET = "";
private static final String AK = "";
private static final String SK = "";
private static final String ENDPOINT = "";
private AmazonS3 s3Client;
public AWSFileClient() {
// 初始化文件客户端
this.initFileClient();
}
@Override
public void initFileClient() {
this.s3Client = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(AK, SK)))
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(ENDPOINT, "cn-north-1"))
.build();
}
@Override
public String initTask(String filename) {
// 初始化分片上传任务
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(DEFAULT_BUCKET, filename);
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
return initResponse.getUploadId();
}
@Override
public String chunk(Chunk chunk, String uploadId) {
try (InputStream in = chunk.getFile().getInputStream()) {
// 上传
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(DEFAULT_BUCKET)
.withKey(chunk.getFilename())
.withUploadId(uploadId)
.withInputStream(in)
.withPartNumber(chunk.getChunkNumber())
.withPartSize(chunk.getCurrentChunkSize());
UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
return uploadResult.getETag();
} catch (IOException e) {
log.error("文件【{}】上传分片【{}】失败", chunk.getFilename(), chunk.getChunkNumber(), e);
throw new RuntimeException(e);
}
}
@Override
public void merge(ChunkProcess chunkProcess) {
List<PartETag> partETagList = chunkProcess.getChunkList()
.stream()
.map(chunkPart -> new PartETag(chunkPart.getChunkNumber(), chunkPart.getLocation()))
.collect(Collectors.toList());
CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(DEFAULT_BUCKET, chunkProcess.getFilename(),
chunkProcess.getUploadId(), partETagList);
s3Client.completeMultipartUpload(compRequest);
}
@Override
public Resource getFile(String filename) {
GetObjectRequest request = new GetObjectRequest(DEFAULT_BUCKET, filename);
S3Object s3Object = s3Client.getObject(request);
return new InputStreamResource(s3Object.getObjectContent());
}
}
客户端
客户端是通过Java代码模拟的前端逻辑,具体代码:
/**
* 模拟前端逻辑 在代码中实现大文件分片上传 使用restTemplate客户端
*
* @author yuanzhihao
* @since 2023/4/10
*/
public class MultipartUploadTest {
@Test
public void testUpload() throws Exception {
String chunkFileFolder = "/data/bucket/";
java.io.File file = new java.io.File("/Users/yuanzhihao/Downloads/ideaIU-2022.2.2.exe");
long contentLength = file.length();
// 每块大小设置为20MB
long partSize = 20 * 1024 * 1024;
// 文件分片块数 最后一块大小可能小于 20MB
long chunkFileNum = (long) Math.ceil(contentLength * 1.0 / partSize);
RestTemplate restTemplate = new RestTemplate();
try (RandomAccessFile raf_read = new RandomAccessFile(file, "r")) {
// 缓冲区
byte[] b = new byte[1024];
for (int i = 1; i <= chunkFileNum; i++) {
// 块文件
java.io.File chunkFile = new java.io.File(chunkFileFolder + i);
// 创建向块文件的写对象
try (RandomAccessFile raf_write = new RandomAccessFile(chunkFile, "rw")) {
int len;
while ((len = raf_read.read(b)) != -1) {
raf_write.write(b, 0, len);
// 如果块文件的大小达到20M 开始写下一块儿 或者已经到了最后一块
if (chunkFile.length() >= partSize) {
break;
}
}
// 上传
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("file", new FileSystemResource(chunkFile));
body.add("chunkNumber", i);
body.add("chunkSize", partSize);
body.add("currentChunkSize", chunkFile.length());
body.add("totalSize", contentLength);
body.add("filename", file.getName());
body.add("totalChunks", chunkFileNum);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
String serverUrl = "http://localhost:8080/chunk";
ResponseEntity<String> response = restTemplate.postForEntity(serverUrl, requestEntity, String.class);
System.out.println("Response code: " + response.getStatusCode() + " Response body: " + response.getBody());
} finally {
FileUtil.del(chunkFile);
}
}
}
// 合并文件
String mergeUrl = "http://localhost:8080/merge?filename=" + file.getName();
ResponseEntity<String> response = restTemplate.getForEntity(mergeUrl, String.class);
System.out.println("Response code: " + response.getStatusCode() + " Response body: " + response.getBody());
}
}
测试验证
启动服务,调用客户端上传文件:
上传成功 获取文件列表: 下载文件:
结语
代码地址:https://github.com/yzh19961031/blogDemo/tree/master/multipartUploadFile
参考:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-upload-object.html