Part 6: 常用标准库 & 第三方库


一、常用标准库

1. 1 collections - 高性能容器

from collections import (
    namedtuple, deque, Counter, OrderedDict, 
    defaultdict, ChainMap
)

# ============ namedtuple:具名元组 ============
# 比普通元组更有可读性,比类更轻量
Point = namedtuple('Point', ['x', 'y'])
p = Point(11, 22)
print(p.x, p.y)      # 11 22
print(p[0], p[1])    # 11 22 (也支持索引)

# 转换为字典
print(p._asdict())   # {'x': 11, 'y':  22}

# 创建新实例(替换某些字段)
p2 = p._replace(x=100)
print(p2)  # Point(x=100, y=22)

# 带默认值 (Python 3.7+)
Point3D = namedtuple('Point3D', ['x', 'y', 'z'], defaults=[0])
print(Point3D(1, 2))  # Point3D(x=1, y=2, z=0)

# ============ deque:双端队列 ============
# 两端操作都是 O(1),比 list 更适合队列场景
dq = deque([1, 2, 3], maxlen=5)  # 可以设置最大长度

dq.append(4)          # 右端添加
dq.appendleft(0)      # 左端添加
dq.pop()              # 右端弹出
dq.popleft()          # 左端弹出
dq.rotate(1)          # 向右旋转
dq.rotate(-1)         # 向左旋转

# 实际应用:固定大小的滑动窗口
def moving_average(iterable, n=3):
    """计算移动平均值"""
    window = deque(maxlen=n)
    for x in iterable: 
        window.append(x)
        if len(window) == n:
            yield sum(window) / n

print(list(moving_average([1, 2, 3, 4, 5, 6, 7], 3)))
# [2.0, 3.0, 4.0, 5.0, 6.0]

# ============ Counter:计数器 ============
# 统计元素出现次数
cnt = Counter(['a', 'b', 'c', 'a', 'b', 'a'])
print(cnt)  # Counter({'a': 3, 'b': 2, 'c': 1})

# 常用操作
print(cnt.most_common(2))  # [('a', 3), ('b', 2)]
print(cnt['a'])            # 3
print(cnt['z'])            # 0 (不存在返回 0,不报错)

# 更新计数
cnt.update(['a', 'b', 'd'])
print(cnt)  # Counter({'a': 4, 'b':  3, 'c': 1, 'd': 1})

# 数学运算
cnt1 = Counter(a=3, b=1)
cnt2 = Counter(a=1, b=2)
print(cnt1 + cnt2)  # Counter({'a': 4, 'b': 3})
print(cnt1 - cnt2)  # Counter({'a': 2})
print(cnt1 & cnt2)  # Counter({'a': 1, 'b': 1}) 交集(取最小)
print(cnt1 | cnt2)  # Counter({'a':  3, 'b': 2}) 并集(取最大)

# 实际应用:词频统计
text = "the quick brown fox jumps over the lazy dog"
word_counts = Counter(text.split())
print(word_counts.most_common(3))

# ============ defaultdict:带默认值的字典 ============
# 访问不存在的 key 时自动创建默认值
dd = defaultdict(list)
dd['a'].append(1)  # 不需要先初始化
dd['a'].append(2)
dd['b'].append(3)
print(dict(dd))  # {'a': [1, 2], 'b': [3]}

# 常用默认工厂
dd_int = defaultdict(int)      # 默认值 0
dd_set = defaultdict(set)      # 默认值 set()
dd_lambda = defaultdict(lambda: 'N/A')  # 自定义默认值

# 实际应用:分组
students = [
    ('Alice', 'Math'),
    ('Bob', 'Physics'),
    ('Charlie', 'Math'),
    ('David', 'Physics'),
]

by_subject = defaultdict(list)
for name, subject in students:
    by_subject[subject].append(name)
print(dict(by_subject))
# {'Math': ['Alice', 'Charlie'], 'Physics':  ['Bob', 'David']}

# ============ OrderedDict:有序字典 ============
# Python 3.7+ dict 已有序,但 OrderedDict 有额外功能
od = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3

od.move_to_end('a')  # 移到末尾
od.move_to_end('c', last=False)  # 移到开头
print(list(od.keys()))  # ['c', 'b', 'a']

# 可以用于实现 LRU 缓存
class LRUCache(OrderedDict):
    def __init__(self, capacity:  int):
        super().__init__()
        self.capacity = capacity
    
    def get(self, key):
        if key not in self:
            return -1
        self.move_to_end(key)
        return self[key]
    
    def put(self, key, value):
        if key in self:
            self.move_to_end(key)
        self[key] = value
        if len(self) > self.capacity:
            self.popitem(last=False)

# ============ ChainMap:链式字典 ============
# 将多个字典作为一个视图
defaults = {'theme': 'dark', 'language': 'en'}
user_settings = {'theme': 'light'}

settings = ChainMap(user_settings, defaults)
print(settings['theme'])     # 'light' (user_settings 优先)
print(settings['language'])  # 'en' (从 defaults 获取)

1.2 itertools - 迭代器工具

from itertools import (
    count, cycle, repeat,  # 无限迭代器
    chain, zip_longest, product, permutations, combinations,  # 排列组合
    groupby, filterfalse, takewhile, dropwhile,  # 过滤分组
    accumulate, starmap,  # 累积映射
    islice, tee  # 切片复制
)

# ============ 无限迭代器 ============
# count:从 n 开始无限计数
for i in count(10, 2):  # 10, 12, 14, ... 
    if i > 20:
        break
    print(i)

# cycle:无限循环
colors = cycle(['red', 'green', 'blue'])
for _ in range(7):
    print(next(colors))  # red, green, blue, red, green, blue, red

# repeat:重复
list(repeat('hello', 3))  # ['hello', 'hello', 'hello']

# ============ 排列组合 ============
# chain:连接多个迭代器
list(chain([1, 2], [3, 4], [5]))  # [1, 2, 3, 4, 5]

# chain.from_iterable:展平嵌套
list(chain.from_iterable([[1, 2], [3, 4]]))  # [1, 2, 3, 4]

# zip_longest:最长匹配
list(zip_longest([1, 2, 3], ['a', 'b'], fillvalue='-'))
# [(1, 'a'), (2, 'b'), (3, '-')]

# product:笛卡尔积
list(product([1, 2], ['a', 'b']))
# [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]

list(product([0, 1], repeat=3))  # 等价于 product([0,1], [0,1], [0,1])
# [(0,0,0), (0,0,1), (0,1,0), (0,1,1), (1,0,0), (1,0,1), (1,1,0), (1,1,1)]

# permutations:排列
list(permutations([1, 2, 3], 2))
# [(1,2), (1,3), (2,1), (2,3), (3,1), (3,2)]

# combinations:组合(不重复)
list(combinations([1, 2, 3, 4], 2))
# [(1,2), (1,3), (1,4), (2,3), (2,4), (3,4)]

# combinations_with_replacement:可重复组合
from itertools import combinations_with_replacement
list(combinations_with_replacement([1, 2, 3], 2))
# [(1,1), (1,2), (1,3), (2,2), (2,3), (3,3)]

