当前位置: 首页 > news >正文

Dubbo线程池策略详解:Fixed、Cached、Limited与Eager对比

1. 项目概述

在AWS上运行深度学习任务时,命令行操作是每位工程师的必备技能。我整理了10个经过实战检验的CLI配方,涵盖从环境配置到模型部署的全流程。这些命令组合特别适合需要快速迭代的实验场景,能帮你省去大量点击控制台的时间。

2. 核心需求解析

2.1 典型使用场景

  • 快速启动临时训练任务
  • 自动化模型部署流水线
  • 跨区域资源监控与管理
  • 突发性计算资源调配

2.2 技术选型考量

选择AWS CLI而非SDK的原因:

  1. 轻量化:无需维护代码库
  2. 可组合性:通过管道连接多个操作
  3. 可追溯性:命令历史即审计日志
  4. 低延迟:特别适合紧急调试场景

3. 环境准备方案

3.1 凭证配置最佳实践

aws configure set region us-west-2 aws configure set output json

重要提示:永远不要在命令中直接写入AK/SK,应该使用IAM角色或环境变量

3.2 必要工具安装

pip install awscli --upgrade sudo apt-get install jq # JSON处理工具

4. 核心命令配方

4.1 计算资源管理

4.1.1 启动GPU实例
aws ec2 run-instances \ --image-id ami-0abcdef1234567890 \ --instance-type p3.2xlarge \ --key-name dl-keypair \ --security-group-ids sg-12345678 \ --subnet-id subnet-12345678 \ --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=DL-Training}]'
4.1.2 自动扩展组配置
aws autoscaling create-auto-scaling-group \ --auto-scaling-group-name DL-Workers \ --launch-template "LaunchTemplateId=lt-1234567890abc,Version=1" \ --min-size 1 \ --max-size 8 \ --desired-capacity 2 \ --vpc-zone-identifier "subnet-123456,subnet-abcdef"

4.2 存储方案优化

4.2.1 高性能共享存储
aws efs create-file-system \ --creation-token dl-shared \ --performance-mode maxIO \ --throughput-mode bursting
4.2.2 S3数据同步技巧
aws s3 sync s3://my-dataset-bucket ./local_data \ --exclude "*" \ --include "train/*.tfrecord" \ --include "val/*.tfrecord"

4.3 训练任务管理

4.3.1 分布式训练启动
aws sagemaker create-training-job \ --training-job-name resnet-$(date +%Y%m%d%H%M%S) \ --algorithm-specification "TrainingImage=763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-training:1.9.0-gpu-py38,TrainingInputMode=File" \ --role-arn arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-ExecutionRole \ --input-data-config '[{"ChannelName":"train","DataSource":{"S3DataSource":{"S3DataType":"S3Prefix","S3Uri":"s3://my-bucket/train/","S3DataDistributionType":"FullyReplicated"}}}]' \ --output-data-config "S3OutputPath=s3://my-bucket/output/" \ --resource-config "InstanceType=ml.p3.8xlarge,InstanceCount=2,VolumeSizeInGB=100" \ --stopping-condition "MaxRuntimeInSeconds=86400"
4.3.2 训练监控方案
aws cloudwatch get-metric-statistics \ --namespace "AWS/SageMaker" \ --metric-name "CPUUtilization" \ --dimensions "Name=TrainingJobName,Value=my-job" \ --start-time $(date -d "1 hour ago" +%Y-%m-%dT%H:%M:%SZ) \ --end-time $(date +%Y-%m-%dT%H:%M:%SZ) \ --period 60 \ --statistics "Average" \ --output json | jq '.Datapoints[] | [.Timestamp, .Average]'

5. 部署与优化

5.1 模型打包部署

