027-邮件处理email和smtplib

1. 模块概述

1.1 email模块简介

email模块提供了处理电子邮件消息的完整功能,包括解析、构造和发送邮件。它支持MIME(多用途互联网邮件扩展)标准,可以处理文本、HTML、附件等各种邮件内容。

1.2 smtplib模块简介

smtplib模块实现了SMTP(简单邮件传输协议)客户端,用于发送邮件到任何支持SMTP的邮件服务器。

1.3 核心功能

  • 邮件构造:创建文本、HTML、多部分邮件
  • 附件处理:添加文件、图片等附件
  • 邮件解析:解析接收到的邮件内容
  • SMTP发送:通过SMTP服务器发送邮件
  • 安全连接:支持SSL/TLS加密连接

2. 基本概念和使用

2.1 基本邮件发送

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
import os

def send_simple_email():
    """发送简单文本邮件"""
    # 邮件配置
    smtp_server = "smtp.gmail.com"
    smtp_port = 587
    sender_email = "your_email@gmail.com"
    sender_password = "your_password"
    receiver_email = "receiver@example.com"
    
    # 创建邮件内容
    message = MIMEText("这是一封测试邮件的内容。", "plain", "utf-8")
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = "测试邮件"
    
    try:
        # 连接SMTP服务器
        server = smtplib.SMTP(smtp_server, smtp_port)
        server.starttls()  # 启用TLS加密
        server.login(sender_email, sender_password)
        
        # 发送邮件
        text = message.as_string()
        server.sendmail(sender_email, receiver_email, text)
        server.quit()
        
        print("邮件发送成功!")
        
    except Exception as e:
        print(f"邮件发送失败: {e}")

# 运行示例
if __name__ == "__main__":
    send_simple_email()

2.2 HTML邮件发送

