文章内容

2024/8/12 18:58:38,作 者: 黄兵

Flask 文件分片上传

最近在做大文件分片上传,前端使用的是 Dropzone,后端使用的是 Flask 处理上传的文件。

下面是 view 的示例代码:

if request.method == 'POST':
    if 'file' not in request.files:
        return jsonify({'error': 'No file part'}), 400
    file = request.files['file']
    # 如果启用了分片上传,处理分片
    if request.form.get('dzchunkindex'):
        # 获取分片信息并处理
        chunk_index = int(request.form['dzchunkindex'])
        total_chunks = int(request.form['dztotalchunkcount'])
        chunk_size = int(request.form['dzchunksize'])

        chunk_path = os.path.join(current_app.static_folder, 'temp', f'{file.filename}_part{chunk_index}')
        file.save(chunk_path)
        init_file_chunk = FileChunk(file_name=file.filename, chunk_size=chunk_size)
        # 检查所有分片是否上传完毕
        if init_file_chunk.check_all_chunks_uploaded(total_chunks):
            # 拼接所有分片
            full_file_path = os.path.join(current_app.static_folder, 'temp', file.filename)
            init_file_chunk.concatenate_chunks(total_chunks, full_file_path)
            # 删除分片文件夹
            init_file_chunk.cleanup_chunks(total_chunks)
        return jsonify({'message': 'File uploaded successfully', 'filename': file.filename}), 200

util 代码如下:

class FileChunk(object):
    """
    文件分块相关操作
    """

    def __init__(self, file_name, chunk_size):
        self._chunk_path = os.path.join(current_app.static_folder, 'temp')
        self._file_name = file_name
        self._chunk_size = chunk_size

    def check_all_chunks_uploaded(self, total_chunks):
        # 检查是否所有分片都已上传
        for i in range(total_chunks):
            if not os.path.exists(os.path.join(self._chunk_path, f'{self._file_name}_part{i}')):
                return False
        return True

    def concatenate_chunks(self, total_chunks, full_file_path):
        # 拼接所有分片到一个完整的文件
        with open(full_file_path, 'wb') as full_file:
            for i in range(total_chunks):
                chunk_path = os.path.join(self._chunk_path, f'{self._file_name}_part{i}')
                with open(chunk_path, 'rb') as chunk_file:
                    full_file.write(chunk_file.read())

    def cleanup_chunks(self, total_chunks):
        # 删除所有分片
        for i in range(total_chunks):
            chunk_path = os.path.join(self._chunk_path, f'{self._file_name}_part{i}')
            if os.path.exists(chunk_path):
                os.remove(chunk_path)

首先对上传的分块文件进行重命名,之后保存分块文件,等待所有分块文件上传完成之后,将所有分块文件调用 concatenate_chunks() 将文件拼接成一个完整文件,拼接完成之后,删除所有分块文件。


其它相关推荐:

1、OpenStack Swift 大文件分片

2、OpenStack Swift 与 AWS S3 对比

3、Ubuntu 22.04 安装 OpenStack Swift

4、Ubuntu 22.04 安装 OpenStack Swift 存储策略

5、OpenStack Swift 自定义认证中间件

分享到:

发表评论

评论列表