当前位置: 首页 > news >正文

[INFRA] EMR集群MetricsCollector组件功能和运行原理分析

请注意本文部分内容经过AI辅助生成,虽然经过笔者检查但是并不保证内容的正确性,请自行判断准确性,本文对相关后果不承担责任

本文主要讲述 EMR 集群中的 MetricsCollector(简称 MC)组件。MC 是 Managed Scaling(托管扩缩容)的核心数据采集器,负责从 YARN 和 HDFS 采集指标,通过 WebSocket 上报给 EMR 控制面,为自动扩缩容提供决策数据。

本文的测试环境是emr7.12版本,细节如下

环境配置

MC 的版本号是 emr-metrics-collector-1.32.0-1.noarch,主类是 emr.metricscollector.Main,使用 JDK 8

仅在 Master 节点运行。Worker 节点虽然通过 RPM 安装了 metricscollector.service,但状态为 static/inactive,进程不启动、端口不监听、日志目录为空(已在 3 个 Worker 节点实际验证)。

关键特性:

特性 说明
仅 Master 运行 Worker 节点不启动
JDK 8 进程 独立于 IC(JDK 17)
5 秒采集 默认每 5 秒采集一次指标
10 秒上报 默认每 10 秒向控制面推送一次
10 分钟保留 内存中保留最近 10 分钟的指标数据
WebSocket 推送 通过 API Gateway WebSocket 实时推送
IC 配置驱动 所有采集参数由 IC 通过 RPC setConfig 下发

进程信息