# ============ 过滤分组 ============
# groupby:分组(需要先排序!)
data = [
    {'name': 'Alice', 'dept': 'IT'},
    {'name':  'Bob', 'dept': 'HR'},
    {'name': 'Charlie', 'dept':  'IT'},
]
data.sort(key=lambda x: x['dept'])  # 必须先排序

for dept, group in groupby(data, key=lambda x: x['dept']):
    print(f"{dept}: {list(group)}")

# filterfalse:过滤假值
list(filterfalse(lambda x: x % 2, range(10)))  # [0, 2, 4, 6, 8]

# takewhile / dropwhile
list(takewhile(lambda x: x < 5, [1, 3, 5, 2, 1]))  # [1, 3]
list(dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1]))  # [5, 2, 1]

# ============ 累积映射 ============
# accumulate:累积
list(accumulate([1, 2, 3, 4, 5]))  # [1, 3, 6, 10, 15]
list(accumulate([1, 2, 3, 4, 5], lambda x, y: x * y))  # [1, 2, 6, 24, 120]

# starmap:展开参数的 map
list(starmap(pow, [(2, 3), (3, 2), (10, 3)]))  # [8, 9, 1000]

# ============ 切片复制 ============
# islice:迭代器切片
list(islice(count(), 5, 10))  # [5, 6, 7, 8, 9]

# tee:复制迭代器
it = iter([1, 2, 3, 4, 5])
it1, it2 = tee(it, 2)
print(list(it1))  # [1, 2, 3, 4, 5]
print(list(it2))  # [1, 2, 3, 4, 5]

# ============ 实际应用 ============
# 1. 分块处理
def chunked(iterable, n):
    """将迭代器按 n 个一组分块"""
    it = iter(iterable)
    while chunk := list(islice(it, n)):
        yield chunk

list(chunked(range(10), 3))  # [[0,1,2], [3,4,5], [6,7,8], [9]]

# 2. 滑动窗口
def sliding_window(iterable, n):
    """滑动窗口"""
    it = iter(iterable)
    window = deque(islice(it, n), maxlen=n)
    if len(window) == n:
        yield tuple(window)
    for x in it: 
        window.append(x)
        yield tuple(window)

list(sliding_window([1, 2, 3, 4, 5], 3))
# [(1, 2, 3), (2, 3, 4), (3, 4, 5)]

1.3 functools - 函数工具

from functools import (
    reduce, partial, wraps, lru_cache, cache,
    singledispatch, total_ordering, cached_property
)

# ============ reduce:归约 ============
from functools import reduce
from operator import add, mul

# 求和
reduce(add, [1, 2, 3, 4, 5])  # 15

# 阶乘
reduce(mul, range(1, 6))  # 120

# 带初始值
reduce(add, [], 0)  # 0 (空列表不会报错)

# ============ partial:偏函数 ============
def power(base, exponent):
    return base ** exponent

square = partial(power, exponent=2)
cube = partial(power, exponent=3)

print(square(5))  # 25
print(cube(5))    # 125

# 实际应用:简化回调
import json
pretty_print = partial(json.dumps, indent=2, ensure_ascii=False)

# ============ lru_cache:LRU 缓存 ============
@lru_cache(maxsize=128)
def fibonacci(n:  int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

print(fibonacci(100))  # 瞬间完成

# 查看缓存信息
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)

# 清除缓存
fibonacci.cache_clear()

# Python 3.9+:无限缓存
@cache  # 等价于 @lru_cache(maxsize=None)
def factorial(n):
    return n * factorial(n - 1) if n else 1

# ============ cached_property:缓存属性 (Python 3.8+) ============
from functools import cached_property

class DataProcessor:
    def __init__(self, data):
        self.data = data
    
    @cached_property
    def processed(self):
        """只计算一次,结果被缓存"""
        print("Processing...")
        return [x ** 2 for x in self.data]

dp = DataProcessor([1, 2, 3, 4, 5])
print(dp.processed)  # Processing... [1, 4, 9, 16, 25]
print(dp.processed)  # [1, 4, 9, 16, 25] (不再打印 Processing)

# ============ singledispatch:单分派泛函数 ============
@singledispatch
def process(arg):
    """默认实现"""
    print(f"Default: {arg}")

@process.register(int)
def _(arg):
    print(f"Integer: {arg * 2}")

@process.register(str)
def _(arg):
    print(f"String:  {arg.upper()}")

@process.register(list)
def _(arg):
    print(f"List with {len(arg)} items")

process(10)         # Integer: 20
process("hello")    # String:  HELLO
process([1, 2, 3])  # List with 3 items
process(3.14)       # Default: 3.14

# ============ total_ordering:自动补全比较方法 ============
@total_ordering
class Student:
    def __init__(self, name, grade):
        self.name = name
        self.grade = grade
    
    def __eq__(self, other):
        return self.grade == other.grade
    
    def __lt__(self, other):
        return self.grade < other.grade

# 自动生成 __le__, __gt__, __ge__
s1 = Student("Alice", 85)
s2 = Student("Bob", 90)
print(s1 < s2)   # True
print(s1 <= s2)  # True (自动生成)
print(s1 >= s2)  # False (自动生成)

1.4 datetime - 日期时间

from datetime import datetime, date, time, timedelta, timezone
from zoneinfo import ZoneInfo  # Python 3.9+

# ============ 获取当前时间 ============
now = datetime.now()           # 本地时间
utc_now = datetime.utcnow()    # UTC 时间(已废弃,用下面的)
utc_now = datetime.now(timezone.utc)  # 推荐

today = date.today()
current_time = datetime.now().time()

# ============ 创建日期时间 ============
dt = datetime(2024, 12, 25, 10, 30, 0)
d = date(2024, 12, 25)
t = time(10, 30, 0)

# ============ 字符串解析与格式化 ============
# 解析
dt = datetime.strptime("2024-12-25 10:30:00", "%Y-%m-%d %H:%M:%S")
dt = datetime.fromisoformat("2024-12-25T10:30:00")  # ISO 格式

# 格式化
dt.strftime("%Y年%m月%d日 %H时%M分%S秒")
dt.isoformat()  # "2024-12-25T10:30:00"

# 常用格式符
"""
%Y - 四位年份 (2024)
%m - 两位月份 (01-12)
%d - 两位日期 (01-31)
%H - 24小时制小时 (00-23)
%I - 12小时制小时 (01-12)
%M - 分钟 (00-59)
%S - 秒 (00-59)
%f - 微秒 (000000-999999)
%a - 星期缩写 (Mon)
%A - 星期全称 (Monday)
%b - 月份缩写 (Jan)
%B - 月份全称 (January)
%p - AM/PM
%Z - 时区名称
%z - UTC 偏移 (+0800)
"""

# ============ 时间运算 ============
now = datetime.now()

# timedelta:时间差
delta = timedelta(days=7, hours=3, minutes=30)
future = now + delta
past = now - delta

# 两个时间的差
diff = datetime(2025, 1, 1) - now
print(diff.days)          # 天数
print(diff.total_seconds())  # 总秒数

# ============ 时区处理 ============
# Python 3.9+ 使用 zoneinfo
from zoneinfo import ZoneInfo

