Python自动化:调用企业微信API高效发送邮件通知
1. 为什么需要企业微信邮件自动化
每天手动发送几十封系统监控报告的日子,我实在受够了。作为运维工程师,最痛苦的就是半夜被报警电话吵醒,还要眯着眼睛登录邮箱发通知。直到发现企业微信的邮件API,才真正体会到自动化带来的解放感。
企业微信的邮件API特别适合这些场景:当服务器CPU超过阈值时自动发预警、每天早晨8点准时推送前日业务报表、审批流程结束后立即通知相关人员。传统邮件客户端需要人工操作,而API调用可以直接嵌入到现有系统中,实现全自动化流程。
我经手过的几个典型案例里,有个电商客户用这个方案把大促期间的故障响应时间从15分钟缩短到30秒。他们的风控系统一旦检测到异常交易,就会通过企业微信邮件API立即触发告警邮件,安全团队能第一时间介入处理。
2. 准备工作:获取API通行证
2.1 创建企业微信应用
登录企业微信管理后台,在"应用管理"里点击"创建应用"。建议取个见名知意的应用名称,比如"运维告警中心"或"财务审批通知"。创建成功后你会看到两个关键参数:AgentId(应用ID)和Secret(应用密钥),记得立即把Secret复制保存到安全的地方,页面刷新后就看不到了。
有个容易踩的坑是可见范围设置。去年我们给人事部门开发考勤系统时,忘记把财务部加入可见范围,导致工资条邮件全部发送失败。所以务必在"应用可见范围"里添加所有需要接收邮件的部门成员。
2.2 获取Access Token
Access Token相当于API调用的临时通行证,有效期通常为2小时。获取方式很简单,用企业ID(CorpID)、应用Secret就能换取:
import requests def get_access_token(corpid, corpsecret): url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}" response = requests.get(url).json() return response['access_token']这里有个性能优化点:实际项目中应该全局缓存这个Token,而不是每次发邮件都重新获取。我通常用Redis设置7100秒过期(比官方2小时稍短),这样既避免频繁调用,又不会遇到Token失效的情况。
3. 发送邮件的核心代码实现
3.1 基础邮件发送函数
下面这个增强版发送函数,已经处理了大多数业务场景的需求:
def send_wechat_work_email(access_token, agent_id, subject, content, to_users, cc_users=None): url = "https://qyapi.weixin.qq.com/cgi-bin/message/send" params = {"access_token": access_token} msg_content = f"邮件主题:{subject}\n\n{content}" if len(msg_content) > 2048: msg_content = msg_content[:2045] + "..." # 企业微信消息长度限制 data = { "touser": "|".join(to_users) if isinstance(to_users, list) else to_users, "msgtype": "text", "agentid": agent_id, "text": {"content": msg_content}, "safe": 0 } if cc_users: data["touser"] += f"|{'|'.join(cc_users) if isinstance(cc_users, list) else cc_users}" try: response = requests.post(url, params=params, json=data, timeout=10) result = response.json() if result['errcode'] != 0: raise Exception(f"发送失败:{result['errmsg']}") return True except Exception as e: print(f"邮件发送异常:{str(e)}") return False这个版本相比基础实现有几个改进点:
- 自动处理了消息长度限制(2048字符)
- 支持抄送功能(cc_users参数)
- 添加了超时机制和异常捕获
- 兼容单个用户字符串和用户列表两种输入格式
3.2 发送带附件的邮件
企业微信API本身不支持直接附件上传,但可以通过变通方案实现。先把文件上传到企业微信临时素材库,然后在邮件内容里插入下载链接:
def upload_file(access_token, file_path): url = f"https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type=file" with open(file_path, 'rb') as f: files = {'media': f} response = requests.post(url, files=files) return response.json()['media_id'] def send_email_with_attachment(access_token, agent_id, subject, content, to_users, file_path): media_id = upload_file(access_token, file_path) file_name = os.path.basename(file_path) download_url = f"https://qyapi.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}" full_content = f"{content}\n\n附件:{file_name}\n下载链接:{download_url}" return send_wechat_work_email(access_token, agent_id, subject, full_content, to_users)实测发现临时素材默认保存3天,对于重要文件建议同时发送到企业邮箱做长期存档。
4. 生产环境实战技巧
4.1 邮件模板管理
直接拼接字符串的方式在复杂邮件场景会很痛苦。我推荐使用Jinja2模板引擎,把邮件内容抽象成模板文件:
from jinja2 import Environment, FileSystemLoader env = Environment(loader=FileSystemLoader('templates')) template = env.get_template('alert_email.j2') def send_alert_email(device_name, error_msg, timestamp): content = template.render( device=device_name, error=error_msg, time=timestamp, contact_person="王工程师" ) send_wechat_work_email(access_token, agent_id, "设备告警通知", content, ["ops_team"])对应的模板文件templates/alert_email.j2内容示例:
【紧急告警】设备 {{ device }} 发生异常 错误详情:{{ error }} 发生时间:{{ time|datetime_format }} 请立即联系值班工程师:{{ contact_person }}4.2 异步发送与重试机制
高峰期同步发送邮件可能导致主程序阻塞。用Celery实现异步任务队列是个不错的选择:
from celery import Celery app = Celery('email_tasks', broker='redis://localhost:6379/0') @app.task(bind=True, max_retries=3) def async_send_email(self, email_args): try: result = send_wechat_work_email(**email_args) if not result: raise self.retry(countdown=60) except Exception as e: raise self.retry(exc=e)调用时只需将原来的同步调用改为:
async_send_email.delay({ "access_token": current_token, "agent_id": AGENT_ID, "subject": "季度报表", "content": report_content, "to_users": ["finance@company.com"] })4.3 监控与报警
建议对所有API调用添加监控指标。使用Prometheus客户端记录发送成功率和延迟:
from prometheus_client import Counter, Histogram SEND_COUNTER = Counter('wechat_email_sends_total', 'Total sent emails', ['status']) SEND_DURATION = Histogram('wechat_email_duration_seconds', 'Email sending latency') @SEND_DURATION.time() def send_with_metrics(access_token, agent_id, subject, content, to_users): try: success = send_wechat_work_email(access_token, agent_id, subject, content, to_users) SEND_COUNTER.labels(status='success' if success else 'fail').inc() return success except Exception: SEND_COUNTER.labels(status='error').inc() raise当连续出现发送失败时,可以通过企业微信反向给自己发报警消息,形成闭环监控。