aws sagemaker create-model \ --model-name my-tf-model \ --execution-role-arn arn:aws:iam::123456789012:role/service-role/AmazonSageMaker-Execution# 1. 概述 本文分享 **Dubbo 的线程池策略**。在 [《Dubbo 用户指南 —— 线程模型》](http://dubbo.apache.org/zh-cn/docs/user/demos/thread-model.html) 一文中,我们可以看到 Dubbo 提供了**三种线程池的实现**: > - `fixed` 固定大小线程池,启动时建立线程,不关闭,一直持有。(**缺省**) > - `cached` 缓存线程池,空闲一分钟自动删除,需要时重建。 > - `limited` 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。 > - `eager` 优先创建`Worker`线程池。在任务数量大于`corePoolSize`但是小于`maximumPoolSize`时,优先创建`Worker`来处理任务。当任务数量大于`maximumPoolSize`时,将任务放入阻塞队列中。阻塞队列充满时抛出`RejectedExecutionException`。(相比于`cached`:`cached`在任务数量超过`maximumPoolSize`时直接抛出异常而不是将任务放入阻塞队列) 本文涉及的类,如下图所示: ![类图](http://www.iocoder.cn/images/Dubbo/2018_12_01/01.png) # 2. ThreadPool `com.alibaba.dubbo.common.threadpool.ThreadPool` ,线程池接口。代码如下: ```java @SPI("fixed") public interface ThreadPool { /** * 线程池 * * @param url URL * @return 线程池 */ @Adaptive({Constants.THREADPOOL_KEY}) Executor getExecutor(URL url); }
  • @SPI("fixed")注解,Dubbo SPI拓展点,默认为"fixed"
  • @Adaptive({Constants.THREADPOOL_KEY})注解,基于 Dubbo SPI Adaptive 机制,加载对应的线程池实现,使用URL.threadpool属性。
  • #getExecutor(url)方法,获得对应的线程池的执行器。

3. FixedThreadPool

com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool,实现 ThreadPool 接口,固定大小线程池,启动时建立线程,不关闭,一直持有。代码如下:

public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 线程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 线程数 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 队列数 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 创建执行器 return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
  • 默认情况下,采用Executors#newFixedThreadPool(int nThreads)的类似实现,差异点如下:

    • 队列大小queues未设置时,使用SynchronousQueue队列。通过Executors#newCachedThreadPool()方法,可以知道SynchronousQueue是无空间的队列,即所有任务,立即执行。
    • 队列大小queues为负数时,使用LinkedBlockingQueue队列。通过Executors#newFixedThreadPool(int nThreads)方法,可以知道LinkedBlockingQueue是无界队列。
    • 队列大小queues为正数时,使用带queues空间的LinkedBlockingQueue队列。
  • 线程池拒绝策略,使用 AbortPolicyWithReport ,实现java.util.concurrent.RejectedExecutionHandler,代码如下:

    public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 打印告警日志 String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); // 打印 JStack ,分析线程状态 dumpJStack(); // 抛出 RejectedExecutionException 异常 throw new RejectedExecutionException(msg); } private void dumpJStack() { // ... 省略代码 } }
    • 打印告警日志
    • 打印JStack,分析线程状态。
    • 抛出 RejectedExecutionException 异常。

4. CachedThreadPool

com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool,实现 ThreadPool 接口,缓存线程池,空闲一定时长,自动删除,需要时重建。代码如下:

public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 线程池名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心线程数 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 队列数 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 线程存活时长 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 创建执行器 return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
  • 默认情况下,采用Executors#newCachedThreadPool()的类似实现,差异点如下:

    • 队列大小queues未设置时,使用SynchronousQueue队列。通过Executors#newCachedThreadPool()方法,可以知道SynchronousQueue是无空间的队列,即所有任务,立即执行。
    • 队列大小queues为负数时,使用LinkedBlockingQueue队列。通过Executors#newFixedThreadPool(int nThreads)方法,可以知道LinkedBlockingQueue是无界队列。
    • 队列大小queues为正数时,使用带queues空间的LinkedBlockingQueue队列。
  • 和 FixedThreadPool 的差异点在于:

    • 核心线程数cores默认为0
    • 最大线程数threads默认为Integer.MAX_VALUE
    • 线程存活时长alive默认为60 * 1000毫秒。

5. LimitedThreadPool

com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool,实现 ThreadPool 接口,可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。代码如下:

public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 线程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心线程数 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 队列数 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 创建执行器 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
  • 和 FixedThreadPool 的差异点在于:

    • 线程存活时长alive默认为Long.MAX_VALUE毫秒。

6. EagerThreadPool

com.alibaba.dubbo.common.threadpool.support.eager.EagerThreadPool,实现 ThreadPool 接口,在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建线程来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。代码如下:

public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 线程名 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心线程数 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 队列数 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 线程存活时长 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // 创建任务队列 TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); // 创建线程工厂 EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
  • 默认情况下,cores = 0threads = Integer.MAX_VALUEqueues = 0alive = 60 * 1000

  • TaskQueue 是 EagerThreadPool 的内部类,代码如下:

    static class TaskQueue<Runnable> extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = -2635853580887179627L; private transient EagerThreadPoolExecutor executor; public TaskQueue(int capacity) { super(capacity); } public void setExecutor(EagerThreadPoolExecutor exec) { executor = exec; } @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } // 当前线程数 int currentPoolThreadSize = executor.getPoolSize(); // 有空闲线程 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // 当前线程数小于最大线程数,创建新线程 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // 当前线程数大于等于最大线程数,添加到任务队列 return super.offer(runnable); } }
    • #offer(runnable)方法中,分成三种情况,也是 EagerThreadPool 的核心关键
    • 情况一,有空闲线程,提交到任务队列。
    • 情况二,无空闲线程且当前线程数小于最大线程数,返回false,让线程池创建新线程。
    • 情况三,无空闲线程且当前线程数大于等于最大线程数,提交到任务队列。
    • 通过这样的方式,优先创建线程来处理任务。
  • EagerThreadPoolExecutor 是 EagerThreadPool 的内部类,代码如下:

    static class EagerThreadPoolExecutor extends ThreadPoolExecutor { /** * 正在提交给线程池,但是还未完成的处理任务数量 */ private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public int getSubmittedTaskCount() { return submittedTaskCount.get(); } @Override protected void afterExecute(Runnable r, Throwable t) { submittedTaskCount.decrementAndGet(); } @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // 提交任务数 + 1 submittedTaskCount.incrementAndGet(); try { // 执行任务 super.execute(command); } catch (RejectedExecutionException rx) { // 若发生拒绝异常,尝试重新添加到任务队列 final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // 提交任务数 - 1 submittedTaskCount.decrementAndGet(); throw t; } } }
    • 通过submittedTaskCount属性,正在提交给线程池,但是还未完成的处理任务数量。
    • #execute(command)方法,提交任务数 + 1 ,执行任务。若发生异常,提交任务数 - 1 。
    • #afterExecute(r, t)方法,执行完成,提交任务数 - 1 。

666. 彩蛋

😈 又是一篇相对轻松的文章。

http://www.jsqmd.com/news/706789/

相关文章:

  • 2026正规免费量化交易软件推荐榜:ea量化交易软件/什么是量化交易/手机量化交易软件/散户如何做量化交易/期货量化交易系统/选择指南 - 优质品牌商家
  • 循环优化设计
  • 从零开始学C语言:环境搭建与首个代码
  • 梯度下降算法详解:原理、实现与优化技巧
  • 零基础秒落地!魔珐星云打造专属法务数字人
  • 成都地区、H型钢、350X350X12X19、Q235B、包钢、现货批发供应 - 四川盛世钢联营销中心
  • 用户上周说有两个孩子,这周说有三个孩子,Agent 如何处理记忆冲突?
  • Weaviate向量数据库实战:从部署到多模态搜索与生产优化
  • PyTorch训练管理:检查点与早停技术详解
  • 成都地区、H型钢、700X300X13X14、Q235B、包钢、现货批发供应 - 四川盛世钢联营销中心
  • 成都地区、低合金H型钢、500X200X10X16、Q355B、包钢、现货批发供应 - 四川盛世钢联营销中心
  • 记录一次Jenkins构建任务的坑
  • HTML总结
  • 成都地区、H型钢、588X300X12X20、Q235B、包钢、现货批发供应 - 四川盛世钢联营销中心
  • 205套思维工具(转)
  • caj2pdf:3个技巧让知网CAJ文献在Linux上重获新生
  • 2026川渝地区耐火砖技术分享:耐火材料供应厂家/耐火材料厂商/耐火材料厂家/耐火材料哪家好/耐火材料批发/耐火材料报价/选择指南 - 优质品牌商家
  • 为什么你的Dev Container正在悄悄上传源码?揭秘.gitignore之外的5类敏感数据泄漏路径(企业级隔离方案已落地)
  • 共享记忆会毁掉系统 多智能体信息污染的五种典型路径
  • 贝叶斯信念网络:原理、构建与应用实践
  • Linearis:Rust高性能线性代数库的设计、应用与性能调优
  • 2026年4月宜宾家装公司排行:宜宾装修公司哪家好、宜宾装修公司推荐、宜宾装修公司电话、宜宾装饰公司口碑、宜宾装饰公司哪家好选择指南 - 优质品牌商家
  • 神经网络模型容量控制:节点数与层数优化指南
  • cuML通过PyPI安装:GPU数据科学的新突破
  • 魔珐星云打造上海历史大屏数字人
  • Python异常检测算法实战:隔离森林与LOF应用解析
  • Cursor试用限制破解:基于MachineID重置的自动化解决方案
  • Cortex-A55寄存器架构与性能监控详解
  • Mockito 单测入门
  • 成都地区、H型钢、500X200X10X16、Q235B、包钢、现货批发供应 - 四川盛世钢联营销中心