当前位置: 首页 > news >正文

【Iced】stream.rs文件

usestd::future::Future;usestd::pin::Pin;usestd::task::{Context,Poll};usefutures::stream::Stream;usecrate::subscription::EventStream;/// 将Stream转换为EventStreampubfnfrom_stream<Message,S>(stream:S)->EventStream<Message>whereMessage:'static+Send,S:Stream<Item=Message>+Send+'static,{EventStream::from(Box::pin(stream))}/// 将Future转换为EventStreampubfnfrom_future<Message,F>(future:F)->EventStream<Message>whereMessage:'static+Send,F:Future<Output=Message>+Send+'static,{from_stream(futures::stream::once(future))}/// 将多个EventStream合并为一个pubfnmerge<Message>(streams:Vec<EventStream<Message>>)->EventStream<Message>whereMessage:'static+Send,{ifstreams.is_empty(){returnEventStream::from(Box::pin(futures::stream::empty()));}letmutmerged=streams.into_iter().map(Into::into).collect::<Vec<_>>();EventStream::from(Box::pin(futures::stream::select_all(merged)))}/// 创建一个空的EventStreampubfnempty<Message>()->EventStream<Message>whereMessage:'static+Send,{EventStream::from(Box::pin(futures::stream::empty()))}/// 创建一个只包含单个消息的EventStreampubfnonce<Message>(message:Message)->EventStream<Message>whereMessage:'static+Send,{from_future(async{message})}/// 创建一个周期触发的EventStreampubfninterval<Message,F>(duration:std::time::Duration,f:F)->EventStream<Message>whereMessage:'static+Send,F:Fn()->Message+Send+Sync+'static,{letstream=futures::stream::unfold((),move|_|asyncmove{tokio::time::sleep(duration).await;Some((f(),()))});EventStream::from(Box::pin(stream))}/// 将Iterator转换为EventStreampubfnfrom_iter<Message,I>(iter:I)->EventStream<Message>whereMessage:'static+Send,I:IntoIterator<Item=Message>+Send+'static,I::IntoIter:Send,{letstream=futures::stream::iter(iter);EventStream::from(Box::pin(stream))}/// 创建一个可以动态发送消息的EventStreampubfnchannel<Message>(buffer:usize)->(mpsc::Sender<Message>,EventStream<Message>)whereMessage:'static+Send,{let(sender,receiver)=mpsc::channel(buffer);(sender,EventStream::from(Box::pin(receiver)))}#[cfg(test)]modtests{usesuper::*;usefutures::StreamExt;#[tokio::test]asyncfntest_from_stream(){letstream=futures::stream::iter(vec![1,2,3]);letmutevent_stream=from_stream(stream);assert_eq!(event_stream.next().await,Some(1));assert_eq!(event_stream.next().await,Some(2));assert_eq!(event_stream.next().await,Some(3));assert_eq!(event_stream.next().await,None);}#[tokio::test]asyncfntest_from_future(){letfuture=async{42};letmutevent_stream=from_future(future);assert_eq!(event_stream.next().await,Some(42));assert_eq!(event_stream.next().await,None);}#[tokio::test]asyncfntest_merge(){letstream1=from_iter(vec![1,2,3]);letstream2=from_iter(vec![4,5,6]);letstream3=from_iter(vec![7,8,9]);letmerged=merge(vec![stream1,stream2,stream3]);letresults:Vec<i32>=merged.collect().await;// select_all会随机选择,所以只验证长度和包含的元素assert_eq!(results.len(),9);foriin1..=9{assert!(results.contains(&i));}}#[tokio::test]asyncfntest_empty(){letmutempty_stream:EventStream<i32>=empty();assert_eq!(empty_stream.next().await,None);}#[tokio::test]asyncfntest_once(){letmutonce_stream=once(42);assert_eq!(once_stream.next().await,Some(42));assert_eq!(once_stream.next().await,None);}#[tokio::test]asyncfntest_interval(){usestd::sync::atomic::{AtomicUsize,Ordering};usestd::sync::Arc;usetokio::time::timeout;letcounter=Arc::new(AtomicUsize::new(0));letcounter_clone=counter.clone();letinterval_stream=interval(std::time::Duration::from_millis(10),move||{counter_clone.fetch_add(1,Ordering::SeqCst);42});// 只取前3个值,避免无限等待letmutlimited_stream=interval_stream.take(3);letresults:Vec<i32>=limited_stream.collect().await;assert_eq!(results,vec![42,42,42]);assert_eq!(counter.load(Ordering::SeqCst),3);}#[tokio::test]asyncfntest_from_iter(){letvec=vec![1,2,3,4,5];letmutiter_stream=from_iter(vec);foriin1..=5{assert_eq!(iter_stream.next().await,Some(i));}assert_eq!(iter_stream.next().await,None);}#[tokio::test]asyncfntest_channel(){let(sender,mutevent_stream)=channel(10);// 发送消息sender.send(1).await.unwrap();sender.send(2).await.unwrap();sender.send(3).await.unwrap();// 由于channel是异步的,需要稍微等待一下tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;// 丢弃sender,关闭通道drop(sender);assert_eq!(event_stream.next().await,Some(1));assert_eq!(event_stream.next().await,Some(2));assert_eq!(event_stream.next().await,Some(3));assert_eq!(event_stream.next().await,None);}}