# 创建带时区的时间
dt_utc = datetime(2024, 12, 25, 10, 0, 0, tzinfo=timezone.utc)
dt_shanghai = datetime(2024, 12, 25, 18, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai"))

# 时区转换
dt_shanghai = dt_utc.astimezone(ZoneInfo("Asia/Shanghai"))
print(dt_shanghai)  # 2024-12-25 18:00:00+08:00

# 本地时间转 UTC
local_dt = datetime.now()
utc_dt = local_dt.astimezone(timezone.utc)

# ============ 实用函数 ============
# 获取月初/月末
from calendar import monthrange

def get_month_range(year:  int, month: int):
    """获取某月的第一天和最后一天"""
    first_day = date(year, month, 1)
    _, last = monthrange(year, month)
    last_day = date(year, month, last)
    return first_day, last_day

# 获取本周一
def get_monday(d:  date) -> date:
    return d - timedelta(days=d.weekday())

# 日期范围生成器
def date_range(start:  date, end: date):
    """生成日期范围"""
    current = start
    while current <= end:
        yield current
        current += timedelta(days=1)

1.5 json - JSON 处理

import json
from datetime import datetime, date
from dataclasses import dataclass, asdict
from enum import Enum

# ============ 基础用法 ============
data = {
    "name":  "Alice",
    "age": 30,
    "skills": ["Python", "JavaScript"],
    "active": True,
    "salary": None
}

# 序列化
json_str = json.dumps(data)
json_str = json.dumps(data, indent=2)  # 格式化
json_str = json.dumps(data, ensure_ascii=False)  # 支持中文

# 反序列化
data = json.loads(json_str)

# 文件读写
with open("data.json", "w", encoding="utf-8") as f:
    json.dump(data, f, indent=2, ensure_ascii=False)

with open("data.json", "r", encoding="utf-8") as f:
    data = json.load(f)

# ============ 自定义序列化 ============
# 方式1:default 参数
def custom_encoder(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    if isinstance(obj, date):
        return obj.isoformat()
    if isinstance(obj, set):
        return list(obj)
    if isinstance(obj, Enum):
        return obj.value
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

data = {
    "created_at": datetime.now(),
    "tags": {"python", "programming"},
}
json.dumps(data, default=custom_encoder)

# 方式2:自定义 JSONEncoder
class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return {"__datetime__": obj.isoformat()}
        if isinstance(obj, date):
            return {"__date__": obj.isoformat()}
        return super().default(obj)

json.dumps(data, cls=CustomJSONEncoder)

# ============ 自定义反序列化 ============
def custom_decoder(dct):
    if "__datetime__" in dct:
        return datetime.fromisoformat(dct["__datetime__"])
    if "__date__" in dct: 
        return date.fromisoformat(dct["__date__"])
    return dct

json.loads(json_str, object_hook=custom_decoder)

# ============ dataclass 序列化 ============
@dataclass
class User:
    name:  str
    age:  int
    email:  str

user = User("Alice", 30, "alice@example.com")
json.dumps(asdict(user))  # {"name":  "Alice", "age": 30, "email": "... "}

1.6 logging - 日志处理

import logging
import logging.handlers
from pathlib import Path

# ============ 基础配置 ============
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logger = logging.getLogger(__name__)
logger.info("Application started")
logger.warning("This is a warning")
logger.error("An error occurred")

# ============ 日志级别 ============
"""
DEBUG    (10) - 详细调试信息
INFO     (20) - 常规信息
WARNING  (30) - 警告信息
ERROR    (40) - 错误信息
CRITICAL (50) - 严重错误
"""

# ============ 高级配置 ============
def setup_logging(
    log_file: str = "app.log",
    level: int = logging.INFO
):
    """配置日志系统"""
    # 创建 logger
    logger = logging.getLogger()
    logger.setLevel(level)
    
    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - '
        '%(filename)s:%(lineno)d - %(message)s'
    )
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器(按大小轮转)
    file_handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    
    # 按时间轮转的处理器
    time_handler = logging.handlers.TimedRotatingFileHandler(
        "app_daily.log",
        when="midnight",
        interval=1,
        backupCount=30,
        encoding='utf-8'
    )
    time_handler.setFormatter(formatter)
    logger.addHandler(time_handler)
    
    return logger

# ============ 使用字典配置 ============
import logging.config

LOGGING_CONFIG = {
    'version':  1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        },
        'json': {
            'class': 'pythonjsonlogger.jsonlogger.JsonFormatter',
            'format': '%(asctime)s %(name)s %(levelname)s %(message)s'
        }
    },
    'handlers': {
        'console': {
            'class':  'logging.StreamHandler',
            'level': 'INFO',
            'formatter':  'standard',
            'stream':  'ext://sys.stdout'
        },
        'file': {
            'class': 'logging.handlers.RotatingFileHandler',
            'level': 'DEBUG',
            'formatter': 'standard',
            'filename': 'app.log',
            'maxBytes': 10485760,
            'backupCount': 5
        }
    },
    'loggers': {
        '': {  # root logger
            'handlers': ['console', 'file'],
            'level': 'DEBUG',
            'propagate': True
        }
    }
}

logging.config.dictConfig(LOGGING_CONFIG)

# ============ 结构化日志 ============
logger = logging.getLogger(__name__)

# 使用 extra 添加额外字段
logger.info(
    "User logged in",
    extra={"user_id":  123, "ip":  "192.168.1.1"}
)

# 或者使用格式化
logger.info("User %s logged in from %s", "alice", "192.168.1.1")

1.7 pathlib - 路径处理

from pathlib import Path

# ============ 创建路径 ============
p = Path("/home/user/documents")
p = Path.home()      # 用户主目录
p = Path.cwd()       # 当前工作目录
p = Path(__file__)   # 当前脚本路径

# ============ 路径操作 ============
p = Path("/home/user/documents/file.txt")

print(p.name)        # "file.txt"
print(p.stem)        # "file"
print(p.suffix)      # ".txt"
print(p.suffixes)    # [".txt"] (多后缀时 [".tar", ".gz"])
print(p.parent)      # Path("/home/user/documents")
print(p.parents[0])  # Path("/home/user/documents")
print(p.parents[1])  # Path("/home/user")
print(p.parts)       # ('/', 'home', 'user', 'documents', 'file.txt')

# 路径拼接
new_path = p.parent / "subdir" / "new_file.txt"

# 修改路径
p.with_name("other.txt")     # 改文件名
p.with_suffix(".md")         # 改后缀
p.with_stem("other")         # 改文件名(保留后缀,Python 3.9+)

# ============ 路径判断 ============
p.exists()       # 是否存在
p.is_file()      # 是否是文件
p.is_dir()       # 是否是目录
p.is_symlink()   # 是否是符号链接
p.is_absolute()  # 是否是绝对路径

# ============ 文件操作 ============
# 读写文本
content = p.read_text(encoding="utf-8")
p.write_text("Hello World", encoding="utf-8")

# 读写二进制
data = p.read_bytes()
p.write_bytes(b"binary data")

# 创建目录
p.mkdir(parents=True, exist_ok=True)

# 删除
p.unlink()        # 删除文件
p.rmdir()         # 删除空目录

# 重命名/移动
p.rename(new_path)
p.replace(new_path)  # 如果目标存在则覆盖

# ============ 遍历目录 ============
# 列出目录内容
for item in Path(".").iterdir():
    print(item)

