Yuandupier

Yuandupier

Spring Boot中大文件分片上传—支持本地文件和Amazon S3

179
1
0
2023-04-19

前言

本篇主要整理了大文件分片上传客户端和服务端的实现,其中客户端是通过Java代码来模拟的文件分片上传的逻辑(我不太会写前端,核心逻辑都是一样的,这边前端可以参考开源组件:vue-uploader),服务端实现包含本地文件系统和Amazon S3对象存储两种文件存储类型。

分片上传实现原理

实现原理其实很简单,网上也有很多资料,核心就是客户端把大文件按照一定规则进行拆分,比如20MB为一个小块,分解成一个一个的文件块,然后把这些文件块单独上传到服务端,等到所有的文件块都上传完毕之后,客户端再通知服务端进行文件合并的操作,合并完成之后整个任务结束。

主要能力

提供下面几个能力:

方法URL功能
POST/chunk分片上传
GET/merge?filename={filename}文件合并
GET/files/{filename}下载文件
GET/files获取文件列表

具体实现

代码结构

在这里插入图片描述

主要功能实现

服务端

这边直接贴一下代码

首先是Chunk类,这个类主要包含了分块的信息:

/**
 * 文件块
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
@Data
public class Chunk {
    /**
     * 当前文件块,从1开始
     */
    private Integer chunkNumber;

    /**
     * 分块大小
     */
    private Long chunkSize;

    /**
     * 当前分块大小
     */
    private Long currentChunkSize;

    /**
     * 总大小
     */
    private Long totalSize;

    /**
     * 文件名
     */
    private String filename;

    /**
     * 总块数
     */
    private Integer totalChunks;

    /**
     * 分块文件内容
     */
    private MultipartFile file;
}

controller,提供四个接口:

/**
 * 分片上传
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
@RestController
@RequestMapping
public class ChunkController {
    @Autowired
    private ChunkService chunkService;

    /**
     * 分块上传文件
     *
     * @param chunk 文件块信息
     * @return 响应
     */
    @PostMapping(value = "chunk")
    public ResponseEntity<String> chunk(Chunk chunk) {
        chunkService.chunk(chunk);
        return ResponseEntity.ok("File Chunk Upload Success");
    }

    /**
     * 文件合并
     *
     * @param filename 文件名
     * @return 响应
     */
    @GetMapping(value = "merge")
    public ResponseEntity<Void> merge(@RequestParam("filename") String filename) {
        chunkService.merge(filename);
        return ResponseEntity.ok().build();
    }


    /**
     * 获取文件列表
     *
     * @return 文件列表
     */
    @GetMapping("/files")
    public ResponseEntity<List<FileInfo>> list() {
        return ResponseEntity.ok(chunkService.list());
    }

    /**
     * 获取指定文件
     *
     * @param filename 文件名称
     * @return 文件
     */
    @GetMapping("/files/{filename:.+}")
    public ResponseEntity<Resource> getFile(@PathVariable("filename") String filename) {
        return ResponseEntity.ok().header(HttpHeaders.CONTENT_DISPOSITION,
                "attachment; filename=\"" + filename + "\"").body(chunkService.getFile(filename));
    }
}

service中,是分块的一些业务逻辑,这边存储的信息我都是写在内存中的,比如文件信息,分片信息等等:

/**
 * 文件分块上传
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
public interface ChunkService {
    /**
     * 分块上传文件
     *
     * @param chunk 文件块信息
     */
    void chunk(Chunk chunk);

    /**
     * 文件合并
     *
     * @param filename 文件名
     */
    void merge(String filename);

    /**
     * 获取文件列表
     *
     * @return 文件列表
     */
    List<FileInfo> list();

    /**
     * 获取指定文件
     *
     * @param filename 文件名称
     * @return 文件
     */
    Resource getFile(String filename);
}
/**
 * 分片上传
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
@Service
@Slf4j
public class ChunkServiceImpl implements ChunkService {
    // 分片进度
    private static final Map<String, ChunkProcess> CHUNK_PROCESS_STORAGE = new ConcurrentHashMap<>();

    // 文件列表
    private static final List<FileInfo> FILE_STORAGE = new CopyOnWriteArrayList<>();

    @Autowired
    private FileClient fileClient;

    @Override
    public void chunk(Chunk chunk) {
        String filename = chunk.getFilename();
        boolean match = FILE_STORAGE.stream().anyMatch(fileInfo -> fileInfo.getFilename().equals(filename));
        if (match) {
            throw new RuntimeException("File [ " + filename + " ] already exist");
        }
        ChunkProcess chunkProcess;
        String uploadId;
        if (CHUNK_PROCESS_STORAGE.containsKey(filename)) {
            chunkProcess = CHUNK_PROCESS_STORAGE.get(filename);
            uploadId = chunkProcess.getUploadId();
            AtomicBoolean isUploaded = new AtomicBoolean(false);
            Optional.ofNullable(chunkProcess.getChunkList()).ifPresent(chunkPartList ->
                    isUploaded.set(chunkPartList.stream().anyMatch(chunkPart -> chunkPart.getChunkNumber() == chunk.getChunkNumber())));
            if (isUploaded.get()) {
                log.info("文件【{}】分块【{}】已经上传,跳过", chunk.getFilename(), chunk.getChunkNumber());
                return;
            }
        } else {
            uploadId = fileClient.initTask(filename);
            chunkProcess = new ChunkProcess().setFilename(filename).setUploadId(uploadId);
            CHUNK_PROCESS_STORAGE.put(filename, chunkProcess);
        }

        List<ChunkProcess.ChunkPart> chunkList = chunkProcess.getChunkList();
        String chunkId = fileClient.chunk(chunk, uploadId);
        chunkList.add(new ChunkProcess.ChunkPart(chunkId, chunk.getChunkNumber()));
        CHUNK_PROCESS_STORAGE.put(filename, chunkProcess.setChunkList(chunkList));
    }

    @Override
    public void merge(String filename) {
        ChunkProcess chunkProcess = CHUNK_PROCESS_STORAGE.get(filename);
        fileClient.merge(chunkProcess);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String currentTime = simpleDateFormat.format(new Date());
        FILE_STORAGE.add(new FileInfo().setUploadTime(currentTime).setFilename(filename));
        CHUNK_PROCESS_STORAGE.remove(filename);
    }

    @Override
    public List<FileInfo> list() {
        return FILE_STORAGE;
    }

    @Override
    public Resource getFile(String filename) {
        return fileClient.getFile(filename);
    }
}

具体文件上传的实现是在FileClient中,我抽象了一个具体的接口:

/**
 * 文件客户端
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
public interface FileClient {
    /**
     * 初始化文件客户端
     */
    void initFileClient();


    /**
     * 初始化任务
     *
     * @param filename 文件名称
     * @return 任务id
     */
    String initTask(String filename);

    /**
     * 上传文件块
     *
     * @param chunk 文件块
     * @param uploadId 任务ID
     * @return 对于S3返回eTag地址  对于本地文件返回文件块地址
     */
    String chunk(Chunk chunk, String uploadId);

    /**
     * 文件合并
     *
     * @param chunkProcess 分片详情
     */
    void merge(ChunkProcess chunkProcess);

    /**
     * 根据文件名称获取文件
     *
     * @param filename 文件名称
     * @return 文件资源
     */
    Resource getFile(String filename);
}

这个接口有两个实现类,一个是LocalFileSystemClient本地文件系统,还有一个是AWSFileClient对象存储,这块我是通过配置文件来指定加载哪一个文件客户端,类似懒加载的方式,具体加载类FileClientConfig:

/**
 * 文件上传客户端 通过配置文件指定加载哪个客户端
 * 目前支持本地文件和AWS S3对象存储
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
@Configuration
public class FileClientConfig {
    @Value("${file.client.type:local-file}")
    private String fileClientType;

    private static final Map<String, Supplier<FileClient>> FILE_CLIENT_SUPPLY = new HashMap<String, Supplier<FileClient>>() {
        {
            put("local-file", LocalFileSystemClient::new);
            put("aws-s3", AWSFileClient::new);
        }
    };

    /**
     * 注入文件客户端对象
     *
     * @return 文件客户端
     */
    @Bean
    public FileClient fileClient() {
        return FILE_CLIENT_SUPPLY.get(fileClientType).get();
    }
}

下面讲一下两种文件服务器的实现

本地文件存储

本地文件是把所有的分块上传到一个指定的目录,等上传完成之后,将所有的文件块排序,按照分片顺序一块一块的写到指定的合并文件中,之后清理所有的文件块,完成上传。

