Python面试手册AI版——Part6
Python面试手册AI版——Part6 常用标准库 & 第三方库
·
Part 6: 常用标准库 & 第三方库
一、常用标准库
1. 1 collections - 高性能容器
from collections import (
namedtuple, deque, Counter, OrderedDict,
defaultdict, ChainMap
)
# ============ namedtuple:具名元组 ============
# 比普通元组更有可读性,比类更轻量
Point = namedtuple('Point', ['x', 'y'])
p = Point(11, 22)
print(p.x, p.y) # 11 22
print(p[0], p[1]) # 11 22 (也支持索引)
# 转换为字典
print(p._asdict()) # {'x': 11, 'y': 22}
# 创建新实例(替换某些字段)
p2 = p._replace(x=100)
print(p2) # Point(x=100, y=22)
# 带默认值 (Python 3.7+)
Point3D = namedtuple('Point3D', ['x', 'y', 'z'], defaults=[0])
print(Point3D(1, 2)) # Point3D(x=1, y=2, z=0)
# ============ deque:双端队列 ============
# 两端操作都是 O(1),比 list 更适合队列场景
dq = deque([1, 2, 3], maxlen=5) # 可以设置最大长度
dq.append(4) # 右端添加
dq.appendleft(0) # 左端添加
dq.pop() # 右端弹出
dq.popleft() # 左端弹出
dq.rotate(1) # 向右旋转
dq.rotate(-1) # 向左旋转
# 实际应用:固定大小的滑动窗口
def moving_average(iterable, n=3):
"""计算移动平均值"""
window = deque(maxlen=n)
for x in iterable:
window.append(x)
if len(window) == n:
yield sum(window) / n
print(list(moving_average([1, 2, 3, 4, 5, 6, 7], 3)))
# [2.0, 3.0, 4.0, 5.0, 6.0]
# ============ Counter:计数器 ============
# 统计元素出现次数
cnt = Counter(['a', 'b', 'c', 'a', 'b', 'a'])
print(cnt) # Counter({'a': 3, 'b': 2, 'c': 1})
# 常用操作
print(cnt.most_common(2)) # [('a', 3), ('b', 2)]
print(cnt['a']) # 3
print(cnt['z']) # 0 (不存在返回 0,不报错)
# 更新计数
cnt.update(['a', 'b', 'd'])
print(cnt) # Counter({'a': 4, 'b': 3, 'c': 1, 'd': 1})
# 数学运算
cnt1 = Counter(a=3, b=1)
cnt2 = Counter(a=1, b=2)
print(cnt1 + cnt2) # Counter({'a': 4, 'b': 3})
print(cnt1 - cnt2) # Counter({'a': 2})
print(cnt1 & cnt2) # Counter({'a': 1, 'b': 1}) 交集(取最小)
print(cnt1 | cnt2) # Counter({'a': 3, 'b': 2}) 并集(取最大)
# 实际应用:词频统计
text = "the quick brown fox jumps over the lazy dog"
word_counts = Counter(text.split())
print(word_counts.most_common(3))
# ============ defaultdict:带默认值的字典 ============
# 访问不存在的 key 时自动创建默认值
dd = defaultdict(list)
dd['a'].append(1) # 不需要先初始化
dd['a'].append(2)
dd['b'].append(3)
print(dict(dd)) # {'a': [1, 2], 'b': [3]}
# 常用默认工厂
dd_int = defaultdict(int) # 默认值 0
dd_set = defaultdict(set) # 默认值 set()
dd_lambda = defaultdict(lambda: 'N/A') # 自定义默认值
# 实际应用:分组
students = [
('Alice', 'Math'),
('Bob', 'Physics'),
('Charlie', 'Math'),
('David', 'Physics'),
]
by_subject = defaultdict(list)
for name, subject in students:
by_subject[subject].append(name)
print(dict(by_subject))
# {'Math': ['Alice', 'Charlie'], 'Physics': ['Bob', 'David']}
# ============ OrderedDict:有序字典 ============
# Python 3.7+ dict 已有序,但 OrderedDict 有额外功能
od = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3
od.move_to_end('a') # 移到末尾
od.move_to_end('c', last=False) # 移到开头
print(list(od.keys())) # ['c', 'b', 'a']
# 可以用于实现 LRU 缓存
class LRUCache(OrderedDict):
def __init__(self, capacity: int):
super().__init__()
self.capacity = capacity
def get(self, key):
if key not in self:
return -1
self.move_to_end(key)
return self[key]
def put(self, key, value):
if key in self:
self.move_to_end(key)
self[key] = value
if len(self) > self.capacity:
self.popitem(last=False)
# ============ ChainMap:链式字典 ============
# 将多个字典作为一个视图
defaults = {'theme': 'dark', 'language': 'en'}
user_settings = {'theme': 'light'}
settings = ChainMap(user_settings, defaults)
print(settings['theme']) # 'light' (user_settings 优先)
print(settings['language']) # 'en' (从 defaults 获取)
1.2 itertools - 迭代器工具
from itertools import (
count, cycle, repeat, # 无限迭代器
chain, zip_longest, product, permutations, combinations, # 排列组合
groupby, filterfalse, takewhile, dropwhile, # 过滤分组
accumulate, starmap, # 累积映射
islice, tee # 切片复制
)
# ============ 无限迭代器 ============
# count:从 n 开始无限计数
for i in count(10, 2): # 10, 12, 14, ...
if i > 20:
break
print(i)
# cycle:无限循环
colors = cycle(['red', 'green', 'blue'])
for _ in range(7):
print(next(colors)) # red, green, blue, red, green, blue, red
# repeat:重复
list(repeat('hello', 3)) # ['hello', 'hello', 'hello']
# ============ 排列组合 ============
# chain:连接多个迭代器
list(chain([1, 2], [3, 4], [5])) # [1, 2, 3, 4, 5]
# chain.from_iterable:展平嵌套
list(chain.from_iterable([[1, 2], [3, 4]])) # [1, 2, 3, 4]
# zip_longest:最长匹配
list(zip_longest([1, 2, 3], ['a', 'b'], fillvalue='-'))
# [(1, 'a'), (2, 'b'), (3, '-')]
# product:笛卡尔积
list(product([1, 2], ['a', 'b']))
# [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
list(product([0, 1], repeat=3)) # 等价于 product([0,1], [0,1], [0,1])
# [(0,0,0), (0,0,1), (0,1,0), (0,1,1), (1,0,0), (1,0,1), (1,1,0), (1,1,1)]
# permutations:排列
list(permutations([1, 2, 3], 2))
# [(1,2), (1,3), (2,1), (2,3), (3,1), (3,2)]
# combinations:组合(不重复)
list(combinations([1, 2, 3, 4], 2))
# [(1,2), (1,3), (1,4), (2,3), (2,4), (3,4)]
# combinations_with_replacement:可重复组合
from itertools import combinations_with_replacement
list(combinations_with_replacement([1, 2, 3], 2))
# [(1,1), (1,2), (1,3), (2,2), (2,3), (3,3)]
# ============ 过滤分组 ============
# groupby:分组(需要先排序!)
data = [
{'name': 'Alice', 'dept': 'IT'},
{'name': 'Bob', 'dept': 'HR'},
{'name': 'Charlie', 'dept': 'IT'},
]
data.sort(key=lambda x: x['dept']) # 必须先排序
for dept, group in groupby(data, key=lambda x: x['dept']):
print(f"{dept}: {list(group)}")
# filterfalse:过滤假值
list(filterfalse(lambda x: x % 2, range(10))) # [0, 2, 4, 6, 8]
# takewhile / dropwhile
list(takewhile(lambda x: x < 5, [1, 3, 5, 2, 1])) # [1, 3]
list(dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1])) # [5, 2, 1]
# ============ 累积映射 ============
# accumulate:累积
list(accumulate([1, 2, 3, 4, 5])) # [1, 3, 6, 10, 15]
list(accumulate([1, 2, 3, 4, 5], lambda x, y: x * y)) # [1, 2, 6, 24, 120]
# starmap:展开参数的 map
list(starmap(pow, [(2, 3), (3, 2), (10, 3)])) # [8, 9, 1000]
# ============ 切片复制 ============
# islice:迭代器切片
list(islice(count(), 5, 10)) # [5, 6, 7, 8, 9]
# tee:复制迭代器
it = iter([1, 2, 3, 4, 5])
it1, it2 = tee(it, 2)
print(list(it1)) # [1, 2, 3, 4, 5]
print(list(it2)) # [1, 2, 3, 4, 5]
# ============ 实际应用 ============
# 1. 分块处理
def chunked(iterable, n):
"""将迭代器按 n 个一组分块"""
it = iter(iterable)
while chunk := list(islice(it, n)):
yield chunk
list(chunked(range(10), 3)) # [[0,1,2], [3,4,5], [6,7,8], [9]]
# 2. 滑动窗口
def sliding_window(iterable, n):
"""滑动窗口"""
it = iter(iterable)
window = deque(islice(it, n), maxlen=n)
if len(window) == n:
yield tuple(window)
for x in it:
window.append(x)
yield tuple(window)
list(sliding_window([1, 2, 3, 4, 5], 3))
# [(1, 2, 3), (2, 3, 4), (3, 4, 5)]
1.3 functools - 函数工具
from functools import (
reduce, partial, wraps, lru_cache, cache,
singledispatch, total_ordering, cached_property
)
# ============ reduce:归约 ============
from functools import reduce
from operator import add, mul
# 求和
reduce(add, [1, 2, 3, 4, 5]) # 15
# 阶乘
reduce(mul, range(1, 6)) # 120
# 带初始值
reduce(add, [], 0) # 0 (空列表不会报错)
# ============ partial:偏函数 ============
def power(base, exponent):
return base ** exponent
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
print(square(5)) # 25
print(cube(5)) # 125
# 实际应用:简化回调
import json
pretty_print = partial(json.dumps, indent=2, ensure_ascii=False)
# ============ lru_cache:LRU 缓存 ============
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(100)) # 瞬间完成
# 查看缓存信息
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)
# 清除缓存
fibonacci.cache_clear()
# Python 3.9+:无限缓存
@cache # 等价于 @lru_cache(maxsize=None)
def factorial(n):
return n * factorial(n - 1) if n else 1
# ============ cached_property:缓存属性 (Python 3.8+) ============
from functools import cached_property
class DataProcessor:
def __init__(self, data):
self.data = data
@cached_property
def processed(self):
"""只计算一次,结果被缓存"""
print("Processing...")
return [x ** 2 for x in self.data]
dp = DataProcessor([1, 2, 3, 4, 5])
print(dp.processed) # Processing... [1, 4, 9, 16, 25]
print(dp.processed) # [1, 4, 9, 16, 25] (不再打印 Processing)
# ============ singledispatch:单分派泛函数 ============
@singledispatch
def process(arg):
"""默认实现"""
print(f"Default: {arg}")
@process.register(int)
def _(arg):
print(f"Integer: {arg * 2}")
@process.register(str)
def _(arg):
print(f"String: {arg.upper()}")
@process.register(list)
def _(arg):
print(f"List with {len(arg)} items")
process(10) # Integer: 20
process("hello") # String: HELLO
process([1, 2, 3]) # List with 3 items
process(3.14) # Default: 3.14
# ============ total_ordering:自动补全比较方法 ============
@total_ordering
class Student:
def __init__(self, name, grade):
self.name = name
self.grade = grade
def __eq__(self, other):
return self.grade == other.grade
def __lt__(self, other):
return self.grade < other.grade
# 自动生成 __le__, __gt__, __ge__
s1 = Student("Alice", 85)
s2 = Student("Bob", 90)
print(s1 < s2) # True
print(s1 <= s2) # True (自动生成)
print(s1 >= s2) # False (自动生成)
1.4 datetime - 日期时间
from datetime import datetime, date, time, timedelta, timezone
from zoneinfo import ZoneInfo # Python 3.9+
# ============ 获取当前时间 ============
now = datetime.now() # 本地时间
utc_now = datetime.utcnow() # UTC 时间(已废弃,用下面的)
utc_now = datetime.now(timezone.utc) # 推荐
today = date.today()
current_time = datetime.now().time()
# ============ 创建日期时间 ============
dt = datetime(2024, 12, 25, 10, 30, 0)
d = date(2024, 12, 25)
t = time(10, 30, 0)
# ============ 字符串解析与格式化 ============
# 解析
dt = datetime.strptime("2024-12-25 10:30:00", "%Y-%m-%d %H:%M:%S")
dt = datetime.fromisoformat("2024-12-25T10:30:00") # ISO 格式
# 格式化
dt.strftime("%Y年%m月%d日 %H时%M分%S秒")
dt.isoformat() # "2024-12-25T10:30:00"
# 常用格式符
"""
%Y - 四位年份 (2024)
%m - 两位月份 (01-12)
%d - 两位日期 (01-31)
%H - 24小时制小时 (00-23)
%I - 12小时制小时 (01-12)
%M - 分钟 (00-59)
%S - 秒 (00-59)
%f - 微秒 (000000-999999)
%a - 星期缩写 (Mon)
%A - 星期全称 (Monday)
%b - 月份缩写 (Jan)
%B - 月份全称 (January)
%p - AM/PM
%Z - 时区名称
%z - UTC 偏移 (+0800)
"""
# ============ 时间运算 ============
now = datetime.now()
# timedelta:时间差
delta = timedelta(days=7, hours=3, minutes=30)
future = now + delta
past = now - delta
# 两个时间的差
diff = datetime(2025, 1, 1) - now
print(diff.days) # 天数
print(diff.total_seconds()) # 总秒数
# ============ 时区处理 ============
# Python 3.9+ 使用 zoneinfo
from zoneinfo import ZoneInfo
# 创建带时区的时间
dt_utc = datetime(2024, 12, 25, 10, 0, 0, tzinfo=timezone.utc)
dt_shanghai = datetime(2024, 12, 25, 18, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai"))
# 时区转换
dt_shanghai = dt_utc.astimezone(ZoneInfo("Asia/Shanghai"))
print(dt_shanghai) # 2024-12-25 18:00:00+08:00
# 本地时间转 UTC
local_dt = datetime.now()
utc_dt = local_dt.astimezone(timezone.utc)
# ============ 实用函数 ============
# 获取月初/月末
from calendar import monthrange
def get_month_range(year: int, month: int):
"""获取某月的第一天和最后一天"""
first_day = date(year, month, 1)
_, last = monthrange(year, month)
last_day = date(year, month, last)
return first_day, last_day
# 获取本周一
def get_monday(d: date) -> date:
return d - timedelta(days=d.weekday())
# 日期范围生成器
def date_range(start: date, end: date):
"""生成日期范围"""
current = start
while current <= end:
yield current
current += timedelta(days=1)
1.5 json - JSON 处理
import json
from datetime import datetime, date
from dataclasses import dataclass, asdict
from enum import Enum
# ============ 基础用法 ============
data = {
"name": "Alice",
"age": 30,
"skills": ["Python", "JavaScript"],
"active": True,
"salary": None
}
# 序列化
json_str = json.dumps(data)
json_str = json.dumps(data, indent=2) # 格式化
json_str = json.dumps(data, ensure_ascii=False) # 支持中文
# 反序列化
data = json.loads(json_str)
# 文件读写
with open("data.json", "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
with open("data.json", "r", encoding="utf-8") as f:
data = json.load(f)
# ============ 自定义序列化 ============
# 方式1:default 参数
def custom_encoder(obj):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, date):
return obj.isoformat()
if isinstance(obj, set):
return list(obj)
if isinstance(obj, Enum):
return obj.value
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
data = {
"created_at": datetime.now(),
"tags": {"python", "programming"},
}
json.dumps(data, default=custom_encoder)
# 方式2:自定义 JSONEncoder
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return {"__datetime__": obj.isoformat()}
if isinstance(obj, date):
return {"__date__": obj.isoformat()}
return super().default(obj)
json.dumps(data, cls=CustomJSONEncoder)
# ============ 自定义反序列化 ============
def custom_decoder(dct):
if "__datetime__" in dct:
return datetime.fromisoformat(dct["__datetime__"])
if "__date__" in dct:
return date.fromisoformat(dct["__date__"])
return dct
json.loads(json_str, object_hook=custom_decoder)
# ============ dataclass 序列化 ============
@dataclass
class User:
name: str
age: int
email: str
user = User("Alice", 30, "alice@example.com")
json.dumps(asdict(user)) # {"name": "Alice", "age": 30, "email": "... "}
1.6 logging - 日志处理
import logging
import logging.handlers
from pathlib import Path
# ============ 基础配置 ============
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
logger.info("Application started")
logger.warning("This is a warning")
logger.error("An error occurred")
# ============ 日志级别 ============
"""
DEBUG (10) - 详细调试信息
INFO (20) - 常规信息
WARNING (30) - 警告信息
ERROR (40) - 错误信息
CRITICAL (50) - 严重错误
"""
# ============ 高级配置 ============
def setup_logging(
log_file: str = "app.log",
level: int = logging.INFO
):
"""配置日志系统"""
# 创建 logger
logger = logging.getLogger()
logger.setLevel(level)
# 格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - '
'%(filename)s:%(lineno)d - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件处理器(按大小轮转)
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 按时间轮转的处理器
time_handler = logging.handlers.TimedRotatingFileHandler(
"app_daily.log",
when="midnight",
interval=1,
backupCount=30,
encoding='utf-8'
)
time_handler.setFormatter(formatter)
logger.addHandler(time_handler)
return logger
# ============ 使用字典配置 ============
import logging.config
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
},
'json': {
'class': 'pythonjsonlogger.jsonlogger.JsonFormatter',
'format': '%(asctime)s %(name)s %(levelname)s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'standard',
'stream': 'ext://sys.stdout'
},
'file': {
'class': 'logging.handlers.RotatingFileHandler',
'level': 'DEBUG',
'formatter': 'standard',
'filename': 'app.log',
'maxBytes': 10485760,
'backupCount': 5
}
},
'loggers': {
'': { # root logger
'handlers': ['console', 'file'],
'level': 'DEBUG',
'propagate': True
}
}
}
logging.config.dictConfig(LOGGING_CONFIG)
# ============ 结构化日志 ============
logger = logging.getLogger(__name__)
# 使用 extra 添加额外字段
logger.info(
"User logged in",
extra={"user_id": 123, "ip": "192.168.1.1"}
)
# 或者使用格式化
logger.info("User %s logged in from %s", "alice", "192.168.1.1")
1.7 pathlib - 路径处理
from pathlib import Path
# ============ 创建路径 ============
p = Path("/home/user/documents")
p = Path.home() # 用户主目录
p = Path.cwd() # 当前工作目录
p = Path(__file__) # 当前脚本路径
# ============ 路径操作 ============
p = Path("/home/user/documents/file.txt")
print(p.name) # "file.txt"
print(p.stem) # "file"
print(p.suffix) # ".txt"
print(p.suffixes) # [".txt"] (多后缀时 [".tar", ".gz"])
print(p.parent) # Path("/home/user/documents")
print(p.parents[0]) # Path("/home/user/documents")
print(p.parents[1]) # Path("/home/user")
print(p.parts) # ('/', 'home', 'user', 'documents', 'file.txt')
# 路径拼接
new_path = p.parent / "subdir" / "new_file.txt"
# 修改路径
p.with_name("other.txt") # 改文件名
p.with_suffix(".md") # 改后缀
p.with_stem("other") # 改文件名(保留后缀,Python 3.9+)
# ============ 路径判断 ============
p.exists() # 是否存在
p.is_file() # 是否是文件
p.is_dir() # 是否是目录
p.is_symlink() # 是否是符号链接
p.is_absolute() # 是否是绝对路径
# ============ 文件操作 ============
# 读写文本
content = p.read_text(encoding="utf-8")
p.write_text("Hello World", encoding="utf-8")
# 读写二进制
data = p.read_bytes()
p.write_bytes(b"binary data")
# 创建目录
p.mkdir(parents=True, exist_ok=True)
# 删除
p.unlink() # 删除文件
p.rmdir() # 删除空目录
# 重命名/移动
p.rename(new_path)
p.replace(new_path) # 如果目标存在则覆盖
# ============ 遍历目录 ============
# 列出目录内容
for item in Path(".").iterdir():
print(item)
# glob 模式匹配
for py_file in Path(".").glob("*.py"):
print(py_file)
# 递归匹配
for py_file in Path(".").rglob("*.py"):
print(py_file)
# ============ 实用函数 ============
def ensure_dir(path: Path) -> Path:
"""确保目录存在"""
path.mkdir(parents=True, exist_ok=True)
return path
def find_files(directory: Path, pattern: str) -> list[Path]:
"""查找文件"""
return list(directory.rglob(pattern))
def get_file_size_mb(path: Path) -> float:
"""获取文件大小(MB)"""
return path.stat().st_size / (1024 * 1024)
1.8 re - 正则表达式
import re
# ============ 基础匹配 ============
text = "Hello, my email is alice@example.com and phone is 123-456-7890"
# match:从开头匹配
result = re.match(r"Hello", text)
print(result.group() if result else None) # "Hello"
# search:搜索第一个匹配
result = re.search(r"\d{3}-\d{3}-\d{4}", text)
print(result.group() if result else None) # "123-456-7890"
# findall:找所有匹配
emails = re.findall(r"[\w.-]+@[\w.-]+\.\w+", text)
print(emails) # ['alice@example.com']
# finditer:迭代所有匹配
for match in re.finditer(r"\d+", text):
print(f"Found {match.group()} at {match.start()}-{match.end()}")
# ============ 替换 ============
# sub:替换
result = re.sub(r"\d", "*", text) # 替换所有数字为 *
# 使用函数替换
def double_number(match):
return str(int(match.group()) * 2)
result = re.sub(r"\d+", double_number, "1 + 2 = 3")
print(result) # "2 + 4 = 6"
# ============ 分割 ============
result = re.split(r"[,\s]+", "a, b, c d")
print(result) # ['a', 'b', 'c', 'd']
# ============ 预编译(提高性能) ============
email_pattern = re.compile(r"[\w.-]+@[\w.-]+\.\w+")
emails = email_pattern.findall(text)
# ============ 分组 ============
pattern = r"(\d{3})-(\d{3})-(\d{4})"
match = re.search(pattern, text)
if match:
print(match.group()) # "123-456-7890"
print(match.group(1)) # "123"
print(match.group(2)) # "456"
print(match.groups()) # ('123', '456', '7890')
# 命名分组
pattern = r"(?P<area>\d{3})-(?P<exchange>\d{3})-(?P<number>\d{4})"
match = re.search(pattern, text)
if match:
print(match.group("area")) # "123"
print(match.groupdict()) # {'area': '123', 'exchange': '456', 'number': '7890'}
# ============ 常用正则表达式 ============
patterns = {
# 邮箱
"email": r"^[\w.-]+@[\w.-]+\.\w+$",
# 手机号(中国)
"phone_cn": r"^1[3-9]\d{9}$",
# URL
"url": r"https?://[\w.-]+(? :/[\w./-]*)?",
# IP 地址
"ipv4": r"^(? :(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)$",
# 日期 YYYY-MM-DD
"date": r"^\d{4}-(? :0[1-9]|1[0-2])-(?:0[1-9]|[12]\d|3[01])$",
# 密码(至少8位,包含大小写字母和数字)
"password": r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d).{8,}$",
}
# ============ 标志 ============
# re.IGNORECASE (re.I) - 忽略大小写
# re.MULTILINE (re.M) - 多行模式
# re.DOTALL (re.S) - 匹配换行符
# re.VERBOSE (re.X) - 允许注释
pattern = re.compile(r"""
^ # 开头
(?P<year>\d{4}) # 年
- # 分隔符
(?P<month>\d{2}) # 月
- # 分隔符
(?P<day>\d{2}) # 日
$ # 结尾
""", re.VERBOSE)
二、常用第三方库
2.1 requests - HTTP 客户端(续)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Optional, Any
import logging
logger = logging.getLogger(__name__)
# ============ 基础请求 ============
# GET 请求
response = requests.get("https://api.example.com/users")
print(response.status_code) # 200
print(response.json()) # 解析 JSON
print(response.text) # 原始文本
print(response.headers) # 响应头
# 带查询参数
response = requests.get(
"https://api.example.com/users",
params={"page": 1, "limit": 10}
)
# POST JSON
response = requests.post(
"https://api.example.com/users",
json={"name": "Alice", "email": "alice@example.com"}
)
# POST 表单
response = requests.post(
"https://api.example.com/login",
data={"username": "alice", "password": "secret"}
)
# ============ 请求头和认证 ============
# 自定义请求头
response = requests.get(
"https://api.example.com/users",
headers={
"Authorization": "Bearer your_token",
"Content-Type": "application/json",
"User-Agent": "MyApp/1.0"
}
)
# Basic 认证
response = requests.get(
"https://api.example.com/users",
auth=("username", "password")
)
# ============ 超时和异常处理 ============
try:
response = requests.get(
"https://api.example.com/users",
timeout=(3.05, 27) # (连接超时, 读取超时)
)
response.raise_for_status() # 如果状态码 >= 400 则抛出异常
except requests.exceptions.Timeout:
logger.error("Request timed out")
except requests.exceptions.ConnectionError:
logger.error("Connection error")
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
# ============ Session(连接复用) ============
# 推荐用于多次请求同一主机
session = requests.Session()
session.headers.update({"Authorization": "Bearer token"})
response = session.get("https://api.example.com/users")
response = session.get("https://api.example.com/posts")
session.close()
# 使用上下文管理器
with requests.Session() as session:
response = session.get("https://api.example.com/users")
# ============ 重试机制 ============
def create_session_with_retry(
retries: int = 3,
backoff_factor: float = 0.3,
status_forcelist: tuple = (500, 502, 503, 504)
) -> requests.Session:
"""创建带重试机制的 Session"""
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# ============ 文件上传/下载 ============
# 上传文件
with open("document.pdf", "rb") as f:
response = requests.post(
"https://api.example.com/upload",
files={"file": ("document.pdf", f, "application/pdf")}
)
# 下载文件(流式)
def download_file(url: str, path: str) -> None:
"""流式下载大文件"""
with requests.get(url, stream=True) as response:
response.raise_for_status()
with open(path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# ============ 封装 HTTP 客户端 ============
class APIClient:
"""通用 API 客户端"""
def __init__(self, base_url: str, token: Optional[str] = None):
self.base_url = base_url.rstrip("/")
self.session = create_session_with_retry()
if token:
self.session.headers["Authorization"] = f"Bearer {token}"
def _request(
self,
method: str,
endpoint: str,
**kwargs
) -> dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
response = self.session.request(method, url, timeout=30, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise
def get(self, endpoint: str, params: dict = None) -> dict:
return self._request("GET", endpoint, params=params)
def post(self, endpoint: str, data: dict = None) -> dict:
return self._request("POST", endpoint, json=data)
def put(self, endpoint: str, data: dict = None) -> dict:
return self._request("PUT", endpoint, json=data)
def delete(self, endpoint: str) -> dict:
return self._request("DELETE", endpoint)
def close(self):
self.session.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# 使用
with APIClient("https://api.example.com", token="xxx") as client:
users = client.get("/users", params={"page": 1})
new_user = client.post("/users", data={"name": "Alice"})
2.2 aiohttp - 异步 HTTP 客户端
import aiohttp
import asyncio
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
# ============ 基础用法 ============
async def fetch(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# ============ 并发请求 ============
async def fetch_all(urls: List[str]) -> List[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def fetch_one(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
# ============ 封装异步 HTTP 客户端 ============
class AsyncAPIClient:
"""异步 API 客户端"""
def __init__(
self,
base_url: str,
token: str = None,
max_concurrent: int = 10,
timeout: int = 30
):
self.base_url = base_url.rstrip("/")
self.token = token
self._semaphore = asyncio.Semaphore(max_concurrent)
self._timeout = aiohttp.ClientTimeout(total=timeout)
self._session: aiohttp.ClientSession = None
async def __aenter__(self):
headers = {}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
self._session = aiohttp.ClientSession(
headers=headers,
timeout=self._timeout
)
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def _request(
self,
method: str,
endpoint: str,
**kwargs
) -> Dict[str, Any]:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
async with self._semaphore:
try:
async with self._session.request(method, url, **kwargs) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
logger.error(f"Request failed: {e}")
raise
async def get(self, endpoint: str, params: dict = None) -> dict:
return await self._request("GET", endpoint, params=params)
async def post(self, endpoint: str, data: dict = None) -> dict:
return await self._request("POST", endpoint, json=data)
async def fetch_many(self, endpoints: List[str]) -> List[dict]:
"""并发获取多个端点"""
tasks = [self.get(ep) for ep in endpoints]
return await asyncio.gather(*tasks, return_exceptions=True)
# 使用示例
async def main():
async with AsyncAPIClient("https://api.example.com", token="xxx") as client:
# 单个请求
user = await client.get("/users/1")
# 并发请求
endpoints = [f"/users/{i}" for i in range(1, 11)]
users = await client.fetch_many(endpoints)
print(f"Fetched {len(users)} users")
# asyncio.run(main())
2.3 Pydantic - 数据验证
from pydantic import (
BaseModel, Field, validator, root_validator,
EmailStr, HttpUrl, constr, conint, confloat,
ValidationError
)
from typing import List, Optional
from datetime import datetime
from enum import Enum
# ============ 基础模型 ============
class User(BaseModel):
id: int
name: str
email: EmailStr
age: Optional[int] = None
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.now)
# 自动验证和转换
user = User(
id="123", # 自动转换为 int
name="Alice",
email="alice@example.com"
)
print(user.id) # 123 (int)
print(user.model_dump()) # 转换为字典
print(user.model_dump_json()) # 转换为 JSON 字符串
# 验证失败
try:
user = User(id="abc", name="Alice", email="invalid")
except ValidationError as e:
print(e.json())
# ============ 字段约束 ============
class Product(BaseModel):
name: constr(min_length=1, max_length=100) # 字符串长度限制
price: confloat(gt=0) # 大于 0 的浮点数
quantity: conint(ge=0) # 大于等于 0 的整数
tags: List[str] = Field(default_factory=list, max_length=10)
description: Optional[str] = Field(None, max_length=1000)
# ============ 自定义验证器 ============
class UserCreate(BaseModel):
username: str
password: str
password_confirm: str
email: EmailStr
@validator("username")
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError("Username must be alphanumeric")
return v
@validator("password")
def password_strength(cls, v):
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not any(c.isupper() for c in v):
raise ValueError("Password must contain uppercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain digit")
return v
@root_validator
def passwords_match(cls, values):
pw = values.get("password")
pw_confirm = values.get("password_confirm")
if pw and pw_confirm and pw != pw_confirm:
raise ValueError("Passwords do not match")
return values
# ============ 嵌套模型 ============
class Address(BaseModel):
street: str
city: str
country: str = "China"
zip_code: Optional[str] = None
class Company(BaseModel):
name: str
address: Address
employees: List[User] = []
company = Company(
name="Tech Corp",
address={"street": "123 Main St", "city": "Beijing"},
employees=[
{"id": 1, "name": "Alice", "email": "alice@example.com"}
]
)
# ============ 枚举和联合类型 ============
class Status(str, Enum):
PENDING = "pending"
ACTIVE = "active"
INACTIVE = "inactive"
class Order(BaseModel):
id: int
status: Status
amount: float | int # 联合类型
# ============ 配置选项 ============
class StrictUser(BaseModel):
model_config = {
"str_strip_whitespace": True, # 自动去除字符串空白
"str_min_length": 1, # 字符串最小长度
"frozen": True, # 不可变
"extra": "forbid", # 禁止额外字段
}
name: str
email: EmailStr
# ============ 从 ORM 对象创建 ============
class UserORM(BaseModel):
model_config = {"from_attributes": True}
id: int
name: str
email: str
# 假设有 SQLAlchemy 模型
# user_orm = session.query(UserModel).first()
# user = UserORM.model_validate(user_orm)
2.4 SQLAlchemy - ORM 框架
from sqlalchemy import (
create_engine, Column, Integer, String, DateTime,
ForeignKey, Boolean, Text, Float, Index,
select, update, delete, and_, or_, func
)
from sqlalchemy.orm import (
declarative_base, relationship, sessionmaker,
Session, selectinload, joinedload
)
from datetime import datetime
from typing import List, Optional
from contextlib import contextmanager
# ============ 数据库连接 ============
# SQLite
engine = create_engine("sqlite:///app.db", echo=True)
# MySQL
# engine = create_engine(
# "mysql+pymysql://user:password@localhost: 3306/dbname",
# pool_size=10,
# max_overflow=20,
# pool_recycle=3600
# )
# PostgreSQL
# engine = create_engine(
# "postgresql://user:password@localhost:5432/dbname"
# )
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)
# ============ 定义模型 ============
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(100), unique=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
posts = relationship("Post", back_populates="author", lazy="selectin")
# 复合索引
__table_args__ = (
Index("idx_user_email_active", "email", "is_active"),
)
def __repr__(self):
return f"<User(id={self.id}, username={self.username})>"
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(Text)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# 关系
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(id={self.id}, title={self.title})>"
# 创建表
Base.metadata.create_all(engine)
# ============ Session 管理 ============
@contextmanager
def get_session():
"""会话上下文管理器"""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# ============ CRUD 操作 ============
class UserRepository:
"""用户数据访问层"""
def __init__(self, session: Session):
self.session = session
def create(self, username: str, email: str, password: str) -> User:
user = User(
username=username,
email=email,
hashed_password=password # 实际应该加密
)
self.session.add(user)
self.session.flush() # 获取 ID
return user
def get_by_id(self, user_id: int) -> Optional[User]:
return self.session.get(User, user_id)
def get_by_username(self, username: str) -> Optional[User]:
stmt = select(User).where(User.username == username)
return self.session.scalar(stmt)
def get_all(
self,
skip: int = 0,
limit: int = 100,
is_active: bool = None
) -> List[User]:
stmt = select(User)
if is_active is not None:
stmt = stmt.where(User.is_active == is_active)
stmt = stmt.offset(skip).limit(limit)
return list(self.session.scalars(stmt))
def update(self, user_id: int, **kwargs) -> Optional[User]:
stmt = (
update(User)
.where(User.id == user_id)
.values(**kwargs)
.returning(User)
)
return self.session.scalar(stmt)
def delete(self, user_id: int) -> bool:
stmt = delete(User).where(User.id == user_id)
result = self.session.execute(stmt)
return result.rowcount > 0
def search(self, keyword: str) -> List[User]:
"""搜索用户"""
stmt = select(User).where(
or_(
User.username.ilike(f"%{keyword}%"),
User.email.ilike(f"%{keyword}%")
)
)
return list(self.session.scalars(stmt))
# ============ 复杂查询 ============
def complex_queries_example(session: Session):
# 聚合查询
stmt = select(
User.is_active,
func.count(User.id).label("count")
).group_by(User.is_active)
results = session.execute(stmt).all()
# JOIN 查询
stmt = (
select(User, Post)
.join(Post, User.id == Post.author_id)
.where(User.is_active == True)
)
# 预加载(避免 N+1 问题)
stmt = (
select(User)
.options(selectinload(User.posts))
.where(User.is_active == True)
)
users = session.scalars(stmt).all()
# 子查询
subquery = (
select(func.count(Post.id))
.where(Post.author_id == User.id)
.correlate(User)
.scalar_subquery()
)
stmt = select(User, subquery.label("post_count"))
return session.execute(stmt).all()
# ============ 使用示例 ============
with get_session() as session:
repo = UserRepository(session)
# 创建用户
user = repo.create("alice", "alice@example.com", "hashed_password")
print(f"Created: {user}")
# 查询用户
user = repo.get_by_username("alice")
print(f"Found: {user}")
# 更新用户
repo.update(user.id, email="alice_new@example.com")
# 搜索用户
users = repo.search("alice")
print(f"Search results: {users}")
2.5 Redis 操作(redis-py)
import redis
from redis import Redis
from typing import Optional, Any, List
import json
import pickle
from datetime import timedelta
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
# ============ 连接 Redis ============
# 单机连接
redis_client = Redis(
host="localhost",
port=6379,
db=0,
password=None,
decode_responses=True, # 自动解码为字符串
socket_timeout=5,
socket_connect_timeout=5
)
# 连接池
pool = redis.ConnectionPool(
host="localhost",
port=6379,
db=0,
max_connections=20,
decode_responses=True
)
redis_client = Redis(connection_pool=pool)
# ============ 基础操作 ============
# 字符串
redis_client.set("key", "value")
redis_client.set("key", "value", ex=3600) # 过期时间(秒)
redis_client.setex("key", 3600, "value") # 同上
redis_client.setnx("key", "value") # 不存在时才设置
value = redis_client.get("key")
redis_client.delete("key")
redis_client.exists("key") # 检查是否存在
redis_client.expire("key", 3600) # 设置过期时间
redis_client.ttl("key") # 获取剩余过期时间
# 批量操作
redis_client.mset({"k1": "v1", "k2": "v2"})
values = redis_client.mget(["k1", "k2"])
# 计数器
redis_client.incr("counter")
redis_client.incrby("counter", 10)
redis_client.decr("counter")
# ============ Hash ============
redis_client.hset("user:1", "name", "Alice")
redis_client.hset("user:1", mapping={"age": 30, "city": "Beijing"})
redis_client.hget("user:1", "name")
redis_client.hgetall("user:1") # 获取所有字段
redis_client.hdel("user:1", "city")
redis_client.hincrby("user: 1", "age", 1)
# ============ List ============
redis_client.lpush("queue", "item1", "item2") # 左边插入
redis_client.rpush("queue", "item3") # 右边插入
redis_client.lpop("queue") # 左边弹出
redis_client.rpop("queue") # 右边弹出
redis_client.lrange("queue", 0, -1) # 获取所有元素
redis_client.llen("queue") # 长度
# 阻塞弹出(消息队列)
redis_client.blpop("queue", timeout=5)
# ============ Set ============
redis_client.sadd("tags", "python", "redis", "database")
redis_client.smembers("tags") # 所有成员
redis_client.sismember("tags", "python") # 是否存在
redis_client.srem("tags", "database") # 移除
redis_client.sinter("tags1", "tags2") # 交集
redis_client.sunion("tags1", "tags2") # 并集
# ============ Sorted Set ============
redis_client.zadd("leaderboard", {"alice": 100, "bob": 85, "charlie": 92})
redis_client.zrange("leaderboard", 0, -1, withscores=True) # 按分数升序
redis_client.zrevrange("leaderboard", 0, 2, withscores=True) # 前3名
redis_client.zscore("leaderboard", "alice") # 获取分数
redis_client.zincrby("leaderboard", 10, "bob") # 增加分数
redis_client.zrank("leaderboard", "alice") # 排名(从0开始)
# ============ 封装 Redis 缓存 ============
class RedisCache:
"""Redis 缓存封装"""
def __init__(self, client: Redis, prefix: str = "cache"):
self.client = client
self.prefix = prefix
def _make_key(self, key: str) -> str:
return f"{self.prefix}:{key}"
def get(self, key: str, default: Any = None) -> Any:
"""获取缓存"""
value = self.client.get(self._make_key(key))
if value is None:
return default
try:
return json.loads(value)
except json.JSONDecodeError:
return value
def set(
self,
key: str,
value: Any,
expire: int = None
) -> bool:
"""设置缓存"""
key = self._make_key(key)
if isinstance(value, (dict, list)):
value = json.dumps(value, ensure_ascii=False)
return self.client.set(key, value, ex=expire)
def delete(self, key: str) -> bool:
"""删除缓存"""
return bool(self.client.delete(self._make_key(key)))
def get_or_set(
self,
key: str,
factory,
expire: int = 3600
) -> Any:
"""获取缓存,不存在则设置"""
value = self.get(key)
if value is None:
value = factory() if callable(factory) else factory
self.set(key, value, expire)
return value
def invalidate_pattern(self, pattern: str) -> int:
"""删除匹配模式的所有 key"""
pattern = self._make_key(pattern)
keys = self.client.keys(pattern)
if keys:
return self.client.delete(*keys)
return 0
# ============ 分布式锁 ============
class RedisLock:
"""Redis 分布式锁"""
def __init__(
self,
client: Redis,
name: str,
timeout: int = 10,
blocking: bool = True,
blocking_timeout: int = None
):
self.client = client
self.name = f"lock:{name}"
self.timeout = timeout
self.blocking = blocking
self.blocking_timeout = blocking_timeout
self._token = None
def acquire(self) -> bool:
"""获取锁"""
import uuid
token = str(uuid.uuid4())
if self.blocking:
stop_time = None
if self.blocking_timeout:
import time
stop_time = time.time() + self.blocking_timeout
while True:
if self.client.set(self.name, token, nx=True, ex=self.timeout):
self._token = token
return True
if stop_time and time.time() >= stop_time:
return False
import time
time.sleep(0.1)
else:
if self.client.set(self.name, token, nx=True, ex=self.timeout):
self._token = token
return True
return False
def release(self) -> bool:
"""释放锁"""
if self._token is None:
return False
# 使用 Lua 脚本确保原子性
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.client.eval(script, 1, self.name, self._token)
self._token = None
return bool(result)
def __enter__(self):
if not self.acquire():
raise TimeoutError("Could not acquire lock")
return self
def __exit__(self, *args):
self.release()
# 使用
with RedisLock(redis_client, "my_resource", timeout=30):
# 执行需要互斥的操作
pass
2.6 Celery - 异步任务队列
from celery import Celery, Task
from celery.schedules import crontab
from typing import Any
import logging
logger = logging.getLogger(__name__)
# ============ 创建 Celery 应用 ============
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
# 配置
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 任务超时时间(秒)
worker_prefetch_multiplier=1, # 预取任务数
task_acks_late=True, # 任务完成后才确认
task_reject_on_worker_lost=True, # worker 丢失时拒绝任务
)
# ============ 定义任务 ============
@app.task(bind=True, max_retries=3)
def send_email(self, to: str, subject: str, body: str) -> dict:
"""发送邮件任务"""
try:
logger.info(f"Sending email to {to}")
# 实际发送邮件的代码
return {"status": "sent", "to": to}
except Exception as e:
logger.error(f"Failed to send email: {e}")
# 重试,指数退避
raise self.retry(exc=e, countdown=2 ** self.request.retries)
@app.task
def process_image(image_path: str, operations: list) -> str:
"""处理图片任务"""
logger.info(f"Processing image: {image_path}")
# 处理图片
return f"processed_{image_path}"
@app.task
def long_running_task(data: list) -> int:
"""长时间运行的任务"""
result = 0
for item in data:
result += item ** 2
return result
# ============ 任务链和组 ============
from celery import chain, group, chord
# 链式调用:按顺序执行
workflow = chain(
process_image.s("image.jpg", ["resize"]),
send_email.s("user@example.com", "Image Ready", "Your image is processed")
)
result = workflow.apply_async()
# 并行执行
job = group([
process_image.s(f"image_{i}.jpg", ["resize"])
for i in range(10)
])
result = job.apply_async()
# chord:并行执行后汇总
callback = send_email.s("admin@example.com", "All Done", "")
workflow = chord(
[process_image.s(f"image_{i}.jpg", ["resize"]) for i in range(10)],
callback
)
# ============ 自定义任务基类 ============
class BaseTask(Task):
"""自定义任务基类"""
def on_success(self, retval, task_id, args, kwargs):
logger.info(f"Task {task_id} succeeded with result: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.error(f"Task {task_id} failed: {exc}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.warning(f"Task {task_id} retrying: {exc}")
@app.task(base=BaseTask, bind=True)
def my_task(self, x: int, y: int) -> int:
return x + y
# ============ 定时任务 ============
app.conf.beat_schedule = {
# 每分钟执行
"send-report-every-minute": {
"task": "tasks.send_report",
"schedule": 60.0,
},
# 每天早上 8 点执行
"morning-task": {
"task": "tasks.daily_task",
"schedule": crontab(hour=8, minute=0),
},
# 每周一早上 9 点执行
"weekly-cleanup": {
"task": "tasks.weekly_cleanup",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
},
}
# ============ 调用任务 ============
# 异步调用
result = send_email.delay("user@example.com", "Hello", "World")
# 带参数调用
result = send_email.apply_async(
args=["user@example.com", "Hello", "World"],
countdown=60, # 60 秒后执行
expires=3600, # 1 小时后过期
)
# 获取结果
print(result.id) # 任务 ID
print(result.status) # 任务状态
print(result.ready()) # 是否完成
print(result.get(timeout=10)) # 获取结果(阻塞)
# ============ 监控任务 ============
from celery.result import AsyncResult
def get_task_status(task_id: str) -> dict:
"""获取任务状态"""
result = AsyncResult(task_id, app=app)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
"traceback": result.traceback if result.failed() else None,
}
2.7 pytest - 测试框架
import pytest
from typing import Generator
from unittest.mock import Mock, patch, MagicMock
import asyncio
# ============ 基础测试 ============
def add(a: int, b: int) -> int:
return a + b
def test_add():
assert add(1, 2) == 3
assert add(-1, 1) == 0
assert add(0, 0) == 0
# 测试异常
def divide(a: int, b: int) -> float:
if b == 0:
raise ValueError("Cannot divide by zero")
return a / b
def test_divide_by_zero():
with pytest.raises(ValueError) as exc_info:
divide(1, 0)
assert "Cannot divide by zero" in str(exc_info.value)
# ============ Fixtures(固定装置) ============
@pytest.fixture
def sample_data() -> dict:
"""提供测试数据"""
return {"name": "Alice", "age": 30}
@pytest.fixture
def db_connection():
"""数据库连接 fixture"""
conn = create_connection()
yield conn # 测试使用这个连接
conn.close() # 测试结束后清理
def test_with_fixture(sample_data):
assert sample_data["name"] == "Alice"
# Fixture 作用域
@pytest.fixture(scope="module")
def expensive_resource():
"""模块级别 fixture,整个模块只创建一次"""
resource = create_expensive_resource()
yield resource
resource.cleanup()
# ============ 参数化测试 ============
@pytest.mark.parametrize("a,b,expected", [
(1, 2, 3),
(-1, 1, 0),
(0, 0, 0),
(100, 200, 300),
])
def test_add_parametrized(a, b, expected):
assert add(a, b) == expected
@pytest.mark.parametrize("input_str,expected", [
("hello", "HELLO"),
("World", "WORLD"),
("", ""),
])
def test_upper(input_str, expected):
assert input_str.upper() == expected
# ============ Mock 和 Patch ============
class UserService:
def __init__(self, api_client):
self.api_client = api_client
def get_user(self, user_id: int) -> dict:
return self.api_client.get(f"/users/{user_id}")
def test_user_service_with_mock():
# 创建 Mock 对象
mock_client = Mock()
mock_client.get.return_value = {"id": 1, "name": "Alice"}
service = UserService(mock_client)
result = service.get_user(1)
assert result["name"] == "Alice"
mock_client.get.assert_called_once_with("/users/1")
# 使用 patch 装饰器
@patch("module.external_api_call")
def test_with_patch(mock_api):
mock_api.return_value = {"status": "success"}
result = function_that_calls_api()
assert result["status"] == "success"
mock_api.assert_called_once()
# patch 上下文管理器
def test_with_patch_context():
with patch("module.get_current_time") as mock_time:
mock_time.return_value = "2024-01-01 00:00:00"
result = function_using_time()
assert "2024-01-01" in result
# ============ 异步测试 ============
@pytest.mark.asyncio
async def test_async_function():
result = await async_fetch_data()
assert result is not None
@pytest.fixture
async def async_client():
"""异步 fixture"""
client = await create_async_client()
yield client
await client.close()
# ============ 测试类 ============
class TestUserService:
@pytest.fixture(autouse=True)
def setup(self):
"""每个测试方法前执行"""
self.service = UserService(Mock())
def test_get_user(self):
self.service.api_client.get.return_value = {"id": 1}
result = self.service.get_user(1)
assert result["id"] == 1
def test_get_user_not_found(self):
self.service.api_client.get.return_value = None
result = self.service.get_user(999)
assert result is None
# ============ 标记和跳过 ============
@pytest.mark.slow
def test_slow_operation():
"""标记为慢速测试"""
pass
@pytest.mark.skip(reason="Not implemented yet")
def test_not_ready():
pass
@pytest.mark.skipif(
condition=True,
reason="Skip on this condition"
)
def test_conditional_skip():
pass
@pytest.mark.xfail(reason="Known bug")
def test_expected_failure():
assert False # 预期失败
# ============ conftest.py(共享 fixtures) ============
# conftest.py
"""
@pytest.fixture(scope="session")
def app():
return create_app("testing")
@pytest.fixture(scope="session")
def client(app):
return app.test_client()
@pytest.fixture(autouse=True)
def reset_db(app):
with app.app_context():
db.drop_all()
db.create_all()
yield
db.session.rollback()
"""
# ============ 运行测试 ============
"""
# 运行所有测试
pytest
# 运行特定文件
pytest test_module.py
# 运行特定测试
pytest test_module.py::test_function
# 显示详细输出
pytest -v
# 显示 print 输出
pytest -s
# 只运行标记的测试
pytest -m slow
# 停止在第一个失败
pytest -x
# 并行运行(需要 pytest-xdist)
pytest -n auto
# 生成覆盖率报告(需要 pytest-cov)
pytest --cov=mypackage --cov-report=html
"""
三、项目常用工具库汇总
3.1 按场景分类
"""
============ Web 开发 ============
- FastAPI: 现代异步 Web 框架
- Django: 全功能 Web 框架
- Flask: 轻量级 Web 框架
- Starlette: ASGI 框架
- uvicorn: ASGI 服务器
============ 数据验证 ============
- Pydantic: 数据验证和序列化
- marshmallow: 序列化/反序列化
- cerberus: 轻量级验证
============ 数据库 ============
- SQLAlchemy: ORM 框架
- databases: 异步数据库
- asyncpg: 异步 PostgreSQL
- pymysql: MySQL 驱动
- redis-py: Redis 客户端
- motor: 异步 MongoDB
============ HTTP 客户端 ============
- requests: 同步 HTTP
- aiohttp: 异步 HTTP
- httpx: 同步/异步 HTTP
============ 任务队列 ============
- Celery: 分布式任务队列
- rq: 简单任务队列
- dramatiq: 任务队列
============ 测试 ============
- pytest: 测试框架
- pytest-asyncio: 异步测试
- pytest-cov: 覆盖率
- faker: 假数据生成
- factory_boy: 测试工厂
============ 日志监控 ============
- loguru: 更好的日志
- sentry-sdk: 错误追踪
- prometheus-client: 指标
============ CLI 工具 ============
- click: 命令行工具
- typer: 现代 CLI
- rich: 终端美化
============ 数据处理 ============
- pandas: 数据分析
- numpy: 数值计算
- polars: 高性能 DataFrame
============ 配置管理 ============
- python-dotenv: 环境变量
- pydantic-settings: 配置验证
- dynaconf: 动态配置
============ 安全 ============
- passlib: 密码哈希
- python-jose: JWT
- cryptography: 加密
"""
📝 Part 6 总结
面试高频考点
| 知识点 | 面试频率 | 难度 |
|---|---|---|
| collections 模块 | ⭐⭐⭐⭐⭐ | ⭐⭐ |
| itertools 模块 | ⭐⭐⭐ | ⭐⭐⭐ |
| functools 装饰器 | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| datetime 处理 | ⭐⭐⭐⭐ | ⭐⭐ |
| json 序列化 | ⭐⭐⭐⭐ | ⭐⭐ |
| logging 配置 | ⭐⭐⭐ | ⭐⭐ |
| 正则表达式 | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| requests 使用 | ⭐⭐⭐⭐⭐ | ⭐⭐ |
| SQLAlchemy ORM | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| Redis 操作 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| pytest 测试 | ⭐⭐⭐⭐ | ⭐⭐ |
面试常见问题
- defaultdict 和普通 dict 有什么区别?
- lru_cache 的原理是什么?maxsize 设置多少合适?
- 如何处理 Python 中的时区问题?
- 如何自定义 JSON 序列化(处理 datetime 等类型)?
- 正则表达式中 match 和 search 的区别?
- 如何配置 logging 输出到文件和控制台?
- requests 和 aiohttp 的区别?什么时候用哪个?
- SQLAlchemy 中如何避免 N+1 查询问题?
- Redis 有哪些数据结构?各适合什么场景?
- 如何实现 Redis 分布式锁?
- pytest 中 fixture 的作用域有哪些?
- 如何 mock 外部 API 调用进行测试?
更多推荐


所有评论(0)