# glob 模式匹配
for py_file in Path(".").glob("*.py"):
    print(py_file)

# 递归匹配
for py_file in Path(".").rglob("*.py"):
    print(py_file)

# ============ 实用函数 ============
def ensure_dir(path:  Path) -> Path:
    """确保目录存在"""
    path.mkdir(parents=True, exist_ok=True)
    return path

def find_files(directory:  Path, pattern: str) -> list[Path]: 
    """查找文件"""
    return list(directory.rglob(pattern))

def get_file_size_mb(path: Path) -> float:
    """获取文件大小(MB)"""
    return path.stat().st_size / (1024 * 1024)

1.8 re - 正则表达式

import re

# ============ 基础匹配 ============
text = "Hello, my email is alice@example.com and phone is 123-456-7890"

# match:从开头匹配
result = re.match(r"Hello", text)
print(result.group() if result else None)  # "Hello"

# search:搜索第一个匹配
result = re.search(r"\d{3}-\d{3}-\d{4}", text)
print(result.group() if result else None)  # "123-456-7890"

# findall:找所有匹配
emails = re.findall(r"[\w.-]+@[\w.-]+\.\w+", text)
print(emails)  # ['alice@example.com']

# finditer:迭代所有匹配
for match in re.finditer(r"\d+", text):
    print(f"Found {match.group()} at {match.start()}-{match.end()}")

# ============ 替换 ============
# sub:替换
result = re.sub(r"\d", "*", text)  # 替换所有数字为 *

# 使用函数替换
def double_number(match):
    return str(int(match.group()) * 2)

result = re.sub(r"\d+", double_number, "1 + 2 = 3")
print(result)  # "2 + 4 = 6"

# ============ 分割 ============
result = re.split(r"[,\s]+", "a, b,  c   d")
print(result)  # ['a', 'b', 'c', 'd']

# ============ 预编译(提高性能) ============
email_pattern = re.compile(r"[\w.-]+@[\w.-]+\.\w+")
emails = email_pattern.findall(text)

# ============ 分组 ============
pattern = r"(\d{3})-(\d{3})-(\d{4})"
match = re.search(pattern, text)
if match:
    print(match.group())   # "123-456-7890"
    print(match.group(1))  # "123"
    print(match.group(2))  # "456"
    print(match.groups())  # ('123', '456', '7890')

# 命名分组
pattern = r"(?P<area>\d{3})-(?P<exchange>\d{3})-(?P<number>\d{4})"
match = re.search(pattern, text)
if match:
    print(match.group("area"))    # "123"
    print(match.groupdict())      # {'area': '123', 'exchange': '456', 'number': '7890'}

# ============ 常用正则表达式 ============
patterns = {
    # 邮箱
    "email": r"^[\w.-]+@[\w.-]+\.\w+$",
    # 手机号(中国)
    "phone_cn": r"^1[3-9]\d{9}$",
    # URL
    "url":  r"https?://[\w.-]+(? :/[\w./-]*)?",
    # IP 地址
    "ipv4": r"^(? :(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)$",
    # 日期 YYYY-MM-DD
    "date":  r"^\d{4}-(? :0[1-9]|1[0-2])-(?:0[1-9]|[12]\d|3[01])$",
    # 密码(至少8位,包含大小写字母和数字)
    "password":  r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d).{8,}$",
}

# ============ 标志 ============
# re.IGNORECASE (re.I) - 忽略大小写
# re.MULTILINE (re.M) - 多行模式
# re.DOTALL (re.S) - 匹配换行符
# re.VERBOSE (re.X) - 允许注释

pattern = re.compile(r"""
    ^                   # 开头
    (?P<year>\d{4})     # 年
    -                   # 分隔符
    (?P<month>\d{2})    # 月
    -                   # 分隔符
    (?P<day>\d{2})      # 日
    $                   # 结尾
""", re.VERBOSE)

二、常用第三方库

2.1 requests - HTTP 客户端(续)

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Any
import logging

logger = logging.getLogger(__name__)

# ============ 基础请求 ============
# GET 请求
response = requests.get("https://api.example.com/users")
print(response.status_code)  # 200
print(response.json())       # 解析 JSON
print(response.text)         # 原始文本
print(response.headers)      # 响应头

# 带查询参数
response = requests.get(
    "https://api.example.com/users",
    params={"page":  1, "limit": 10}
)

# POST JSON
response = requests.post(
    "https://api.example.com/users",
    json={"name": "Alice", "email": "alice@example.com"}
)

# POST 表单
response = requests.post(
    "https://api.example.com/login",
    data={"username": "alice", "password": "secret"}
)

# ============ 请求头和认证 ============
# 自定义请求头
response = requests.get(
    "https://api.example.com/users",
    headers={
        "Authorization": "Bearer your_token",
        "Content-Type": "application/json",
        "User-Agent": "MyApp/1.0"
    }
)

# Basic 认证
response = requests.get(
    "https://api.example.com/users",
    auth=("username", "password")
)

# ============ 超时和异常处理 ============
try: 
    response = requests.get(
        "https://api.example.com/users",
        timeout=(3.05, 27)  # (连接超时, 读取超时)
    )
    response.raise_for_status()  # 如果状态码 >= 400 则抛出异常
except requests.exceptions.Timeout:
    logger.error("Request timed out")
except requests.exceptions.ConnectionError:
    logger.error("Connection error")
except requests.exceptions.HTTPError as e:
    logger.error(f"HTTP error: {e}")
except requests.exceptions.RequestException as e:
    logger.error(f"Request failed: {e}")

# ============ Session(连接复用) ============
# 推荐用于多次请求同一主机
session = requests.Session()
session.headers.update({"Authorization": "Bearer token"})

response = session.get("https://api.example.com/users")
response = session.get("https://api.example.com/posts")

session.close()

# 使用上下文管理器
with requests.Session() as session:
    response = session.get("https://api.example.com/users")

# ============ 重试机制 ============
def create_session_with_retry(
    retries:  int = 3,
    backoff_factor: float = 0.3,
    status_forcelist:  tuple = (500, 502, 503, 504)
) -> requests.Session:
    """创建带重试机制的 Session"""
    session = requests.Session()
    
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# ============ 文件上传/下载 ============
# 上传文件
with open("document.pdf", "rb") as f:
    response = requests.post(
        "https://api.example.com/upload",
        files={"file": ("document.pdf", f, "application/pdf")}
    )

