当前位置: 首页 > news >正文

ABPVNext项目中使用HangFire定时任务(支持租户模式) - 梦想代码

引入相关nuget包

       Hangfire有服务端、持久化存储和客户端三大核心部件组成,而持久化存储这块儿,除了官方默认的SQLServer(可以集成MSMQ)以外,还支持Redis、MongoDB等,

       以持久化到SQLServer为例,ABP7.0版本引入的包如下:

 

主要包说明:

l  Hangfire.Sqlserver                     --Sqlserver数据库存储

l  Hangfire.Dashboard.BasicAuthorization   --可视化+权限控制

另外,还可以使用 Redis缓存 作业存储,它实现的 Hangfire 处理作业的速度比使用 SQL Server 存储快得多。

ABP中配置HangFire

简单化配置

主要设置的HangFire连接的数据库连接配置

 

说明:Hangfire会在第一次运行时,自动为我们创建HangFire相关的表。

数据库默认已经为我们创建了HangFire所需的表。

 

上图中的表用途说明:

以下是 Hangfire 后台任务生成的一些常见表格的说明:

l  `AggregatedCounter` 表:用于存储聚合计数器的数据。聚合计数器可以用于跟踪任务执行的数量和状态。

l  `Counter` 表:用于存储计数器的数据。计数器可以用于跟踪任务执行的数量和状态。

l  `Hash` 表:用于存储哈希数据。在 Hangfire 中,哈希表常用于存储任务的参数和属性。

l  `Job` 表:用于存储任务的信息。每个任务都会在该表中创建一行记录,包含任务的标识、类型、参数等信息。

l  `JobParameter` 表:用于存储任务的参数。每个任务的参数都会在该表中创建一行记录。

l  `JobQueue` 表:用于存储任务队列的信息。任务队列用于管理任务的执行顺序。

l  `List` 表:用于存储列表数据。在 Hangfire 中,列表常用于存储任务的依赖关系和执行顺序。

l  `Schema` 表:用于存储 Hangfire 数据库的模式信息。

l  `Server` 表:用于存储 Hangfire 服务器的信息。每个运行 Hangfire 服务器的实例都会在该表中创建一行记录。

l  `Set` 表:用于存储集合数据。在 Hangfire 中,集合常用于存储任务的标识和标签。

l  `State` 表:用于存储任务的状态信息。每个任务的状态变化都会在该表中创建一行记录,包含任务的标识、状态、执行时间等信息。

这些表格是 Hangfire 在数据库中创建的一部分,用于存储任务和相关数据。它们协同工作,提供了任务调度、执行和状态跟踪的功能。

默认以上配置都搞定,其他hangfire配置的知识自行补一下,以下上重点内容和知识点。

环境准备:1,安装了相关HangFire包及在ABP框架中进行了配置;2,使用IdentieyServer4服务进行token验证;3,创建一个自定义存储定时任务的表结构如下图

image

 核心思路:

初始创建定时任务时,用户登录后将token请求中的参数传递过来,进行新的token解析,生成刷新token的密钥,此密钥作用,一是用来刷新token,保证定时到点时调用在token权限的接口时,token不过期;二是,此密钥只能用一次就失效,保证每次定时调用时都生成最新的,实现上述功能就是将初时生在的refreshtoken保存到数据库中,每次定时到点时调用再重新token密钥并保存到定时任务列表中。