/usr/lib/jvm/java-1.8.0-amazon-corretto.x86_64/bin/java \-Xmx1024m -Xms300m \-XX:OnOutOfMemoryError="kill -9 %p" \-XX:MinHeapFreeRatio=10 \-server \-cp /usr/share/aws/emr/emr-metrics-collector/lib/*:/home/hadoop/conf \-Dlog4j.defaultInitOverride \emr.metricscollector.Main

systemd 服务配置

[Unit]
Description=EMR metrics collector daemon
After=libinstance-controller-java.service[Service]
Type=forking
ExecStart=/usr/bin/metricscollector
Restart=always
RestartSec=5
PIDFile=/run/metricscollector.pid
MemoryMax=1024M
CPUQuota=20%
MemorySwapMax=1536M

资源限制:

资源 限制
MemoryMax 1024M(超过则 OOM kill)
CPUQuota 20%
MemorySwapMax 1536M
JVM -Xmx 1024m
JVM -Xms 300m
-XX:OnOutOfMemoryError kill -9 %p

架构概览

EMR 控制面 (Control Plane)▲│ WebSocket (wss://)│ METRICS 消息│
┌───┼──────────────────────────────────────────────────────┐
│   │  Master Node                                         │
│   │                                                      │
│  ┌┴──────────────┐  setConfig (RPC 9500)                │
│  │ Instance      │ ──────────────────► MetricsCollector  │
│  │ Controller    │                     (JDK 8)           │
│  │ (JDK 17)     │                         │              │
│  └───────────────┘                         │              │
│                                            ▼              │
│  ┌───────────────┐  REST API    ┌──────────────────┐     │
│  │ YARN RM :8088 │ ◄──────────  │ Collectors       │     │
│  └───────────────┘              │ (线程池 100)     │     │
│                                 └──────────────────┘     │
│  ┌───────────────┐  /transferMetrics (RPC 9501)          │
│  │ Hadoop        │ ──────────────────► MC                │
│  │ Metrics2      │  每 5 分钟推送 HDFS 指标              │
│  └───────────────┘                                       │
└──────────────────────────────────────────────────────────┘

MC 监听两个 HTTP 端口:

端口 认证方式 用途
9500 Linux UID (hadoop=990) IC → MC 配置下发(setConfig)
9501 Linux UID (hdfs=985) Hadoop Metrics2 → MC 指标推送(transferMetrics)

认证流程:

请求到达 → ProcessRunnerImpl: id -u <user>→ LinuxLocalAuthHelperImpl: getLinuxUid output: 990→ RequestHelper: Granted access to UID: 990

setConfig 接口

IC 每30 秒调用一次 setConfig,下发三组配置。Managed Scaling 启用前后差异巨大:

AutoScalingConfiguration

参数 未启用 启用后
shouldPublishMetrics false true
collectionPeriod PT0S PT5S
publisherPeriod PT0S PT10S
retentionPeriod PT0S PT10M
autoScalingMetricsList [] (空) 28 个指标
privateIpAddressForRmHost null 192.168.25.65

启用后的 28 个指标名称:

YARNMemoryAvailablePercentage    ContainerPendingRatio
MRActiveNodes                    HDFSUtilization
ContainerPending                 IsIdle
AppsPending                      MemoryAvailableMB
CoreNodesRunning                 AppsCompleted
AppsRunning                      AppsFailed
ContainerAllocated               MemoryTotalMB
TotalVcpuCount                   PrivateIpAddr
AvailableVcores                  UsedVcores
ContainersPhysicalMemoryMB       ContainersVirtualMemoryMB
TotalVcpuUsage                   NodePhysicalMemoryMB
NodeVirtualMemoryMB              NodesApiRunTime
Presto.ClusterMemoryBytes        Presto.ClusterTotalMemoryReservation
Trino.ClusterMemoryBytes         Trino.ClusterTotalMemoryReservation

InstanceAutoScalingConfiguration(本集群全部为 PT0S,未启用):

retentionPeriodForInstanceMetrics=PT0S
collectorFrequencyForInstanceMetrics=PT0S
collectorFrequencyForYarnRM=PT0S
collectorFrequencyForHDFS=PT0S
collectorFrequencyForPresto=PT0S

SparkShuffleAutoScalingConfiguration

retentionPeriodForSparkMetrics=PT0S
collectorFrequencyForSparkMetrics=PT0S
publisherFrequencyForSparkMetrics=PT30S

transferMetrics 接口

Hadoop Metrics2 框架通过此接口推送 HDFS 指标:

POST http://localhost:9501/transferMetrics
User: hdfs (UID 985)
Body: {"dfs.FSNamesystem.TotalLoad":12, ...}
Response: {"transferredMetrics":true}

推送频率约每 5 分钟一次。

完整指标列表

YARN Cluster 指标(22 个)

http://<RM>:8088/ws/v1/cluster/metrics 采集:

指标名 发布名 说明
MemoryTotalMB Counter.MapReduceFramework.MemoryTotalMB YARN 总内存
MemoryAllocatedMB Counter.MapReduceFramework.MemoryAllocatedMB 已分配内存
MemoryAvailableMB Counter.MapReduceFramework.MemoryAvailableMB 可用内存
MemoryReservedMB Counter.MapReduceFramework.MemoryReservedMB 预留内存
MemoryAvailablePercentage Counter.MapReduceFramework.MemoryAvailablePercentage 可用内存百分比
NumContainersAllocated Counter.MapReduceFramework.NumContainersAllocated 已分配容器数
NumContainersPending Counter.MapReduceFramework.NumContainersPending 等待容器数
NumContainersReserved Counter.MapReduceFramework.NumContainersReserved 预留容器数
ContainersPendingRatio Counter.MapReduceFramework.ContainersPendingRatio 等待容器比率
NumAppsSubmitted Counter.MapReduceFramework.NumAppsSubmitted 提交的应用数
NumAppsRunning Counter.MapReduceFramework.NumAppsRunning 运行中应用数
NumAppsPending Counter.MapReduceFramework.NumAppsPending 等待中应用数
NumAppsCompleted Counter.MapReduceFramework.NumAppsCompleted 完成的应用数
NumAppsFailed Counter.MapReduceFramework.NumAppsFailed 失败的应用数
NumAppsKilled Counter.MapReduceFramework.NumAppsKilled 被杀的应用数
NoOfActiveNodes mapred.resourcemanager.NoOfActiveNodes 活跃节点数
TotalNodes mapred.resourcemanager.TotalNodes 总节点数
NoOfLostNodes mapred.resourcemanager.NoOfLostNodes 丢失节点数
NoOfDecommissionedNodes mapred.resourcemanager.NoOfDecommissionedNodes 退役节点数
NoOfRebootedNodes mapred.resourcemanager.NoOfRebootedNodes 重启节点数
NoOfUnhealthyNodes mapred.resourcemanager.NoOfUnhealthyNodes 不健康节点数
IsIdle IsIdle 集群是否空闲

YARN App 指标(4 个)

http://<RM>:8088/ws/v1/cluster/apps 采集:

指标名 说明
Counter.FileSystem.HDFS_BYTES_READ 应用读取 HDFS 字节数
Counter.FileSystem.HDFS_BYTES_WRITTEN 应用写入 HDFS 字节数
Counter.FileSystem.S3_BYTES_READ 应用读取 S3 字节数
Counter.FileSystem.S3_BYTES_WRITTEN 应用写入 S3 字节数

YARN Nodes 指标(9 个)

http://<RM>:8088/ws/v1/cluster/nodes 采集:

指标名 说明
Monitor.availableVcores 可用 vCPU 数
Monitor.usedVcores 已用 vCPU 数
Monitor.usage 总体使用率
Monitor.containersPhysicalMemoryMB 容器物理内存
Monitor.containersVirtualMemoryMB 容器虚拟内存
Monitor.nodePhysicalMemoryMB 节点物理内存
Monitor.nodeVirtualMemoryMB 节点虚拟内存
Monitor.privateIpAddr 节点私有 IP
Monitor.NodesApi.RunTime Nodes API 调用耗时

HDFS 指标(~11 个)

由 Hadoop Metrics2 通过 /transferMetrics RPC 推送:

指标名 说明
dfs.FSNamesystem.NumLiveDataNodes 存活 DataNode 数
dfs.FSNamesystem.CapacityTotalGB HDFS 总容量 (GB)
dfs.FSNamesystem.CapacityUsedGB HDFS 已用容量 (GB)
dfs.FSNamesystem.CapacityRemainingGB HDFS 剩余容量 (GB)
dfs.FSNamesystem.TotalLoad HDFS 总负载
dfs.FSNamesystem.PendingReplicationBlocks 待复制块数
dfs.FSNamesystem.UnderReplicatedBlocks 副本不足块数
dfs.FSNamesystem.CorruptBlocks 损坏块数
dfs.FSNamesystem.MissingBlocks 丢失块数
dfs.FSNamesystem.PendingDeletionBlocks 待删除块数
HDFSUtilization HDFS 使用率

HDFS 指标的特殊采集方式

HDFS 指标不是 MC 主动拉取的,而是由 cloudwatch-sink 组件通过 RPC 推送过来的。

cloudwatch-sink(RPM: cloudwatch-sink-2.22.0)是一个 Hadoop Metrics2 Sink 插件,在 hadoop-metrics2.properties 中配置了白名单:

*.sink.cloudwatch.metricsList = TotalLoad,CapacityTotalGB,UnderReplicatedBlocks,CapacityRemainingGB,PendingDeletionBlocks,PendingReplicationBlocks,CorruptBlocks,CapacityUsedGB,NumLiveDataNodes,NumDeadDataNodes,MissingBlocks

数据流向:

Hadoop 组件 (NameNode/RM/NM/...)││ Metrics2 框架 → CloudWatchSink.putMetrics()│               → MetricsCollectorRpcClient││ POST /transferMetrics (端口 9501)│ 以 hdfs 用户身份,每 5 分钟一次▼
MC RPC Server│ 认证: LinuxLocalAuthHelper → UID 985 (hdfs)│ 授权: RequestHelper.Granted access▼
TransferMetricsMethod.handle()

推送的 JSON 数据示例:

{"dfs.FSNamesystem.TotalLoad": 12,"dfs.FSNamesystem.PendingReplicationBlocks": 0,"dfs.FSNamesystem.UnderReplicatedBlocks": 0,"dfs.FSNamesystem.NumLiveDataNodes": 3,"dfs.FSNamesystem.CapacityTotalGB": 174.0,"dfs.FSNamesystem.NumDeadDataNodes": 0,"dfs.FSNamesystem.CorruptBlocks": 0,"dfs.FSNamesystem.CapacityUsedGB": 4.0,"dfs.FSNamesystem.PendingDeletionBlocks": 0,"dfs.FSNamesystem.MissingBlocks": 0,"dfs.FSNamesystem.CapacityRemainingGB": 161.0
}

名字叫 cloudwatch-sink但不直接推送到 CloudWatch,而是推送到 MC

Checkpoint 机制

MC 使用 checkpoint 文件跟踪 YARN App 指标的采集进度:

# 文件位置
/emr/metricscollector/run/cwmetrics.ckp

内容:

{"lastCkpTs": 1774197284748,"totalHdfsBytesReadCompletedApps": 0,"totalHdfsBytesWrittenCompletedApps": 0,"totalS3BytesReadCompletedApps": 0,"totalS3BytesWrittenCompletedApps": 0
}

lastCkpTs 用于查询已完成应用的时间窗口:

GET /ws/v1/cluster/apps?states=FINISHED&finishedTimeBegin={lastCkpTs-900s}&finishedTimeEnd={lastCkpTs}

这确保了 MC 重启后不会重复统计已完成应用的 I/O 字节数。

WebSocket 通信

连接建立

MC → wss://snbxldp692.execute-api.cn-north-1.amazonaws.com.cn/prod使用 Tyrus/Grizzly WebSocket 客户端心跳: Http-pingpong-thread, 每 30 秒

消息格式

{"messageType": "METRICS","messageDirection": "CLUSTER_TO_CP","clusterId": "j-1Zxxxxxxxx51T","accountId": "<accountID>","messageId": "uuid","eventTime": 1774197339397,"metricsEntity": {"metricDataPoints": [[{"metricName": "ContainerAllocated"}, [{"timestamp": 1774197333775, "metricValue": 0.0}]],[{"metricName": "MemoryAvailableMB"}, [{"timestamp": 1774197333775, "metricValue": 12288.0}]]]}
}

可靠性机制

  • RetryableMessageHandler:消息发送失败后自动重试
  • 消息队列:发送后等待服务端 ACK,超时重发
  • 重试扫描:pool-6-thread-1 每 5 秒扫描待重试消息
  • payload 限制:单条消息最大 122880 字节(120KB)

MetricsMessageHandlerStats 字段说明

MetricsMessageHandlerStats: TS: 141, TR: 141, UNS: 141, ACR: 141, ER:0
字段 含义
TS Total Sent — 总发送消息数
TR Total Received — 总接收确认数
UNS Unique Sent — 唯一发送数
ACR Acknowledged — 已确认数
ER Error — 错误数

正常情况下 TS ≈ TR ≈ UNS ≈ ACR,ER = 0。

启动流程时间线

10:57:23  MetricsCollectorDaemon: Start Metrics Collector Daemon
10:57:24  Received userData                          ← 读取集群元数据
10:57:25  Received extra instance data               ← 读取实例信息
10:57:25  Finished initializing MetricsCollectorServiceImpl
10:57:26  Started ServerConnector (HTTP/1.1) {0.0.0.0:9500}  ← RPC 端口就绪
10:57:29  IC-MC-RPC: setConfig from 127.0.0.1        ← IC 首次下发配置autoScalingMetricsList: 31 个指标collectionPeriod=PT5S, publisherPeriod=PT10S
10:57:56  Finish set up all required configurations   ← 配置就绪(等了 ~30 秒)createThreadPoolExecutor with workerCount 100
10:58:07  WebSocket Client ready to establish connection
10:58:10  Opened Connection (wss://...amazonaws.com.cn/prod)
10:58:10  Successfully refreshed the client           ← WebSocket 建连完成
10:58:11  First METRICS message sent                  ← 首次指标上报

MC 启动后会等待 IC 多次 setConfig 调用(约每 4 秒一次,共 ~7 次),直到配置稳定后才开始采集。

线程模型

线程名 数量 职责
main 1 启动、初始化
IC-MC-RPC-N 多个 处理 IC setConfig RPC 请求
arpc-N 多个 处理 Hadoop transferMetrics RPC 请求
metrics-collector-N 100 (线程池) 指标采集(YARN/HDFS/Nodes)
metrics-publisher-1 1 指标发布到 WebSocket
spark-shuffle-metrics-publisher1 1 Spark Shuffle 指标发布
Grizzly(N) 2 WebSocket I/O
Http-pingpong-thread-0 1 WebSocket 心跳
pool-6-thread-1 1 消息重试扫描(每 5 秒)
ApiGatewayEndpointRefresher-0 1 API Gateway 端点刷新

目录结构

/emr/metricscollector/
├── log/
│   ├── metricscollector.log              ← 当前日志(按小时轮转)
│   ├── metricscollector_*.log.gz         ← 历史日志
│   └── metricscollector.out              ← 启动日志
├── run/
│   └── cwmetrics.ckp                     ← checkpoint 文件
├── local/                                ← 本地数据目录
└── isbusy                                ← 忙碌标记文件/usr/share/aws/emr/emr-metrics-collector/lib/  ← JAR 包(~100MB, 150+ 依赖)
/usr/bin/metricscollector                       ← 启动脚本
/run/metricscollector.pid                       ← PID 文件

运维指南

常用命令

# 服务管理
systemctl status metricscollector
sudo systemctl restart metricscollector# 日志查看
sudo tail -f /emr/metricscollector/log/metricscollector.log
sudo grep "shouldPublishMetrics" /emr/metricscollector/log/metricscollector.log | tail -3
sudo grep "MetricsMessageHandlerStats" /emr/metricscollector/log/metricscollector.log | tail -5# 端口检查
sudo ss -tlnp | grep -E "9500|9501"# Checkpoint 查看
sudo cat /emr/metricscollector/run/cwmetrics.ckp | python3 -m json.tool# 查看采集的指标名称
sudo grep "Metric being published" /emr/metricscollector/log/metricscollector.log | sed 's/.*published: //' | sort -u
http://www.jsqmd.com/news/524378/

相关文章:

  • 2026年五恒系统厂家推荐排行榜:别墅/大平层/洋房/叠拼/独栋/豪宅全屋定制,专业打造恒温恒湿恒氧恒洁恒静舒适生活空间 - 品牌企业推荐师(官方)
  • C++初始化列表、类型转换
  • 解决Android Studio中annotation-experimental-1.4.1.aar版本冲突的实战指南
  • DeepSpeed多卡通信避坑指南:all_to_all_single的5个常见错误及解决方法
  • 20241223 实验一《Python程序设计》实验报告
  • AGV调度算法深度解析:从避碰优化到千车并行的技术演进
  • 混合动力汽车Simulink整车模型:探索P2并联混动仿真的奇妙世界
  • 嵌入式网络调试利器:在ARM开发板上手把手编译tcpdump 4.99.4
  • 算法复杂度理论的边界与不可计算性探讨的技术7
  • 2026会议音响套装优质品牌推荐指南:报告厅音响、无纸化会议室、无纸化会议终端、无纸化会议软件、无纸化办公系统选择指南 - 优质品牌商家
  • 168开奖网源码API修复记录
  • 6.1.1 软件->PEP标准(PSF基金会):Python 标准库标准(Python Standard Library Specification)
  • 基于LBM的Xflow单相及两相流动模拟探索
  • CrossEntropyLoss参数详解:从reduction=‘none‘到loss.backward()的完整避坑指南
  • 【C++面经】轻舟智航自动驾驶应用软件开发实习岗位
  • 五大品牌设计培训机构横评——后浪教育引领未来人才培养 - 速递信息
  • ComfyUI-WanVideoWrapper:AI视频创作者的技术赋能平台
  • 基于Java的OPC DA客户端开发与常见问题解析
  • Zynq开发避坑指南:FDMA读写AXI总线时最常见的3个时序错误
  • BurpSuite新手避坑大全:从安装到解决界面错位的5个关键步骤(2024.10版实测)
  • 数字电路入门:手把手教你理解RS触发器的核心原理(附避坑指南)
  • GPT-4o与Gemini 3镜像站背后的算力与工程:大模型训练基础设施拆解
  • 显卡调用精细化:1%算力+1MB显存代码方案
  • 佳易王小餐馆点餐管理系统软件功能观察与使用体验
  • Linux 系统安全实战:从服务防护到入侵检测
  • 文墨共鸣新手指南:如何构造高质量测试文本以验证‘异曲同工’判别力
  • 从零搭建Telegram数据交互机器人:构建、集成与功能实战
  • OmniBench
  • LIO-SAM建图漂移?别急着调参,先检查你的IMU和雷达安装!
  • 6.3.1 软件->W3C XPath 1.0 标准(W3C Recommendation):XPath(XML Path Language)查询语言