# 下载文件(流式)
def download_file(url: str, path: str) -> None:
    """流式下载大文件"""
    with requests.get(url, stream=True) as response:
        response.raise_for_status()
        with open(path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

# ============ 封装 HTTP 客户端 ============
class APIClient:
    """通用 API 客户端"""
    
    def __init__(self, base_url: str, token: Optional[str] = None):
        self.base_url = base_url.rstrip("/")
        self.session = create_session_with_retry()
        
        if token: 
            self.session.headers["Authorization"] = f"Bearer {token}"
    
    def _request(
        self,
        method: str,
        endpoint: str,
        **kwargs
    ) -> dict[str, Any]: 
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        try:
            response = self.session.request(method, url, timeout=30, **kwargs)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error(f"API request failed: {e}")
            raise
    
    def get(self, endpoint:  str, params: dict = None) -> dict:
        return self._request("GET", endpoint, params=params)
    
    def post(self, endpoint: str, data: dict = None) -> dict:
        return self._request("POST", endpoint, json=data)
    
    def put(self, endpoint: str, data: dict = None) -> dict:
        return self._request("PUT", endpoint, json=data)
    
    def delete(self, endpoint: str) -> dict:
        return self._request("DELETE", endpoint)
    
    def close(self):
        self.session.close()
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        self.close()

# 使用
with APIClient("https://api.example.com", token="xxx") as client:
    users = client.get("/users", params={"page":  1})
    new_user = client.post("/users", data={"name": "Alice"})

2.2 aiohttp - 异步 HTTP 客户端

import aiohttp
import asyncio
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

# ============ 基础用法 ============
async def fetch(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# ============ 并发请求 ============
async def fetch_all(urls: List[str]) -> List[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def fetch_one(session:  aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

# ============ 封装异步 HTTP 客户端 ============
class AsyncAPIClient:
    """异步 API 客户端"""
    
    def __init__(
        self,
        base_url: str,
        token: str = None,
        max_concurrent: int = 10,
        timeout: int = 30
    ):
        self.base_url = base_url.rstrip("/")
        self.token = token
        self._semaphore = asyncio.Semaphore(max_concurrent)
        self._timeout = aiohttp.ClientTimeout(total=timeout)
        self._session:  aiohttp.ClientSession = None
    
    async def __aenter__(self):
        headers = {}
        if self.token:
            headers["Authorization"] = f"Bearer {self.token}"
        
        self._session = aiohttp.ClientSession(
            headers=headers,
            timeout=self._timeout
        )
        return self
    
    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()
    
    async def _request(
        self,
        method: str,
        endpoint: str,
        **kwargs
    ) -> Dict[str, Any]: 
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        async with self._semaphore:
            try: 
                async with self._session.request(method, url, **kwargs) as resp:
                    resp.raise_for_status()
                    return await resp.json()
            except aiohttp.ClientError as e:
                logger.error(f"Request failed: {e}")
                raise
    
    async def get(self, endpoint: str, params: dict = None) -> dict:
        return await self._request("GET", endpoint, params=params)
    
    async def post(self, endpoint: str, data: dict = None) -> dict:
        return await self._request("POST", endpoint, json=data)
    
    async def fetch_many(self, endpoints:  List[str]) -> List[dict]:
        """并发获取多个端点"""
        tasks = [self.get(ep) for ep in endpoints]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 使用示例
async def main():
    async with AsyncAPIClient("https://api.example.com", token="xxx") as client:
        # 单个请求
        user = await client.get("/users/1")
        
        # 并发请求
        endpoints = [f"/users/{i}" for i in range(1, 11)]
        users = await client.fetch_many(endpoints)
        
        print(f"Fetched {len(users)} users")

# asyncio.run(main())

2.3 Pydantic - 数据验证

from pydantic import (
    BaseModel, Field, validator, root_validator,
    EmailStr, HttpUrl, constr, conint, confloat,
    ValidationError
)
from typing import List, Optional
from datetime import datetime
from enum import Enum

# ============ 基础模型 ============
class User(BaseModel):
    id: int
    name: str
    email: EmailStr
    age: Optional[int] = None
    is_active: bool = True
    created_at: datetime = Field(default_factory=datetime.now)

# 自动验证和转换
user = User(
    id="123",  # 自动转换为 int
    name="Alice",
    email="alice@example.com"
)
print(user.id)  # 123 (int)
print(user.model_dump())  # 转换为字典
print(user.model_dump_json())  # 转换为 JSON 字符串

# 验证失败
try: 
    user = User(id="abc", name="Alice", email="invalid")
except ValidationError as e:
    print(e.json())

# ============ 字段约束 ============
class Product(BaseModel):
    name: constr(min_length=1, max_length=100)  # 字符串长度限制
    price:  confloat(gt=0)  # 大于 0 的浮点数
    quantity: conint(ge=0)  # 大于等于 0 的整数
    tags: List[str] = Field(default_factory=list, max_length=10)
    description: Optional[str] = Field(None, max_length=1000)

# ============ 自定义验证器 ============
class UserCreate(BaseModel):
    username: str
    password: str
    password_confirm: str
    email: EmailStr
    
    @validator("username")
    def username_alphanumeric(cls, v):
        if not v.isalnum():
            raise ValueError("Username must be alphanumeric")
        return v
    
    @validator("password")
    def password_strength(cls, v):
        if len(v) < 8:
            raise ValueError("Password must be at least 8 characters")
        if not any(c.isupper() for c in v):
            raise ValueError("Password must contain uppercase letter")
        if not any(c.isdigit() for c in v):
            raise ValueError("Password must contain digit")
        return v
    
    @root_validator
    def passwords_match(cls, values):
        pw = values.get("password")
        pw_confirm = values.get("password_confirm")
        if pw and pw_confirm and pw != pw_confirm:
            raise ValueError("Passwords do not match")
        return values

# ============ 嵌套模型 ============
class Address(BaseModel):
    street: str
    city: str
    country: str = "China"
    zip_code: Optional[str] = None

class Company(BaseModel):
    name: str
    address: Address
    employees: List[User] = []

company = Company(
    name="Tech Corp",
    address={"street": "123 Main St", "city": "Beijing"},
    employees=[
        {"id": 1, "name": "Alice", "email": "alice@example.com"}
    ]
)

# ============ 枚举和联合类型 ============
class Status(str, Enum):
    PENDING = "pending"
    ACTIVE = "active"
    INACTIVE = "inactive"

class Order(BaseModel):
    id: int
    status: Status
    amount: float | int  # 联合类型

# ============ 配置选项 ============
class StrictUser(BaseModel):
    model_config = {
        "str_strip_whitespace": True,  # 自动去除字符串空白
        "str_min_length":  1,  # 字符串最小长度
        "frozen":  True,  # 不可变
        "extra": "forbid",  # 禁止额外字段
    }
    
    name:  str
    email: EmailStr

# ============ 从 ORM 对象创建 ============
class UserORM(BaseModel):
    model_config = {"from_attributes": True}
    
    id: int
    name: str
    email: str

# 假设有 SQLAlchemy 模型
# user_orm = session.query(UserModel).first()
# user = UserORM.model_validate(user_orm)

2.4 SQLAlchemy - ORM 框架

from sqlalchemy import (
    create_engine, Column, Integer, String, DateTime,
    ForeignKey, Boolean, Text, Float, Index,
    select, update, delete, and_, or_, func
)
from sqlalchemy.orm import (
    declarative_base, relationship, sessionmaker,
    Session, selectinload, joinedload
)
from datetime import datetime
from typing import List, Optional
from contextlib import contextmanager

# ============ 数据库连接 ============
# SQLite
engine = create_engine("sqlite:///app.db", echo=True)

# MySQL
# engine = create_engine(
#     "mysql+pymysql://user:password@localhost: 3306/dbname",
#     pool_size=10,
#     max_overflow=20,
#     pool_recycle=3600
# )

# PostgreSQL
# engine = create_engine(
#     "postgresql://user:password@localhost:5432/dbname"
# )

Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)

# ============ 定义模型 ============
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, nullable=False, index=True)
    email = Column(String(100), unique=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # 关系
    posts = relationship("Post", back_populates="author", lazy="selectin")
    
    # 复合索引
    __table_args__ = (
        Index("idx_user_email_active", "email", "is_active"),
    )
    
    def __repr__(self):
        return f"<User(id={self.id}, username={self.username})>"

class Post(Base):
    __tablename__ = "posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    content = Column(Text)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # 关系
    author = relationship("User", back_populates="posts")
    
    def __repr__(self):
        return f"<Post(id={self.id}, title={self.title})>"

# 创建表
Base.metadata.create_all(engine)

# ============ Session 管理 ============
@contextmanager
def get_session():
    """会话上下文管理器"""
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally: 
        session.close()

# ============ CRUD 操作 ============
class UserRepository:
    """用户数据访问层"""
    
    def __init__(self, session:  Session):
        self.session = session
    
    def create(self, username: str, email:  str, password: str) -> User:
        user = User(
            username=username,
            email=email,
            hashed_password=password  # 实际应该加密
        )
        self.session.add(user)
        self.session.flush()  # 获取 ID
        return user
    
    def get_by_id(self, user_id: int) -> Optional[User]:
        return self.session.get(User, user_id)
    
    def get_by_username(self, username: str) -> Optional[User]: 
        stmt = select(User).where(User.username == username)
        return self.session.scalar(stmt)
    
    def get_all(
        self,
        skip: int = 0,
        limit: int = 100,
        is_active: bool = None
    ) -> List[User]:
        stmt = select(User)
        
        if is_active is not None: 
            stmt = stmt.where(User.is_active == is_active)
        
        stmt = stmt.offset(skip).limit(limit)
        return list(self.session.scalars(stmt))
    
    def update(self, user_id: int, **kwargs) -> Optional[User]:
        stmt = (
            update(User)
            .where(User.id == user_id)
            .values(**kwargs)
            .returning(User)
        )
        return self.session.scalar(stmt)
    
    def delete(self, user_id: int) -> bool:
        stmt = delete(User).where(User.id == user_id)
        result = self.session.execute(stmt)
        return result.rowcount > 0
    
    def search(self, keyword: str) -> List[User]:
        """搜索用户"""
        stmt = select(User).where(
            or_(
                User.username.ilike(f"%{keyword}%"),
                User.email.ilike(f"%{keyword}%")
            )
        )
        return list(self.session.scalars(stmt))

# ============ 复杂查询 ============
def complex_queries_example(session: Session):
    # 聚合查询
    stmt = select(
        User.is_active,
        func.count(User.id).label("count")
    ).group_by(User.is_active)
    
    results = session.execute(stmt).all()
    
    # JOIN 查询
    stmt = (
        select(User, Post)
        .join(Post, User.id == Post.author_id)
        .where(User.is_active == True)
    )
    
    # 预加载(避免 N+1 问题)
    stmt = (
        select(User)
        .options(selectinload(User.posts))
        .where(User.is_active == True)
    )
    users = session.scalars(stmt).all()
    
    # 子查询
    subquery = (
        select(func.count(Post.id))
        .where(Post.author_id == User.id)
        .correlate(User)
        .scalar_subquery()
    )
    
    stmt = select(User, subquery.label("post_count"))
    
    return session.execute(stmt).all()

# ============ 使用示例 ============
with get_session() as session:
    repo = UserRepository(session)
    
    # 创建用户
    user = repo.create("alice", "alice@example.com", "hashed_password")
    print(f"Created:  {user}")
    
    # 查询用户
    user = repo.get_by_username("alice")
    print(f"Found: {user}")
    
    # 更新用户
    repo.update(user.id, email="alice_new@example.com")
    
    # 搜索用户
    users = repo.search("alice")
    print(f"Search results: {users}")

2.5 Redis 操作(redis-py)

import redis
from redis import Redis
from typing import Optional, Any, List
import json
import pickle
from datetime import timedelta
from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)

# ============ 连接 Redis ============
# 单机连接
redis_client = Redis(
    host="localhost",
    port=6379,
    db=0,
    password=None,
    decode_responses=True,  # 自动解码为字符串
    socket_timeout=5,
    socket_connect_timeout=5
)

# 连接池
pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True
)
redis_client = Redis(connection_pool=pool)

# ============ 基础操作 ============
# 字符串
redis_client.set("key", "value")
redis_client.set("key", "value", ex=3600)  # 过期时间(秒)
redis_client.setex("key", 3600, "value")  # 同上
redis_client.setnx("key", "value")  # 不存在时才设置

value = redis_client.get("key")
redis_client.delete("key")
redis_client.exists("key")  # 检查是否存在
redis_client.expire("key", 3600)  # 设置过期时间
redis_client.ttl("key")  # 获取剩余过期时间

# 批量操作
redis_client.mset({"k1": "v1", "k2":  "v2"})
values = redis_client.mget(["k1", "k2"])

# 计数器
redis_client.incr("counter")
redis_client.incrby("counter", 10)
redis_client.decr("counter")

# ============ Hash ============
redis_client.hset("user:1", "name", "Alice")
redis_client.hset("user:1", mapping={"age": 30, "city": "Beijing"})
redis_client.hget("user:1", "name")
redis_client.hgetall("user:1")  # 获取所有字段
redis_client.hdel("user:1", "city")
redis_client.hincrby("user: 1", "age", 1)

# ============ List ============
redis_client.lpush("queue", "item1", "item2")  # 左边插入
redis_client.rpush("queue", "item3")  # 右边插入
redis_client.lpop("queue")  # 左边弹出
redis_client.rpop("queue")  # 右边弹出
redis_client.lrange("queue", 0, -1)  # 获取所有元素
redis_client.llen("queue")  # 长度

# 阻塞弹出(消息队列)
redis_client.blpop("queue", timeout=5)

# ============ Set ============
redis_client.sadd("tags", "python", "redis", "database")
redis_client.smembers("tags")  # 所有成员
redis_client.sismember("tags", "python")  # 是否存在
redis_client.srem("tags", "database")  # 移除
redis_client.sinter("tags1", "tags2")  # 交集
redis_client.sunion("tags1", "tags2")  # 并集

# ============ Sorted Set ============
redis_client.zadd("leaderboard", {"alice": 100, "bob": 85, "charlie": 92})
redis_client.zrange("leaderboard", 0, -1, withscores=True)  # 按分数升序
redis_client.zrevrange("leaderboard", 0, 2, withscores=True)  # 前3名
redis_client.zscore("leaderboard", "alice")  # 获取分数
redis_client.zincrby("leaderboard", 10, "bob")  # 增加分数
redis_client.zrank("leaderboard", "alice")  # 排名(从0开始)

# ============ 封装 Redis 缓存 ============
class RedisCache:
    """Redis 缓存封装"""
    
    def __init__(self, client: Redis, prefix: str = "cache"):
        self.client = client
        self.prefix = prefix
    
    def _make_key(self, key: str) -> str:
        return f"{self.prefix}:{key}"
    
    def get(self, key:  str, default:  Any = None) -> Any:
        """获取缓存"""
        value = self.client.get(self._make_key(key))
        if value is None:
            return default
        try:
            return json.loads(value)
        except json.JSONDecodeError: 
            return value
    
    def set(
        self,
        key: str,
        value:  Any,
        expire: int = None
    ) -> bool:
        """设置缓存"""
        key = self._make_key(key)
        if isinstance(value, (dict, list)):
            value = json.dumps(value, ensure_ascii=False)
        return self.client.set(key, value, ex=expire)
    
    def delete(self, key:  str) -> bool:
        """删除缓存"""
        return bool(self.client.delete(self._make_key(key)))
    
    def get_or_set(
        self,
        key: str,
        factory,
        expire: int = 3600
    ) -> Any:
        """获取缓存,不存在则设置"""
        value = self.get(key)
        if value is None: 
            value = factory() if callable(factory) else factory
            self.set(key, value, expire)
        return value
    
    def invalidate_pattern(self, pattern:  str) -> int:
        """删除匹配模式的所有 key"""
        pattern = self._make_key(pattern)
        keys = self.client.keys(pattern)
        if keys:
            return self.client.delete(*keys)
        return 0

# ============ 分布式锁 ============
class RedisLock:
    """Redis 分布式锁"""
    
    def __init__(
        self,
        client: Redis,
        name: str,
        timeout: int = 10,
        blocking: bool = True,
        blocking_timeout: int = None
    ):
        self.client = client
        self.name = f"lock:{name}"
        self.timeout = timeout
        self.blocking = blocking
        self.blocking_timeout = blocking_timeout
        self._token = None
    
    def acquire(self) -> bool:
        """获取锁"""
        import uuid
        token = str(uuid.uuid4())
        
        if self.blocking:
            stop_time = None
            if self.blocking_timeout: 
                import time
                stop_time = time.time() + self.blocking_timeout
            
            while True: 
                if self.client.set(self.name, token, nx=True, ex=self.timeout):
                    self._token = token
                    return True
                
                if stop_time and time.time() >= stop_time:
                    return False
                
                import time
                time.sleep(0.1)
        else:
            if self.client.set(self.name, token, nx=True, ex=self.timeout):
                self._token = token
                return True
            return False
    
    def release(self) -> bool:
        """释放锁"""
        if self._token is None:
            return False
        
        # 使用 Lua 脚本确保原子性
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.client.eval(script, 1, self.name, self._token)
        self._token = None
        return bool(result)
    
    def __enter__(self):
        if not self.acquire():
            raise TimeoutError("Could not acquire lock")
        return self
    
    def __exit__(self, *args):
        self.release()

# 使用
with RedisLock(redis_client, "my_resource", timeout=30):
    # 执行需要互斥的操作
    pass

2.6 Celery - 异步任务队列

from celery import Celery, Task
from celery.schedules import crontab
from typing import Any
import logging

logger = logging.getLogger(__name__)

# ============ 创建 Celery 应用 ============
app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

# 配置
app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 任务超时时间(秒)
    worker_prefetch_multiplier=1,  # 预取任务数
    task_acks_late=True,  # 任务完成后才确认
    task_reject_on_worker_lost=True,  # worker 丢失时拒绝任务
)