具体实现LocalFileSystemClient:

/**
 * 本地文件系统
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
@Slf4j
public class LocalFileSystemClient implements FileClient {
    private static final String DEFAULT_UPLOAD_PATH = "/data/upload/";

    public LocalFileSystemClient() {
        // 初始化
        this.initFileClient();
    }

    @Override
    public void initFileClient() {
        FileUtil.mkdir(DEFAULT_UPLOAD_PATH);
    }

    @Override
    public String initTask(String filename) {
        // 分块文件存储路径
        String tempFilePath = DEFAULT_UPLOAD_PATH + filename + UUID.randomUUID();
        FileUtil.mkdir(tempFilePath);
        return tempFilePath;
    }


    @Override
    public String chunk(Chunk chunk, String uploadId) {
        String filename = chunk.getFilename();
        String chunkFilePath = uploadId + "/" + chunk.getChunkNumber();
        try (InputStream in = chunk.getFile().getInputStream();
             OutputStream out = Files.newOutputStream(Paths.get(chunkFilePath))) {
            FileCopyUtils.copy(in, out);
        } catch (IOException e) {
            log.error("File [{}] upload failed", filename, e);
            throw new RuntimeException(e);
        }
        return chunkFilePath;
    }

    @Override
    public void merge(ChunkProcess chunkProcess) {
        String filename = chunkProcess.getFilename();
        // 需要合并的文件所在的文件夹
        File chunkFolder = new File(chunkProcess.getUploadId());
        // 合并后的文件
        File mergeFile = new File(DEFAULT_UPLOAD_PATH + filename);

        try (BufferedOutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(mergeFile.toPath()))) {
            byte[] bytes = new byte[1024];
            File[] fileArray = Optional.ofNullable(chunkFolder.listFiles())
                    .orElseThrow(() -> new IllegalArgumentException("Folder is empty"));
            List<File> fileList = Arrays.stream(fileArray).sorted(Comparator.comparing(File::getName)).collect(Collectors.toList());
            fileList.forEach(file -> {
                try (BufferedInputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath()))) {
                    int len;
                    while ((len = inputStream.read(bytes)) != -1) {
                        outputStream.write(bytes, 0, len);
                    }
                } catch (IOException e) {
                    log.info("File [{}] chunk [{}] write failed", filename, file.getName(), e);
                    throw new RuntimeException(e);
                }
            });
        } catch (IOException e) {
            log.info("File [{}] merge failed", filename, e);
            throw new RuntimeException(e);
        } finally {
            FileUtil.del(chunkProcess.getUploadId());
        }
    }

    @Override
    public Resource getFile(String filename) {
        File file = new File(DEFAULT_UPLOAD_PATH + filename);
        return new FileSystemResource(file);
    }
}
Amazon S3对象存储

对象存储中,首先是需要调用initiateMultipartUpload方法,初始化生成一个任务标识uploadId,然后每个分块任务上传都需要携带这个uploadId,分块任务上传是调用uploadPart方法,这个方法会返回一个eTag标识,最后等所有文件块上传完成之后,需要使用这个eTag标识和uploadId去调用completeMultipartUpload方法完成文件合并。

具体实现:

/**
 * AWS S3对象存储
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
@Slf4j
public class AWSFileClient implements FileClient {
    // 默认桶
    private static final String DEFAULT_BUCKET = "";

    private static final String AK = "";
    private static final String SK = "";
    private static final String ENDPOINT = "";
    private AmazonS3 s3Client;

    public AWSFileClient() {
        // 初始化文件客户端
        this.initFileClient();
    }

    @Override
    public void initFileClient() {
        this.s3Client = AmazonS3ClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(AK, SK)))
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(ENDPOINT, "cn-north-1"))
                .build();
    }


    @Override
    public String initTask(String filename) {
        // 初始化分片上传任务
        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(DEFAULT_BUCKET, filename);
        InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(initRequest);
        return initResponse.getUploadId();
    }

    @Override
    public String chunk(Chunk chunk, String uploadId) {
        try (InputStream in = chunk.getFile().getInputStream()) {
            // 上传
            UploadPartRequest uploadRequest = new UploadPartRequest()
                    .withBucketName(DEFAULT_BUCKET)
                    .withKey(chunk.getFilename())
                    .withUploadId(uploadId)
                    .withInputStream(in)
                    .withPartNumber(chunk.getChunkNumber())
                    .withPartSize(chunk.getCurrentChunkSize());
            UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
            return uploadResult.getETag();
        } catch (IOException e) {
            log.error("文件【{}】上传分片【{}】失败", chunk.getFilename(), chunk.getChunkNumber(), e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void merge(ChunkProcess chunkProcess) {
        List<PartETag> partETagList = chunkProcess.getChunkList()
                .stream()
                .map(chunkPart -> new PartETag(chunkPart.getChunkNumber(), chunkPart.getLocation()))
                .collect(Collectors.toList());
        CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(DEFAULT_BUCKET, chunkProcess.getFilename(),
                chunkProcess.getUploadId(), partETagList);
        s3Client.completeMultipartUpload(compRequest);
    }

    @Override
    public Resource getFile(String filename) {
        GetObjectRequest request = new GetObjectRequest(DEFAULT_BUCKET, filename);
        S3Object s3Object = s3Client.getObject(request);
        return new InputStreamResource(s3Object.getObjectContent());
    }
}

客户端

客户端是通过Java代码模拟的前端逻辑,具体代码:

/**
 * 模拟前端逻辑 在代码中实现大文件分片上传 使用restTemplate客户端
 *
 * @author yuanzhihao
 * @since 2023/4/10
 */
