当前位置: 首页 > news >正文

从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

在大规模数据处理场景中,YARN集群往往同时运行着数百个应用实例。当某个ETL任务因代码缺陷陷入死循环,或是某个临时查询占用了生产环境关键资源时,传统的手动操作方式显得力不从心。本文将通过实战案例,展示如何构建一个智能化的YARN应用管控系统,实现三个核心能力:实时状态巡检精准条件筛选批量操作执行

1. 构建YARN应用管理的基础工具链

1.1 环境准备与API接入

YARN ResourceManager的REST API是我们所有自动化操作的基础入口点。首先需要确保操作主机能够访问ResourceManager服务,并安装必要的命令行工具:

# 验证网络连通性 ping resourcemanager.example.com telnet resourcemanager.example.com 8088 # 安装基础工具包 sudo yum install -y curl jq python3-pip # CentOS/RHEL sudo apt-get install -y curl jq python3-pip # Ubuntu/Debian pip install requests pandas

提示:生产环境建议配置Kerberos认证,可通过kinit命令预先获取票据

1.2 应用状态查询的原型实现

通过组合curljq工具,可以快速构建应用列表查询功能:

#!/bin/bash RM_HOST="resourcemanager.example.com" API_URL="http://$RM_HOST:8088/ws/v1/cluster/apps" # 获取所有RUNNING状态的应用 curl -s "$API_URL" | jq '.apps.app[] | select(.state == "RUNNING")'

对应的Python实现版本更具扩展性:

import requests def fetch_yarn_apps(state='RUNNING'): api_url = "http://resourcemanager.example.com:8088/ws/v1/cluster/apps" params = {'states': state} if state else None response = requests.get(api_url, params=params) response.raise_for_status() return response.json().get('apps', {}).get('app', []) if __name__ == '__main__': running_apps = fetch_yarn_apps() print(f"Found {len(running_apps)} running applications")

2. 高级筛选策略的实现

2.1 多维度过滤条件设计

在实际运维中,我们需要根据多种条件组合筛选目标应用。下表列出了常见的过滤维度及其实现方式:

筛选维度数据类型示例值实现方法
用户字符串etl_user.user == "etl_user"
队列字符串production.queue == "production"
运行时长整数3600 (秒).elapsedTime > 3600000
资源占用浮点数50.0 (vcore占比).allocatedVCores > 50
应用类型枚举SPARK.applicationType == "SPARK"

2.2 动态过滤器的Python实现

通过封装过滤逻辑,可以构建灵活的筛选系统:

from datetime import datetime, timedelta class AppFilter: @staticmethod def by_user(apps, username): return [app for app in apps if app['user'] == username] @staticmethod def by_runtime(apps, hours): threshold = hours * 3600 * 1000 # 转为毫秒 return [app for app in apps if app['elapsedTime'] > threshold] @staticmethod def by_resource(apps, min_vcores, min_memory): return [ app for app in apps if app['allocatedVCores'] >= min_vcores and app['allocatedMB'] >= min_memory * 1024 ] # 使用示例 apps = fetch_yarn_apps() long_running = AppFilter.by_runtime(apps, hours=2) overloaded = AppFilter.by_resource(apps, min_vcores=8, min_memory=32)

3. 安全终止机制的实现

3.1 批量终止的防护措施

直接执行批量终止操作存在风险,需要建立防护机制:

  1. 二次确认机制:显示将被终止的应用详情,要求人工确认
  2. 模拟执行模式:仅输出将要执行的操作而不实际调用API
  3. 操作白名单:限制可操作的应用类型和用户范围
  4. 操作日志记录:详细记录每次终止操作的元数据

3.2 带防护的终止脚本实现