def send_html_email():
    """发送HTML格式邮件"""
    # 邮件配置
    smtp_server = "smtp.gmail.com"
    smtp_port = 587
    sender_email = "your_email@gmail.com"
    sender_password = "your_password"
    receiver_email = "receiver@example.com"
    
    # 创建HTML内容
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>HTML邮件</title>
    </head>
    <body>
        <h1 style="color: #333;">欢迎使用Python邮件系统</h1>
        <p>这是一封<strong>HTML格式</strong>的邮件。</p>
        <ul>
            <li>支持富文本格式</li>
            <li>支持图片和链接</li>
            <li>支持CSS样式</li>
        </ul>
        <p>
            <a href="https://python.org" style="color: #0066cc;">访问Python官网</a>
        </p>
        <img src="https://www.python.org/static/img/python-logo.png" 
             alt="Python Logo" style="max-width: 200px;">
    </body>
    </html>
    """
    
    # 创建邮件对象
    message = MIMEMultipart("alternative")
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = "HTML格式邮件测试"
    
    # 添加HTML内容
    html_part = MIMEText(html_content, "html", "utf-8")
    message.attach(html_part)
    
    # 也可以添加纯文本版本
    text_content = """
    欢迎使用Python邮件系统
    
    这是一封HTML格式的邮件。
    
    功能特点:
    - 支持富文本格式
    - 支持图片和链接
    - 支持CSS样式
    
    访问Python官网: https://python.org
    """
    
    text_part = MIMEText(text_content, "plain", "utf-8")
    message.attach(text_part)
    
    try:
        # 发送邮件
        server = smtplib.SMTP(smtp_server, smtp_port)
        server.starttls()
        server.login(sender_email, sender_password)
        
        text = message.as_string()
        server.sendmail(sender_email, receiver_email, text)
        server.quit()
        
        print("HTML邮件发送成功!")
        
    except Exception as e:
        print(f"HTML邮件发送失败: {e}")

# 运行示例
if __name__ == "__main__":
    send_html_email()

3. 高级邮件功能

3.1 带附件的邮件

import mimetypes
from pathlib import Path

class EmailWithAttachments:
    """带附件的邮件处理类"""
    
    def __init__(self, smtp_server, smtp_port, sender_email, sender_password):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.sender_email = sender_email
        self.sender_password = sender_password
    
    def create_message(self, to_email, subject, body, is_html=False):
        """创建邮件消息"""
        message = MIMEMultipart()
        message["From"] = self.sender_email
        message["To"] = to_email
        message["Subject"] = subject
        
        # 添加邮件正文
        if is_html:
            message.attach(MIMEText(body, "html", "utf-8"))
        else:
            message.attach(MIMEText(body, "plain", "utf-8"))
        
        return message
    
    def add_attachment(self, message, file_path):
        """添加附件"""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"附件文件不存在: {file_path}")
        
        # 获取文件信息
        filename = os.path.basename(file_path)
        
        # 猜测MIME类型
        mime_type, _ = mimetypes.guess_type(file_path)
        if mime_type is None:
            mime_type = "application/octet-stream"
        
        main_type, sub_type = mime_type.split('/', 1)
        
        # 读取文件内容
        with open(file_path, "rb") as attachment:
            if main_type == "text":
                # 文本文件
                part = MIMEText(attachment.read().decode('utf-8'), sub_type)
            elif main_type == "image":
                # 图片文件
                from email.mime.image import MIMEImage
                part = MIMEImage(attachment.read(), sub_type)
            elif main_type == "audio":
                # 音频文件
                from email.mime.audio import MIMEAudio
                part = MIMEAudio(attachment.read(), sub_type)
            else:
                # 其他文件类型
                part = MIMEBase(main_type, sub_type)
                part.set_payload(attachment.read())
                encoders.encode_base64(part)
        
        # 设置附件头部
        part.add_header(
            "Content-Disposition",
            f"attachment; filename= {filename}",
        )
        
        message.attach(part)
        return message
    
    def send_email_with_attachments(self, to_email, subject, body, 
                                  attachments=None, is_html=False):
        """发送带附件的邮件"""
        try:
            # 创建邮件消息
            message = self.create_message(to_email, subject, body, is_html)
            
            # 添加附件
            if attachments:
                for attachment_path in attachments:
                    self.add_attachment(message, attachment_path)
            
            # 连接SMTP服务器并发送
            server = smtplib.SMTP(self.smtp_server, self.smtp_port)
            server.starttls()
            server.login(self.sender_email, self.sender_password)
            
            text = message.as_string()
            server.sendmail(self.sender_email, to_email, text)
            server.quit()
            
            print(f"邮件发送成功!附件数量: {len(attachments) if attachments else 0}")
            return True
            
        except Exception as e:
            print(f"邮件发送失败: {e}")
            return False

def attachment_example():
    """附件邮件示例"""
    # 创建邮件发送器
    email_sender = EmailWithAttachments(
        smtp_server="smtp.gmail.com",
        smtp_port=587,
        sender_email="your_email@gmail.com",
        sender_password="your_password"
    )
    
    # 准备邮件内容
    subject = "带附件的邮件测试"
    body = """
    您好!
    
    这是一封带有附件的测试邮件。
    
    附件包含:
    1. 文档文件
    2. 图片文件
    3. 数据文件
    
    请查收!
    
    此致
    敬礼!
    """
    
    # 准备附件列表(确保这些文件存在)
    attachments = [
        "document.pdf",
        "image.jpg",
        "data.csv"
    ]
    
    # 发送邮件
    email_sender.send_email_with_attachments(
        to_email="receiver@example.com",
        subject=subject,
        body=body,
        attachments=attachments
    )

# 运行示例
if __name__ == "__main__":
    attachment_example()

3.2 批量邮件发送

import time
import csv
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor

class BulkEmailSender:
    """批量邮件发送器"""
    
    def __init__(self, smtp_config):
        self.smtp_config = smtp_config
        self.sent_count = 0
        self.failed_count = 0
        self.lock = threading.Lock()
    
    def load_recipients_from_csv(self, csv_file):
        """从CSV文件加载收件人列表"""
        recipients = []
        try:
            with open(csv_file, 'r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                for row in reader:
                    recipients.append({
                        'email': row.get('email', ''),
                        'name': row.get('name', ''),
                        'company': row.get('company', ''),
                        'custom_data': {k: v for k, v in row.items() 
                                      if k not in ['email', 'name', 'company']}
                    })
        except Exception as e:
            print(f"读取CSV文件失败: {e}")
        
        return recipients
    
    def create_personalized_message(self, recipient, template, subject_template):
        """创建个性化邮件消息"""
        # 替换模板中的占位符
        personalized_body = template.format(
            name=recipient.get('name', '用户'),
            company=recipient.get('company', '贵公司'),
            email=recipient.get('email', ''),
            **recipient.get('custom_data', {})
        )
        
        personalized_subject = subject_template.format(
            name=recipient.get('name', '用户'),
            company=recipient.get('company', '贵公司'),
            **recipient.get('custom_data', {})
        )
        
        # 创建邮件消息
        message = MIMEMultipart()
        message["From"] = self.smtp_config['sender_email']
        message["To"] = recipient['email']
        message["Subject"] = personalized_subject
        
        # 添加HTML内容
        message.attach(MIMEText(personalized_body, "html", "utf-8"))
        
        return message
    
    def send_single_email(self, recipient, template, subject_template, delay=0):
        """发送单封邮件"""
        try:
            # 创建个性化消息
            message = self.create_personalized_message(
                recipient, template, subject_template
            )
            
            # 连接SMTP服务器
            server = smtplib.SMTP(
                self.smtp_config['smtp_server'], 
                self.smtp_config['smtp_port']
            )
            server.starttls()
            server.login(
                self.smtp_config['sender_email'], 
                self.smtp_config['sender_password']
            )
            
            # 发送邮件
            text = message.as_string()
            server.sendmail(
                self.smtp_config['sender_email'], 
                recipient['email'], 
                text
            )
            server.quit()
            
            # 更新计数器
            with self.lock:
                self.sent_count += 1
            
            print(f"✓ 邮件发送成功: {recipient['email']}")
            
            # 延迟(避免被标记为垃圾邮件)
            if delay > 0:
                time.sleep(delay)
            
            return True
            
        except Exception as e:
            with self.lock:
                self.failed_count += 1
            print(f"✗ 邮件发送失败 {recipient['email']}: {e}")
            return False
    
    def send_bulk_emails(self, recipients, template, subject_template, 
                        max_workers=5, delay=1):
        """批量发送邮件"""
        print(f"开始批量发送邮件,收件人数量: {len(recipients)}")
        print(f"最大并发数: {max_workers}, 发送间隔: {delay}秒")
        
        start_time = datetime.now()
        
        # 使用线程池并发发送
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for recipient in recipients:
                future = executor.submit(
                    self.send_single_email, 
                    recipient, 
                    template, 
                    subject_template, 
                    delay
                )
                futures.append(future)
            
            # 等待所有任务完成
            for future in futures:
                future.result()
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        # 打印统计信息
        print(f"\n批量发送完成!")
        print(f"总耗时: {duration:.2f}秒")
        print(f"成功发送: {self.sent_count}")
        print(f"发送失败: {self.failed_count}")
        print(f"成功率: {(self.sent_count / len(recipients) * 100):.1f}%")

def bulk_email_example():
    """批量邮件发送示例"""
    # SMTP配置
    smtp_config = {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'sender_email': 'your_email@gmail.com',
        'sender_password': 'your_password'
    }
    
    # 创建批量发送器
    bulk_sender = BulkEmailSender(smtp_config)
    
    # 模拟收件人数据(实际使用时从CSV文件加载)
    recipients = [
        {
            'email': 'user1@example.com',
            'name': '张三',
            'company': 'ABC公司',
            'custom_data': {'position': '经理'}
        },
        {
            'email': 'user2@example.com',
            'name': '李四',
            'company': 'XYZ公司',
            'custom_data': {'position': '总监'}
        },
        # 更多收件人...
    ]
    
    # 邮件模板
    subject_template = "致{company}{name}的重要通知"
    
    email_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>重要通知</title>
    </head>
    <body>
        <h2>尊敬的{name},您好!</h2>
        
        <p>感谢您对我们产品的关注。</p>
        
        <p>我们很高兴地通知您,{company}已被选为我们的重要合作伙伴。</p>
        
        <p>作为{position},您将享受以下特殊待遇:</p>
        <ul>
            <li>专属客户经理服务</li>
            <li>优先技术支持</li>
            <li>定制化解决方案</li>
        </ul>
        
        <p>如有任何疑问,请随时联系我们。</p>
        
        <p>此致<br>敬礼!</p>
        
        <hr>
        <p><small>此邮件发送至: {email}</small></p>
    </body>
    </html>
    """
    
    # 发送批量邮件
    bulk_sender.send_bulk_emails(
        recipients=recipients,
        template=email_template,
        subject_template=subject_template,
        max_workers=3,
        delay=2
    )

