当前位置: 首页 > news >正文

RPC ServiceModel.Grpc C#

RPC ServiceModel.Grpc C#

DogWatcherHeartbeatService 中,CancellationToken 的核心作用是响应外部取消信号(如服务停止、客户端断开连接等),避免资源泄漏并确保程序优雅退出。以下是具体的使用分析和优化建议:

一、CancellationToken 的当前使用逻辑

1. HeartbeatService.ContinuousHeartbeatsAsync(服务器流)

  • 作用:检测客户端是否断开连接(或服务终止),及时终止服务器流,释放资源。
  • 当前逻辑
    • 循环条件 !cancellationToken.IsCancellationRequested 检查是否需要取消。
    • 显式调用 cancellationToken.ThrowIfCancellationRequested() 抛出取消异常。
    • Task.Delay(1000, cancellationToken) 传递令牌,确保延迟过程中能响应取消。

2. DogWatcher.ExecuteAsync(客户端流消费)

  • 作用:响应宿主服务停止信号(stoppingToken),终止客户端对流的消费。
  • 当前逻辑
    • 循环条件 !stoppingToken.IsCancellationRequested 检查服务是否需要停止。
    • 调用服务器流 ContinuousHeartbeatsAsync 时传递 stoppingToken,确保服务停止时能取消流。
    • 捕获 OperationCanceledException 处理取消场景,捕获其他异常处理流异常。

二、可优化的点及建议

1. 简化 HeartbeatService 中的取消检查逻辑

当前代码中,while 循环条件和 ThrowIfCancellationRequested 存在冗余(两者都是检查取消信号)。ThrowIfCancellationRequested 会在取消时直接抛出异常,可简化循环条件:

public async IAsyncEnumerable<HeartbeatResponse> ContinuousHeartbeatsAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{while (true) // 取消由异常触发,无需循环条件检查{cancellationToken.ThrowIfCancellationRequested(); // 关键:取消时立即抛出异常yield return new HeartbeatResponse { Message = "服务器流心跳正常", Timestamp = DateTime.Now };// 延迟期间也能响应取消(若取消,Delay会抛出异常)await Task.Delay(1000, cancellationToken); }
}

优化理由ThrowIfCancellationRequested 已涵盖取消检查,且能主动抛出异常让客户端感知取消,比单纯循环条件更直接。

2. 移除 DogWatcher 中不必要的延迟

