监控邮箱/邮箱自动回复/python
主题:QQ邮箱的实时监控和自动回复
实现QQ邮箱的实时监控和自动回复
思路(代码):
1. 获取QQ邮箱授权码
只有开启了QQ邮箱的IMAP SMTP服务,才能
路径:登录QQ邮箱->设置->账号与安全->开启IMAP/SMTP服务
大概在这个位置你会看到一个 ““POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务”类似的内容,点击下方的“开启服务”,我这边因为已经开启过了所以看不到
补充(了解即可):
开启IMAP/SMTP服务,按照提示生成一串16位的授权码,务必保存好
2. 读取配置文件
创建配置文件:
读取配置文件:
def load_config(config_file): """加载配置文件(如果需要)""" try: with open(config_file, 'r', encoding='utf-8') as f: config = {} for line in f: if line.strip() and not line.startswith('#'): key, value = line.split('=', 1) config[key.strip()] = value.strip() return config except FileNotFoundError: print(f"配置文件 {config_file} 未找到。使用默认配置。") return {} CONFIG = load_config(r'./QQ_mail.txt') # ---------- 配置区域 ---------- IMAP_SERVER = CONFIG.get('IMAP_SERVER', 'imap.qq.com') EMAIL_ACCOUNT = CONFIG.get('EMAIL_ACCOUNT') # 你的QQ邮箱 EMAIL_PASSWORD = CONFIG.get('EMAIL_PASSWORD') # 替换成16位授权码 CHECK_INTERVAL = CONFIG.get('CHECK_INTERVAL', 180) # 检查间隔(秒) SMTP_SERVER = CONFIG.get('SMTP_SERVER', 'smtp.qq.com') # ---------------------------3. 解码邮件头部信息
def decode_email_header(header_value): """解码邮件头部信息(处理中文乱码)""" if header_value is None: return "" decoded_parts = [] for part, encoding in decode_header(header_value): if isinstance(part, bytes): try: if encoding: decoded_parts.append(part.decode(encoding)) else: decoded_parts.append(part.decode('utf-8', errors='ignore')) except: try: decoded_parts.append(part.decode('gbk', errors='ignore')) except: decoded_parts.append(part.decode('utf-8', errors='ignore')) else: decoded_parts.append(part) return ''.join(decoded_parts)4. 从发件人字符串中提取邮箱地址
def get_sender_email(sender_string): """从发件人字符串中提取邮箱地址""" # 匹配邮箱地址的正则表达式 email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+' match = re.search(email_pattern, sender_string) if match: return match.group(0) return sender_string5. 判断哪些邮箱应该自动回复
有的邮件是系统邮件,可以不用回复
def should_auto_reply(sender_email): """判断是否应该自动回复""" # 检查黑名单 if sender_email in BLACKLIST: return False # 检查是否是系统邮件 system_keywords = ['postmaster', 'mailer-daemon', 'noreply', 'no-reply', 'service'] sender_lower = sender_email.lower() for keyword in system_keywords: if keyword in sender_lower: return False # 检查是否是自己的邮件 if sender_email == EMAIL_ACCOUNT: return False return True6. 提取邮件正文
def get_email_body(msg): """提取邮件正文""" body = "" try: if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) # 只提取纯文本内容,跳过附件 if content_type == "text/plain" and "attachment" not in content_disposition: try: payload = part.get_payload(decode=True) if payload: body = payload.decode('utf-8', errors='ignore') break except: try: body = part.get_payload(decode=True).decode('gbk', errors='ignore') break except: pass else: # 非 multipart 邮件 payload = msg.get_payload(decode=True) if payload: try: body = payload.decode('utf-8', errors='ignore') except: body = payload.decode('gbk', errors='ignore') except Exception as e: print(f" 提取正文时出错: {e}") return body[:500] # 限制长度7. 生成回复内容
def generate_reply_content(original_sender, original_subject, original_content=""): """生成回复内容""" reply = f"""您好, 感谢您的来信! 我是自动回复机器人,已收到您的邮件: 主题:{original_subject} 您的邮件对我们很重要,我会尽快人工回复您。 祝好! {EMAIL_ACCOUNT} (自动回复,请勿直接回复此邮件)""" return reply8. 发送回复邮件
def send_reply(to_email, subject, content): """发送回复邮件 - 增强错误处理""" try: # 创建邮件对象 msg = MIMEMultipart() msg['From'] = EMAIL_ACCOUNT msg['To'] = to_email msg['Subject'] = f"Re: {subject}" # 添加邮件正文 msg.attach(MIMEText(content, 'plain', 'utf-8')) # 使用更稳定的SMTP连接方式 server = smtplib.SMTP_SSL(SMTP_SERVER, 465, timeout=30) # 添加调试信息(可选) # server.set_debuglevel(1) # 登录 server.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) # 发送邮件 server.send_message(msg) # 关闭连接 server.quit() print(f" ✓ 已回复: {to_email}") return True except smtplib.SMTPServerDisconnected as e: print(f" ✗ SMTP连接断开: {e}") return False except smtplib.SMTPAuthenticationError as e: print(f" ✗ 认证失败,请检查授权码: {e}") return False except Exception as e: print(f" ✗ 发送失败: {e}") return False def mark_as_replied(mail, email_id): """标记邮件为已处理""" try: # 标记为已读 mail.store(email_id, '+FLAGS', '\\Seen') print(f" 已标记邮件为已读") except Exception as e: print(f" 标记失败: {e}")9. 获取未读邮件并逐一回复
def process_and_reply(): """主函数:获取未读邮件并逐一回复""" try: # 连接IMAP服务器 print("\n🔌 正在连接IMAP服务器...") mail = imaplib.IMAP4_SSL(IMAP_SERVER, timeout=30) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) mail.select('INBOX') # 搜索未读邮件 status, messages = mail.search(None, 'UNSEEN') if status != 'OK': return 0 email_ids = messages[0].split() if not email_ids: print("📭 没有未读邮件") return 0 print(f"\n📬 发现 {len(email_ids)} 封未读邮件") # 过滤掉系统邮件和黑名单 valid_emails = [] for email_id in email_ids: status, data = mail.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])') if status == 'OK': raw_header = data[0][1] msg = email.message_from_bytes(raw_header) from_raw = msg.get('From', '') sender_email = get_sender_email(from_raw) if should_auto_reply(sender_email): valid_emails.append(email_id) print(f" ✓ 有效邮件: {sender_email}") else: print(f" ⏭ 跳过黑名单: {sender_email}") # 标记为已读避免重复处理 mail.store(email_id, '+FLAGS', '\\Seen') if not valid_emails: print("📭 没有需要回复的有效邮件") return 0 print(f"\n🤖 开始回复 {len(valid_emails)} 封有效邮件...") replied_count = 0 for email_id in valid_emails: print(f"\n--- 处理邮件 ID: {email_id.decode()} ---") # 获取完整邮件内容 status, data = mail.fetch(email_id, '(RFC822)') if status != 'OK': print(f" 无法获取邮件内容") continue raw_email = data[0][1] msg = email.message_from_bytes(raw_email) # 提取邮件信息 from_raw = msg.get('From', '') from_addr = decode_email_header(from_raw) subject_raw = msg.get('Subject', '无主题') subject = decode_email_header(subject_raw) sender_email = get_sender_email(from_addr) print(f" 发件人: {from_addr}") print(f" 邮箱: {sender_email}") print(f" 主题: {subject}") # 提取邮件正文 body = get_email_body(msg) # 生成回复内容 reply_content = generate_reply_content(sender_email, subject, body[:200]) # 发送回复 if send_reply(sender_email, subject, reply_content): replied_count += 1 # 标记已处理 mark_as_replied(mail, email_id) # 等待一下,避免发送过快 time.sleep(2) print(f"\n✅ 处理完成!共回复 {replied_count} 封邮件") return replied_count except imaplib.IMAP4.error as e: print(f"❌ IMAP连接错误: {e}") return 0 except Exception as e: print(f"❌ 处理过程中出错: {e}") import traceback traceback.print_exc() return 0 finally: try: mail.close() mail.logout() except: pass10. 定时监控并自动回复
在循环内定时执行 process_and_reply函数
def auto_reply_loop(): """持续监控并自动回复""" print("=" * 60) print(f"🤖 自动回复系统已启动") print(f"📧 监控邮箱: {EMAIL_ACCOUNT}") print(f"⏱️ 检查间隔: {CHECK_INTERVAL}秒") print(f"🚫 黑名单: {', '.join(BLACKLIST)}") print("=" * 60) while True: try: process_and_reply() print(f"\n⏰ 等待 {CHECK_INTERVAL} 秒后继续监控...") time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: print("\n\n👋 用户中断,自动回复系统已停止") break except Exception as e: print(f"⚠️ 运行错误: {e}") time.sleep(CHECK_INTERVAL)11. 测试
if __name__ == '__main__': print(load_config(r'./QQ_mail.txt') ) print(f"开始监控邮箱: {EMAIL_ACCOUNT}") auto_reply_loop()12. 程序运行结果
PS D:\workspace\email-automation> uv run python monitor.py {'IMAP_SERVER': 'imap.qq.com', 'EMAIL_ACCOUNT': 'spz_0911@qq.com', 'EMAIL_PASSWORD': 'jiugrsrwpmddbadi'} 开始监控邮箱: spz_0911@qq.com ============================================================ 🤖 自动回复系统已启动 📧 监控邮箱: spz_0911@qq.com ⏱️ 检查间隔: 180秒 🚫 黑名单: PostMaster@qq.com, mailer-daemon@qq.com, noreply@qq.com, service@qq.com, spz_0911@qq.com ============================================================ 🔌 正在连接IMAP服务器... 📬 发现 32 封未读邮件 ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ✓ 有效邮件: campus@mailf.wisdomore.com ✓ 有效邮件: campus@mailf.wisdomore.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: service@quickjobs.51job.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ⏭ 跳过黑名单: noreply@redditmail.com ✓ 有效邮件: MyClaw@aisecret.us ⏭ 跳过黑名单: no-reply@github.com ✓ 有效邮件: hi@cursor.com ✓ 有效邮件: position-tracking@semrush.com ✓ 有效邮件: product@mailsend1.jijyun.cn ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com ⏭ 跳过黑名单: PostMaster@qq.com 🤖 开始回复 6 封有效邮件... --- 处理邮件 ID: 155 --- 发件人: 智联招聘 <campus@mailf.wisdomore.com> 邮箱: campus@mailf.wisdomore.com 主题: 尊敬的曾同学【智联推荐】盒马前置仓校招专项来啦! ✓ 已回复: campus@mailf.wisdomore.com 已标记邮件为已读 --- 处理邮件 ID: 156 --- 发件人: 智联招聘 <campus@mailf.wisdomore.com> 邮箱: campus@mailf.wisdomore.com 主题: 尊敬的曾同学【智联推荐】盒马前置仓校招专项来啦! ✓ 已回复: campus@mailf.wisdomore.com 已标记邮件为已读 --- 处理邮件 ID: 198 --- 发件人: MyClaw Newsletter <MyClaw@aisecret.us> 邮箱: MyClaw@aisecret.us 主题: 🦞 OpenClaw Instances Widely Exposed ✓ 已回复: MyClaw@aisecret.us 已标记邮件为已读 --- 处理邮件 ID: 274 --- 发件人: Cursor <hi@cursor.com> 邮箱: hi@cursor.com 主题: Here to help ✓ 已回复: hi@cursor.com 已标记邮件为已读 --- 处理邮件 ID: 278 --- 发件人: Semrush Position Tracking <position-tracking@semrush.com> 邮箱: position-tracking@semrush.com 主题: Position Tracking Update (originmattress.com.au) ✓ 已回复: position-tracking@semrush.com 已标记邮件为已读 --- 处理邮件 ID: 279 --- 发件人: 集简云 <product@mailsend1.jijyun.cn> 邮箱: product@mailsend1.jijyun.cn 主题: 欢迎加入集简云! ✓ 已回复: product@mailsend1.jijyun.cn 已标记邮件为已读 ✅ 处理完成!共回复 6 封邮件 ⏰ 等待 180 秒后继续监控...完整代码
import imaplib import smtplib import time import email from email.header import decode_header from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import re def load_config(config_file): """加载配置文件(如果需要)""" try: with open(config_file, 'r', encoding='utf-8') as f: config = {} for line in f: if line.strip() and not line.startswith('#'): key, value = line.split('=', 1) config[key.strip()] = value.strip() return config except FileNotFoundError: print(f"配置文件 {config_file} 未找到。使用默认配置。") return {} CONFIG = load_config(r'./QQ_mail.txt') # ---------- 配置区域 ---------- IMAP_SERVER = CONFIG.get('IMAP_SERVER', 'imap.qq.com') EMAIL_ACCOUNT = CONFIG.get('EMAIL_ACCOUNT') # 你的QQ邮箱 EMAIL_PASSWORD = CONFIG.get('EMAIL_PASSWORD') # 替换成16位授权码 CHECK_INTERVAL = CONFIG.get('CHECK_INTERVAL', 180) # 检查间隔(秒) SMTP_SERVER = CONFIG.get('SMTP_SERVER', 'smtp.qq.com') # --------------------------- BLACKLIST = [ 'PostMaster@qq.com', 'mailer-daemon@qq.com', 'noreply@qq.com', 'service@qq.com', 'spz_0911@qq.com' # 自己的邮箱 ] def decode_email_header(header_value): """解码邮件头部信息(处理中文乱码)""" if header_value is None: return "" decoded_parts = [] for part, encoding in decode_header(header_value): if isinstance(part, bytes): try: if encoding: decoded_parts.append(part.decode(encoding)) else: decoded_parts.append(part.decode('utf-8', errors='ignore')) except: try: decoded_parts.append(part.decode('gbk', errors='ignore')) except: decoded_parts.append(part.decode('utf-8', errors='ignore')) else: decoded_parts.append(part) return ''.join(decoded_parts) def get_sender_email(sender_string): """从发件人字符串中提取邮箱地址""" # 匹配邮箱地址的正则表达式 email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+' match = re.search(email_pattern, sender_string) if match: return match.group(0) return sender_string def should_auto_reply(sender_email): """判断是否应该自动回复""" # 检查黑名单 if sender_email in BLACKLIST: return False # 检查是否是系统邮件 system_keywords = ['postmaster', 'mailer-daemon', 'noreply', 'no-reply', 'service'] sender_lower = sender_email.lower() for keyword in system_keywords: if keyword in sender_lower: return False # 检查是否是自己的邮件 if sender_email == EMAIL_ACCOUNT: return False return True def generate_reply_content(original_sender, original_subject, original_content=""): """生成回复内容""" reply = f"""您好, 感谢您的来信! 我是自动回复机器人,已收到您的邮件: 主题:{original_subject} 您的邮件对我们很重要,我会尽快人工回复您。 祝好! {EMAIL_ACCOUNT} (自动回复,请勿直接回复此邮件)""" return reply def send_reply(to_email, subject, content): """发送回复邮件 - 增强错误处理""" try: # 创建邮件对象 msg = MIMEMultipart() msg['From'] = EMAIL_ACCOUNT msg['To'] = to_email msg['Subject'] = f"Re: {subject}" # 添加邮件正文 msg.attach(MIMEText(content, 'plain', 'utf-8')) # 使用更稳定的SMTP连接方式 server = smtplib.SMTP_SSL(SMTP_SERVER, 465, timeout=30) # 添加调试信息(可选) # server.set_debuglevel(1) # 登录 server.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) # 发送邮件 server.send_message(msg) # 关闭连接 server.quit() print(f" ✓ 已回复: {to_email}") return True except smtplib.SMTPServerDisconnected as e: print(f" ✗ SMTP连接断开: {e}") return False except smtplib.SMTPAuthenticationError as e: print(f" ✗ 认证失败,请检查授权码: {e}") return False except Exception as e: print(f" ✗ 发送失败: {e}") return False def mark_as_replied(mail, email_id): """标记邮件为已处理""" try: # 标记为已读 mail.store(email_id, '+FLAGS', '\\Seen') print(f" 已标记邮件为已读") except Exception as e: print(f" 标记失败: {e}") def get_email_body(msg): """提取邮件正文""" body = "" try: if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) # 只提取纯文本内容,跳过附件 if content_type == "text/plain" and "attachment" not in content_disposition: try: payload = part.get_payload(decode=True) if payload: body = payload.decode('utf-8', errors='ignore') break except: try: body = part.get_payload(decode=True).decode('gbk', errors='ignore') break except: pass else: # 非 multipart 邮件 payload = msg.get_payload(decode=True) if payload: try: body = payload.decode('utf-8', errors='ignore') except: body = payload.decode('gbk', errors='ignore') except Exception as e: print(f" 提取正文时出错: {e}") return body[:500] # 限制长度 def process_and_reply(): """主函数:获取未读邮件并逐一回复""" try: # 连接IMAP服务器 print("\n🔌 正在连接IMAP服务器...") mail = imaplib.IMAP4_SSL(IMAP_SERVER, timeout=30) mail.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) mail.select('INBOX') # 搜索未读邮件 status, messages = mail.search(None, 'UNSEEN') if status != 'OK': return 0 email_ids = messages[0].split() if not email_ids: print("📭 没有未读邮件") return 0 print(f"\n📬 发现 {len(email_ids)} 封未读邮件") # 过滤掉系统邮件和黑名单 valid_emails = [] for email_id in email_ids: status, data = mail.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT)])') if status == 'OK': raw_header = data[0][1] msg = email.message_from_bytes(raw_header) from_raw = msg.get('From', '') sender_email = get_sender_email(from_raw) if should_auto_reply(sender_email): valid_emails.append(email_id) print(f" ✓ 有效邮件: {sender_email}") else: print(f" ⏭ 跳过黑名单: {sender_email}") # 标记为已读避免重复处理 mail.store(email_id, '+FLAGS', '\\Seen') if not valid_emails: print("📭 没有需要回复的有效邮件") return 0 print(f"\n🤖 开始回复 {len(valid_emails)} 封有效邮件...") replied_count = 0 for email_id in valid_emails: print(f"\n--- 处理邮件 ID: {email_id.decode()} ---") # 获取完整邮件内容 status, data = mail.fetch(email_id, '(RFC822)') if status != 'OK': print(f" 无法获取邮件内容") continue raw_email = data[0][1] msg = email.message_from_bytes(raw_email) # 提取邮件信息 from_raw = msg.get('From', '') from_addr = decode_email_header(from_raw) subject_raw = msg.get('Subject', '无主题') subject = decode_email_header(subject_raw) sender_email = get_sender_email(from_addr) print(f" 发件人: {from_addr}") print(f" 邮箱: {sender_email}") print(f" 主题: {subject}") # 提取邮件正文 body = get_email_body(msg) # 生成回复内容 reply_content = generate_reply_content(sender_email, subject, body[:200]) # 发送回复 if send_reply(sender_email, subject, reply_content): replied_count += 1 # 标记已处理 mark_as_replied(mail, email_id) # 等待一下,避免发送过快 time.sleep(2) print(f"\n✅ 处理完成!共回复 {replied_count} 封邮件") return replied_count except imaplib.IMAP4.error as e: print(f"❌ IMAP连接错误: {e}") return 0 except Exception as e: print(f"❌ 处理过程中出错: {e}") import traceback traceback.print_exc() return 0 finally: try: mail.close() mail.logout() except: pass def auto_reply_loop(): """持续监控并自动回复""" print("=" * 60) print(f"🤖 自动回复系统已启动") print(f"📧 监控邮箱: {EMAIL_ACCOUNT}") print(f"⏱️ 检查间隔: {CHECK_INTERVAL}秒") print(f"🚫 黑名单: {', '.join(BLACKLIST)}") print("=" * 60) while True: try: process_and_reply() print(f"\n⏰ 等待 {CHECK_INTERVAL} 秒后继续监控...") time.sleep(CHECK_INTERVAL) except KeyboardInterrupt: print("\n\n👋 用户中断,自动回复系统已停止") break except Exception as e: print(f"⚠️ 运行错误: {e}") time.sleep(CHECK_INTERVAL) if __name__ == '__main__': print(load_config(r'./QQ_mail.txt') ) print(f"开始监控邮箱: {EMAIL_ACCOUNT}") auto_reply_loop()