# ============ 定义任务 ============
@app.task(bind=True, max_retries=3)
def send_email(self, to:  str, subject: str, body: str) -> dict:
    """发送邮件任务"""
    try:
        logger.info(f"Sending email to {to}")
        # 实际发送邮件的代码
        return {"status": "sent", "to": to}
    except Exception as e:
        logger.error(f"Failed to send email:  {e}")
        # 重试,指数退避
        raise self.retry(exc=e, countdown=2 ** self.request.retries)

@app.task
def process_image(image_path: str, operations: list) -> str:
    """处理图片任务"""
    logger.info(f"Processing image: {image_path}")
    # 处理图片
    return f"processed_{image_path}"

@app.task
def long_running_task(data: list) -> int:
    """长时间运行的任务"""
    result = 0
    for item in data:
        result += item ** 2
    return result

# ============ 任务链和组 ============
from celery import chain, group, chord

# 链式调用:按顺序执行
workflow = chain(
    process_image.s("image.jpg", ["resize"]),
    send_email.s("user@example.com", "Image Ready", "Your image is processed")
)
result = workflow.apply_async()

# 并行执行
job = group([
    process_image.s(f"image_{i}.jpg", ["resize"])
    for i in range(10)
])
result = job.apply_async()

# chord:并行执行后汇总
callback = send_email.s("admin@example.com", "All Done", "")
workflow = chord(
    [process_image.s(f"image_{i}.jpg", ["resize"]) for i in range(10)],
    callback
)