# 运行示例
if __name__ == "__main__":
    bulk_email_example()

4. 邮件解析和处理

4.1 邮件解析器

import email
from email.header import decode_header
import imaplib
import base64

class EmailParser:
    """邮件解析器"""
    
    def __init__(self):
        self.parsed_emails = []
    
    def decode_mime_words(self, s):
        """解码MIME编码的字符串"""
        decoded_parts = decode_header(s)
        decoded_string = ""
        
        for part, encoding in decoded_parts:
            if isinstance(part, bytes):
                if encoding:
                    decoded_string += part.decode(encoding)
                else:
                    decoded_string += part.decode('utf-8', errors='ignore')
            else:
                decoded_string += part
        
        return decoded_string
    
    def parse_email_from_string(self, email_string):
        """从字符串解析邮件"""
        try:
            # 解析邮件
            msg = email.message_from_string(email_string)
            
            # 提取基本信息
            email_data = {
                'subject': self.decode_mime_words(msg.get('Subject', '')),
                'from': self.decode_mime_words(msg.get('From', '')),
                'to': self.decode_mime_words(msg.get('To', '')),
                'date': msg.get('Date', ''),
                'message_id': msg.get('Message-ID', ''),
                'content_type': msg.get_content_type(),
                'parts': []
            }
            
            # 解析邮件内容
            if msg.is_multipart():
                for part in msg.walk():
                    self.parse_email_part(part, email_data)
            else:
                self.parse_email_part(msg, email_data)
            
            return email_data
            
        except Exception as e:
            print(f"邮件解析失败: {e}")
            return None
    
    def parse_email_part(self, part, email_data):
        """解析邮件部分"""
        content_type = part.get_content_type()
        content_disposition = str(part.get('Content-Disposition', ''))
        
        part_data = {
            'content_type': content_type,
            'content_disposition': content_disposition,
            'filename': None,
            'content': None,
            'size': 0
        }
        
        # 获取文件名
        if 'attachment' in content_disposition:
            filename = part.get_filename()
            if filename:
                part_data['filename'] = self.decode_mime_words(filename)
        
        # 获取内容
        try:
            payload = part.get_payload(decode=True)
            if payload:
                part_data['size'] = len(payload)
                
                if content_type.startswith('text/'):
                    # 文本内容
                    charset = part.get_content_charset() or 'utf-8'
                    part_data['content'] = payload.decode(charset, errors='ignore')
                else:
                    # 二进制内容(附件)
                    part_data['content'] = base64.b64encode(payload).decode('ascii')
        
        except Exception as e:
            print(f"解析邮件部分失败: {e}")
        
        email_data['parts'].append(part_data)
    
    def extract_text_content(self, email_data):
        """提取邮件的文本内容"""
        text_content = ""
        html_content = ""
        
        for part in email_data['parts']:
            if part['content_type'] == 'text/plain' and part['content']:
                text_content += part['content']
            elif part['content_type'] == 'text/html' and part['content']:
                html_content += part['content']
        
        return {
            'text': text_content,
            'html': html_content
        }
    
    def extract_attachments(self, email_data):
        """提取邮件附件"""
        attachments = []
        
        for part in email_data['parts']:
            if part['filename'] and part['content']:
                attachments.append({
                    'filename': part['filename'],
                    'content_type': part['content_type'],
                    'size': part['size'],
                    'content': part['content']  # Base64编码的内容
                })
        
        return attachments
    
    def save_attachment(self, attachment, save_dir="./attachments"):
        """保存附件到文件"""
        try:
            os.makedirs(save_dir, exist_ok=True)
            
            file_path = os.path.join(save_dir, attachment['filename'])
            
            # 解码Base64内容
            content = base64.b64decode(attachment['content'])
            
            with open(file_path, 'wb') as f:
                f.write(content)
            
            print(f"附件已保存: {file_path}")
            return file_path
            
        except Exception as e:
            print(f"保存附件失败: {e}")
            return None