public class MultipartUploadTest {

    @Test
    public void testUpload() throws Exception {
        String chunkFileFolder = "/data/bucket/";
        java.io.File file = new java.io.File("/Users/yuanzhihao/Downloads/ideaIU-2022.2.2.exe");
        long contentLength = file.length();
        // 每块大小设置为20MB
        long partSize = 20 * 1024 * 1024;
        // 文件分片块数 最后一块大小可能小于 20MB
        long chunkFileNum = (long) Math.ceil(contentLength * 1.0 / partSize);
        RestTemplate restTemplate = new RestTemplate();

        try (RandomAccessFile raf_read = new RandomAccessFile(file, "r")) {
            // 缓冲区
            byte[] b = new byte[1024];
            for (int i = 1; i <= chunkFileNum; i++) {
                // 块文件
                java.io.File chunkFile = new java.io.File(chunkFileFolder + i);
                // 创建向块文件的写对象
                try (RandomAccessFile raf_write = new RandomAccessFile(chunkFile, "rw")) {
                    int len;
                    while ((len = raf_read.read(b)) != -1) {
                        raf_write.write(b, 0, len);
                        // 如果块文件的大小达到20M 开始写下一块儿  或者已经到了最后一块
                        if (chunkFile.length() >= partSize) {
                            break;
                        }
                    }
                    // 上传
                    MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
                    body.add("file", new FileSystemResource(chunkFile));
                    body.add("chunkNumber", i);
                    body.add("chunkSize", partSize);
                    body.add("currentChunkSize", chunkFile.length());
                    body.add("totalSize", contentLength);
                    body.add("filename", file.getName());
                    body.add("totalChunks", chunkFileNum);
                    HttpHeaders headers = new HttpHeaders();
                    headers.setContentType(MediaType.MULTIPART_FORM_DATA);
                    HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
                    String serverUrl = "http://localhost:8080/chunk";
                    ResponseEntity<String> response = restTemplate.postForEntity(serverUrl, requestEntity, String.class);
                    System.out.println("Response code: " + response.getStatusCode() + " Response body: " + response.getBody());
                } finally {
                    FileUtil.del(chunkFile);
                }
            }
        }
        // 合并文件
        String mergeUrl = "http://localhost:8080/merge?filename=" + file.getName();
        ResponseEntity<String> response = restTemplate.getForEntity(mergeUrl, String.class);
        System.out.println("Response code: " + response.getStatusCode() + " Response body: " + response.getBody());
    }
}

测试验证

启动服务,调用客户端上传文件:

上传成功 在这里插入图片描述 获取文件列表: 在这里插入图片描述 下载文件: 在这里插入图片描述

结语

代码地址:https://github.com/yzh19961031/blogDemo/tree/master/multipartUploadFile

参考:

https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-upload-object.html

https://lliujg.blog.csdn.net/article/details/125136122

https://blog.csdn.net/oppo5630/article/details/80880224