当前位置: 首页 > news >正文

TypeScript 对列,实现消息队列(FIFO显示+定时清理)

使用对列实现消息接收显示与清除, 根据消息的【显示时间】来清除,显示超过 10 秒的自动清理,未显示、显示不足 10 秒的都保留

线程安全 Queue

/** * 纯先进先出(FIFO)队列独立实现 * 支持:入队、出队、查看队头、判空、清空、获取长度、遍历 */ class Queue<T> { // 用数组存储队列元素 private items: T[] = []; // 互斥锁:解决并发问题 private isLocked = false; // 等待锁释放 private async waitForUnlock(): Promise<void> { while (this.isLocked) { await new Promise(resolve => setTimeout(resolve, 1)); } } // 加锁 private lock(): void { this.isLocked = true; } // 解锁 private unlock(): void { this.isLocked = false; } // 线程安全:入队 添加元素到队尾 async enqueue(item: T): Promise<void> { await this.waitForUnlock(); this.lock(); try { this.items.push(item); } finally { this.unlock(); } } // 线程安全:批量入队 async enqueueBatch(items: T[]): Promise<void> { await this.waitForUnlock(); this.lock(); try { this.items.push(...items); } finally { this.unlock(); } } /** * 出队:从队头移除并返回元素 * 空队列返回 undefined */ async dequeue(): Promise<T | void> { await this.waitForUnlock(); this.lock(); try { return this.items.shift() } finally { this.unlock(); } } /** * 查看队头元素(不删除) */ peek(): T | undefined { return this.items[0]; } /** * 判断队列是否为空 */ isEmpty(): boolean { return this.items.length === 0; } /** * 获取队列长度 */ get size(): number { return this.items.length; } // 线程安全:清空队列 async clear(): Promise<void> { await this.waitForUnlock(); this.lock(); try { this.items = []; } finally { this.unlock(); } } // 删除单个item async removeIf(predicate: (item: T) => boolean): Promise<number> { await this.waitForUnlock(); this.lock(); try { const beforeSize = this.items.length; this.items = this.items.filter(item => !predicate(item)); return beforeSize - this.items.length; } finally { this.unlock(); } } /** * 遍历队列(保持 FIFO 顺序) */ forEach(callback: (item: T, index: number) => void): void { this.items.forEach(callback); } /** * 过滤对列 * @param predicate 过滤条件 */ filter(predicate: (item: T) => boolean): T[] { return this.items.filter(predicate); } /** * 转数组(方便查看所有元素) */ toArray(): T[] { return [...this.items]; } } export default Queue;

消息类型

interface Message { id: string; content: string; hasDisplayed: boolean; // 是否已显示 displayTime: number | null; // 显示时的时间戳(毫秒) }

显示模块

class MessageView { // 只渲染未显示的,并记录显示时间戳 renderNew(queue: Queue<Message>): void { const newMsgs = queue.filter(m => !m.hasDisplayed); if (newMsgs.length === 0) { console.log("\n📭 暂无新消息"); return; } console.log("\n===== 新消息 ====="); const now = Date.now(); newMsgs.forEach(m => { console.log("✅ " + m.content); m.hasDisplayed = true; m.displayTime = now; // 记录显示时间 }); console.log("=================\n"); } // 清理后刷新界面 refreshAfterClear(removed: number, remain: number): void { console.log(`\n🧹 自动清理完成`); console.log(`🗑️ 已删除:${removed} 条(显示超过10秒)`); console.log(`📌 剩余消息:${remain} 条`); console.log("---------------------------------------"); } }

定时清理器

class MessageCleaner { private timer: NodeJS.Timeout | null = null; private readonly EXPIRE_SECONDS = 10; // 10秒过期 start(ms: number, queue: Queue<Message>, view: MessageView): void { if (this.timer) return; this.timer = setInterval(async () => { const now = Date.now(); const expireTime = this.EXPIRE_SECONDS * 1000; // 核心条件: // 1. 已显示 // 2. 显示时间不为空 // 3. 当前时间 - 显示时间 > 10秒 const removedCount = await queue.removeIf(msg => msg.hasDisplayed && msg.displayTime !== null && now - msg.displayTime > expireTime ); view.refreshAfterClear(removedCount, queue.size); }, ms); } stop(): void { if (this.timer) { clearInterval(this.timer); this.timer = null; console.log("⏹️ 清理已停止"); } } }

测试:每2秒随机发1~3条消息

function genId(): string { return Date.now().toString(36) + Math.random().toString(36).slice(2); } function randomMessages(): Message[] { const count = Math.floor(Math.random() * 3) + 1; const list: Message[] = []; for (var i = 0; i < count; i++) { list.push({ id: genId(), content: `消息 ${Date.now()} - ${i + 1}`, hasDisplayed: false, displayTime: null }); } return list; } // ------------------------------ // 启动运行 // ------------------------------ const queue = new Queue<Message>(); const view = new MessageView(); const cleaner = new MessageCleaner(); // 每3秒检查一次清理 cleaner.start(3000, queue, view); // 每2秒随机发消息 setInterval(async () => { const msgs = randomMessages(); await queue.enqueueBatch(msgs); view.renderNew(queue); }, 2000);
http://www.jsqmd.com/news/774110/

相关文章:

  • Git Restore命令介绍(撤销工作区修改、恢复多个文件、取消暂存:--staged、同时恢复暂存区和工作区:--worktree、-SW、从指定commit恢复文件--source)
  • 怎么在 docker-compose 中自定义网络名称和 IP?
  • java学习笔记(1)
  • 20260507笔记
  • SMP系统架构解析与多核优化实战
  • 将Claude Code编程助手无缝对接至Taotoken服务的详细配置步骤
  • AI模型选型避坑指南:五大核心维度横向对比
  • 本地AI对话搜索引擎aii:构建私有知识库与AI助手记忆体
  • GaussDB索引实战:从‘商品销售表’案例看5种索引的正确用法与性能对比
  • VRM Blender插件:解锁虚拟角色创作的专业解决方案
  • AMD SCU35 FPGA评估套件开发指南与应用解析
  • Git Merge命令介绍(把指定分支的提交历史合并到当前分支)经典合并、Fast-Forward快进合并FF Merge、三方合并、merge commit、squash merge、合并冲突
  • 2026年高品质的香水喷头/电化铝香水喷头定制加工厂家推荐 - 行业平台推荐
  • 思路总结--华大(Stereo-seq)的空间通讯分析
  • Attio:用关系型数据库思维重塑CRM与团队协作
  • Quixel Mixer本地材质库管理全攻略:从下载、整理到备份,告别资源混乱
  • Bonsai Memory:为AI智能体构建分层记忆索引,实现Token消耗降低81%
  • 性价比高的6s与目视化管理咨询企业
  • 基于MCP协议构建企业级AI协作引擎:连接Claude与Gemini的33个生产力工具
  • 海明码+加密签名(软考专项)学习记录+速记+真题
  • SystemVerilog里disable fork的‘误伤’有多严重?一个实际仿真案例带你避坑
  • Git Reset命令介绍(用于移动HEAD,并选择是否同步更新暂存区工作区)三种模式:--soft、--mixed(默认)、--hard;修改最近提交、合并多个提交、取消git add、回退版本回退
  • 创业者人格AI:大模型垂直化与提示词工程实战解析
  • 警惕!POS系统4大安全风险别踩雷
  • 不止于测距:用51单片机和HC-SR04超声波模块DIY一个简易倒车雷达/防撞预警系统
  • Taro编译h5端口点击返回Taro.navigateBack({delta: 1,})刷新当前页面问题
  • GodotFirebase插件实战:为游戏快速集成云端用户认证与实时数据库
  • 从开源项目到商业落地:一个软PLC的‘前世今生’与技术启示
  • 【408考研·OS】核心考点:中断分类、线程模型 (KLT/ULT) 与调度算法方法论总结
  • 互联网大厂 Java 求职者面试:深入探讨微服务与云原生技术