TypeScript 对列,实现消息队列(FIFO显示+定时清理)
使用对列实现消息接收显示与清除, 根据消息的【显示时间】来清除,显示超过 10 秒的自动清理,未显示、显示不足 10 秒的都保留
线程安全 Queue
/** * 纯先进先出(FIFO)队列独立实现 * 支持:入队、出队、查看队头、判空、清空、获取长度、遍历 */ class Queue<T> { // 用数组存储队列元素 private items: T[] = []; // 互斥锁:解决并发问题 private isLocked = false; // 等待锁释放 private async waitForUnlock(): Promise<void> { while (this.isLocked) { await new Promise(resolve => setTimeout(resolve, 1)); } } // 加锁 private lock(): void { this.isLocked = true; } // 解锁 private unlock(): void { this.isLocked = false; } // 线程安全:入队 添加元素到队尾 async enqueue(item: T): Promise<void> { await this.waitForUnlock(); this.lock(); try { this.items.push(item); } finally { this.unlock(); } } // 线程安全:批量入队 async enqueueBatch(items: T[]): Promise<void> { await this.waitForUnlock(); this.lock(); try { this.items.push(...items); } finally { this.unlock(); } } /** * 出队:从队头移除并返回元素 * 空队列返回 undefined */ async dequeue(): Promise<T | void> { await this.waitForUnlock(); this.lock(); try { return this.items.shift() } finally { this.unlock(); } } /** * 查看队头元素(不删除) */ peek(): T | undefined { return this.items[0]; } /** * 判断队列是否为空 */ isEmpty(): boolean { return this.items.length === 0; } /** * 获取队列长度 */ get size(): number { return this.items.length; } // 线程安全:清空队列 async clear(): Promise<void> { await this.waitForUnlock(); this.lock(); try { this.items = []; } finally { this.unlock(); } } // 删除单个item async removeIf(predicate: (item: T) => boolean): Promise<number> { await this.waitForUnlock(); this.lock(); try { const beforeSize = this.items.length; this.items = this.items.filter(item => !predicate(item)); return beforeSize - this.items.length; } finally { this.unlock(); } } /** * 遍历队列(保持 FIFO 顺序) */ forEach(callback: (item: T, index: number) => void): void { this.items.forEach(callback); } /** * 过滤对列 * @param predicate 过滤条件 */ filter(predicate: (item: T) => boolean): T[] { return this.items.filter(predicate); } /** * 转数组(方便查看所有元素) */ toArray(): T[] { return [...this.items]; } } export default Queue;消息类型
interface Message { id: string; content: string; hasDisplayed: boolean; // 是否已显示 displayTime: number | null; // 显示时的时间戳(毫秒) }显示模块
class MessageView { // 只渲染未显示的,并记录显示时间戳 renderNew(queue: Queue<Message>): void { const newMsgs = queue.filter(m => !m.hasDisplayed); if (newMsgs.length === 0) { console.log("\n📭 暂无新消息"); return; } console.log("\n===== 新消息 ====="); const now = Date.now(); newMsgs.forEach(m => { console.log("✅ " + m.content); m.hasDisplayed = true; m.displayTime = now; // 记录显示时间 }); console.log("=================\n"); } // 清理后刷新界面 refreshAfterClear(removed: number, remain: number): void { console.log(`\n🧹 自动清理完成`); console.log(`🗑️ 已删除:${removed} 条(显示超过10秒)`); console.log(`📌 剩余消息:${remain} 条`); console.log("---------------------------------------"); } }定时清理器
class MessageCleaner { private timer: NodeJS.Timeout | null = null; private readonly EXPIRE_SECONDS = 10; // 10秒过期 start(ms: number, queue: Queue<Message>, view: MessageView): void { if (this.timer) return; this.timer = setInterval(async () => { const now = Date.now(); const expireTime = this.EXPIRE_SECONDS * 1000; // 核心条件: // 1. 已显示 // 2. 显示时间不为空 // 3. 当前时间 - 显示时间 > 10秒 const removedCount = await queue.removeIf(msg => msg.hasDisplayed && msg.displayTime !== null && now - msg.displayTime > expireTime ); view.refreshAfterClear(removedCount, queue.size); }, ms); } stop(): void { if (this.timer) { clearInterval(this.timer); this.timer = null; console.log("⏹️ 清理已停止"); } } }测试:每2秒随机发1~3条消息
function genId(): string { return Date.now().toString(36) + Math.random().toString(36).slice(2); } function randomMessages(): Message[] { const count = Math.floor(Math.random() * 3) + 1; const list: Message[] = []; for (var i = 0; i < count; i++) { list.push({ id: genId(), content: `消息 ${Date.now()} - ${i + 1}`, hasDisplayed: false, displayTime: null }); } return list; } // ------------------------------ // 启动运行 // ------------------------------ const queue = new Queue<Message>(); const view = new MessageView(); const cleaner = new MessageCleaner(); // 每3秒检查一次清理 cleaner.start(3000, queue, view); // 每2秒随机发消息 setInterval(async () => { const msgs = randomMessages(); await queue.enqueueBatch(msgs); view.renderNew(queue); }, 2000);