实现作用一的核心代码:

   public async Task<bool> AddOrUpdateTenantRecurring([FromBody] HttpJobDescriptorDto jobDescriptor){var httpJobExecutor = new TenantHttpJobExecutor();var jobName = jobDescriptor.JobName;var expressionStr = string.Empty;var _guid = GuidGenerator.Create();var exist = await _repository.FirstOrDefaultAsync(_ => _.JobName == jobName);if (exist != null){throw new BusinessException(message: "任务名称:" + jobName + "已存在,不能重复!");}if (string.IsNullOrEmpty(jobDescriptor.Cron))throw new BusinessException(message: "间隔时间即Cron表达式不能为空!");if (!jobDescriptor.CompanyId.HasValue){var getCompanyIdFormToken = CurrentUser.GetCurrentUserCompanyID();if (!string.IsNullOrEmpty(getCompanyIdFormToken))jobDescriptor.CompanyId = Guid.Parse(getCompanyIdFormToken);//从token中取公司ID
       }var entity = ObjectMapper.Map<HttpJobDescriptorDto, HttpJobDescriptor>(jobDescriptor);entity.JobName = jobName;//任务名称entity.JobType = ((int)HangFireJobTypeEnum.RecurringJobs).ToString();//标识定时任务entity.TenantId = CurrentTenant?.Id;//租户ID                                                                    bool hasSeconds = CheckIfCronExpressionHasSeconds(jobDescriptor.Cron);// 检查是否包含秒字段var now = DateTime.UtcNow;if (!hasSeconds){var expression = CronExpression.Parse(jobDescriptor.Cron);//不含秒var nextUtc = expression.GetNextOccurrence(now);var span = nextUtc - now;if (span != null && (int)span.Value.TotalMilliseconds > 0){entity.TimeSpanFromSeconds = (int)span.Value.TotalMilliseconds;//间隔时间:秒
           }expressionStr = expression.ToString();}else{var expression = CronExpression.Parse(jobDescriptor.Cron, CronFormat.IncludeSeconds);//秒级var nextUtc = expression.GetNextOccurrence(now);var span = nextUtc - now;if (span != null && (int)span.Value.TotalMilliseconds > 0){entity.TimeSpanFromSeconds = (int)span.Value.TotalMilliseconds;//间隔时间:秒
           }expressionStr = expression.ToString();}var getRefreshToken = await GetRefreshTokenAsync(jobDescriptor);if (getRefreshToken != null)entity.RefreshToken = getRefreshToken;//更改token密钥
       entity.SetId(_guid);jobDescriptor.Id = _guid;await _repository.InsertAsync(entity, true);//往调度表中添加数据
RecurringJob.AddOrUpdate(jobName, () => httpJobExecutor.DoRequest(jobDescriptor), expressionStr, TimeZoneInfo.Local);//其中Cron为cron表达式return true;}