def email_parsing_example():
    """邮件解析示例"""
    # 示例邮件字符串(实际使用时从邮件服务器获取)
    sample_email = """From: sender@example.com
To: receiver@example.com
Subject: =?UTF-8?B?5rWL6K+V6YKu5Lu2?=
Date: Mon, 01 Jan 2024 12:00:00 +0000
Message-ID: <123456@example.com>
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="boundary123"

--boundary123
Content-Type: text/plain; charset=utf-8

这是邮件的文本内容。

--boundary123
Content-Type: text/html; charset=utf-8

<html>
<body>
<h1>这是HTML内容</h1>
<p>包含<strong>格式化</strong>文本。</p>
</body>
</html>

--boundary123
Content-Type: application/pdf
Content-Disposition: attachment; filename="document.pdf"
Content-Transfer-Encoding: base64

JVBERi0xLjQKJcOkw7zDtsO4DQo...

--boundary123--
"""
    
    # 创建解析器
    parser = EmailParser()
    
    # 解析邮件
    email_data = parser.parse_email_from_string(sample_email)
    
    if email_data:
        print("邮件解析成功!")
        print(f"主题: {email_data['subject']}")
        print(f"发件人: {email_data['from']}")
        print(f"收件人: {email_data['to']}")
        print(f"日期: {email_data['date']}")
        
        # 提取文本内容
        content = parser.extract_text_content(email_data)
        print(f"\n文本内容:\n{content['text']}")
        print(f"\nHTML内容:\n{content['html']}")
        
        # 提取附件
        attachments = parser.extract_attachments(email_data)
        print(f"\n附件数量: {len(attachments)}")
        for attachment in attachments:
            print(f"- {attachment['filename']} ({attachment['size']} 字节)")

# 运行示例
if __name__ == "__main__":
    email_parsing_example()

5. 实际应用案例

5.1 邮件通知系统

import json
import logging
from datetime import datetime, timedelta
from enum import Enum

class NotificationType(Enum):
    """通知类型枚举"""
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    SUCCESS = "success"

