当前位置: 首页 > news >正文

NET中的异步编程(四)- IO完成端口以及FileStream.BeginRead

IO完成端口(IO Completion Port)

大多数人应该或多或少地听说过IO完成端口这么个东西,而且也知道它是实现高性能IO,高伸缩性应用的尚方宝剑。IO完成端口是一个非常复杂的内核对象,其实现的也非常巧妙,细细琢磨还是非常有意思的。

创建高伸缩性的应用的一个基本原则就是:创建更少的线程。线程数更少首先消耗的资源就少,每个线程的创建除了要浪费CPU时间外,还要创建一系列的数据结构用来保存线程相关的一些信息:用户栈,线程上下文,内核栈等。这个总共加起来大概1.5M左右,那么你算算你的32位机器总共能使用多少内存?那么对应地能创建多少线程?可能有人讲那对于64位的就无所谓了。嗯,在资源占用这方面64位确实不用担心。但是系统中可运行的线程数越多,你的CPU数又是有限的(8个?80个?)。Windows的任务调度机制是每个线程会运行一个时间片,然后Windows抢占式的调度另一个线程运行。那么线程数越多,Windows势必要进行更频繁的线程上下文切换。线程上下文切换对系统性能的影响在这里我就不多说了,你可以搜搜资料。

那么如何做到创建更少的线程,而又干更多的事儿呢?答案就是“不等待”。相对CPU来说,IO设备的速度简直低的要命。就好像飞机和拖拉机的差别一样,我们可不能让拖拉机拖了飞机的后退儿。而IO完成端口就是为了这个而生的:创建更少的线程,干更多的事儿。

IO完成端口首先不是一个我们看得见摸得着的什么插口,也和我们常说的80这样的端口不同。你可以将其理解为一个数据结构或一个对象(下面我会用C#的代码来辅助讲解IO完成端口,仅仅是讲解,这些代码并不是真实的实现):

Windows提供了一个CreateIoCompletionPort API来创建IO完成端口,实际上这个API有两个作用:创建IO完成端口和将一个IO设备与该端口绑定。创建IO完成端口时有一个很重要的参数:指定同时最多能有多少个线程并行运行,这就是为了保证更少的线程,如果你将这个数值指定为0,那么默认值就会是你机器的CPU数。IO端口里还有一个IO设备句柄列表,你可以将很多设备句柄与这个端口绑定(文件、Socket等):

//函数原型
HANDLE CreateIoCompletionPort(
//设备句柄
HANDLE hFile,
//已有的IO完成端口句柄,如果这里已经指定,则是将前面指定的设备与该端口绑定
HANDLE hExistingCompletionPort,
//因为一个IO完成端口可以绑定很多设备,可以用这个来区分
ULONG_PTR CompletionKey,
//允许同时运行的线程数
DWORD dwNumberOfConcurrentThreads
);
//创建一个IO完成端口
HANDLE hIoPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,2);
//创建文件,如果要异步访问文件则需要指定FILE_FLAG_OVERLAPPED
HANDLE hFile = CreateFile(..);
//将上面创建的文件句柄与刚才创建的IO完成端口绑定,不仅仅是文件可以
CreateIoCompletionPort(hFile,hIoPort,1,2);

除此之外,我们还要为该端口创建一些供使用的线程。然后让这些线程调用Windows提供的GetQueuedCompletionStatus方法。这些线程调用了该方法后会被放到IO完成端口另外一个数据结构中:一个后进先出的队列(我们将其称为等待队列吧)。然后该线程会休眠起来,不占用CPU。然后我们可以调用像ReadFile这样的方法发起一个IO请求:

BOOL ReadFile(
HANDLE hFile,
PVOID pvBuffer,
DWORD nNumBytesToRead,
PDWORD pdwNumBytes,
OVERLAPPED* pOverlapped);
ReadFile(..&overlapped);