上述方法中调用的GetRefreshTokenAsync代码如下,用于初始生成RefreshToken密钥,并保存到定时任务数据库表中

  /// <summary>/// 创建任务时,初始生成一个token密钥/// </summary>/// <param name="jobDto"></param>/// <returns></returns>/// <exception cref="BusinessException"></exception>private async Task<string> GetRefreshTokenAsync(HttpJobDescriptorDto jobDto){var getRefToken = string.Empty;using (HttpClient clientToken = _httpClientFactory.CreateClient("AuthClientHttpUrl")){// 构建请求体var request_body = new TokenBodyDto(){username = jobDto.Username,password = jobDto.Password,client_id = jobDto.ClientId,grant_type = "password",scope = jobDto.Scope,LoginUrl = "/connect/token",tenant = jobDto.Tenant};var LoginUrl = $"{request_body.LoginUrl}?__tenant={request_body.tenant}&company_id={jobDto.CompanyId.ToString()}";//var LoginUrl = $"{request_body.LoginUrl}?company_id={companyid.ToString()}";var paramPostJwt = new Dictionary<string, string>{{ "username", request_body.username },{ "password", request_body.password },{ "client_id", request_body.client_id },{ "grant_type", request_body.grant_type },{ "scope", request_body.scope }};var requestToken = new HttpRequestMessage(HttpMethod.Post, LoginUrl);requestToken.Headers.Add("Accept", "application/json, text/plain, */*");requestToken.Headers.Add("Accept-Language", "zh-Hans");requestToken.Headers.Add("Connection", "keep-alive");requestToken.Headers.Add("Access-Control-Request-Headers", "authorization");requestToken.Headers.Add("Access-Control-Request-Method", "POST");requestToken.Headers.Add("Sec-Fetch-Mode", "cors");requestToken.Headers.Add("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36");requestToken.Content = new FormUrlEncodedContent(paramPostJwt);requestToken.Content.Headers.ContentType = new MediaTypeHeaderValue("application/x-www-form-urlencoded") { CharSet = "UTF-8" };var responseToken = await clientToken.SendAsync(requestToken);if (responseToken.IsSuccessStatusCode){var body = await responseToken.Content.ReadAsStringAsync();var tokenInfoBrige = JsonConvert.DeserializeObject<TokenResponse>(body);getRefToken = tokenInfoBrige?.refresh_token;}else{throw new BusinessException(message: $"访问地址{LoginUrl},生成token失败!");}}return getRefToken;}

 核心的httpJobExecutor.DoRequest方法如下:

  public async Task DoRequest(HttpJobDescriptorDto jobDestriptor){using (var uow = _unitOfWorkManager.Begin(true, true)){using (var scope = _serviceProvider.CreateScope()){try{var service = scope.ServiceProvider.GetService<IHXRecurringJobService>();if (jobDestriptor.CompanyId != null){if (!jobDestriptor.CompanyId.HasValue){var jobInfo = await _repository.FindAsync(x => x.JobName == jobDestriptor.JobName);jobDestriptor.CompanyId = jobInfo.CompanyId;}}var getTenantJob = await service.GetHttpJobDescriptor(jobDestriptor.Id);var getToken = await RefreshToken(getTenantJob.RefreshToken, getTenantJob.ClientId, getTenantJob.ClientSecret);if (getToken != null){await service.UpdateHttpJobDescriptor(jobDestriptor.Id, getToken.access_token, getToken.refresh_token);}var client = new RestClient(jobDestriptor.HttpUrl);var httpMethod = (object)Method.Post;if (!Enum.TryParse(typeof(Method), jobDestriptor.HttpMethod, out httpMethod))throw new BusinessException($"不支持的HTTP动词:{jobDestriptor.HttpMethod}");var request = new RestRequest(string.Empty, (Method)httpMethod);request.AddHeader("Content-Type", "application/json");//请求标头header中语言标识if (!string.IsNullOrEmpty(jobDestriptor.AcceptLanguage)){request.AddHeader("Accept-Language", jobDestriptor.AcceptLanguage.Trim());}else{request.AddHeader("Accept-Language", "zh-Hans");//默认中文
                  }if (!string.IsNullOrEmpty(getToken?.bearer_access_token)){request.AddHeader("Authorization", getToken.bearer_access_token);}if (jobDestriptor.HttpMethod == "Get"){request.Timeout = 1000 * 60 * 5; // 限制时间 5分钟foreach (var item in jobDestriptor.GetParams){//key参数值不为空,才动态添加参数,底层Key要求不能为空,否则报错,考虑还有不传参的接口,此处只做过滤if (!string.IsNullOrEmpty(item.Key)){request.AddParameter(item.Key, item.Value.ToString());}}}else{var json = string.Empty;if (jobDestriptor.JobParameter != null){if (jobDestriptor.JobParameter is string _dataStr){json = _dataStr;}else{json = JsonConvert.SerializeObject(jobDestriptor.JobParameter);}}request.AddParameter("application/json", json, ParameterType.RequestBody);}var response = client.Execute(request);if (response.StatusCode != HttpStatusCode.OK)throw new BusinessException(message: $"调用接口{jobDestriptor.HttpUrl}失败,接口返回:{response.Content}");await uow.CompleteAsync();//提交事务
              }catch (Exception ex){_logger.LogWarning($"执行出错:{ex.Message}");}}}}