import logging from typing import List, Dict logging.basicConfig( filename='yarn_operations.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class YarnTerminator: def __init__(self, dry_run=False): self.dry_run = dry_run def safe_kill(self, app_ids: List[str], reason: str = ''): results = [] for app_id in app_ids: if self.dry_run: logging.info(f"DRY RUN: Would kill {app_id}") results.append({'id': app_id, 'status': 'simulated'}) continue try: url = f"http://resourcemanager.example.com:8088/ws/v1/cluster/apps/{app_id}/state" payload = {'state': 'KILLED'} headers = {'Content-Type': 'application/json'} response = requests.put(url, json=payload, headers=headers) if response.status_code == 200: logging.info(f"Killed {app_id} - Reason: {reason}") results.append({'id': app_id, 'status': 'success'}) else: logging.error(f"Failed to kill {app_id}: {response.text}") results.append({'id': app_id, 'status': 'failed'}) except Exception as e: logging.exception(f"Error killing {app_id}") results.append({'id': app_id, 'status': 'error'}) return results # 使用示例 terminator = YarnTerminator(dry_run=True) apps_to_kill = [app['id'] for app in overloaded] terminator.safe_kill(apps_to_kill, reason='Resource overcommit')

4. 系统集成与自动化调度

4.1 与监控系统对接

将YARN管理脚本集成到Prometheus监控系统中,可以通过Textfile Collector暴露指标:

#!/bin/bash OUTPUT_FILE="/var/lib/node_exporter/textfile_collector/yarn_metrics.prom" # 获取运行超时的应用数量 TIMEOUT_APPS=$(python3 yarn_manager.py --filter runtime=2h --count-only) cat <<EOF > "$OUTPUT_FILE" # HELP yarn_timeout_apps Number of applications running over 2 hours # TYPE yarn_timeout_apps gauge yarn_timeout_apps $TIMEOUT_APPS EOF

4.2 定时巡检的Crontab配置

设置定期执行的巡检任务:

# 每30分钟检查一次长时间运行的应用 */30 * * * * /usr/local/bin/yarn_manager.py --filter runtime=4h --notify # 每天凌晨清理测试环境残留应用 0 1 * * * /usr/local/bin/yarn_manager.py --queue test --older-than 24h --kill

4.3 完整的CLI工具封装

通过argparse模块创建功能完善的命令行工具:

import argparse def main(): parser = argparse.ArgumentParser(description='YARN Application Manager') parser.add_argument('--user', help='Filter by user') parser.add_argument('--queue', help='Filter by queue') parser.add_argument('--runtime', help='Filter by runtime (e.g. 2h, 30m)') parser.add_argument('--kill', action='store_true', help='Terminate matched apps') parser.add_argument('--dry-run', action='store_true', help='Simulate operations') parser.add_argument('--notify', action='store_true', help='Send alert notifications') args = parser.parse_args() # 应用过滤逻辑 filters = {} if args.user: filters['user'] = args.user if args.queue: filters['queue'] = args.queue if args.runtime: filters['runtime'] = parse_runtime(args.runtime) apps = fetch_yarn_apps() matched = apply_filters(apps, **filters) if args.kill: terminator = YarnTerminator(dry_run=args.dry_run) results = terminator.safe_kill([app['id'] for app in matched]) if args.notify and matched: send_notification(matched) def parse_runtime(time_str): # 实现时间字符串解析逻辑 pass

在实际项目中,这套系统将运维人员从重复的手动操作中解放出来。通过组合不同的过滤条件,可以精确控制操作范围,比如仅终止测试环境中运行超过8小时的Spark作业,或是清理特定用户提交的异常任务。

http://www.jsqmd.com/news/917903/

相关文章:

  • 2026年旧房翻新大揭秘!靠谱机构究竟该怎么选?
  • GPU资源利用率暴跌63%?揭秘Gemini v1.5部署后必踩的3类资源配置陷阱,今天不改明天告警爆炸
  • RoadRunner场景导入Carla
  • 树莓派超声波雷达系统:从硬件连接到Python实时扫描界面
  • 技术方案:Figma-to-JSON实现设计文件与结构化数据的双向转换
  • IDEA表数据复制到excle
  • 2026成都花园户型装修设计榜单|一楼庭院+顶楼露台花园专属装企推荐,避坑首选 - 资讯纵览
  • 基于Arduino的防酒驾系统:从传感器到物联网的嵌入式实战
  • 使用图像识别点击评论按钮
  • 2026哈尔滨防水补漏公司排名TOP5|本地专业防水补漏公司推荐 (全域极速上门) - 防水空鼓维修家
  • 物联网卡、流量卡、SIM 卡到底有什么区别?
  • SRC挖洞必备:用Eeyes棱眼快速整理目标C段资产(附实战避坑指南)
  • 2026年企业如何鉴别一家靠谱的AI搜索GEO服务商 - 品牌报告
  • 2026年新闻稿发布平台TOP10权威测评报告 - 资讯纵览
  • AI Agent Harness Engineering 与具身智能:当大脑拥有了身体
  • 2026 年宏碁入局智能眼镜市场,产品亮点不足,软件适配难题待解
  • Beyond Compare 5密钥生成器技术深度解析与实用指南
  • 工业应急指挥调度方案:实时态势感知,防控厂区安全隐患
  • 5.30 南京黄金回收,真实报价不玩虚的 - 资讯纵览
  • DS4Windows终极指南:让PS4/PS5手柄在Windows电脑上完美运行
  • 氙弧老化测试全参数解析:滤镜类型、辐照度与黑标温度设定
  • 步进梁加热炉炉温综合优化控制策略【附仿真】
  • 我让 3 个子 Agent 同时改同一个文件,没打架——因为偷了 Git 的一个冷门功能
  • 2026 常州geo优化公司推荐丨常州网络公司丨常州geo广告丨常州geo系统丨常州豆包优化公司推荐及电话联系 - 资讯纵览
  • 终极指南:如何快速解包Godot游戏资源文件
  • 微信机器人API接口:图片、文件、语音收发快速搞定
  • 5.30 天津黄金回收,今日大盘价无套路 - 资讯纵览
  • HotSpot VM源码剖析2026版开源!
  • 法律AI工具选型终极决策矩阵(含22家供应商穿透式测评+17项等保2.0/《人工智能法(草案)》适配度评分)
  • 小桌签 —— 一个编程小白用华为云码道(CodeArts),1 小时做出自己的第一个网页 App