027-邮件处理email和smtplib
Python邮件处理:email与smtplib模块详解 摘要:本文介绍了Python中处理电子邮件的两个核心模块——email和smtplib。email模块用于构建邮件内容,支持文本、HTML格式及附件处理;smtplib模块则通过SMTP协议实现邮件发送功能。文章提供了三种邮件发送示例:简单文本邮件、HTML格式邮件(含CSS样式和图片)以及带附件的邮件(支持多种文件类型)。关键代码展示了如
·
027-邮件处理email和smtplib
1. 模块概述
1.1 email模块简介
email
模块提供了处理电子邮件消息的完整功能,包括解析、构造和发送邮件。它支持MIME(多用途互联网邮件扩展)标准,可以处理文本、HTML、附件等各种邮件内容。
1.2 smtplib模块简介
smtplib
模块实现了SMTP(简单邮件传输协议)客户端,用于发送邮件到任何支持SMTP的邮件服务器。
1.3 核心功能
- 邮件构造:创建文本、HTML、多部分邮件
- 附件处理:添加文件、图片等附件
- 邮件解析:解析接收到的邮件内容
- SMTP发送:通过SMTP服务器发送邮件
- 安全连接:支持SSL/TLS加密连接
2. 基本概念和使用
2.1 基本邮件发送
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os
def send_simple_email():
"""发送简单文本邮件"""
# 邮件配置
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "your_email@gmail.com"
sender_password = "your_password"
receiver_email = "receiver@example.com"
# 创建邮件内容
message = MIMEText("这是一封测试邮件的内容。", "plain", "utf-8")
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = "测试邮件"
try:
# 连接SMTP服务器
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls() # 启用TLS加密
server.login(sender_email, sender_password)
# 发送邮件
text = message.as_string()
server.sendmail(sender_email, receiver_email, text)
server.quit()
print("邮件发送成功!")
except Exception as e:
print(f"邮件发送失败: {e}")
# 运行示例
if __name__ == "__main__":
send_simple_email()
2.2 HTML邮件发送
def send_html_email():
"""发送HTML格式邮件"""
# 邮件配置
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "your_email@gmail.com"
sender_password = "your_password"
receiver_email = "receiver@example.com"
# 创建HTML内容
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>HTML邮件</title>
</head>
<body>
<h1 style="color: #333;">欢迎使用Python邮件系统</h1>
<p>这是一封<strong>HTML格式</strong>的邮件。</p>
<ul>
<li>支持富文本格式</li>
<li>支持图片和链接</li>
<li>支持CSS样式</li>
</ul>
<p>
<a href="https://python.org" style="color: #0066cc;">访问Python官网</a>
</p>
<img src="https://www.python.org/static/img/python-logo.png"
alt="Python Logo" style="max-width: 200px;">
</body>
</html>
"""
# 创建邮件对象
message = MIMEMultipart("alternative")
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = "HTML格式邮件测试"
# 添加HTML内容
html_part = MIMEText(html_content, "html", "utf-8")
message.attach(html_part)
# 也可以添加纯文本版本
text_content = """
欢迎使用Python邮件系统
这是一封HTML格式的邮件。
功能特点:
- 支持富文本格式
- 支持图片和链接
- 支持CSS样式
访问Python官网: https://python.org
"""
text_part = MIMEText(text_content, "plain", "utf-8")
message.attach(text_part)
try:
# 发送邮件
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_email, sender_password)
text = message.as_string()
server.sendmail(sender_email, receiver_email, text)
server.quit()
print("HTML邮件发送成功!")
except Exception as e:
print(f"HTML邮件发送失败: {e}")
# 运行示例
if __name__ == "__main__":
send_html_email()
3. 高级邮件功能
3.1 带附件的邮件
import mimetypes
from pathlib import Path
class EmailWithAttachments:
"""带附件的邮件处理类"""
def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.sender_email = sender_email
self.sender_password = sender_password
def create_message(self, to_email, subject, body, is_html=False):
"""创建邮件消息"""
message = MIMEMultipart()
message["From"] = self.sender_email
message["To"] = to_email
message["Subject"] = subject
# 添加邮件正文
if is_html:
message.attach(MIMEText(body, "html", "utf-8"))
else:
message.attach(MIMEText(body, "plain", "utf-8"))
return message
def add_attachment(self, message, file_path):
"""添加附件"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"附件文件不存在: {file_path}")
# 获取文件信息
filename = os.path.basename(file_path)
# 猜测MIME类型
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = "application/octet-stream"
main_type, sub_type = mime_type.split('/', 1)
# 读取文件内容
with open(file_path, "rb") as attachment:
if main_type == "text":
# 文本文件
part = MIMEText(attachment.read().decode('utf-8'), sub_type)
elif main_type == "image":
# 图片文件
from email.mime.image import MIMEImage
part = MIMEImage(attachment.read(), sub_type)
elif main_type == "audio":
# 音频文件
from email.mime.audio import MIMEAudio
part = MIMEAudio(attachment.read(), sub_type)
else:
# 其他文件类型
part = MIMEBase(main_type, sub_type)
part.set_payload(attachment.read())
encoders.encode_base64(part)
# 设置附件头部
part.add_header(
"Content-Disposition",
f"attachment; filename= {filename}",
)
message.attach(part)
return message
def send_email_with_attachments(self, to_email, subject, body,
attachments=None, is_html=False):
"""发送带附件的邮件"""
try:
# 创建邮件消息
message = self.create_message(to_email, subject, body, is_html)
# 添加附件
if attachments:
for attachment_path in attachments:
self.add_attachment(message, attachment_path)
# 连接SMTP服务器并发送
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.sender_email, self.sender_password)
text = message.as_string()
server.sendmail(self.sender_email, to_email, text)
server.quit()
print(f"邮件发送成功!附件数量: {len(attachments) if attachments else 0}")
return True
except Exception as e:
print(f"邮件发送失败: {e}")
return False
def attachment_example():
"""附件邮件示例"""
# 创建邮件发送器
email_sender = EmailWithAttachments(
smtp_server="smtp.gmail.com",
smtp_port=587,
sender_email="your_email@gmail.com",
sender_password="your_password"
)
# 准备邮件内容
subject = "带附件的邮件测试"
body = """
您好!
这是一封带有附件的测试邮件。
附件包含:
1. 文档文件
2. 图片文件
3. 数据文件
请查收!
此致
敬礼!
"""
# 准备附件列表(确保这些文件存在)
attachments = [
"document.pdf",
"image.jpg",
"data.csv"
]
# 发送邮件
email_sender.send_email_with_attachments(
to_email="receiver@example.com",
subject=subject,
body=body,
attachments=attachments
)
# 运行示例
if __name__ == "__main__":
attachment_example()
3.2 批量邮件发送
import time
import csv
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor
class BulkEmailSender:
"""批量邮件发送器"""
def __init__(self, smtp_config):
self.smtp_config = smtp_config
self.sent_count = 0
self.failed_count = 0
self.lock = threading.Lock()
def load_recipients_from_csv(self, csv_file):
"""从CSV文件加载收件人列表"""
recipients = []
try:
with open(csv_file, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
recipients.append({
'email': row.get('email', ''),
'name': row.get('name', ''),
'company': row.get('company', ''),
'custom_data': {k: v for k, v in row.items()
if k not in ['email', 'name', 'company']}
})
except Exception as e:
print(f"读取CSV文件失败: {e}")
return recipients
def create_personalized_message(self, recipient, template, subject_template):
"""创建个性化邮件消息"""
# 替换模板中的占位符
personalized_body = template.format(
name=recipient.get('name', '用户'),
company=recipient.get('company', '贵公司'),
email=recipient.get('email', ''),
**recipient.get('custom_data', {})
)
personalized_subject = subject_template.format(
name=recipient.get('name', '用户'),
company=recipient.get('company', '贵公司'),
**recipient.get('custom_data', {})
)
# 创建邮件消息
message = MIMEMultipart()
message["From"] = self.smtp_config['sender_email']
message["To"] = recipient['email']
message["Subject"] = personalized_subject
# 添加HTML内容
message.attach(MIMEText(personalized_body, "html", "utf-8"))
return message
def send_single_email(self, recipient, template, subject_template, delay=0):
"""发送单封邮件"""
try:
# 创建个性化消息
message = self.create_personalized_message(
recipient, template, subject_template
)
# 连接SMTP服务器
server = smtplib.SMTP(
self.smtp_config['smtp_server'],
self.smtp_config['smtp_port']
)
server.starttls()
server.login(
self.smtp_config['sender_email'],
self.smtp_config['sender_password']
)
# 发送邮件
text = message.as_string()
server.sendmail(
self.smtp_config['sender_email'],
recipient['email'],
text
)
server.quit()
# 更新计数器
with self.lock:
self.sent_count += 1
print(f"✓ 邮件发送成功: {recipient['email']}")
# 延迟(避免被标记为垃圾邮件)
if delay > 0:
time.sleep(delay)
return True
except Exception as e:
with self.lock:
self.failed_count += 1
print(f"✗ 邮件发送失败 {recipient['email']}: {e}")
return False
def send_bulk_emails(self, recipients, template, subject_template,
max_workers=5, delay=1):
"""批量发送邮件"""
print(f"开始批量发送邮件,收件人数量: {len(recipients)}")
print(f"最大并发数: {max_workers}, 发送间隔: {delay}秒")
start_time = datetime.now()
# 使用线程池并发发送
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for recipient in recipients:
future = executor.submit(
self.send_single_email,
recipient,
template,
subject_template,
delay
)
futures.append(future)
# 等待所有任务完成
for future in futures:
future.result()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 打印统计信息
print(f"\n批量发送完成!")
print(f"总耗时: {duration:.2f}秒")
print(f"成功发送: {self.sent_count}")
print(f"发送失败: {self.failed_count}")
print(f"成功率: {(self.sent_count / len(recipients) * 100):.1f}%")
def bulk_email_example():
"""批量邮件发送示例"""
# SMTP配置
smtp_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'sender_email': 'your_email@gmail.com',
'sender_password': 'your_password'
}
# 创建批量发送器
bulk_sender = BulkEmailSender(smtp_config)
# 模拟收件人数据(实际使用时从CSV文件加载)
recipients = [
{
'email': 'user1@example.com',
'name': '张三',
'company': 'ABC公司',
'custom_data': {'position': '经理'}
},
{
'email': 'user2@example.com',
'name': '李四',
'company': 'XYZ公司',
'custom_data': {'position': '总监'}
},
# 更多收件人...
]
# 邮件模板
subject_template = "致{company}{name}的重要通知"
email_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>重要通知</title>
</head>
<body>
<h2>尊敬的{name},您好!</h2>
<p>感谢您对我们产品的关注。</p>
<p>我们很高兴地通知您,{company}已被选为我们的重要合作伙伴。</p>
<p>作为{position},您将享受以下特殊待遇:</p>
<ul>
<li>专属客户经理服务</li>
<li>优先技术支持</li>
<li>定制化解决方案</li>
</ul>
<p>如有任何疑问,请随时联系我们。</p>
<p>此致<br>敬礼!</p>
<hr>
<p><small>此邮件发送至: {email}</small></p>
</body>
</html>
"""
# 发送批量邮件
bulk_sender.send_bulk_emails(
recipients=recipients,
template=email_template,
subject_template=subject_template,
max_workers=3,
delay=2
)
# 运行示例
if __name__ == "__main__":
bulk_email_example()
4. 邮件解析和处理
4.1 邮件解析器
import email
from email.header import decode_header
import imaplib
import base64
class EmailParser:
"""邮件解析器"""
def __init__(self):
self.parsed_emails = []
def decode_mime_words(self, s):
"""解码MIME编码的字符串"""
decoded_parts = decode_header(s)
decoded_string = ""
for part, encoding in decoded_parts:
if isinstance(part, bytes):
if encoding:
decoded_string += part.decode(encoding)
else:
decoded_string += part.decode('utf-8', errors='ignore')
else:
decoded_string += part
return decoded_string
def parse_email_from_string(self, email_string):
"""从字符串解析邮件"""
try:
# 解析邮件
msg = email.message_from_string(email_string)
# 提取基本信息
email_data = {
'subject': self.decode_mime_words(msg.get('Subject', '')),
'from': self.decode_mime_words(msg.get('From', '')),
'to': self.decode_mime_words(msg.get('To', '')),
'date': msg.get('Date', ''),
'message_id': msg.get('Message-ID', ''),
'content_type': msg.get_content_type(),
'parts': []
}
# 解析邮件内容
if msg.is_multipart():
for part in msg.walk():
self.parse_email_part(part, email_data)
else:
self.parse_email_part(msg, email_data)
return email_data
except Exception as e:
print(f"邮件解析失败: {e}")
return None
def parse_email_part(self, part, email_data):
"""解析邮件部分"""
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition', ''))
part_data = {
'content_type': content_type,
'content_disposition': content_disposition,
'filename': None,
'content': None,
'size': 0
}
# 获取文件名
if 'attachment' in content_disposition:
filename = part.get_filename()
if filename:
part_data['filename'] = self.decode_mime_words(filename)
# 获取内容
try:
payload = part.get_payload(decode=True)
if payload:
part_data['size'] = len(payload)
if content_type.startswith('text/'):
# 文本内容
charset = part.get_content_charset() or 'utf-8'
part_data['content'] = payload.decode(charset, errors='ignore')
else:
# 二进制内容(附件)
part_data['content'] = base64.b64encode(payload).decode('ascii')
except Exception as e:
print(f"解析邮件部分失败: {e}")
email_data['parts'].append(part_data)
def extract_text_content(self, email_data):
"""提取邮件的文本内容"""
text_content = ""
html_content = ""
for part in email_data['parts']:
if part['content_type'] == 'text/plain' and part['content']:
text_content += part['content']
elif part['content_type'] == 'text/html' and part['content']:
html_content += part['content']
return {
'text': text_content,
'html': html_content
}
def extract_attachments(self, email_data):
"""提取邮件附件"""
attachments = []
for part in email_data['parts']:
if part['filename'] and part['content']:
attachments.append({
'filename': part['filename'],
'content_type': part['content_type'],
'size': part['size'],
'content': part['content'] # Base64编码的内容
})
return attachments
def save_attachment(self, attachment, save_dir="./attachments"):
"""保存附件到文件"""
try:
os.makedirs(save_dir, exist_ok=True)
file_path = os.path.join(save_dir, attachment['filename'])
# 解码Base64内容
content = base64.b64decode(attachment['content'])
with open(file_path, 'wb') as f:
f.write(content)
print(f"附件已保存: {file_path}")
return file_path
except Exception as e:
print(f"保存附件失败: {e}")
return None
def email_parsing_example():
"""邮件解析示例"""
# 示例邮件字符串(实际使用时从邮件服务器获取)
sample_email = """From: sender@example.com
To: receiver@example.com
Subject: =?UTF-8?B?5rWL6K+V6YKu5Lu2?=
Date: Mon, 01 Jan 2024 12:00:00 +0000
Message-ID: <123456@example.com>
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="boundary123"
--boundary123
Content-Type: text/plain; charset=utf-8
这是邮件的文本内容。
--boundary123
Content-Type: text/html; charset=utf-8
<html>
<body>
<h1>这是HTML内容</h1>
<p>包含<strong>格式化</strong>文本。</p>
</body>
</html>
--boundary123
Content-Type: application/pdf
Content-Disposition: attachment; filename="document.pdf"
Content-Transfer-Encoding: base64
JVBERi0xLjQKJcOkw7zDtsO4DQo...
--boundary123--
"""
# 创建解析器
parser = EmailParser()
# 解析邮件
email_data = parser.parse_email_from_string(sample_email)
if email_data:
print("邮件解析成功!")
print(f"主题: {email_data['subject']}")
print(f"发件人: {email_data['from']}")
print(f"收件人: {email_data['to']}")
print(f"日期: {email_data['date']}")
# 提取文本内容
content = parser.extract_text_content(email_data)
print(f"\n文本内容:\n{content['text']}")
print(f"\nHTML内容:\n{content['html']}")
# 提取附件
attachments = parser.extract_attachments(email_data)
print(f"\n附件数量: {len(attachments)}")
for attachment in attachments:
print(f"- {attachment['filename']} ({attachment['size']} 字节)")
# 运行示例
if __name__ == "__main__":
email_parsing_example()
5. 实际应用案例
5.1 邮件通知系统
import json
import logging
from datetime import datetime, timedelta
from enum import Enum
class NotificationType(Enum):
"""通知类型枚举"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
SUCCESS = "success"
class EmailNotificationSystem:
"""邮件通知系统"""
def __init__(self, smtp_config, templates_dir="./email_templates"):
self.smtp_config = smtp_config
self.templates_dir = templates_dir
self.notification_queue = []
self.sent_notifications = []
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 确保模板目录存在
os.makedirs(templates_dir, exist_ok=True)
self.create_default_templates()
def create_default_templates(self):
"""创建默认邮件模板"""
templates = {
'system_alert': {
'subject': '系统警报 - {alert_type}',
'html': '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>系统警报</title>
<style>
.alert {{
padding: 20px;
border-radius: 5px;
margin: 10px 0;
}}
.error {{ background-color: #f8d7da; color: #721c24; }}
.warning {{ background-color: #fff3cd; color: #856404; }}
.info {{ background-color: #d1ecf1; color: #0c5460; }}
.success {{ background-color: #d4edda; color: #155724; }}
</style>
</head>
<body>
<div class="alert {alert_type}">
<h2>系统警报通知</h2>
<p><strong>警报类型:</strong> {alert_type}</p>
<p><strong>发生时间:</strong> {timestamp}</p>
<p><strong>服务器:</strong> {server}</p>
<p><strong>详细信息:</strong></p>
<pre>{details}</pre>
</div>
</body>
</html>
'''
},
'user_welcome': {
'subject': '欢迎加入 {service_name}',
'html': '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>欢迎</title>
</head>
<body>
<h1>欢迎,{username}!</h1>
<p>感谢您注册 {service_name}。</p>
<p>您的账户信息:</p>
<ul>
<li>用户名: {username}</li>
<li>邮箱: {email}</li>
<li>注册时间: {registration_time}</li>
</ul>
<p>
<a href="{activation_link}"
style="background: #007bff; color: white; padding: 10px 20px;
text-decoration: none; border-radius: 5px;">
激活账户
</a>
</p>
</body>
</html>
'''
},
'report_summary': {
'subject': '{report_type}报告 - {date}',
'html': '''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>报告摘要</title>
</head>
<body>
<h1>{report_type}报告</h1>
<p><strong>报告日期:</strong> {date}</p>
<p><strong>报告周期:</strong> {period}</p>
<h2>关键指标</h2>
<table border="1" style="border-collapse: collapse; width: 100%;">
<tr>
<th>指标</th>
<th>数值</th>
<th>变化</th>
</tr>
{metrics_table}
</table>
<h2>详细数据</h2>
<p>{details}</p>
<p>完整报告请查看附件。</p>
</body>
</html>
'''
}
}
# 保存模板到文件
for template_name, template_content in templates.items():
template_file = os.path.join(self.templates_dir, f"{template_name}.json")
with open(template_file, 'w', encoding='utf-8') as f:
json.dump(template_content, f, ensure_ascii=False, indent=2)
def load_template(self, template_name):
"""加载邮件模板"""
template_file = os.path.join(self.templates_dir, f"{template_name}.json")
try:
with open(template_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
self.logger.error(f"模板文件不存在: {template_file}")
return None
def create_notification(self, notification_type, recipients, template_name,
template_data, attachments=None, priority=1):
"""创建通知"""
notification = {
'id': f"notif_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{len(self.notification_queue)}",
'type': notification_type,
'recipients': recipients if isinstance(recipients, list) else [recipients],
'template_name': template_name,
'template_data': template_data,
'attachments': attachments or [],
'priority': priority,
'created_at': datetime.now(),
'status': 'pending'
}
self.notification_queue.append(notification)
self.logger.info(f"通知已创建: {notification['id']}")
return notification['id']
def send_notification(self, notification):
"""发送单个通知"""
try:
# 加载模板
template = self.load_template(notification['template_name'])
if not template:
raise Exception(f"模板不存在: {notification['template_name']}")
# 渲染模板
subject = template['subject'].format(**notification['template_data'])
html_body = template['html'].format(**notification['template_data'])
# 创建邮件
for recipient in notification['recipients']:
message = MIMEMultipart()
message["From"] = self.smtp_config['sender_email']
message["To"] = recipient
message["Subject"] = subject
# 添加HTML内容
message.attach(MIMEText(html_body, "html", "utf-8"))
# 添加附件
for attachment_path in notification['attachments']:
if os.path.exists(attachment_path):
with open(attachment_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {os.path.basename(attachment_path)}'
)
message.attach(part)
# 发送邮件
server = smtplib.SMTP(
self.smtp_config['smtp_server'],
self.smtp_config['smtp_port']
)
server.starttls()
server.login(
self.smtp_config['sender_email'],
self.smtp_config['sender_password']
)
text = message.as_string()
server.sendmail(
self.smtp_config['sender_email'],
recipient,
text
)
server.quit()
# 更新状态
notification['status'] = 'sent'
notification['sent_at'] = datetime.now()
self.sent_notifications.append(notification)
self.logger.info(f"通知发送成功: {notification['id']}")
return True
except Exception as e:
notification['status'] = 'failed'
notification['error'] = str(e)
self.logger.error(f"通知发送失败 {notification['id']}: {e}")
return False
def process_notification_queue(self):
"""处理通知队列"""
# 按优先级排序
self.notification_queue.sort(key=lambda x: x['priority'])
processed = []
for notification in self.notification_queue:
if notification['status'] == 'pending':
self.send_notification(notification)
processed.append(notification)
# 从队列中移除已处理的通知
for notification in processed:
self.notification_queue.remove(notification)
def send_system_alert(self, alert_type, server, details, recipients):
"""发送系统警报"""
template_data = {
'alert_type': alert_type.value,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'server': server,
'details': details
}
priority = 1 if alert_type == NotificationType.ERROR else 2
return self.create_notification(
notification_type=alert_type,
recipients=recipients,
template_name='system_alert',
template_data=template_data,
priority=priority
)
def send_welcome_email(self, username, email, service_name, activation_link):
"""发送欢迎邮件"""
template_data = {
'username': username,
'email': email,
'service_name': service_name,
'activation_link': activation_link,
'registration_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return self.create_notification(
notification_type=NotificationType.INFO,
recipients=[email],
template_name='user_welcome',
template_data=template_data,
priority=3
)
def notification_system_example():
"""通知系统示例"""
# SMTP配置
smtp_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'sender_email': 'your_email@gmail.com',
'sender_password': 'your_password'
}
# 创建通知系统
notification_system = EmailNotificationSystem(smtp_config)
# 发送系统警报
notification_system.send_system_alert(
alert_type=NotificationType.ERROR,
server="web-server-01",
details="CPU使用率超过90%,内存使用率85%",
recipients=["admin@example.com", "ops@example.com"]
)
# 发送欢迎邮件
notification_system.send_welcome_email(
username="新用户",
email="newuser@example.com",
service_name="我的服务",
activation_link="https://example.com/activate?token=abc123"
)
# 处理通知队列
notification_system.process_notification_queue()
# 打印统计信息
print(f"队列中待处理通知: {len(notification_system.notification_queue)}")
print(f"已发送通知: {len(notification_system.sent_notifications)}")
# 运行示例
if __name__ == "__main__":
notification_system_example()
5.2 邮件监控和自动回复系统
import imaplib
import email
import re
from datetime import datetime, timedelta
class EmailMonitoringSystem:
"""邮件监控和自动回复系统"""
def __init__(self, imap_config, smtp_config):
self.imap_config = imap_config
self.smtp_config = smtp_config
self.auto_reply_rules = []
self.processed_emails = set()
def connect_imap(self):
"""连接IMAP服务器"""
try:
mail = imaplib.IMAP4_SSL(
self.imap_config['imap_server'],
self.imap_config['imap_port']
)
mail.login(
self.imap_config['email'],
self.imap_config['password']
)
return mail
except Exception as e:
print(f"IMAP连接失败: {e}")
return None
def add_auto_reply_rule(self, condition_func, reply_template, priority=1):
"""添加自动回复规则"""
rule = {
'condition': condition_func,
'template': reply_template,
'priority': priority,
'created_at': datetime.now()
}
self.auto_reply_rules.append(rule)
# 按优先级排序
self.auto_reply_rules.sort(key=lambda x: x['priority'])
def check_email_conditions(self, email_data):
"""检查邮件是否满足自动回复条件"""
for rule in self.auto_reply_rules:
try:
if rule['condition'](email_data):
return rule['template']
except Exception as e:
print(f"规则检查失败: {e}")
return None
def send_auto_reply(self, original_email, reply_template):
"""发送自动回复"""
try:
# 提取原邮件信息
from_email = original_email.get('from', '')
subject = original_email.get('subject', '')
# 构建回复主题
if not subject.startswith('Re:'):
reply_subject = f"Re: {subject}"
else:
reply_subject = subject
# 渲染回复模板
reply_content = reply_template.format(
original_subject=subject,
original_from=from_email,
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
# 创建回复邮件
message = MIMEMultipart()
message["From"] = self.smtp_config['sender_email']
message["To"] = from_email
message["Subject"] = reply_subject
message["In-Reply-To"] = original_email.get('message_id', '')
message["References"] = original_email.get('message_id', '')
# 添加回复内容
message.attach(MIMEText(reply_content, "html", "utf-8"))
# 发送邮件
server = smtplib.SMTP(
self.smtp_config['smtp_server'],
self.smtp_config['smtp_port']
)
server.starttls()
server.login(
self.smtp_config['sender_email'],
self.smtp_config['sender_password']
)
text = message.as_string()
server.sendmail(
self.smtp_config['sender_email'],
from_email,
text
)
server.quit()
print(f"自动回复已发送至: {from_email}")
return True
except Exception as e:
print(f"自动回复发送失败: {e}")
return False
def monitor_inbox(self, folder='INBOX', check_interval=60):
"""监控收件箱"""
print(f"开始监控邮箱: {self.imap_config['email']}")
print(f"检查间隔: {check_interval}秒")
while True:
try:
# 连接IMAP
mail = self.connect_imap()
if not mail:
time.sleep(check_interval)
continue
# 选择文件夹
mail.select(folder)
# 搜索未读邮件
status, messages = mail.search(None, 'UNSEEN')
if status == 'OK' and messages[0]:
email_ids = messages[0].split()
print(f"发现 {len(email_ids)} 封未读邮件")
for email_id in email_ids:
# 获取邮件
status, msg_data = mail.fetch(email_id, '(RFC822)')
if status == 'OK':
# 解析邮件
email_body = msg_data[0][1]
email_message = email.message_from_bytes(email_body)
# 提取邮件信息
email_data = {
'subject': email_message.get('Subject', ''),
'from': email_message.get('From', ''),
'to': email_message.get('To', ''),
'date': email_message.get('Date', ''),
'message_id': email_message.get('Message-ID', ''),
'body': self.extract_email_body(email_message)
}
# 检查是否已处理
email_hash = hash(email_data['message_id'])
if email_hash in self.processed_emails:
continue
print(f"处理邮件: {email_data['subject']}")
# 检查自动回复条件
reply_template = self.check_email_conditions(email_data)
if reply_template:
self.send_auto_reply(email_data, reply_template)
# 标记为已处理
self.processed_emails.add(email_hash)
# 关闭连接
mail.close()
mail.logout()
# 等待下次检查
time.sleep(check_interval)
except KeyboardInterrupt:
print("监控已停止")
break
except Exception as e:
print(f"监控过程中出错: {e}")
time.sleep(check_interval)
def extract_email_body(self, email_message):
"""提取邮件正文"""
body = ""
if email_message.is_multipart():
for part in email_message.walk():
if part.get_content_type() == "text/plain":
try:
body += part.get_payload(decode=True).decode('utf-8')
except:
body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
else:
try:
body = email_message.get_payload(decode=True).decode('utf-8')
except:
body = email_message.get_payload(decode=True).decode('utf-8', errors='ignore')
return body
def monitoring_system_example():
"""邮件监控系统示例"""
# IMAP配置
imap_config = {
'imap_server': 'imap.gmail.com',
'imap_port': 993,
'email': 'your_email@gmail.com',
'password': 'your_password'
}
# SMTP配置
smtp_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'sender_email': 'your_email@gmail.com',
'sender_password': 'your_password'
}
# 创建监控系统
monitor = EmailMonitoringSystem(imap_config, smtp_config)
# 添加自动回复规则
# 规则1: 包含"价格"关键词的邮件
def price_inquiry_condition(email_data):
keywords = ['价格', '报价', '费用', 'price', 'cost']
subject_body = (email_data['subject'] + ' ' + email_data['body']).lower()
return any(keyword in subject_body for keyword in keywords)
price_reply_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>价格咨询自动回复</title>
</head>
<body>
<h2>感谢您的价格咨询</h2>
<p>您好!</p>
<p>感谢您对我们产品的关注。我们已收到您关于价格的咨询。</p>
<p>我们的销售团队将在24小时内与您联系,为您提供详细的报价信息。</p>
<p>如有紧急需求,请直接致电:400-123-4567</p>
<p>此致<br>敬礼!</p>
<hr>
<p><small>这是一封自动回复邮件,发送时间:{timestamp}</small></p>
</body>
</html>
"""
monitor.add_auto_reply_rule(price_inquiry_condition, price_reply_template, priority=1)
# 规则2: 技术支持邮件
def support_condition(email_data):
keywords = ['支持', '帮助', '问题', 'support', 'help', 'issue']
subject_body = (email_data['subject'] + ' ' + email_data['body']).lower()
return any(keyword in subject_body for keyword in keywords)
support_reply_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>技术支持自动回复</title>
</head>
<body>
<h2>技术支持请求已收到</h2>
<p>您好!</p>
<p>我们已收到您的技术支持请求。</p>
<p>我们的技术团队将尽快为您解决问题,通常在4小时内回复。</p>
<p>紧急技术问题请联系:tech-support@example.com</p>
<p>此致<br>敬礼!</p>
<hr>
<p><small>这是一封自动回复邮件,发送时间:{timestamp}</small></p>
</body>
</html>
"""
monitor.add_auto_reply_rule(support_condition, support_reply_template, priority=2)
# 开始监控(实际使用时取消注释)
# monitor.monitor_inbox(check_interval=30)
print("邮件监控系统配置完成")
print(f"已配置 {len(monitor.auto_reply_rules)} 条自动回复规则")
# 运行示例
if __name__ == "__main__":
monitoring_system_example()
6. 性能优化和最佳实践
6.1 连接池和重用
import queue
import threading
from contextlib import contextmanager
class SMTPConnectionPool:
"""SMTP连接池"""
def __init__(self, smtp_config, pool_size=5):
self.smtp_config = smtp_config
self.pool_size = pool_size
self.pool = queue.Queue(maxsize=pool_size)
self.lock = threading.Lock()
# 初始化连接池
self._initialize_pool()
def _initialize_pool(self):
"""初始化连接池"""
for _ in range(self.pool_size):
connection = self._create_connection()
if connection:
self.pool.put(connection)
def _create_connection(self):
"""创建SMTP连接"""
try:
server = smtplib.SMTP(
self.smtp_config['smtp_server'],
self.smtp_config['smtp_port']
)
server.starttls()
server.login(
self.smtp_config['sender_email'],
self.smtp_config['sender_password']
)
return server
except Exception as e:
print(f"创建SMTP连接失败: {e}")
return None
@contextmanager
def get_connection(self, timeout=30):
"""获取连接(上下文管理器)"""
connection = None
try:
# 从池中获取连接
connection = self.pool.get(timeout=timeout)
# 检查连接是否有效
if not self._is_connection_valid(connection):
connection.quit()
connection = self._create_connection()
yield connection
except queue.Empty:
# 池中没有可用连接,创建新连接
connection = self._create_connection()
yield connection
finally:
# 归还连接到池中
if connection:
try:
if self.pool.qsize() < self.pool_size:
self.pool.put_nowait(connection)
else:
connection.quit()
except:
try:
connection.quit()
except:
pass
def _is_connection_valid(self, connection):
"""检查连接是否有效"""
try:
status = connection.noop()[0]
return status == 250
except:
return False
def close_all_connections(self):
"""关闭所有连接"""
while not self.pool.empty():
try:
connection = self.pool.get_nowait()
connection.quit()
except:
pass
class OptimizedEmailSender:
"""优化的邮件发送器"""
def __init__(self, smtp_config, pool_size=5):
self.connection_pool = SMTPConnectionPool(smtp_config, pool_size)
self.smtp_config = smtp_config
self.stats = {
'sent': 0,
'failed': 0,
'start_time': datetime.now()
}
def send_email_optimized(self, to_email, subject, body, attachments=None):
"""使用连接池发送邮件"""
try:
# 创建邮件消息
message = MIMEMultipart()
message["From"] = self.smtp_config['sender_email']
message["To"] = to_email
message["Subject"] = subject
# 添加正文
message.attach(MIMEText(body, "html", "utf-8"))
# 添加附件
if attachments:
for attachment_path in attachments:
if os.path.exists(attachment_path):
with open(attachment_path, "rb") as attachment:
part = MIMEBase('application', 'octet-stream')
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename= {os.path.basename(attachment_path)}'
)
message.attach(part)
# 使用连接池发送
with self.connection_pool.get_connection() as server:
text = message.as_string()
server.sendmail(
self.smtp_config['sender_email'],
to_email,
text
)
self.stats['sent'] += 1
return True
except Exception as e:
self.stats['failed'] += 1
print(f"邮件发送失败 {to_email}: {e}")
return False
def send_bulk_emails_optimized(self, email_list, max_workers=10):
"""优化的批量邮件发送"""
print(f"开始批量发送 {len(email_list)} 封邮件")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for email_data in email_list:
future = executor.submit(
self.send_email_optimized,
email_data['to'],
email_data['subject'],
email_data['body'],
email_data.get('attachments')
)
futures.append(future)
# 等待所有任务完成
for future in futures:
future.result()
# 打印统计信息
duration = (datetime.now() - self.stats['start_time']).total_seconds()
print(f"批量发送完成!")
print(f"总耗时: {duration:.2f}秒")
print(f"成功: {self.stats['sent']}")
print(f"失败: {self.stats['failed']}")
print(f"发送速率: {self.stats['sent'] / duration:.2f} 封/秒")
def cleanup(self):
"""清理资源"""
self.connection_pool.close_all_connections()
def optimization_example():
"""性能优化示例"""
# SMTP配置
smtp_config = {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'sender_email': 'your_email@gmail.com',
'sender_password': 'your_password'
}
# 创建优化的邮件发送器
sender = OptimizedEmailSender(smtp_config, pool_size=3)
# 准备测试邮件列表
email_list = []
for i in range(50):
email_list.append({
'to': f'test{i}@example.com',
'subject': f'测试邮件 #{i+1}',
'body': f'<h1>这是第 {i+1} 封测试邮件</h1><p>发送时间: {datetime.now()}</p>'
})
# 批量发送
sender.send_bulk_emails_optimized(email_list, max_workers=5)
# 清理资源
sender.cleanup()
# 运行示例
if __name__ == "__main__":
optimization_example()
7. 实践练习
练习1:邮件营销系统
创建一个完整的邮件营销系统,支持模板管理、用户分组、发送统计等功能。
练习2:邮件备份工具
实现一个邮件备份工具,能够从IMAP服务器下载邮件并保存为本地文件。
总结
email
和smtplib
模块为Python提供了完整的邮件处理能力:
- 邮件构造:支持文本、HTML、附件等多种格式
- SMTP发送:可靠的邮件发送机制
- 邮件解析:完整的MIME邮件解析功能
- 批量处理:高效的批量邮件发送
- 自动化:邮件监控和自动回复系统
通过掌握这些技术,你可以构建从简单的邮件发送到复杂的邮件系统的各种应用。
更多推荐
所有评论(0)