RefreshToken方法代码:

 /// <summary>/// 重新请求token/// </summary>/// <param name="refreshToken">刷新token令牌</param>/// <param name="clientId">客户端ID</param>/// <param name="clientSecret">客户端密钥</param>/// <returns></returns>/// <exception cref="Exception"></exception>public async Task<TokenResponse> RefreshToken(string refreshToken, string clientId, string clientSecret){using (HttpClient client = _httpClientFactory.CreateClient("AuthClientHttpUrl")){var LoginUrl = "/connect/token";var paramPostJwt = new Dictionary<string, string>{{ "grant_type", "refresh_token" },{ "refresh_token",refreshToken },{ "client_id", clientId },{ "client_secret", clientSecret },{ "scope", "WebAppGateway BaseService BusinessService BaseData offline_access" }};var requestToken = new HttpRequestMessage(HttpMethod.Post, LoginUrl);requestToken.Headers.Add("Accept", "application/json, text/plain, */*");requestToken.Headers.Add("Accept-Language", "zh-Hans");requestToken.Headers.Add("Connection", "keep-alive");requestToken.Headers.Add("Access-Control-Request-Headers", "authorization");requestToken.Headers.Add("Access-Control-Request-Method", "POST");requestToken.Headers.Add("Sec-Fetch-Mode", "cors");requestToken.Headers.Add("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36");requestToken.Content = new FormUrlEncodedContent(paramPostJwt);requestToken.Content.Headers.ContentType = new MediaTypeHeaderValue("application/x-www-form-urlencoded") { CharSet = "UTF-8" };var responseToken = await client.SendAsync(requestToken);if (responseToken.IsSuccessStatusCode){var body = await responseToken.Content.ReadAsStringAsync();var tokenInfoBrige = Newtonsoft.Json.JsonConvert.DeserializeObject<TokenResponse>(body);return new TokenResponse(){access_token = tokenInfoBrige?.access_token,expires_in = tokenInfoBrige?.expires_in,token_type = tokenInfoBrige?.token_type,refresh_token = tokenInfoBrige?.refresh_token,scope = tokenInfoBrige?.scope,bearer_access_token = "Bearer " + tokenInfoBrige?.access_token};}else{throw new BusinessException(message: $"Failed to refresh token: {responseToken.StatusCode}");}}}

运行效果

 

image

 

http://www.jsqmd.com/news/40045/

相关文章:

  • 巧用异步监听切面,提高系统性能
  • 2025年镜面电火花加工油直销厂家权威推荐榜单:清洗剂/水溶性切削液/不锈钢攻牙油源头厂家精选
  • 深入解析:【金仓数据库产品体验官】实战测评:电科金仓数据库接口兼容性深度体验
  • 使用AI对话和编程的一些提示词和用法
  • 实现“拼好库”,让你的 NuGet 包同时支持库调用和源生成器解析
  • 2025年膜结构工程订做厂家权威推荐榜单:膜结构遮阳棚/膜结构汽车棚/膜结构景观棚源头厂家精选
  • [忘发了]P3710 方方方的数据结构
  • open-type=chooseAvatar
  • 2025年防洪松木桩批发厂家权威推荐榜单:河道木桩/6米松木桩/人工湖木桩源头厂家精选
  • 2025年口碑好的实木楼梯定制:十大品牌综合评测与选择指南
  • 仿手绘画流程图工具 excalidraw
  • 2025 养老保险规划公司最新推荐榜:国际测评认证优质企业,综合实力与服务竞争力深度解析
  • Java CountDownLatch
  • GEO:AI搜索时代的新增长方式,以及灵捷AI的实践路径
  • 详细介绍:JVM Java虚拟机
  • [电调]AM32电调调参系列 —— Active brake on stop power 和 Brake on stop的区别
  • 2025 最新车床厂家推荐榜:聚焦高精度智能设备,涵盖立式 / 双主轴 / 车铣复合等热门机型
  • 2025年工业用离心机源头厂家权威推荐榜单:过滤离心机/高钾离心机/自动卸料离心机实力厂家精选
  • 2025 最新表冷器源头厂家权威推荐榜:14 项专利加持 + 国际测评认证,锂电表冷器/钎焊板式换热表冷器/铜管串铝翅片表冷器公司推荐
  • N - 翻译布尔表达式
  • 2025年市场上桥洞力学板开发公司排名背后故事:技术与实力的深度解析
  • 2025年10月桥洞力学板公司口碑排行情况
  • 2025年重庆脊柱矫正服务权威推荐榜单:中医理疗/经络/正脊服务精选
  • 2025全球知名连接器品牌价值榜与中国企业崛起:十大品牌全景测评与选型指南
  • 大气模式
  • 2025年存包储物柜实力厂家权威推荐榜单:公共场所储物柜/酒店储物柜/超市储物柜源头厂家精选
  • C# PuppeteerSharp html转pdf
  • 大气环流模式
  • 2025年隔音净化板制造厂权威推荐榜单:电子厂净化板/保温净化板/抗静电净化板源头厂家精选
  • 2025年阻燃泡沫批发厂家权威推荐榜单:防水泡沫/密封海绵/阻燃棉源头厂家精选