上面代码中的OVERLAPPED是一个非常重要的数据结构,后面会提到。

现在假设你的某个IO设备收到了一个数据包,Windows就会检查这个IO设备是否跟一个IO完成端口关联了,如果关联了Windows就会把这个数据包投递到这个IO完成端口。IO完成端口里还有另外一个先进先出的队列,用来保存这些IO完成的数据。IO完成端口一看,唔,有个IO完成包投递到我这儿来了,那我看看我的那个等待队列里有没有线程还在休息,如果有就叫它起来干活儿。嘿,还真有一个家伙还在睡觉,如是IO完成端口就唤醒该线程,实际上就是上面的那个GetQueuedCompletionStatus方法返回了。该方法返回时还会得到一些别的信息:接收了多少个字节啊,是哪个设备啊,最重要的是上面提到的OVERLAPPED这个结构等等。起来后的线程就会拿着这些信息干一些后续的事儿:

BOOL GetQueuedCompletionStatus(
HANDLE hCompletionPort,
PDWORD pdwNumberOfBytesTransferred,
PULONG_PTR pCompletionKey,
OVERLAPPED **ppOverlapped,
DWORD dwMilliseconds);
//类似于下面的过程
//创建一个线程
Thread thread = new Thread(()=>{
while(true){
//如果没有IO完成通知到达,该线程就在这里休眠了
if(GetQueuedCompletionStatus(hIoPort,..ppOverlapped..)){
//从ppOverlapped里取出所需的信息,比如可能设置了一个回调函数的指针等
}else{
//
}
}
});
thread.Start();

干完这个事儿后,这个线程又会回到刚才那个队列继续躺起来(其实是再次调用一下那个方法)。我们要注意的是,这个等待队列是后进先出的,也就是说如果下次有消息来了很有可能还是上会那个线程来处理。这样做的目的还是为了提高性能:不需要进行线程上下文切换。因为CPU的速度比IO设备的高出很多,大部分时候我们只需要一两个线程就可以处理很多IO请求。

现在假设我们的机器有2个CPU,创建IO完成端口时我们指定了同时可以有2个线程运行。我们创建了4个线程放到等待队列里。现在有4个IO完成包投递过来了,放在那个队列里。实际上IO完成端口只会唤醒两个线程去执行,因为你指定了同时只能有两个线程运行,那两个线程运行完就会立马回来继续运行别的。但是现在出了一个状况,其中有一个线程执行过程中因为等待某个资源被阻塞了。那现在只有一个线程执行了,那这个线程就有点吃力了。其实IO完成端口非常聪明,它内部还有一个暂停运行的线程列表和一个正在运行的线程列表。如果某个线程正在运行,它就把这个线程ID放到这个队列里,当这个线程因为某个事儿暂停运行了它就会将其移动到另外一个列表中。IO完成端口会保证正在运行的线程列表里的数目不会超过你指定的最大并发数。一旦这个列表里的数目少于这个数,而IO完成包队列里又有未处理的包,IO完成端口就会看看还有没有在睡觉的线程,如果有就将其唤醒干活儿。

IO完成端口尽量的控制同时运行的线程数,减少上下文切换浪费的时间和资源,并且让线程尽量的忙起来。

这里还有一个有意思的地方,假设现在正在运行的两个线程其中一个调用Thread.Sleep休眠了,然后IO完成端口唤醒另外一个线程,让同时运行的线程数保持为2个,不过过了一会儿刚才调用Sleep休眠的线程醒过来了,有意思的事情发生了:现在有三个线程同时运行,超过了我们设置的最大并行数。这个时候IO完成端口是不会杀掉一个线程的,它会让它们继续执行,然后等到执行完了再让这个并行数降下去。

实际上,IO完成端口不仅仅可以用来处理这种异步的IO,它完全可以作为一种线程间的通讯机制来使用(与IO一点关系都没有),我们可以调用Win API PostQueuedCompletionStatus来模拟一次IO完成,这样我们的IO完成端口就会接到通知,然后调用线程执行。熟悉并发里的Actor模型的同学可能觉得这有点Actor的影子了。

