当前位置: 首页 > news >正文

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(十)

Kotlin协程flow缓冲buffer任务流,批次任务中选取优先级最高任务最先运行(十)

在 https://blog.csdn.net/zhangphil/article/details/159286201 基础上改进,简化LoadMgr提交简单任务的方法 。

Kotlin协程Flow结合缓冲(buffer)实现优先级任务调度的改进方案。通过PriorityBlockingQueue存储任务并按优先级排序,配合Channel和Flow构建生产者-消费者模型。新增submit()方法简化简单任务提交,支持优先级设置和lambda表达式。核心流程包括:1)任务入队到优先级队列;2)通过Flow缓冲控制任务流速;3)触发时取出最高优先级任务执行。实现了4线程池的并发处理,并提供了任务取消和结果回调机制。改进后API更简洁,支持优先级任务调度和流量控制。

package lib import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext import java.util.concurrent.PriorityBlockingQueue class LoadMgr { companion object { private const val TAG = "fly/LoadMgr" val INSTANCE = LoadMgr() val THREAD_POOL = newFixedThreadPoolContext(nThreads = 4, name = "线程") } private val mChannel = Channel<LoadRequest>() private val bufferCapacity = 10 private val initialCapacity = 50 private val mPriorityBlockingQueue = PriorityBlockingQueue( initialCapacity, Comparator<LoadRequest> { o1, o2 -> o2.getPriority()!!.ordinal - o1.getPriority()!!.ordinal }) private constructor() { println("$TAG constructor") } fun startup() { //接收任务 CoroutineScope(THREAD_POOL).launch { println("$TAG Channel start... ${Thread.currentThread().name}") mChannel.receiveAsFlow() .onEach { it -> //生产者 //println("$TAG onEach-$it ${Thread.currentThread().name}") }.buffer(bufferCapacity) .collect { it -> //消费者 //collect, 这里相当于通过缓冲后匀速发射过来的触发器(trigger)。 //收集到的值在此并不重要,这里,只是把它作为触发信号。 //println("$TAG collect-$it ${Thread.currentThread().name}") trigger() } } } private fun trigger() { val loadRequest = mPriorityBlockingQueue.poll() println("$TAG 当前最大优先级任务:${loadRequest} ${Thread.currentThread().name}") loadRequest?.let { CoroutineScope(THREAD_POOL).launch { val result = if (it.isCancelled()) { println("$TAG id=${loadRequest.getId()} isCancelled=${it.isCancelled()}") return@launch } else { it.getListener()?.onStart(it) it.getLoader()?.doInBackground() } println("$TAG id=${loadRequest} doInBackground完成 isCancelled=${loadRequest.isCancelled()} ${Thread.currentThread().name}") if (it.isCancelled()) { // do noting } else { it.getListener()?.onSuccess(it, result) println("$TAG deliveryResult loadRequest=${loadRequest} ${Thread.currentThread().name}") it.getLoader()?.deliveryResult(result) } } } } fun enqueue(taskInfo: LoadRequest) { CoroutineScope(THREAD_POOL).launch { mPriorityBlockingQueue.add(taskInfo) mChannel.send(taskInfo) } } fun submit(priority: Priority = Priority.NORMAL, loader: Loader): LoadRequest { val request = LoadRequest.Builder() .priority(priority) .loader(loader) .build() enqueue(request) return request } fun submit(priority: Priority = Priority.NORMAL, func: () -> Unit): LoadRequest { val loader = object : SimpleLoader() { override fun worker() { func.invoke() } } val request = LoadRequest.Builder() .priority(priority) .loader(loader) .build() enqueue(request) return request } fun destroy() { mPriorityBlockingQueue.clear() mChannel.cancel() mChannel.close() } }
http://www.jsqmd.com/news/535242/

相关文章:

  • 批量翻译商品图片用什么工具好?跨马翻译使用心得与效率对比
  • 降AI率工具避坑指南:这些降论文ai率的常见误区千万别踩
  • 从零开始掌握Garmin Connect IQ开发:核心技术与实战指南
  • PyTorch 2.8镜像惊艳效果:Wan2.2-T2V在RTX 4090D上生成1080p视频实录
  • QGIS缓冲区功能详解:从‘线段数’到‘端点样式’,这些高级参数你真的用对了吗?
  • BGP实战:如何用Loopback接口提升网络稳定性(附华为设备配置示例)
  • 国企长期配套2026市场口碑好的法兰锻件权威源头厂家 - 速递信息
  • YOLO12模型API接口调用指南:快速集成到Flask/Django项目
  • 【ROS开发指南】VSCode高效开发ROS项目的完整实践
  • linux——进程
  • 独立袋装弹簧床垫盘点:这项技术为何成为主流? - 速递信息
  • 【开题答辩全过程】以 基于WEB的视频网站为例,包含答辩的问题和答案
  • R语言实战:单因素方差分析从数据导入到结果解读(附完整代码)
  • 5分钟上手Kimi CLI:彻底改变你与命令行交互方式的AI助手终极指南
  • PVE 更新源与DNS配置避坑指南(持续更新)
  • 零门槛神经网络可视化:用PlotNeuralNet轻松绘制专业架构图
  • 智能睡眠成趋势,如何选择适合你的睡眠系统? - 速递信息
  • Oracle转义符
  • NaViL-9B图文对话教程:上传图片即问即答,新手零基础快速上手
  • Text-Classification-Pytorch实战指南:从原理到部署的NLP落地工具
  • 探索WLED:从入门到精通的智能LED控制指南
  • 小数据( small data ) 小数据系统( small data system )PPT(上)
  • DeOldify模型服务化:利用CSDN云原生平台实现高可用部署
  • 从入门到冲刺全免费:这款托福APP凭什么敢说“一站式”? - 速递信息
  • 别再只用普通卷积了!门控卷积(GConv)在AEC和语音合成中的实战调优心得
  • 亲测重庆租车避坑指南:案例复盘分享
  • MGeo地址匹配镜像体验:无需调参,直接跑通你的业务地址对
  • 基于LumiPixel的智能摄影工作室解决方案
  • 电容三点式振荡器Multisim仿真优化实践
  • Qwen3-ASR-0.6B行业落地:教育场景课堂语音→教学笔记自动生成