当前代码在循环内先执行 Task.Delay(1000, stoppingToken),再消费流,这会导致首次连接延迟1秒,而服务器流本身已每秒发送一次数据,客户端无需额外延迟:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{while (!stoppingToken.IsCancellationRequested){try{// 移除多余的Task.Delay,直接消费流await foreach (var response in _heartbeatService.ContinuousHeartbeatsAsync(stoppingToken)){Console.WriteLine($"流数据:{response.Message},时间:{response.Timestamp:yyyy-MM-dd HH:mm:ss}");}// 若流正常结束(服务器主动关闭),延迟后重试连接Console.WriteLine("服务器流已结束,准备重试...");await Task.Delay(1000, stoppingToken);}catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested){// 仅处理宿主停止的取消信号Console.WriteLine("服务停止,流已取消");break;}catch (Exception ex){// 异常时延迟重试,避免频繁失败(如网络波动)Console.WriteLine($"流异常:{ex.Message},3秒后重试...");await Task.Delay(3000, stoppingToken); // 关键:添加重试延迟}}
}

优化理由

  • 去掉首次连接的延迟,让客户端尽快开始消费流。
  • 异常时添加延迟(如3秒),避免短时间内频繁重试导致资源浪费。
  • 使用 when (stoppingToken.IsCancellationRequested) 过滤取消源,仅处理宿主停止的场景。

3. 明确取消信号的传递范围

  • 服务器端[EnumeratorCancellation] 特性确保 cancellationToken 能正确关联到异步枚举器,客户端断开时会自动触发取消,无需额外处理。
  • 客户端stoppingTokenBackgroundService 的宿主停止信号,必须传递给所有阻塞操作(如 Task.Delay、流消费),确保服务停止时能快速终止所有任务。

4. 补充资源释放逻辑(若有)

如果流操作涉及非托管资源(如文件、网络连接),需在 finally 块中释放:

// 示例:若有资源需要释放
public async IAsyncEnumerable<HeartbeatResponse> ContinuousHeartbeatsAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{var resource = new SomeResource(); // 假设需要释放的资源try{while (true){cancellationToken.ThrowIfCancellationRequested();yield return new HeartbeatResponse { ... };await Task.Delay(1000, cancellationToken);}}finally{resource.Dispose(); // 确保资源释放}
}

三、总结

  • CancellationToken 的核心原则:在所有阻塞操作(Task.Delay、IO操作等)中传递令牌,确保能及时响应外部取消信号。
  • 优化核心:简化冗余检查、移除不必要延迟、添加合理重试策略,确保程序在取消或异常时优雅退出,减少资源浪费。

通过以上调整,代码会更简洁、高效,且对取消信号的响应更及时。

实例代码

RpcDemo.RpcDemo\RpcDemo.Edge\appsettings.json


{"Logging": {"LogLevel": {"Default": "Information","Microsoft": "Warning","Microsoft.Hosting.Lifetime": "Information"}},"AllowedHosts": "*","Kestrel": {"Endpoints": {"gRPC": {"Url": "http://*:9091","Protocols": "Http2"}}}
}

RpcDemo.RpcDemo\RpcDemo.Edge\HeartbeatService.cs


using System.Runtime.CompilerServices;
using RpcDemo.RpcShared;public class HeartbeatService : IHeartbeatService
{/// <summary>/// 普通 RPC 实现(略)/// </summary>/// <param name="cancellationToken"></param>/// <returns></returns>public async Task<bool> HeartbeatAsync(CancellationToken cancellationToken){Console.WriteLine($"普通心跳:{DateTime.Now:yyyy-MM-dd HH:mm:ss}");return true;}/// <summary>/// 服务器流实现:持续返回数据/// </summary>/// <param name="cancellationToken"></param>/// <returns></returns>public async IAsyncEnumerable<HeartbeatResponse> ContinuousHeartbeatsAsync([EnumeratorCancellation] CancellationToken cancellationToken){while (true) // 取消由异常触发,无需循环条件检查{cancellationToken.ThrowIfCancellationRequested(); // 关键:取消时立即抛出异常yield return new HeartbeatResponse { Message = "服务器流心跳正常", Timestamp = DateTime.Now };// 延迟期间也能响应取消(若取消,Delay会抛出异常)await Task.Delay(1000, cancellationToken);}}
}

RpcDemo.RpcDemo\RpcDemo.Edge\RpcDemo.Edge.csproj


<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net9.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><FrameworkReference Include="Microsoft.AspNetCore.App" /></ItemGroup><ItemGroup><PackageReference Include="ServiceModel.Grpc.DesignTime" Version="1.14.0" /><PackageReference Include="ServiceModel.Grpc.AspNetCore" Version="1.14.0" /><PackageReference Include="Grpc.AspNetCore.Server" Version="2.71.0" /><PackageReference Include="Grpc.AspNetCore.Web" Version="2.71.0" /></ItemGroup><ItemGroup><ProjectReference Include="..\RpcDemo.RpcShared\RpcDemo.RpcShared.csproj" /></ItemGroup><ItemGroup><None Update="appsettings.json" CopyToOutputDirectory="Always" /></ItemGroup></Project>

RpcDemo.RpcDemo\RpcDemo.Edge\Program.cs


using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;WebApplicationBuilder builder = WebApplication.CreateBuilder();builder.Configuration.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);builder.Services.AddServiceModelGrpc();
WebApplication webApplication = builder.Build();// webApplication.UseGrpcWeb();webApplication.MapGrpcService<HeartbeatService>();// webApplication.MapGrpcService<HeartbeatService>().EnableGrpcWeb();_ = Task.Run(() => webApplication.RunAsync());
System.Console.ReadLine();

RpcDemo.RpcDemo\RpcDemo.Handler\DogWatcher.cs


using RpcDemo.RpcShared;
using Microsoft.Extensions.Hosting;public class DogWatcher : BackgroundService
{private readonly IHeartbeatService _heartbeatService;public DogWatcher(IHeartbeatService heartbeatService){_heartbeatService = heartbeatService;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){try{// 移除多余的Task.Delay,直接消费流await foreach (var response in _heartbeatService.ContinuousHeartbeatsAsync(stoppingToken)){Console.WriteLine($"流数据:{response.Message},时间:{response.Timestamp:yyyy-MM-dd HH:mm:ss}");}// 若流正常结束(服务器主动关闭),延迟后重试连接Console.WriteLine("服务器流已结束,准备重试...");await Task.Delay(1000, stoppingToken);}catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested){// 仅处理宿主停止的取消信号Console.WriteLine("服务停止,流已取消");break;}catch (Exception ex){// 异常时延迟重试,避免频繁失败(如网络波动)Console.WriteLine($"流异常:{ex.Message},3秒后重试...");await Task.Delay(3000, stoppingToken); // 关键:添加重试延迟}}}
}

RpcDemo.RpcDemo\RpcDemo.Handler\GrpcClients.cs


using RpcDemo.RpcShared;
using ServiceModel.Grpc.DesignTime;// instruct ServiceModel.Grpc.DesignTime to generate required code during the build process
[ImportGrpcService(typeof(IHeartbeatService), GenerateDependencyInjectionExtensions = true)]
internal static partial class GrpcClients;

RpcDemo.RpcDemo\RpcDemo.Handler\RpcDemo.Handler.csproj


<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net9.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><FrameworkReference Include="Microsoft.AspNetCore.App" /></ItemGroup><ItemGroup><PackageReference Include="Grpc.Net.Client" Version="2.71.0" /><PackageReference Include="Grpc.Net.Client.Web" Version="2.71.0" /><PackageReference Include="Grpc.AspNetCore.Web" Version="2.71.0" /><PackageReference Include="ServiceModel.Grpc.DesignTime" Version="1.14.0" /><PackageReference Include="ServiceModel.Grpc.Client.DependencyInjection" Version="1.14.0" /></ItemGroup><ItemGroup><ProjectReference Include="..\RpcDemo.RpcShared\RpcDemo.RpcShared.csproj" /></ItemGroup></Project>

RpcDemo.RpcDemo\RpcDemo.Handler\Program.cs


using System.Net;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using ServiceModel.Grpc.Client.DependencyInjection;WebApplicationBuilder builder = WebApplication.CreateBuilder();builder.Services.AddSingleton<ChannelBase>(provider =>
{return GrpcChannel.ForAddress(new Uri("http://localhost:9091"), new GrpcChannelOptions { DisposeHttpClient = true, HttpVersion = HttpVersion.Version20 });
});// builder.Services.AddChatHttp20Client(_ => new Uri("http://localhost:9091"));builder.Services.AddServiceModelGrpcClientFactory().AddHeartbeatServiceClient();builder.Services.AddHostedService<DogWatcher>();WebApplication app = builder.Build();_ = Task.Run(() => app.RunAsync());Console.ReadLine();

RpcDemo.RpcDemo\RpcDemo.Handler\ServiceCollectionExtensions.cs


using System.Net;
using System.Net.Http;
using Grpc.Core;
using Grpc.Net.Client;
using Grpc.Net.Client.Web;
using Microsoft.Extensions.DependencyInjection;public static class ServiceCollectionExtensions
{public static IServiceCollection AddChatHttp11Client(this IServiceCollection services, Func<IServiceProvider, Uri> serverAddressResolver){// register GrpcChannelservices.AddSingleton(provider =>{var serverAddress = serverAddressResolver(provider);return CreateGrpcChannel(provider, serverAddress, useGrpcWeb: true);});return services;}public static IServiceCollection AddChatHttp20Client(this IServiceCollection services, Func<IServiceProvider, Uri> serverAddressResolver){// register GrpcChannelservices.AddSingleton(provider =>{var serverAddress = serverAddressResolver(provider);return CreateGrpcChannel(provider, serverAddress, useGrpcWeb: false);});return services;}public static ChannelBase CreateGrpcChannel(IServiceProvider serviceProvider, Uri serverAddress, bool useGrpcWeb){if (useGrpcWeb){HttpClientHandler handler = new HttpClientHandler();GrpcWebHandler grpcWebHandler = new GrpcWebHandler(GrpcWebMode.GrpcWeb, handler);var channelOptions = new GrpcChannelOptions{DisposeHttpClient = true,HttpVersion = HttpVersion.Version11,HttpClient = new HttpClient(grpcWebHandler) { BaseAddress = serverAddress }};return GrpcChannel.ForAddress(serverAddress, channelOptions);}else{var channelOptions = new GrpcChannelOptions { DisposeHttpClient = true, HttpVersion = HttpVersion.Version20, };return GrpcChannel.ForAddress(serverAddress, channelOptions);}}
}

RpcDemo.RpcDemo\RpcDemo.RpcShared\RpcDemo.RpcShared.csproj


<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><TargetFramework>net9.0</TargetFramework><ImplicitUsings>enable</ImplicitUsings><Nullable>enable</Nullable></PropertyGroup><ItemGroup><PackageReference Include="ServiceModel.Grpc" Version="1.14.0" /><PackageReference Include="System.ServiceModel.Primitives" Version="8.1.2" /></ItemGroup></Project>

RpcDemo.RpcDemo\.csharpierrc.json


{"printWidth": 200,"useTabs": false,"tabWidth": 4,"endOfLine": "auto"
}

RpcDemo.RpcDemo\build.bat


dotnet build

RpcDemo.RpcDemo\RpcDemo.RpcDemo.sln

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.0.31903.59
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RpcDemo.Handler", "RpcDemo.Handler\RpcDemo.Handler.csproj", "{B8462209-2F7B-4A8C-8C69-7558E5037107}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RpcDemo.Edge", "RpcDemo.Edge\RpcDemo.Edge.csproj", "{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RpcDemo.RpcShared", "RpcDemo.RpcShared\RpcDemo.RpcShared.csproj", "{53B5547A-8494-4EE6-859E-1119DFCB9CD1}"
EndProject
GlobalGlobalSection(SolutionConfigurationPlatforms) = preSolutionDebug|Any CPU = Debug|Any CPUDebug|x64 = Debug|x64Debug|x86 = Debug|x86Release|Any CPU = Release|Any CPURelease|x64 = Release|x64Release|x86 = Release|x86EndGlobalSectionGlobalSection(ProjectConfigurationPlatforms) = postSolution{B8462209-2F7B-4A8C-8C69-7558E5037107}.Debug|Any CPU.ActiveCfg = Debug|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Debug|Any CPU.Build.0 = Debug|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Debug|x64.ActiveCfg = Debug|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Debug|x64.Build.0 = Debug|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Debug|x86.ActiveCfg = Debug|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Debug|x86.Build.0 = Debug|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Release|Any CPU.ActiveCfg = Release|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Release|Any CPU.Build.0 = Release|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Release|x64.ActiveCfg = Release|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Release|x64.Build.0 = Release|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Release|x86.ActiveCfg = Release|Any CPU{B8462209-2F7B-4A8C-8C69-7558E5037107}.Release|x86.Build.0 = Release|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Debug|Any CPU.ActiveCfg = Debug|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Debug|Any CPU.Build.0 = Debug|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Debug|x64.ActiveCfg = Debug|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Debug|x64.Build.0 = Debug|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Debug|x86.ActiveCfg = Debug|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Debug|x86.Build.0 = Debug|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Release|Any CPU.ActiveCfg = Release|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Release|Any CPU.Build.0 = Release|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Release|x64.ActiveCfg = Release|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Release|x64.Build.0 = Release|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Release|x86.ActiveCfg = Release|Any CPU{BD597AF8-B4C7-4209-B0B4-7B6EE2CBC063}.Release|x86.Build.0 = Release|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Debug|Any CPU.Build.0 = Debug|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Debug|x64.ActiveCfg = Debug|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Debug|x64.Build.0 = Debug|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Debug|x86.ActiveCfg = Debug|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Debug|x86.Build.0 = Debug|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Release|Any CPU.ActiveCfg = Release|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Release|Any CPU.Build.0 = Release|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Release|x64.ActiveCfg = Release|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Release|x64.Build.0 = Release|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Release|x86.ActiveCfg = Release|Any CPU{53B5547A-8494-4EE6-859E-1119DFCB9CD1}.Release|x86.Build.0 = Release|Any CPUEndGlobalSectionGlobalSection(SolutionProperties) = preSolutionHideSolutionNode = FALSEEndGlobalSection
EndGlobal