BeginRead&EndRead

那么,既然有IO完成端口这么个好东西,如是有很多人想在.NET里也利用利用。其实大可不必,在.NET里异步的IO内部就是使用了IO完成端口。每个CLR初始化后都会创建一个IO完成端口,用来处理IO请求。很多人应该知道ThreadPool里的线程分为两类:worker thread和io completion thread,这里的io completion thread就是上一节说的跟IO完成端口相关联的那些thread。要说它跟其他的thread有什么不同?没什么不同,只是受IO完成端口控制而已。

为了看看在.NET中是如何利用IO完成端口的,我们将FileStream.BeginRead作为我们的入口点。在FileStream的Init方法里我们会看到这么一段代码:

if (this._isAsync)
{
//...
try
{
flag4 = ThreadPool.BindHandle(this._handle);
}
finally
{
CodeAccessPermission.RevertAssert();
}
//...
}

我们感兴趣的就是ThreadPool.BindHandle。还记得上面对IO完成端口的描述么?其实这里做的事儿就是将该文件句柄与每个CLR都初始化了的那个IO完成端口绑定。也就是说如果我们创建一个FileStream时指定了异步,那么IO完成端口就会“监视”这个文件。

我们再来看看BeginRead这个方法。该方法是用来发起异步IO请求的方法,该方法执行后会立即返回,不阻塞线程。

首先,看这么段代码:

if (!this._isAsync)
{
return base.BeginRead(array, offset, numBytes, userCallback, stateObject);
}

也就是说如果我们创建FileStream时,没有指定为异步,就会调用基类的BeginRead方法,那基类的这个方法又是如何实现的呢?

[HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)]
public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
//...
ReadDelegate delegate2 = new ReadDelegate(this.Read);
//...
return delegate2.BeginInvoke(buffer, offset, count, callback, state);
}

其实是创建一个调用同步的Read方法的委托,然后调用一下BeginInvoke方法(在第二篇文章已经说过,这样的调用实际上还是让线程池里的一个线程来调用,我们可以称之为一种伪异步IO)。这里可以得出一个结论:如果你想用BeginRead,那么初始化FileStream的时候就指定异步,否则就不用直接用Read。

那么如果创建FileStream的时候指定了异步会是什么结果呢(这里的实现在BeginReadCore方法里)?

[SecuritySafeCritical]
private unsafe FileStreamAsyncResult BeginReadCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, object stateObject, int numBufferedBytesRead)
{
NativeOverlapped* overlappedPtr;
FileStreamAsyncResult ar = new FileStreamAsyncResult {
_handle = this._handle,
_userCallback = userCallback,
_userStateObject = stateObject,
_isWrite = false,
_numBufferedBytes = numBufferedBytesRead
};
ManualResetEvent event2 = new ManualResetEvent(false);
ar._waitHandle = event2;
Overlapped overlapped = new Overlapped(0, 0, IntPtr.Zero, ar);
//...
overlappedPtr = overlapped.Pack(IOCallback, bytes);
//...
ar._overlapped = overlappedPtr;
//...
ReadFileNative(this._handle, bytes, offset, numBytes, overlappedPtr, out hr)

上面代码中的NativeOverlapped就是在上一节我们提到的保存有回调等信息的OVERLAPPED结构,在这里也是一样,它保存有我们的userCallback回调。然后通过调用ReadNative发起IO请求,并将这个数据结构传递进去,这里的ReadNative就是对Win32 的ReadFile的封装。发起异步IO请求完毕,BeginRead返回,过了一会儿磁盘驱动程序将数据读回来了,对应的IO完成端口收到通知,IO完成端口把刚才传递进去的NativeOverlapped结构传递给IO线程,IO线程从中取出IOCallback回调,IOCallback回调里有对我们的userCallback回调的调用:

IOCallback = new IOCompletionCallback(FileStream.AsyncFSCallback);
private static unsafe void AsyncFSCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
FileStreamAsyncResult asyncResult = (FileStreamAsyncResult) Overlapped.Unpack(pOverlapped).AsyncResult;
//...
AsyncCallback callback = asyncResult._userCallback;
if (callback != null)
{
callback(asyncResult);
}
}