# ============ 自定义任务基类 ============
class BaseTask(Task):
    """自定义任务基类"""
    
    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f"Task {task_id} succeeded with result: {retval}")
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.error(f"Task {task_id} failed: {exc}")
    
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        logger.warning(f"Task {task_id} retrying: {exc}")

@app.task(base=BaseTask, bind=True)
def my_task(self, x: int, y:  int) -> int:
    return x + y

# ============ 定时任务 ============
app.conf.beat_schedule = {
    # 每分钟执行
    "send-report-every-minute": {
        "task": "tasks.send_report",
        "schedule":  60.0,
    },
    # 每天早上 8 点执行
    "morning-task": {
        "task": "tasks.daily_task",
        "schedule": crontab(hour=8, minute=0),
    },
    # 每周一早上 9 点执行
    "weekly-cleanup": {
        "task": "tasks.weekly_cleanup",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
    },
}

# ============ 调用任务 ============
# 异步调用
result = send_email.delay("user@example.com", "Hello", "World")

# 带参数调用
result = send_email.apply_async(
    args=["user@example.com", "Hello", "World"],
    countdown=60,  # 60 秒后执行
    expires=3600,  # 1 小时后过期
)

# 获取结果
print(result.id)  # 任务 ID
print(result.status)  # 任务状态
print(result.ready())  # 是否完成
print(result.get(timeout=10))  # 获取结果(阻塞)

# ============ 监控任务 ============
from celery.result import AsyncResult

def get_task_status(task_id: str) -> dict:
    """获取任务状态"""
    result = AsyncResult(task_id, app=app)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
        "traceback": result.traceback if result.failed() else None,
    }

2.7 pytest - 测试框架

import pytest
from typing import Generator
from unittest.mock import Mock, patch, MagicMock
import asyncio

# ============ 基础测试 ============
def add(a: int, b: int) -> int:
    return a + b

def test_add():
    assert add(1, 2) == 3
    assert add(-1, 1) == 0
    assert add(0, 0) == 0

# 测试异常
def divide(a: int, b: int) -> float:
    if b == 0:
        raise ValueError("Cannot divide by zero")
    return a / b

def test_divide_by_zero():
    with pytest.raises(ValueError) as exc_info: 
        divide(1, 0)
    assert "Cannot divide by zero" in str(exc_info.value)

# ============ Fixtures(固定装置) ============
@pytest.fixture
def sample_data() -> dict:
    """提供测试数据"""
    return {"name": "Alice", "age": 30}

@pytest.fixture
def db_connection():
    """数据库连接 fixture"""
    conn = create_connection()
    yield conn  # 测试使用这个连接
    conn.close()  # 测试结束后清理

def test_with_fixture(sample_data):
    assert sample_data["name"] == "Alice"