class EmailNotificationSystem:
    """邮件通知系统"""
    
    def __init__(self, smtp_config, templates_dir="./email_templates"):
        self.smtp_config = smtp_config
        self.templates_dir = templates_dir
        self.notification_queue = []
        self.sent_notifications = []
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # 确保模板目录存在
        os.makedirs(templates_dir, exist_ok=True)
        self.create_default_templates()
    
    def create_default_templates(self):
        """创建默认邮件模板"""
        templates = {
            'system_alert': {
                'subject': '系统警报 - {alert_type}',
                'html': '''
                <!DOCTYPE html>
                <html>
                <head>
                    <meta charset="UTF-8">
                    <title>系统警报</title>
                    <style>
                        .alert {{ 
                            padding: 20px; 
                            border-radius: 5px; 
                            margin: 10px 0; 
                        }}
                        .error {{ background-color: #f8d7da; color: #721c24; }}
                        .warning {{ background-color: #fff3cd; color: #856404; }}
                        .info {{ background-color: #d1ecf1; color: #0c5460; }}
                        .success {{ background-color: #d4edda; color: #155724; }}
                    </style>
                </head>
                <body>
                    <div class="alert {alert_type}">
                        <h2>系统警报通知</h2>
                        <p><strong>警报类型:</strong> {alert_type}</p>
                        <p><strong>发生时间:</strong> {timestamp}</p>
                        <p><strong>服务器:</strong> {server}</p>
                        <p><strong>详细信息:</strong></p>
                        <pre>{details}</pre>
                    </div>
                </body>
                </html>
                '''
            },
            'user_welcome': {
                'subject': '欢迎加入 {service_name}',
                'html': '''
                <!DOCTYPE html>
                <html>
                <head>
                    <meta charset="UTF-8">
                    <title>欢迎</title>
                </head>
                <body>
                    <h1>欢迎,{username}!</h1>
                    <p>感谢您注册 {service_name}。</p>
                    <p>您的账户信息:</p>
                    <ul>
                        <li>用户名: {username}</li>
                        <li>邮箱: {email}</li>
                        <li>注册时间: {registration_time}</li>
                    </ul>
                    <p>
                        <a href="{activation_link}" 
                           style="background: #007bff; color: white; padding: 10px 20px; 
                                  text-decoration: none; border-radius: 5px;">
                            激活账户
                        </a>
                    </p>
                </body>
                </html>
                '''
            },
            'report_summary': {
                'subject': '{report_type}报告 - {date}',
                'html': '''
                <!DOCTYPE html>
                <html>
                <head>
                    <meta charset="UTF-8">
                    <title>报告摘要</title>
                </head>
                <body>
                    <h1>{report_type}报告</h1>
                    <p><strong>报告日期:</strong> {date}</p>
                    <p><strong>报告周期:</strong> {period}</p>
                    
                    <h2>关键指标</h2>
                    <table border="1" style="border-collapse: collapse; width: 100%;">
                        <tr>
                            <th>指标</th>
                            <th>数值</th>
                            <th>变化</th>
                        </tr>
                        {metrics_table}
                    </table>
                    
                    <h2>详细数据</h2>
                    <p>{details}</p>
                    
                    <p>完整报告请查看附件。</p>
                </body>
                </html>
                '''
            }
        }
        
        # 保存模板到文件
        for template_name, template_content in templates.items():
            template_file = os.path.join(self.templates_dir, f"{template_name}.json")
            with open(template_file, 'w', encoding='utf-8') as f:
                json.dump(template_content, f, ensure_ascii=False, indent=2)
    
    def load_template(self, template_name):
        """加载邮件模板"""
        template_file = os.path.join(self.templates_dir, f"{template_name}.json")
        try:
            with open(template_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            self.logger.error(f"模板文件不存在: {template_file}")
            return None
    
    def create_notification(self, notification_type, recipients, template_name, 
                          template_data, attachments=None, priority=1):
        """创建通知"""
        notification = {
            'id': f"notif_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{len(self.notification_queue)}",
            'type': notification_type,
            'recipients': recipients if isinstance(recipients, list) else [recipients],
            'template_name': template_name,
            'template_data': template_data,
            'attachments': attachments or [],
            'priority': priority,
            'created_at': datetime.now(),
            'status': 'pending'
        }
        
        self.notification_queue.append(notification)
        self.logger.info(f"通知已创建: {notification['id']}")
        return notification['id']
    
    def send_notification(self, notification):
        """发送单个通知"""
        try:
            # 加载模板
            template = self.load_template(notification['template_name'])
            if not template:
                raise Exception(f"模板不存在: {notification['template_name']}")
            
            # 渲染模板
            subject = template['subject'].format(**notification['template_data'])
            html_body = template['html'].format(**notification['template_data'])
            
            # 创建邮件
            for recipient in notification['recipients']:
                message = MIMEMultipart()
                message["From"] = self.smtp_config['sender_email']
                message["To"] = recipient
                message["Subject"] = subject
                
                # 添加HTML内容
                message.attach(MIMEText(html_body, "html", "utf-8"))
                
                # 添加附件
                for attachment_path in notification['attachments']:
                    if os.path.exists(attachment_path):
                        with open(attachment_path, "rb") as attachment:
                            part = MIMEBase('application', 'octet-stream')
                            part.set_payload(attachment.read())
                            encoders.encode_base64(part)
                            part.add_header(
                                'Content-Disposition',
                                f'attachment; filename= {os.path.basename(attachment_path)}'
                            )
                            message.attach(part)
                
                # 发送邮件
                server = smtplib.SMTP(
                    self.smtp_config['smtp_server'], 
                    self.smtp_config['smtp_port']
                )
                server.starttls()
                server.login(
                    self.smtp_config['sender_email'], 
                    self.smtp_config['sender_password']
                )
                
                text = message.as_string()
                server.sendmail(
                    self.smtp_config['sender_email'], 
                    recipient, 
                    text
                )
                server.quit()
            
            # 更新状态
            notification['status'] = 'sent'
            notification['sent_at'] = datetime.now()
            self.sent_notifications.append(notification)
            
            self.logger.info(f"通知发送成功: {notification['id']}")
            return True
            
        except Exception as e:
            notification['status'] = 'failed'
            notification['error'] = str(e)
            self.logger.error(f"通知发送失败 {notification['id']}: {e}")
            return False
    
    def process_notification_queue(self):
        """处理通知队列"""
        # 按优先级排序
        self.notification_queue.sort(key=lambda x: x['priority'])
        
        processed = []
        for notification in self.notification_queue:
            if notification['status'] == 'pending':
                self.send_notification(notification)
                processed.append(notification)
        
        # 从队列中移除已处理的通知
        for notification in processed:
            self.notification_queue.remove(notification)
    
    def send_system_alert(self, alert_type, server, details, recipients):
        """发送系统警报"""
        template_data = {
            'alert_type': alert_type.value,
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'server': server,
            'details': details
        }
        
        priority = 1 if alert_type == NotificationType.ERROR else 2
        
        return self.create_notification(
            notification_type=alert_type,
            recipients=recipients,
            template_name='system_alert',
            template_data=template_data,
            priority=priority
        )
    
    def send_welcome_email(self, username, email, service_name, activation_link):
        """发送欢迎邮件"""
        template_data = {
            'username': username,
            'email': email,
            'service_name': service_name,
            'activation_link': activation_link,
            'registration_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        
        return self.create_notification(
            notification_type=NotificationType.INFO,
            recipients=[email],
            template_name='user_welcome',
            template_data=template_data,
            priority=3
        )

def notification_system_example():
    """通知系统示例"""
    # SMTP配置
    smtp_config = {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'sender_email': 'your_email@gmail.com',
        'sender_password': 'your_password'
    }
    
    # 创建通知系统
    notification_system = EmailNotificationSystem(smtp_config)
    
    # 发送系统警报
    notification_system.send_system_alert(
        alert_type=NotificationType.ERROR,
        server="web-server-01",
        details="CPU使用率超过90%,内存使用率85%",
        recipients=["admin@example.com", "ops@example.com"]
    )
    
    # 发送欢迎邮件
    notification_system.send_welcome_email(
        username="新用户",
        email="newuser@example.com",
        service_name="我的服务",
        activation_link="https://example.com/activate?token=abc123"
    )
    
    # 处理通知队列
    notification_system.process_notification_queue()
    
    # 打印统计信息
    print(f"队列中待处理通知: {len(notification_system.notification_queue)}")
    print(f"已发送通知: {len(notification_system.sent_notifications)}")

# 运行示例
if __name__ == "__main__":
    notification_system_example()

5.2 邮件监控和自动回复系统

import imaplib
import email
import re
from datetime import datetime, timedelta

class EmailMonitoringSystem:
    """邮件监控和自动回复系统"""
    
    def __init__(self, imap_config, smtp_config):
        self.imap_config = imap_config
        self.smtp_config = smtp_config
        self.auto_reply_rules = []
        self.processed_emails = set()
    
    def connect_imap(self):
        """连接IMAP服务器"""
        try:
            mail = imaplib.IMAP4_SSL(
                self.imap_config['imap_server'], 
                self.imap_config['imap_port']
            )
            mail.login(
                self.imap_config['email'], 
                self.imap_config['password']
            )
            return mail
        except Exception as e:
            print(f"IMAP连接失败: {e}")
            return None
    
    def add_auto_reply_rule(self, condition_func, reply_template, priority=1):
        """添加自动回复规则"""
        rule = {
            'condition': condition_func,
            'template': reply_template,
            'priority': priority,
            'created_at': datetime.now()
        }
        self.auto_reply_rules.append(rule)
        
        # 按优先级排序
        self.auto_reply_rules.sort(key=lambda x: x['priority'])
    
    def check_email_conditions(self, email_data):
        """检查邮件是否满足自动回复条件"""
        for rule in self.auto_reply_rules:
            try:
                if rule['condition'](email_data):
                    return rule['template']
            except Exception as e:
                print(f"规则检查失败: {e}")
        return None
    
    def send_auto_reply(self, original_email, reply_template):
        """发送自动回复"""
        try:
            # 提取原邮件信息
            from_email = original_email.get('from', '')
            subject = original_email.get('subject', '')
            
            # 构建回复主题
            if not subject.startswith('Re:'):
                reply_subject = f"Re: {subject}"
            else:
                reply_subject = subject
            
            # 渲染回复模板
            reply_content = reply_template.format(
                original_subject=subject,
                original_from=from_email,
                timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            )
            
            # 创建回复邮件
            message = MIMEMultipart()
            message["From"] = self.smtp_config['sender_email']
            message["To"] = from_email
            message["Subject"] = reply_subject
            message["In-Reply-To"] = original_email.get('message_id', '')
            message["References"] = original_email.get('message_id', '')
            
            # 添加回复内容
            message.attach(MIMEText(reply_content, "html", "utf-8"))
            
            # 发送邮件
            server = smtplib.SMTP(
                self.smtp_config['smtp_server'], 
                self.smtp_config['smtp_port']
            )
            server.starttls()
            server.login(
                self.smtp_config['sender_email'], 
                self.smtp_config['sender_password']
            )
            
            text = message.as_string()
            server.sendmail(
                self.smtp_config['sender_email'], 
                from_email, 
                text
            )
            server.quit()
            
            print(f"自动回复已发送至: {from_email}")
            return True
            
        except Exception as e:
            print(f"自动回复发送失败: {e}")
            return False
    
    def monitor_inbox(self, folder='INBOX', check_interval=60):
        """监控收件箱"""
        print(f"开始监控邮箱: {self.imap_config['email']}")
        print(f"检查间隔: {check_interval}秒")
        
        while True:
            try:
                # 连接IMAP
                mail = self.connect_imap()
                if not mail:
                    time.sleep(check_interval)
                    continue
                
                # 选择文件夹
                mail.select(folder)
                
                # 搜索未读邮件
                status, messages = mail.search(None, 'UNSEEN')
                
                if status == 'OK' and messages[0]:
                    email_ids = messages[0].split()
                    print(f"发现 {len(email_ids)} 封未读邮件")
                    
                    for email_id in email_ids:
                        # 获取邮件
                        status, msg_data = mail.fetch(email_id, '(RFC822)')
                        
                        if status == 'OK':
                            # 解析邮件
                            email_body = msg_data[0][1]
                            email_message = email.message_from_bytes(email_body)
                            
                            # 提取邮件信息
                            email_data = {
                                'subject': email_message.get('Subject', ''),
                                'from': email_message.get('From', ''),
                                'to': email_message.get('To', ''),
                                'date': email_message.get('Date', ''),
                                'message_id': email_message.get('Message-ID', ''),
                                'body': self.extract_email_body(email_message)
                            }
                            
                            # 检查是否已处理
                            email_hash = hash(email_data['message_id'])
                            if email_hash in self.processed_emails:
                                continue
                            
                            print(f"处理邮件: {email_data['subject']}")
                            
                            # 检查自动回复条件
                            reply_template = self.check_email_conditions(email_data)
                            if reply_template:
                                self.send_auto_reply(email_data, reply_template)
                            
                            # 标记为已处理
                            self.processed_emails.add(email_hash)
                
                # 关闭连接
                mail.close()
                mail.logout()
                
                # 等待下次检查
                time.sleep(check_interval)
                
            except KeyboardInterrupt:
                print("监控已停止")
                break
            except Exception as e:
                print(f"监控过程中出错: {e}")
                time.sleep(check_interval)
    
    def extract_email_body(self, email_message):
        """提取邮件正文"""
        body = ""
        
        if email_message.is_multipart():
            for part in email_message.walk():
                if part.get_content_type() == "text/plain":
                    try:
                        body += part.get_payload(decode=True).decode('utf-8')
                    except:
                        body += part.get_payload(decode=True).decode('utf-8', errors='ignore')
        else:
            try:
                body = email_message.get_payload(decode=True).decode('utf-8')
            except:
                body = email_message.get_payload(decode=True).decode('utf-8', errors='ignore')
        
        return body

def monitoring_system_example():
    """邮件监控系统示例"""
    # IMAP配置
    imap_config = {
        'imap_server': 'imap.gmail.com',
        'imap_port': 993,
        'email': 'your_email@gmail.com',
        'password': 'your_password'
    }
    
    # SMTP配置
    smtp_config = {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'sender_email': 'your_email@gmail.com',
        'sender_password': 'your_password'
    }
    
    # 创建监控系统
    monitor = EmailMonitoringSystem(imap_config, smtp_config)
    
    # 添加自动回复规则
    
    # 规则1: 包含"价格"关键词的邮件
    def price_inquiry_condition(email_data):
        keywords = ['价格', '报价', '费用', 'price', 'cost']
        subject_body = (email_data['subject'] + ' ' + email_data['body']).lower()
        return any(keyword in subject_body for keyword in keywords)
    
    price_reply_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>价格咨询自动回复</title>
    </head>
    <body>
        <h2>感谢您的价格咨询</h2>
        <p>您好!</p>
        <p>感谢您对我们产品的关注。我们已收到您关于价格的咨询。</p>
        <p>我们的销售团队将在24小时内与您联系,为您提供详细的报价信息。</p>
        <p>如有紧急需求,请直接致电:400-123-4567</p>
        <p>此致<br>敬礼!</p>
        <hr>
        <p><small>这是一封自动回复邮件,发送时间:{timestamp}</small></p>
    </body>
    </html>
    """
    
    monitor.add_auto_reply_rule(price_inquiry_condition, price_reply_template, priority=1)
    
    # 规则2: 技术支持邮件
    def support_condition(email_data):
        keywords = ['支持', '帮助', '问题', 'support', 'help', 'issue']
        subject_body = (email_data['subject'] + ' ' + email_data['body']).lower()
        return any(keyword in subject_body for keyword in keywords)
    
    support_reply_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>技术支持自动回复</title>
    </head>
    <body>
        <h2>技术支持请求已收到</h2>
        <p>您好!</p>
        <p>我们已收到您的技术支持请求。</p>
        <p>我们的技术团队将尽快为您解决问题,通常在4小时内回复。</p>
        <p>紧急技术问题请联系:tech-support@example.com</p>
        <p>此致<br>敬礼!</p>
        <hr>
        <p><small>这是一封自动回复邮件,发送时间:{timestamp}</small></p>
    </body>
    </html>
    """
    
    monitor.add_auto_reply_rule(support_condition, support_reply_template, priority=2)
    
    # 开始监控(实际使用时取消注释)
    # monitor.monitor_inbox(check_interval=30)
    
    print("邮件监控系统配置完成")
    print(f"已配置 {len(monitor.auto_reply_rules)} 条自动回复规则")

# 运行示例
if __name__ == "__main__":
    monitoring_system_example()

6. 性能优化和最佳实践

6.1 连接池和重用

import queue
import threading
from contextlib import contextmanager

class SMTPConnectionPool:
    """SMTP连接池"""
    
    def __init__(self, smtp_config, pool_size=5):
        self.smtp_config = smtp_config
        self.pool_size = pool_size
        self.pool = queue.Queue(maxsize=pool_size)
        self.lock = threading.Lock()
        
        # 初始化连接池
        self._initialize_pool()
    
    def _initialize_pool(self):
        """初始化连接池"""
        for _ in range(self.pool_size):
            connection = self._create_connection()
            if connection:
                self.pool.put(connection)
    
    def _create_connection(self):
        """创建SMTP连接"""
        try:
            server = smtplib.SMTP(
                self.smtp_config['smtp_server'], 
                self.smtp_config['smtp_port']
            )
            server.starttls()
            server.login(
                self.smtp_config['sender_email'], 
                self.smtp_config['sender_password']
            )
            return server
        except Exception as e:
            print(f"创建SMTP连接失败: {e}")
            return None
    
    @contextmanager
    def get_connection(self, timeout=30):
        """获取连接(上下文管理器)"""
        connection = None
        try:
            # 从池中获取连接
            connection = self.pool.get(timeout=timeout)
            
            # 检查连接是否有效
            if not self._is_connection_valid(connection):
                connection.quit()
                connection = self._create_connection()
            
            yield connection
            
        except queue.Empty:
            # 池中没有可用连接,创建新连接
            connection = self._create_connection()
            yield connection
            
        finally:
            # 归还连接到池中
            if connection:
                try:
                    if self.pool.qsize() < self.pool_size:
                        self.pool.put_nowait(connection)
                    else:
                        connection.quit()
                except:
                    try:
                        connection.quit()
                    except:
                        pass
    
    def _is_connection_valid(self, connection):
        """检查连接是否有效"""
        try:
            status = connection.noop()[0]
            return status == 250
        except:
            return False
    
    def close_all_connections(self):
        """关闭所有连接"""
        while not self.pool.empty():
            try:
                connection = self.pool.get_nowait()
                connection.quit()
            except:
                pass

class OptimizedEmailSender:
    """优化的邮件发送器"""
    
    def __init__(self, smtp_config, pool_size=5):
        self.connection_pool = SMTPConnectionPool(smtp_config, pool_size)
        self.smtp_config = smtp_config
        self.stats = {
            'sent': 0,
            'failed': 0,
            'start_time': datetime.now()
        }
    
    def send_email_optimized(self, to_email, subject, body, attachments=None):
        """使用连接池发送邮件"""
        try:
            # 创建邮件消息
            message = MIMEMultipart()
            message["From"] = self.smtp_config['sender_email']
            message["To"] = to_email
            message["Subject"] = subject
            
            # 添加正文
            message.attach(MIMEText(body, "html", "utf-8"))
            
            # 添加附件
            if attachments:
                for attachment_path in attachments:
                    if os.path.exists(attachment_path):
                        with open(attachment_path, "rb") as attachment:
                            part = MIMEBase('application', 'octet-stream')
                            part.set_payload(attachment.read())
                            encoders.encode_base64(part)
                            part.add_header(
                                'Content-Disposition',
                                f'attachment; filename= {os.path.basename(attachment_path)}'
                            )
                            message.attach(part)
            
            # 使用连接池发送
            with self.connection_pool.get_connection() as server:
                text = message.as_string()
                server.sendmail(
                    self.smtp_config['sender_email'], 
                    to_email, 
                    text
                )
            
            self.stats['sent'] += 1
            return True
            
        except Exception as e:
            self.stats['failed'] += 1
            print(f"邮件发送失败 {to_email}: {e}")
            return False
    
    def send_bulk_emails_optimized(self, email_list, max_workers=10):
        """优化的批量邮件发送"""
        print(f"开始批量发送 {len(email_list)} 封邮件")
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            
            for email_data in email_list:
                future = executor.submit(
                    self.send_email_optimized,
                    email_data['to'],
                    email_data['subject'],
                    email_data['body'],
                    email_data.get('attachments')
                )
                futures.append(future)
            
            # 等待所有任务完成
            for future in futures:
                future.result()
        
        # 打印统计信息
        duration = (datetime.now() - self.stats['start_time']).total_seconds()
        print(f"批量发送完成!")
        print(f"总耗时: {duration:.2f}秒")
        print(f"成功: {self.stats['sent']}")
        print(f"失败: {self.stats['failed']}")
        print(f"发送速率: {self.stats['sent'] / duration:.2f} 封/秒")
    
    def cleanup(self):
        """清理资源"""
        self.connection_pool.close_all_connections()

def optimization_example():
    """性能优化示例"""
    # SMTP配置
    smtp_config = {
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'sender_email': 'your_email@gmail.com',
        'sender_password': 'your_password'
    }
    
    # 创建优化的邮件发送器
    sender = OptimizedEmailSender(smtp_config, pool_size=3)
    
    # 准备测试邮件列表
    email_list = []
    for i in range(50):
        email_list.append({
            'to': f'test{i}@example.com',
            'subject': f'测试邮件 #{i+1}',
            'body': f'<h1>这是第 {i+1} 封测试邮件</h1><p>发送时间: {datetime.now()}</p>'
        })
    
    # 批量发送
    sender.send_bulk_emails_optimized(email_list, max_workers=5)
    
    # 清理资源
    sender.cleanup()

# 运行示例
if __name__ == "__main__":
    optimization_example()

7. 实践练习

练习1:邮件营销系统

创建一个完整的邮件营销系统,支持模板管理、用户分组、发送统计等功能。

练习2:邮件备份工具

实现一个邮件备份工具,能够从IMAP服务器下载邮件并保存为本地文件。

总结

emailsmtplib模块为Python提供了完整的邮件处理能力:

  1. 邮件构造:支持文本、HTML、附件等多种格式
  2. SMTP发送:可靠的邮件发送机制
  3. 邮件解析:完整的MIME邮件解析功能
  4. 批量处理:高效的批量邮件发送
  5. 自动化:邮件监控和自动回复系统

通过掌握这些技术,你可以构建从简单的邮件发送到复杂的邮件系统的各种应用。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