在这个回调里我们会对EndRead进行调用,我们看看EndRead的代码会发现其他一些东西:

public override unsafe int EndRead(IAsyncResult asyncResult)
{
//...
WaitHandle handle = result._waitHandle;
if (handle != null)
{
try
{
handle.WaitOne();
}
finally
{
handle.Close();
}
}
NativeOverlapped* nativeOverlappedPtr = result._overlapped;
if (nativeOverlappedPtr != null)
{
Overlapped.Free(nativeOverlappedPtr);
}
//...
return (result._numBytes + result._numBufferedBytes);
}

首先是销毁我们在BeginRead里初始化的WaitHandle内核对象,然后将NativeOverlapped结构也销毁。所以EndRead除了取回读了多少个字节的作用外,还起了销毁资源的作用。所以有的时候我们想进行这么一个操作:异步的发起请求,但是我们并不关心该请求是否成功。如是我们就假想能不能只调用BeginXXX方法就可以了?从这里看我们不能简单的调用一下BeginXXX就了事了,因为在BeginXXX里分配的一些句柄和内核资源需要在EndXXX里销毁,不然会造成资源泄露。

http://www.jsqmd.com/news/1125811/

相关文章:

  • 全球邮轮旅行服务市场投资前景分析及发展研究建议报告2026年版
  • Nano Banana 2 怎么用?14 种宽高比 + 4K 出图完整步骤
  • 国漫视效巅峰最好的国产动画片哪吒魔童
  • 四步部署Dify:构建私有化AI应用开发平台
  • 从文档到AI知识库:工程化SOP与RAG实战指南
  • Engine-Sim实战:3大技术挑战与精准仿真验证指南
  • 智商平平”学软件
  • 暖通 / 配电 / 动环培训推荐|传统技工转行机房刚需岗位完整攻略
  • 2025-2026工业纯水机主流品牌资质服务多维对比指南
  • magnetW:一款高效的跨平台磁力链接聚合搜索工具完全指南
  • 从团购网的漏洞看网站安全性问题
  • Git凭据助手原理与安全实践:从本地开发到CI/CD的凭证治理
  • Nginx安全头配置实战:从X-Frame-Options到CSP的完整指南
  • 使用WorkBuddy自动发微博教程
  • 三轴运动跟踪系统设计与IMU传感器应用实践
  • 微信支付V3 微信小程序支付 线下正常、线上验签失败 回调异常 报错 com.wechat.pay.java.core.exception.ValidationException
  • 【2026】3ds Max 2027安装教程超详细图文步骤(附完整安装包)
  • 低压密集型母线槽核心选材标准解析,16 年生产工厂实操经验总结
  • WP7有约(三):课堂重点
  • R语言实现电力系统N-1事故分析与风险图谱生成
  • 创业是一种心态、信念和坚持,是一种生活方式
  • 商品条码查询API实战:免费接口申请到代码集成全攻略
  • UE指的是用户的体验,
  • 如何找到口碑过硬的医美材料供应商?
  • 多材质通用UV打印机:适配哪些材料?满足多场景印刷需求
  • LeetDown:3步让你的旧iPhone重获新生,macOS上一键降级体验
  • TypeScript_类型系统深度解析
  • 【Agent 个人学习分享日记】《RAG 全链路深度拆解:从知识库构建到精准问答的核心机制与工程实践》
  • 如何向妻子解释OOD
  • 商品条码查询API快速集成指南:从申请到调用实战