文件说明

这是一个用于创建和管理EventStream的工具函数集合。EventStream似乎是某个框架(可能是 Iced GUI 框架)中用于处理异步事件流的核心类型。

主要函数

函数描述使用场景
from_stream将普通Stream转换为EventStream适配外部Stream到框架
from_future将Future转换为单元素EventStream处理一次性异步操作
merge合并多个EventStream组合多个事件源
empty创建空EventStream默认值或条件分支
once创建包含单个消息的Stream立即触发的事件
interval创建周期性触发的Stream定时器、心跳、轮询
from_iter将Iterator转换为EventStream处理静态数据集合
channel创建可动态发送消息的Stream复杂事件生产场景

核心模式

所有函数都遵循一个统一模式:接收各种数据源(Stream、Future、Iterator、定时器等),统一包装为EventStream类型。这提供了:

  1. 统一接口:所有事件源都表现为同一种类型
  2. 组合能力:可以轻松合并、转换事件流
  3. 灵活性:支持多种事件生产方式

应用场景

这个模块通常用于:

  • GUI应用:处理用户输入、系统事件
  • 实时系统:处理数据流、定时任务
  • 异步编程:统一处理各种异步数据源
  • 事件驱动架构:作为事件总线的基础

通过提供这些工具函数,开发者可以专注于业务逻辑,而不必关心不同类型事件流的底层实现细节。

http://www.jsqmd.com/news/482373/

相关文章:

  • ⚽⊔☺
  • Bootstrap5 图像形状
  • 057基于web的可追溯果蔬生产过程的管理系统-springboot+vue
  • 刚入行Java如何提升竞争力?
  • LLM 算法岗 | 八股题目 代码手撕 题目汇总与解析
  • ionic 模态窗口详解
  • 笔记3 - i
  • 大厂面试真题汇总(2026版)
  • Java程序员面试前请多刷题!
  • 二手交易平台毕业论文+PPT(附源代码+演示视频)
  • 深入解析观察者模式:从核心原理到现代框架中的高级实践
  • Redux - redux-saga中takeLates的作用
  • OpenClaw安全防护:从威胁认知到工程化加固
  • opencv中,把图片变成灰度图有什么用
  • AI驱动的8款工具能高效简化论文写作,自动完成目录生成与内容结构调整
  • web课堂笔记
  • 通过8款智能工具,论文写作更高效,自动生成目录并优化逻辑结构
  • PAT 乙级 1108
  • 部分OtterCTF2018
  • PAT 乙级 1103
  • 8款AI工具助力论文写作,一键生成目录并智能优化内容框架
  • 贪心/妙妙题单2
  • 智能工具提升论文写作效率,8款方案支持目录自动生成与结构优化
  • vosk-ASR angular调用[AI人工智能(五十二)]—东方仙盟
  • 利用8款AI工具,轻松实现论文目录自动生成与内容结构的智能调整
  • vosk-ASR asterisk调用[AI人工智能(五十三)]—东方仙盟
  • 借助8款高效智能工具,论文写作流程可大幅简化,自动生成目录并优化内容结构
  • 单机环境并发控制方案(详解+使用场景+比对)
  • 学习web第二天
  • C# .NET 周刊|2026年2月4期