# Fixture 作用域
@pytest.fixture(scope="module")
def expensive_resource():
    """模块级别 fixture,整个模块只创建一次"""
    resource = create_expensive_resource()
    yield resource
    resource.cleanup()

# ============ 参数化测试 ============
@pytest.mark.parametrize("a,b,expected", [
    (1, 2, 3),
    (-1, 1, 0),
    (0, 0, 0),
    (100, 200, 300),
])
def test_add_parametrized(a, b, expected):
    assert add(a, b) == expected

@pytest.mark.parametrize("input_str,expected", [
    ("hello", "HELLO"),
    ("World", "WORLD"),
    ("", ""),
])
def test_upper(input_str, expected):
    assert input_str.upper() == expected

# ============ Mock 和 Patch ============
class UserService:
    def __init__(self, api_client):
        self.api_client = api_client
    
    def get_user(self, user_id: int) -> dict:
        return self.api_client.get(f"/users/{user_id}")

def test_user_service_with_mock():
    # 创建 Mock 对象
    mock_client = Mock()
    mock_client.get.return_value = {"id": 1, "name": "Alice"}
    
    service = UserService(mock_client)
    result = service.get_user(1)
    
    assert result["name"] == "Alice"
    mock_client.get.assert_called_once_with("/users/1")

# 使用 patch 装饰器
@patch("module.external_api_call")
def test_with_patch(mock_api):
    mock_api.return_value = {"status": "success"}
    
    result = function_that_calls_api()
    
    assert result["status"] == "success"
    mock_api.assert_called_once()

# patch 上下文管理器
def test_with_patch_context():
    with patch("module.get_current_time") as mock_time:
        mock_time.return_value = "2024-01-01 00:00:00"
        result = function_using_time()
        assert "2024-01-01" in result

# ============ 异步测试 ============
@pytest.mark.asyncio
async def test_async_function():
    result = await async_fetch_data()
    assert result is not None

@pytest.fixture
async def async_client():
    """异步 fixture"""
    client = await create_async_client()
    yield client
    await client.close()

# ============ 测试类 ============
class TestUserService: 
    @pytest.fixture(autouse=True)
    def setup(self):
        """每个测试方法前执行"""
        self.service = UserService(Mock())
    
    def test_get_user(self):
        self.service.api_client.get.return_value = {"id": 1}
        result = self.service.get_user(1)
        assert result["id"] == 1
    
    def test_get_user_not_found(self):
        self.service.api_client.get.return_value = None
        result = self.service.get_user(999)
        assert result is None

# ============ 标记和跳过 ============
@pytest.mark.slow
def test_slow_operation():
    """标记为慢速测试"""
    pass

@pytest.mark.skip(reason="Not implemented yet")
def test_not_ready():
    pass

@pytest.mark.skipif(
    condition=True,
    reason="Skip on this condition"
)
def test_conditional_skip():
    pass

@pytest.mark.xfail(reason="Known bug")
def test_expected_failure():
    assert False  # 预期失败

# ============ conftest.py(共享 fixtures) ============
# conftest.py
"""
@pytest.fixture(scope="session")
def app():
    return create_app("testing")

@pytest.fixture(scope="session")
def client(app):
    return app.test_client()

@pytest.fixture(autouse=True)
def reset_db(app):
    with app.app_context():
        db.drop_all()
        db.create_all()
        yield
        db.session.rollback()
"""

# ============ 运行测试 ============
"""
# 运行所有测试
pytest

# 运行特定文件
pytest test_module.py

# 运行特定测试
pytest test_module.py::test_function

# 显示详细输出
pytest -v

# 显示 print 输出
pytest -s

# 只运行标记的测试
pytest -m slow

# 停止在第一个失败
pytest -x

# 并行运行(需要 pytest-xdist)
pytest -n auto

# 生成覆盖率报告(需要 pytest-cov)
pytest --cov=mypackage --cov-report=html
"""

三、项目常用工具库汇总

3.1 按场景分类

"""
============ Web 开发 ============
- FastAPI: 现代异步 Web 框架
- Django: 全功能 Web 框架
- Flask: 轻量级 Web 框架
- Starlette:  ASGI 框架
- uvicorn:  ASGI 服务器

============ 数据验证 ============
- Pydantic: 数据验证和序列化
- marshmallow: 序列化/反序列化
- cerberus: 轻量级验证

============ 数据库 ============
- SQLAlchemy:  ORM 框架
- databases: 异步数据库
- asyncpg: 异步 PostgreSQL
- pymysql: MySQL 驱动
- redis-py: Redis 客户端
- motor: 异步 MongoDB

============ HTTP 客户端 ============
- requests: 同步 HTTP
- aiohttp: 异步 HTTP
- httpx: 同步/异步 HTTP

============ 任务队列 ============
- Celery: 分布式任务队列
- rq: 简单任务队列
- dramatiq: 任务队列

============ 测试 ============
- pytest: 测试框架
- pytest-asyncio: 异步测试
- pytest-cov: 覆盖率
- faker: 假数据生成
- factory_boy: 测试工厂

============ 日志监控 ============
- loguru: 更好的日志
- sentry-sdk: 错误追踪
- prometheus-client: 指标

============ CLI 工具 ============
- click:  命令行工具
- typer: 现代 CLI
- rich: 终端美化

============ 数据处理 ============
- pandas:  数据分析
- numpy: 数值计算
- polars: 高性能 DataFrame

============ 配置管理 ============
- python-dotenv: 环境变量
- pydantic-settings: 配置验证
- dynaconf: 动态配置

============ 安全 ============
- passlib: 密码哈希
- python-jose: JWT
- cryptography: 加密
"""

📝 Part 6 总结

面试高频考点

知识点 面试频率 难度
collections 模块 ⭐⭐⭐⭐⭐ ⭐⭐
itertools 模块 ⭐⭐⭐ ⭐⭐⭐
functools 装饰器 ⭐⭐⭐⭐ ⭐⭐⭐
datetime 处理 ⭐⭐⭐⭐ ⭐⭐
json 序列化 ⭐⭐⭐⭐ ⭐⭐
logging 配置 ⭐⭐⭐ ⭐⭐
正则表达式 ⭐⭐⭐⭐ ⭐⭐⭐
requests 使用 ⭐⭐⭐⭐⭐ ⭐⭐
SQLAlchemy ORM ⭐⭐⭐⭐ ⭐⭐⭐
Redis 操作 ⭐⭐⭐⭐⭐ ⭐⭐⭐
pytest 测试 ⭐⭐⭐⭐ ⭐⭐

面试常见问题

  1. defaultdict 和普通 dict 有什么区别?
  2. lru_cache 的原理是什么?maxsize 设置多少合适?
  3. 如何处理 Python 中的时区问题?
  4. 如何自定义 JSON 序列化(处理 datetime 等类型)?
  5. 正则表达式中 match 和 search 的区别?
  6. 如何配置 logging 输出到文件和控制台?
  7. requests 和 aiohttp 的区别?什么时候用哪个?
  8. SQLAlchemy 中如何避免 N+1 查询问题?
  9. Redis 有哪些数据结构?各适合什么场景?
  10. 如何实现 Redis 分布式锁?
  11. pytest 中 fixture 的作用域有哪些?
  12. 如何 mock 外部 API 调用进行测试?

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