当前位置: 首页 > news >正文

大文件上传公共库

大文件上传

背景:如果上传的企业资料或会议视频等大文件不做特殊处理可能会出现:

  • 网络中断、程序异常退出等问题导致上传失败,从而不得不全部重新上传

  • 同一文件被不同用户反复上传,白白占用网络和服务器存储资源

大文件上传的普遍方案是文件分片上传

如果把文件上传看作是一个不可分割的事务(transcation最小的不可被分割的操作),那么分片的目标就是把一个耗时的大事务划分为一个一个小的事务——降低失败风险

BFF(background for Frontend)中间层,一般公司使用BFF层来承接前端的文件请求

(后端传递的数据结构和前端所需的可能不一致,后端的数据结构通常反应的是存储结构,前端的数据结构通常反应的是渲染结构

ScreenShot_2025-11-04_190101_782

  • 前端分片
  • 中间层存储文件
  • 后端记录文件的请求地址URL、文件名到数据库

减少页面阻塞

分片上传要尽量避免相同(文件内容一样)的分片重复上传,服务器必须要能够识别来自客户端的各个上传请求中是否存在与过去分片相同的上传请求

通过内容hash进行对比

ScreenShot_2025-11-04_191816_211

前端:

  • 文件分片:file.slice(0,1000) 通过1K个字节分片,得到Blob对象

  • 计算分片hash

  • 根据所有分片的hash,计算整个文件的hash

计算hash是一个CPU密集型操作,如果不处理会导致长时间阻塞主线程

首先计算整体hash,发送到服务器判断是否需要上传该文件,如果无此文件则进行分片上传

可通过WebWorker多线程计算,不阻塞主线程,但是依旧需要等待一段时间

ScreenShot_2025-11-04_193246_672

因此,假设绝大部分的文件上传都是新文件上传,无需等待整体hash的计算结果,直接上传分片即可,同时可以把分片操作使用多线程+异步的方式进行上传处理

优点

  • 对于新文件可以缩短整体的上传时间,消除页面阻塞

  • 对于旧文件上传可能会产生一些无效请求,但由于请求仅传递hash,并不是真正文件数据,所以对于网络和服务器的影响很小,且旧文件上传的情况较少,所以整体影响可以忽略

创建文件协议

协议是前后端的桥梁

当客户端发送分片到服务器时,需要告知服务器分片属于哪一次文件上传,因此需要一个唯一标识来标识某一次文件上传

  • 分片校验

ScreenShot_2025-11-04_195238_784

ScreenShot_2025-11-04_195834_383

ScreenShot_2025-11-04_200221_711

  • 分片上传

    ScreenShot_2025-11-04_201245_623

  • 分片合并

    ScreenShot_2025-11-04_201441_039

组织代码

大文件上传的SDK搭建可以分为三层:

ScreenShot_2025-11-04_201953_663

upload-core

使用发布订阅模式提供统一的EventEmitter

export class EventEmitter<T extends string>{private events:Mao<T,Set<Function>>constructor(){this.events=new Map() // 存储事件与对应要执行函数的映射关系}// 注册事件on(event:T,listener:Function){if(!this.events.has(event)){this.events.set(event,new Set())}this.events.get(event)!.add(listener)}// 取消监听off(event:T,listener:Function){if(!this.events.has(event)){return}this.events.get(event)!.delete(listener)}// 监听一次事件once(event:T,listener:Function){const onceListener=(...args:any[]) => {listener(...args)this.off(event,onceListener)}this.on(event,onceListener)}// 触发事件emit(event:T,...args:any[]){if(!this.events.has(event)){return}this.events.get(event)!.forEach((listener) => {listener(...args)})}
}

支持多任务并发执行,提供TaskQueue类

export class Task{fn:Function      // 任务关联的执行函数payload?: any    // 任务关联的其他信息constructor(fn:Function,payload?:any){this.fn=fnthis.payload=payload}// 执行任务run(){return this.fn(this.payload)}
}
// 可并发执行的任务队列
export class TaskQueue extends EventEmitter<'start'| 'pause'|'drain'>{// 待执行的任务private tasks:Set<Task>=new Set()// 当前正在执行的任务数private currentCount =0// 任务状态private status:'paused' | 'running' = 'paused'// 最大并发数private concurrency:number=4constructor(concurrency:number=4){super()this.concurrency=concurrency}// 添加任务add(...tasks:Task[]){for(const t of tasks){this.tasks.add(t)}}// 添加任务并启动执行addAndStart(...tasks:Task[]){this.add(..tasks)this.start()}// 启动任务start(){if(this.status==='running') returnif(this.tasks.size===0){this.emit('drain') //当前已无任务return}this.status='running'this.emit('start') this.runNext() // 开始执行下一个任务}// 取出第一个任务private taskHeadTask(){const task=this.tasks.values().next().valueif(task) this.tasks.delete(task)return task}// 执行下一个任务private runNext(){if(this.status!=='running') return // 如果任务状态是暂停或已完成则返回if(this.currentCount>=this.concurrency) return // 并发数已满const task=this.taskHeadTask()if(!task){  // 没有任务了this.status='paused'this.emit('drain')return}// 执行任务Promise.resolve(task.run()).finally(()=>{this.currentCount--this.runNext()})}// 暂停任务pause(){this.status='paused'this.emit('pause')}
}

client

  • 文件分片

    export interface Chunk{blob:Blob    // 分片的二进制数据start:number // 分片起始位置end:number   // 分片的结束位置hash:string  // 分片的hashindex:number // 分片在文件的索引
    }
    // 创建一个不带hash的chunk
    export function createChunk(file:File,index:number,chunkSize:number):Chunk{const start = index*chunkSizeconst end=Math.min((index+1)*chunkSize,file.size)const blob=file.slice(start,end)return {blob,start,end,hash:'',index}
    }
    // 计算chunk的hash
    export function calChunkHash(chunk:Chunk):Promise<string>{return new Promise((resolve)=>{const spark=new SparkMD5.ArrayBuffer()const fileReader=new FileReader() // 创建文件读取器fileReader.onload=(e)=>{spark.append(e.target?.result as ArrayBuffer)resolve(spark.end())}fileReader.readAsArrayBuffer(chunk.blob) // 传入二进制数据})
    }
    

    文件的分片方式有很多:

    • 普通分片:for循环依次计算hash,会导致主线程卡顿

    • 基于多线程的分片:根据CPU的内核数开启线程navigator.hardwareConcurrency

    • 基于主线程时间切片的分片:(类似React Fiber)requestIdleCallback

    • 其他分片模式

    // 抽象类
    export type ChunkSplitorEvents = 'chunks' | 'wholeHash' | 'drain'
    export abstract class ChunkSplitor extends EventEmitter<ChunkSplitorEvents>{protected chunkSize:number // 分片大小(字节)protected file:File        // 待分片的文件protected hash?:string     // 整个文件的hashprotected chunks: Chunkp[] // 分片列表private handleChunkCount=0 // 已计算hash的分片数量private spark = new SparkMD5() // 计算hash的工具private hasSplited = false    // 是否已分片constructor(file:File,chunkSize:number=1024*1024*5){super()this.file=filethis.chunkSize=chunkSizeconst chunkCount=Math.ceil(this.file.size/this.chunkSize)this.chunks = new Array(chunkCount).fill(0).map((_,index)=>createChhunk(this.file,index,this.chunkSize))}// 分片split(){if(this.hasSplited) returnthis.hasSplited =trueconst emitter =new EventEmitter<'chunks'>()const chunksHanlder=(chunks: chunk[])=>{this.emit('chunks', chunks)chunks.forEach((chunk)=>{this.spark.append(chunk.hash)})this.handleChunkCount += chunks.lengthif(this.handleChunkcount === this.chunks.length){// 计算完成emitter.off('chunks', chunksHanlder)this.emit('wholeHash',this.spark.end())this.spark.destroy()this.emit('drain')}}emitter.on('chunk',chunkHandler)this.calcHash(this.chunks,emitter)
    }// 计算每一个分片的hash 差异部分abstract calcHash(chunks:Chunk[],emitter:EventEmitter<'chunk'>):voidabstract dispose():void
    
    // 基于多线程的分片
    export class MultiThreadsplitor extends ChunkSplitor{private workers: Worker[]= new Array(navigator.hardwareConcurrency || 4).fill(0).map( ()=> new Worker(new URL('./SplitWorker.ts',import.meta.url),{type: 'module'}))calcHash(chunks: Chunk[],emitter: EventEmitter<'chunks'>): void {const workerSize =Math.ceil(chunks.length / this.workers.length)for(let i=0;i<this.workers.length;i++){const worker=this.workers[i];const start=i*workerSize;const end=Math.min((i+1)*workerSize, chunks.length)const workerChunks=chunks.slice(start,end)worker.postMessage(workerChunks)worker.onmessage=(e)=>{emitter.emit('chunks',e.data)}}}
    dispose(): void {this.workers.forEach((worker)=> worker.terminate())}}
    
    // 子线程
    onmessage=function(e){const chunks =e.data as Chunk[]for(const chunk of chunks){calcchunkHash(chunk).then((hash)=>{chunk.hash = hashpostMessage([chunk])})}
    }
    

  • 请求控制

    • 充分利用带宽:通过基础库的TaskQueue实现并发控制

    • 与上层请求解耦:策略模式解耦

    // 请求策略
    export interface RequestStratey{//文件创建请求,返回tokencreateFile(file:File):Promise<string>//分片上传请求uploadchunk(chunk: Chunk): Promise<void>//文件合并请求,返回文件urlmergeFile(token: string):Promise<string>//hash校验请求patchHash<T extends 'file'| 'chunk'>(token: string,hash: string,type: T):Promise<T extends 'file' ?{ hasFile: boolean }:{ 		     hasFile: boolean; rest:number[];url:string }>}
    
    // 请求控制
    export class Uploadcontroller{private requestStrategy:Requeststrategy //请求策略,没有传递则使用默认策略private splitStrategy:ChunkSplitor;  //分片策略,没有传递则默认多线程分private taskQueue:TaskQueue;         //任务队列//其他属性略
    // 初始化async init(){// 获取文件tokenthis.token = await this.requeststrategy.createFile(this.file)//分片事件监听this.splitstrategy.on('chunks', this.handlechunks.bind(this))this.splitstrategy.on('wholeHash',this.handlewholeHash.bind(this))}
    //分片事件处理
    private handlechunks(chunks: chunk[]){//分片上传任务加入队列chunks.forEach((chunk)=>{this.taskQueue.addAndstart(new Task(this.uploadChunk.bind(this),
    chunk))})
    }async uploadchunk(chunk: Chunk){//hash校验const resp= await this.requeststrategy.patchHash(this.token, chunk.hash,'chunk')if(resp.hasFile) return // 文件已存在// 分片上传await this.requeststrategy.uploadchunk(chunk,this.uploadEmitter)}//整体hash事件处理private async handleWholeHash(hash: string){//hash校验const resp=await this.requestStrategy.patchHash(this.token, hash,'file')if (resp.hasFile){// 文件已存在this.emit('end', resp.url)return}//根据resp.rest重新编排后续任务// ...}}
    

server

  • 隔离不同的文件上传:服务端使用uuid+jwt生成一个不可篡改的唯一编码,用于表示不同的文件

  • 保证分片不重复:要求分片跨文件唯一,且永不删除

    • 数据库1:存储每个分片

    • 数据库2:存储name、hash、size等信息,一条记录对应数据库1一个分片

    • 数据库3:存储token、filename、hash、url、用到了哪些分片

  • 合并分片:服务器的合并不是真正的合并,而是在数据库记录文件所包含分片的顺序和指针

  • 访问文件:如果服务器收到对文件的请求,并在数据库3中找到了对应的文件则读取文件的所有分片ID,依次找到对应的分片文件,服务器利用TaskQueue的并发控制能力,逐步产生文件读取流,并利用管道直接输出到网络I/O

http://www.jsqmd.com/news/31577/

相关文章:

  • 2025 年 11 月电磁阀厂家推荐排行榜,高压电磁阀,防爆电磁阀,比例电磁阀,汽车电磁阀,ABS电磁阀,ESP电磁阀,车用ESC电磁阀公司推荐
  • 2025 年 11 月 EVA 厂家推荐排行榜,EVA发泡胶/EVA板材/EVA卷材/EVA片材,防火EVA/阻燃EVA/防静电EVA/去味EVA/高弹EVA/彩色EVA公司推荐
  • 2025 年 11 月控制器厂家推荐排行榜,开关控制器,自动控制器,阀门控制器,智能控制器,限位开关控制器公司推荐
  • 请求库的封装
  • [jupyter]
  • 2025 年 11 月管道泵厂家推荐排行榜,新型管道泵,节能管道泵,低噪声管道泵,超低压管道泵,防爆管道泵,高压管道泵,防腐管道泵,SF管道泵,SFB管道泵,WF屋顶管道泵公司推荐
  • 2025 年 11 月冷却塔厂家推荐排行榜,工业冷却塔,开式冷却塔/钢制开式冷却塔,封闭式冷却塔/密闭式冷却塔,蒸发式冷却塔公司推荐
  • Spring 中的Event机制
  • jiangly模板-字符串
  • Java 内存模型(JMM)中 volatile 的作用与限制
  • 今日学习:二分
  • Ice Breaker Games - 一个在线免费的游戏网站,无需登录,打开即玩。
  • Java获取当前时间的下一天以及30天前的时间
  • 论文导读:从 TSMC ISSCC 看 SRAM 存算发展
  • edge chromium浏览器copilot图标消失处理
  • AI - 自然语言处理(NLP) - part 2 - 词向量 - 教程
  • 洛谷 P4577
  • C++算法贪心例题讲解 - 实践
  • AI元人文:理论框架、僵局本质与文明演化的系统性构想
  • [linux-mint] Surface Pro4 安装linux驱动
  • [B] AGC VP 记录
  • 2025年河南工业大学2025新生周赛(2)
  • Atcoder [ARC161C] Dyed by Majority (Odd Tree) 题解 [ 绿 ] [ 树的遍历 ] [ 构造 ] [ 贪心 ]
  • Reflections on Trusting Trust by Ken Thompson
  • [Agent] ACE(Agentic Context Engineering)源码阅读笔记---(1)基础模块
  • AI大语言模型从0开发
  • 做题笔记22
  • 第三十三篇
  • 2025.11.4
  • 25.11.4 动态规划dp