Python面试手册AI版——Part8 终章
Python面试手册AI版——Part8 设计模式 & 系统设计
·
Part 8: 设计模式 & 系统设计 & 软技能
一、常用设计模式
1.1 创建型模式
单例模式(Singleton)
"""
单例模式:确保一个类只有一个实例
适用场景:
- 数据库连接池
- 配置管理
- 日志记录器
- 线程池
"""
# ============ 方式1:模块级单例(推荐,最简单) ============
# config.py
class _Config:
def __init__(self):
self.debug = False
self.database_url = "mysql://localhost/db"
config = _Config() # 模块导入时创建,天然单例
# 使用
# from config import config
# ============ 方式2:装饰器实现 ============
from functools import wraps
from threading import Lock
def singleton(cls):
"""线程安全的单例装饰器"""
instances = {}
lock = Lock()
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
with lock:
if cls not in instances: # 双重检查
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class Database:
def __init__(self):
print("Creating database connection...")
self.connection = "connected"
db1 = Database()
db2 = Database()
print(db1 is db2) # True
# ============ 方式3:__new__ 方法 ============
class Singleton:
_instance = None
_lock = Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, value=None):
# 注意:__init__ 每次都会被调用
if not hasattr(self, 'initialized'):
self.value = value
self.initialized = True
# ============ 方式4:元类实现 ============
class SingletonMeta(type):
_instances = {}
_lock = Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class Logger(metaclass=SingletonMeta):
def __init__(self):
self.logs = []
def log(self, message):
self.logs.append(message)
print(f"[LOG] {message}")
工厂模式(Factory)
"""
工厂模式:将对象创建逻辑封装起来
适用场景:
- 创建逻辑复杂
- 需要根据条件创建不同类型的对象
- 解耦对象的创建和使用
"""
from abc import ABC, abstractmethod
from typing import Dict, Type
# ============ 简单工厂 ============
class PaymentProcessor(ABC):
@abstractmethod
def pay(self, amount: float) -> bool:
pass
class AlipayProcessor(PaymentProcessor):
def pay(self, amount: float) -> bool:
print(f"Paying {amount} via Alipay")
return True
class WechatPayProcessor(PaymentProcessor):
def pay(self, amount: float) -> bool:
print(f"Paying {amount} via WeChat Pay")
return True
class CreditCardProcessor(PaymentProcessor):
def pay(self, amount: float) -> bool:
print(f"Paying {amount} via Credit Card")
return True
# 简单工厂
class PaymentFactory:
"""支付处理器工厂"""
_processors: Dict[str, Type[PaymentProcessor]] = {
"alipay": AlipayProcessor,
"wechat": WechatPayProcessor,
"credit_card": CreditCardProcessor,
}
@classmethod
def create(cls, payment_type: str) -> PaymentProcessor:
processor_class = cls._processors.get(payment_type)
if not processor_class:
raise ValueError(f"Unknown payment type: {payment_type}")
return processor_class()
@classmethod
def register(cls, payment_type: str, processor_class: Type[PaymentProcessor]):
"""注册新的支付处理器"""
cls._processors[payment_type] = processor_class
# 使用
processor = PaymentFactory.create("alipay")
processor.pay(100.0)
# ============ 工厂方法模式 ============
class NotificationFactory(ABC):
"""通知工厂抽象类"""
@abstractmethod
def create_notification(self) -> "Notification":
pass
def send(self, message: str):
notification = self.create_notification()
notification.notify(message)
class Notification(ABC):
@abstractmethod
def notify(self, message: str):
pass
class EmailNotification(Notification):
def notify(self, message: str):
print(f"Sending email: {message}")
class SMSNotification(Notification):
def notify(self, message: str):
print(f"Sending SMS: {message}")
class EmailNotificationFactory(NotificationFactory):
def create_notification(self) -> Notification:
return EmailNotification()
class SMSNotificationFactory(NotificationFactory):
def create_notification(self) -> Notification:
return SMSNotification()
# 使用
factory = EmailNotificationFactory()
factory.send("Hello!")
# ============ 抽象工厂模式 ============
class UIFactory(ABC):
"""UI 组件抽象工厂"""
@abstractmethod
def create_button(self) -> "Button":
pass
@abstractmethod
def create_checkbox(self) -> "Checkbox":
pass
class Button(ABC):
@abstractmethod
def render(self):
pass
class Checkbox(ABC):
@abstractmethod
def render(self):
pass
# Windows 风格
class WindowsButton(Button):
def render(self):
print("Rendering Windows button")
class WindowsCheckbox(Checkbox):
def render(self):
print("Rendering Windows checkbox")
class WindowsUIFactory(UIFactory):
def create_button(self) -> Button:
return WindowsButton()
def create_checkbox(self) -> Checkbox:
return WindowsCheckbox()
# Mac 风格
class MacButton(Button):
def render(self):
print("Rendering Mac button")
class MacCheckbox(Checkbox):
def render(self):
print("Rendering Mac checkbox")
class MacUIFactory(UIFactory):
def create_button(self) -> Button:
return MacButton()
def create_checkbox(self) -> Checkbox:
return MacCheckbox()
建造者模式(Builder)
"""
建造者模式:分步骤构建复杂对象
适用场景:
- 对象构造参数很多
- 需要构建不同配置的对象
- 构建过程需要多个步骤
"""
from dataclasses import dataclass, field
from typing import List, Optional
# ============ 使用 dataclass + Builder ============
@dataclass
class HttpRequest:
method: str
url: str
headers: dict = field(default_factory=dict)
params: dict = field(default_factory=dict)
body: Optional[str] = None
timeout: int = 30
class HttpRequestBuilder:
"""HTTP 请求建造者"""
def __init__(self):
self._method = "GET"
self._url = ""
self._headers = {}
self._params = {}
self._body = None
self._timeout = 30
def method(self, method: str) -> "HttpRequestBuilder":
self._method = method
return self
def url(self, url: str) -> "HttpRequestBuilder":
self._url = url
return self
def header(self, key: str, value: str) -> "HttpRequestBuilder":
self._headers[key] = value
return self
def headers(self, headers: dict) -> "HttpRequestBuilder":
self._headers.update(headers)
return self
def param(self, key: str, value: str) -> "HttpRequestBuilder":
self._params[key] = value
return self
def body(self, body: str) -> "HttpRequestBuilder":
self._body = body
return self
def timeout(self, timeout: int) -> "HttpRequestBuilder":
self._timeout = timeout
return self
def build(self) -> HttpRequest:
if not self._url:
raise ValueError("URL is required")
return HttpRequest(
method=self._method,
url=self._url,
headers=self._headers,
params=self._params,
body=self._body,
timeout=self._timeout,
)
# 使用:链式调用
request = (
HttpRequestBuilder()
.method("POST")
.url("https://api.example.com/users")
.header("Content-Type", "application/json")
.header("Authorization", "Bearer token")
.body('{"name": "Alice"}')
.timeout(60)
.build()
)
# ============ 更 Pythonic 的方式:使用 __init__ 关键字参数 ============
@dataclass
class QueryBuilder:
"""SQL 查询建造者"""
_table: str = ""
_columns: List[str] = field(default_factory=lambda: ["*"])
_where: List[str] = field(default_factory=list)
_order_by: List[str] = field(default_factory=list)
_limit: Optional[int] = None
_offset: Optional[int] = None
def select(self, *columns: str) -> "QueryBuilder":
self._columns = list(columns) if columns else ["*"]
return self
def from_table(self, table: str) -> "QueryBuilder":
self._table = table
return self
def where(self, condition: str) -> "QueryBuilder":
self._where.append(condition)
return self
def order_by(self, column: str, desc: bool = False) -> "QueryBuilder":
order = f"{column} DESC" if desc else column
self._order_by.append(order)
return self
def limit(self, limit: int) -> "QueryBuilder":
self._limit = limit
return self
def offset(self, offset: int) -> "QueryBuilder":
self._offset = offset
return self
def build(self) -> str:
if not self._table:
raise ValueError("Table is required")
sql = f"SELECT {', '.join(self._columns)} FROM {self._table}"
if self._where:
sql += f" WHERE {' AND '.join(self._where)}"
if self._order_by:
sql += f" ORDER BY {', '.join(self._order_by)}"
if self._limit:
sql += f" LIMIT {self._limit}"
if self._offset:
sql += f" OFFSET {self._offset}"
return sql
# 使用
query = (
QueryBuilder()
.select("id", "name", "email")
.from_table("users")
.where("status = 1")
.where("age >= 18")
.order_by("created_at", desc=True)
.limit(10)
.offset(0)
.build()
)
print(query)
# SELECT id, name, email FROM users WHERE status = 1 AND age >= 18 ORDER BY created_at DESC LIMIT 10 OFFSET 0
1.2 结构型模式
装饰器模式(Decorator)
"""
装饰器模式:动态给对象添加职责
Python 的 @ 装饰器语法天然支持这种模式
"""
from abc import ABC, abstractmethod
from functools import wraps
from typing import Callable
import time
import logging
logger = logging.getLogger(__name__)
# ============ 函数装饰器(最常用) ============
def timer(func: Callable) -> Callable:
"""计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"{func.__name__} took {elapsed:.4f}s")
return result
return wrapper
def retry(max_attempts: int = 3, delay: float = 1.0):
"""重试装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
def cache(ttl: int = 300):
"""缓存装饰器"""
def decorator(func: Callable) -> Callable:
cache_data = {}
@wraps(func)
def wrapper(*args, **kwargs):
key = str(args) + str(kwargs)
now = time.time()
if key in cache_data:
value, expire_time = cache_data[key]
if now < expire_time:
return value
result = func(*args, **kwargs)
cache_data[key] = (result, now + ttl)
return result
wrapper.cache_clear = lambda: cache_data.clear()
return wrapper
return decorator
@timer
@retry(max_attempts=3)
@cache(ttl=60)
def fetch_data(url: str) -> dict:
"""获取数据"""
pass
# ============ 类装饰器模式 ============
class DataSource(ABC):
@abstractmethod
def read_data(self) -> str:
pass
class FileDataSource(DataSource):
def __init__(self, filename: str):
self.filename = filename
def read_data(self) -> str:
with open(self.filename) as f:
return f.read()
class DataSourceDecorator(DataSource):
"""数据源装饰器基类"""
def __init__(self, source: DataSource):
self._source = source
def read_data(self) -> str:
return self._source.read_data()
class EncryptionDecorator(DataSourceDecorator):
"""加密装饰器"""
def read_data(self) -> str:
data = super().read_data()
return self._decrypt(data)
def _decrypt(self, data: str) -> str:
# 解密逻辑
return f"decrypted({data})"
class CompressionDecorator(DataSourceDecorator):
"""压缩装饰器"""
def read_data(self) -> str:
data = super().read_data()
return self._decompress(data)
def _decompress(self, data: str) -> str:
# 解压逻辑
return f"decompressed({data})"
# 使用:可以叠加多个装饰器
source = FileDataSource("data.txt")
source = CompressionDecorator(source)
source = EncryptionDecorator(source)
data = source.read_data()
适配器模式(Adapter)
"""
适配器模式:将一个接口转换成另一个接口
适用场景:
- 使用第三方库但接口不兼容
- 统一多个不同接口的类
"""
from abc import ABC, abstractmethod
from typing import Dict, Any
import json
import xml.etree.ElementTree as ET
# ============ 目标接口 ============
class DataParser(ABC):
@abstractmethod
def parse(self, data: str) -> Dict[str, Any]:
pass
# ============ 已有的类(接口不兼容) ============
class LegacyXMLParser:
"""旧的 XML 解析器"""
def parse_xml_string(self, xml_string: str) -> ET.Element:
return ET.fromstring(xml_string)
def get_element_value(self, element: ET.Element, tag: str) -> str:
child = element.find(tag)
return child.text if child is not None else ""
# ============ 适配器 ============
class XMLParserAdapter(DataParser):
"""XML 解析器适配器"""
def __init__(self):
self._legacy_parser = LegacyXMLParser()
def parse(self, data: str) -> Dict[str, Any]:
root = self._legacy_parser.parse_xml_string(data)
return self._element_to_dict(root)
def _element_to_dict(self, element: ET.Element) -> Dict[str, Any]:
result = {}
for child in element:
if len(child) == 0:
result[child.tag] = child.text
else:
result[child.tag] = self._element_to_dict(child)
return result
class JSONParserAdapter(DataParser):
"""JSON 解析器适配器"""
def parse(self, data: str) -> Dict[str, Any]:
return json.loads(data)
# ============ 使用 ============
def process_data(parser: DataParser, data: str):
"""统一的数据处理函数"""
result = parser.parse(data)
return result
xml_data = "<root><name>Alice</name><age>30</age></root>"
json_data = '{"name": "Alice", "age": 30}'
xml_parser = XMLParserAdapter()
json_parser = JSONParserAdapter()
print(process_data(xml_parser, xml_data))
print(process_data(json_parser, json_data))
1.3 行为型模式
策略模式(Strategy)
"""
策略模式:定义一系列算法,将每个算法封装起来,使它们可以互换
适用场景:
- 多种算法可以互换
- 避免大量 if-else
- 运行时选择算法
"""
from abc import ABC, abstractmethod
from typing import List, Callable
from dataclasses import dataclass
# ============ 策略接口 ============
class PricingStrategy(ABC):
@abstractmethod
def calculate_price(self, original_price: float) -> float:
pass
# ============ 具体策略 ============
class RegularPricing(PricingStrategy):
"""普通定价"""
def calculate_price(self, original_price: float) -> float:
return original_price
class VIPPricing(PricingStrategy):
"""VIP 定价:8 折"""
def calculate_price(self, original_price: float) -> float:
return original_price * 0.8
class PromotionPricing(PricingStrategy):
"""促销定价:满 100 减 20"""
def calculate_price(self, original_price: float) -> float:
if original_price >= 100:
return original_price - 20
return original_price
class CouponPricing(PricingStrategy):
"""优惠券定价"""
def __init__(self, discount: float):
self.discount = discount
def calculate_price(self, original_price: float) -> float:
return original_price - self.discount
# ============ 上下文 ============
class ShoppingCart:
def __init__(self, strategy: PricingStrategy = None):
self._strategy = strategy or RegularPricing()
self._items: List[float] = []
def set_strategy(self, strategy: PricingStrategy):
self._strategy = strategy
def add_item(self, price: float):
self._items.append(price)
def get_total(self) -> float:
original = sum(self._items)
return self._strategy.calculate_price(original)
# 使用
cart = ShoppingCart()
cart.add_item(50)
cart.add_item(80)
print(f"普通价格: {cart.get_total()}") # 130
cart.set_strategy(VIPPricing())
print(f"VIP 价格: {cart.get_total()}") # 104
cart.set_strategy(PromotionPricing())
print(f"促销价格: {cart.get_total()}") # 110
# ============ 更 Pythonic 的方式:使用函数 ============
@dataclass
class Order:
items: List[float]
pricing_strategy: Callable[[float], float] = lambda x: x
def get_total(self) -> float:
original = sum(self.items)
return self.pricing_strategy(original)
# 策略函数
def vip_pricing(price: float) -> float:
return price * 0.8
def promotion_pricing(price: float) -> float:
return price - 20 if price >= 100 else price
order = Order(items=[50, 80], pricing_strategy=vip_pricing)
print(order.get_total()) # 104
观察者模式(Observer)
"""
观察者模式:定义对象间一对多的依赖关系,当一个对象状态改变时,所有依赖它的对象都会收到通知
适用场景:
- 事件驱动系统
- 发布-订阅模式
- 状态变化通知
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Callable, Any
from dataclasses import dataclass, field
import asyncio
# ============ 经典实现 ============
class Observer(ABC):
@abstractmethod
def update(self, event: str, data: Any):
pass
class Subject:
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer):
self._observers.append(observer)
def detach(self, observer: Observer):
self._observers.remove(observer)
def notify(self, event: str, data: Any = None):
for observer in self._observers:
observer.update(event, data)
class OrderSubject(Subject):
def create_order(self, order_data: dict):
# 创建订单逻辑
print(f"Order created: {order_data}")
self.notify("order_created", order_data)
class EmailObserver(Observer):
def update(self, event: str, data: Any):
if event == "order_created":
print(f"Sending email for order: {data}")
class SMSObserver(Observer):
def update(self, event: str, data: Any):
if event == "order_created":
print(f"Sending SMS for order: {data}")
class InventoryObserver(Observer):
def update(self, event: str, data: Any):
if event == "order_created":
print(f"Updating inventory for order: {data}")
# 使用
order_subject = OrderSubject()
order_subject.attach(EmailObserver())
order_subject.attach(SMSObserver())
order_subject.attach(InventoryObserver())
order_subject.create_order({"id": 1, "product": "iPhone"})
# ============ 更 Pythonic 的事件系统 ============
class EventEmitter:
"""事件发射器"""
def __init__(self):
self._listeners: Dict[str, List[Callable]] = {}
def on(self, event: str, callback: Callable):
"""注册事件监听器"""
if event not in self._listeners:
self._listeners[event] = []
self._listeners[event].append(callback)
return self
def off(self, event: str, callback: Callable = None):
"""移除事件监听器"""
if event in self._listeners:
if callback:
self._listeners[event].remove(callback)
else:
del self._listeners[event]
return self
def emit(self, event: str, *args, **kwargs):
"""触发事件"""
if event in self._listeners:
for callback in self._listeners[event]:
callback(*args, **kwargs)
return self
def once(self, event: str, callback: Callable):
"""只监听一次"""
def wrapper(*args, **kwargs):
callback(*args, **kwargs)
self.off(event, wrapper)
self.on(event, wrapper)
return self
# 使用
emitter = EventEmitter()
def on_order_created(order):
print(f"Order created: {order}")
def send_email(order):
print(f"Sending email for order: {order['id']}")
emitter.on("order: created", on_order_created)
emitter.on("order:created", send_email)
emitter.emit("order:created", {"id": 1, "product": "iPhone"})
# ============ 异步事件系统 ============
class AsyncEventEmitter:
"""异步事件发射器"""
def __init__(self):
self._listeners: Dict[str, List[Callable]] = {}
def on(self, event: str, callback: Callable):
if event not in self._listeners:
self._listeners[event] = []
self._listeners[event].append(callback)
async def emit(self, event: str, *args, **kwargs):
if event in self._listeners:
tasks = []
for callback in self._listeners[event]:
if asyncio.iscoroutinefunction(callback):
tasks.append(callback(*args, **kwargs))
else:
callback(*args, **kwargs)
if tasks:
await asyncio.gather(*tasks)
# 使用
async def async_handler(data):
await asyncio.sleep(1)
print(f"Async handler: {data}")
emitter = AsyncEventEmitter()
emitter.on("data", async_handler)
# await emitter.emit("data", {"key": "value"})
责任链模式(Chain of Responsibility)
"""
责任链模式:将请求沿着处理者链传递,直到有一个处理者处理它
适用场景:
- 中间件
- 审批流程
- 过滤器链
"""
from abc import ABC, abstractmethod
from typing import Optional, Any
from dataclasses import dataclass
# ============ 请求对象 ============
@dataclass
class Request:
user_id: int
amount: float
request_type: str
# ============ 处理者抽象类 ============
class Handler(ABC):
def __init__(self):
self._next_handler: Optional[Handler] = None
def set_next(self, handler: "Handler") -> "Handler":
self._next_handler = handler
return handler
def handle(self, request: Request) -> Optional[str]:
result = self._process(request)
if result is not None:
return result
if self._next_handler:
return self._next_handler.handle(request)
return None
@abstractmethod
def _process(self, request: Request) -> Optional[str]:
pass
# ============ 具体处理者 ============
class AuthenticationHandler(Handler):
"""认证处理"""
def _process(self, request: Request) -> Optional[str]:
if request.user_id <= 0:
return "Authentication failed: invalid user"
print(f"Authentication passed for user {request.user_id}")
return None
class AuthorizationHandler(Handler):
"""授权处理"""
def _process(self, request: Request) -> Optional[str]:
if request.amount > 10000 and request.user_id != 1:
return "Authorization failed: amount too large"
print(f"Authorization passed for amount {request.amount}")
return None
class ValidationHandler(Handler):
"""验证处理"""
def _process(self, request: Request) -> Optional[str]:
if request.amount <= 0:
return "Validation failed: invalid amount"
print(f"Validation passed")
return None
class BusinessHandler(Handler):
"""业务处理"""
def _process(self, request: Request) -> Optional[str]:
print(f"Processing business logic for request")
return "Request processed successfully"
# 构建责任链
auth = AuthenticationHandler()
authz = AuthorizationHandler()
valid = ValidationHandler()
business = BusinessHandler()
auth.set_next(authz).set_next(valid).set_next(business)
# 处理请求
request = Request(user_id=1, amount=5000, request_type="payment")
result = auth.handle(request)
print(f"Result: {result}")
# ============ 中间件风格实现 ============
from typing import Callable, List
Middleware = Callable[[Request, Callable], Any]
class Pipeline:
"""中间件管道"""
def __init__(self):
self._middlewares: List[Middleware] = []
def use(self, middleware: Middleware) -> "Pipeline":
self._middlewares.append(middleware)
return self
def execute(self, request: Request) -> Any:
def create_next(index: int) -> Callable:
if index >= len(self._middlewares):
return lambda req: None
def next_func(req: Request) -> Any:
return self._middlewares[index](req, create_next(index + 1))
return next_func
return create_next(0)(request)
# 中间件函数
def logging_middleware(request: Request, next: Callable) -> Any:
print(f"[LOG] Processing request: {request}")
result = next(request)
print(f"[LOG] Request completed")
return result
def auth_middleware(request: Request, next: Callable) -> Any:
if request.user_id <= 0:
raise ValueError("Unauthorized")
return next(request)
def business_middleware(request: Request, next: Callable) -> Any:
return f"Processed: {request}"
# 使用
pipeline = Pipeline()
pipeline.use(logging_middleware)
pipeline.use(auth_middleware)
pipeline.use(business_middleware)
result = pipeline.execute(Request(user_id=1, amount=100, request_type="order"))
二、系统设计
2.1 系统设计面试思路
"""
============ 系统设计面试框架 ============
1. 需求澄清(5 分钟)
- 功能需求:核心功能是什么?
- 非功能需求:性能、可用性、扩展性
- 规模估算:DAU、QPS、数据量
2. 高层设计(10 分钟)
- 画出核心组件
- 确定数据流向
- 识别关键接口
3. 详细设计(15 分钟)
- 数据库设计
- API 设计
- 关键算法
4. 扩展性设计(10 分钟)
- 如何水平扩展
- 如何处理热点
- 如何保证高可用
5. 总结和权衡(5 分钟)
- 技术选型的理由
- 系统的不足和改进方向
"""
2.2 短链系统设计
"""
============ 需求分析 ============
功能需求:
1. 给定长 URL,生成短链
2. 访问短链,跳转到长 URL
3. (可选)自定义短链
4. (可选)链接有效期
非功能需求:
- 高可用:99.99%
- 低延迟:<100ms
- 高并发:10000 QPS 写,100000 QPS 读
规模估算:
- 每天新增短链:100万
- 短链长度:7 位 (62^7 ≈ 3.5 万亿)
- 读写比:100:1
"""
import hashlib
import time
from typing import Optional
from dataclasses import dataclass
import redis
from sqlalchemy import Column, String, BigInteger, DateTime
from sqlalchemy.ext.asyncio import AsyncSession
# ============ 数据模型 ============
class ShortUrl(Base):
__tablename__ = "short_urls"
id = Column(BigInteger, primary_key=True)
short_code = Column(String(10), unique=True, index=True, nullable=False)
long_url = Column(String(2048), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime, nullable=True)
click_count = Column(BigInteger, default=0)
# ============ 短码生成器 ============
class ShortCodeGenerator:
"""短码生成器"""
CHARS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
BASE = len(CHARS)
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# 方式1:自增 ID + Base62 编码
async def generate_by_id(self) -> str:
"""使用自增 ID 生成短码"""
id = await self.redis.incr("short_url: id_counter")
return self._encode(id)
def _encode(self, num: int) -> str:
"""Base62 编码"""
if num == 0:
return self.CHARS[0]
result = []
while num:
result.append(self.CHARS[num % self.BASE])
num //= self.BASE
return ''.join(reversed(result))
def _decode(self, code: str) -> int:
"""Base62 解码"""
num = 0
for char in code:
num = num * self.BASE + self.CHARS.index(char)
return num
# 方式2:Hash + 冲突处理
def generate_by_hash(self, url: str, retry: int = 0) -> str:
"""使用 Hash 生成短码"""
data = f"{url}{retry}{time.time()}"
hash_value = hashlib.md5(data.encode()).hexdigest()
return hash_value[:7]
# ============ 短链服务 ============
class ShortUrlService:
"""短链服务"""
def __init__(
self,
db: AsyncSession,
redis: redis.Redis,
generator: ShortCodeGenerator,
):
self.db = db
self.redis = redis
self.generator = generator
self.cache_ttl = 86400 # 24 小时
async def create_short_url(
self,
long_url: str,
custom_code: str = None,
expires_at: datetime = None,
) -> str:
"""创建短链"""
# 1. 检查是否已存在
cached = await self.redis.get(f"url: long:{long_url}")
if cached:
return cached.decode()
# 2. 生成短码
if custom_code:
if await self._code_exists(custom_code):
raise ValueError("Custom code already exists")
short_code = custom_code
else:
short_code = await self._generate_unique_code()
# 3. 保存到数据库
short_url = ShortUrl(
short_code=short_code,
long_url=long_url,
expires_at=expires_at,
)
self.db.add(short_url)
await self.db.commit()
# 4. 写入缓存
await self.redis.setex(
f"url: short:{short_code}",
self.cache_ttl,
long_url
)
await self.redis.setex(
f"url:long:{long_url}",
self.cache_ttl,
short_code
)
return short_code
async def get_long_url(self, short_code: str) -> Optional[str]:
"""获取长链接"""
# 1. 查缓存
cached = await self.redis.get(f"url:short:{short_code}")
if cached:
# 异步更新点击次数
asyncio.create_task(self._increment_click(short_code))
return cached.decode()
# 2. 查数据库
result = await self.db.execute(
select(ShortUrl).where(ShortUrl.short_code == short_code)
)
short_url = result.scalar_one_or_none()
if not short_url:
return None
# 3. 检查是否过期
if short_url.expires_at and short_url.expires_at < datetime.utcnow():
return None
# 4. 写入缓存
await self.redis.setex(
f"url: short:{short_code}",
self.cache_ttl,
short_url.long_url
)
return short_url.long_url
async def _generate_unique_code(self) -> str:
"""生成唯一短码"""
for _ in range(10): # 最多重试 10 次
code = await self.generator.generate_by_id()
if not await self._code_exists(code):
return code
raise RuntimeError("Failed to generate unique code")
async def _code_exists(self, code: str) -> bool:
"""检查短码是否存在"""
# 先查缓存
if await self.redis.exists(f"url:short:{code}"):
return True
# 再查数据库
result = await self.db.execute(
select(ShortUrl.id).where(ShortUrl.short_code == code)
)
return result.scalar_one_or_none() is not None
async def _increment_click(self, short_code: str):
"""增加点击次数"""
await self.db.execute(
update(ShortUrl)
.where(ShortUrl.short_code == short_code)
.values(click_count=ShortUrl.click_count + 1)
)
await self.db.commit()
# ============ API 接口 ============
@app.post("/shorten")
async def create_short_url(
request: CreateShortUrlRequest,
service: ShortUrlService = Depends(get_short_url_service),
):
short_code = await service.create_short_url(
long_url=request.url,
custom_code=request.custom_code,
expires_at=request.expires_at,
)
return {"short_url": f"https://short.url/{short_code}"}
@app.get("/{short_code}")
async def redirect(
short_code: str,
service: ShortUrlService = Depends(get_short_url_service),
):
long_url = await service.get_long_url(short_code)
if not long_url:
raise HTTPException(status_code=404, detail="Short URL not found")
return RedirectResponse(url=long_url, status_code=301)
2.3 秒杀系统设计
"""
============ 需求分析 ============
功能需求:
1. 商品展示:秒杀商品列表和详情
2. 秒杀下单:高并发抢购
3. 订单处理:创建订单、支付
非功能需求:
- 高并发:100 万 QPS 瞬时流量
- 高可用:核心链路 99.99%
- 一致性:不能超卖
- 防作弊:限制刷单
核心挑战:
1. 瞬时高并发
2. 库存一致性
3. 防止超卖
"""
import asyncio
import time
from typing import Optional
from dataclasses import dataclass
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import AsyncSession
import uuid
# ============ 数据模型 ============
@dataclass
class SeckillProduct:
id: int
name: str
price: float
stock: int
start_time: datetime
end_time: datetime
@dataclass
class SeckillOrder:
id: str
user_id: int
product_id: int
price: float
status: str # pending, paid, cancelled
created_at: datetime
# ============ 秒杀服务 ============
class SeckillService:
"""秒杀服务"""
def __init__(self, redis: Redis, db: AsyncSession):
self.redis = redis
self.db = db
# ============ 1. 预热库存到 Redis ============
async def warm_up(self, product_id: int, stock: int):
"""秒杀开始前预热库存"""
key = f"seckill:stock:{product_id}"
await self.redis.set(key, stock)
# 设置秒杀标志
await self.redis.set(f"seckill:active:{product_id}", 1)
# ============ 2. 秒杀核心逻辑 ============
async def seckill(self, user_id: int, product_id: int) -> Optional[str]:
"""
秒杀入口
返回订单 ID 或 None
"""
# 1. 前置校验
if not await self._pre_check(user_id, product_id):
return None
# 2. 扣减库存(Redis)
if not await self._deduct_stock(product_id):
return None
# 3. 创建订单(异步)
order_id = await self._create_order(user_id, product_id)
return order_id
async def _pre_check(self, user_id: int, product_id: int) -> bool:
"""前置校验"""
# 1. 检查秒杀是否开始
if not await self.redis.get(f"seckill: active:{product_id}"):
return False
# 2. 检查用户是否已购买(一人一单)
if await self.redis.sismember(f"seckill:users:{product_id}", user_id):
return False
# 3. 检查库存(快速失败)
stock = await self.redis.get(f"seckill:stock:{product_id}")
if not stock or int(stock) <= 0:
return False
return True
async def _deduct_stock(self, product_id: int) -> bool:
"""扣减库存(Lua 脚本保证原子性)"""
script = """
local stock = tonumber(redis.call('get', KEYS[1]))
if stock and stock > 0 then
redis.call('decr', KEYS[1])
return 1
end
return 0
"""
result = await self.redis.eval(
script, 1, f"seckill:stock:{product_id}"
)
return bool(result)
async def _create_order(self, user_id: int, product_id: int) -> str:
"""创建订单"""
order_id = str(uuid.uuid4())
# 记录用户已购买
await self.redis.sadd(f"seckill:users:{product_id}", user_id)
# 发送到消息队列异步处理
order_data = {
"order_id": order_id,
"user_id": user_id,
"product_id": product_id,
"created_at": time.time(),
}
await self.redis.lpush("seckill:orders", json.dumps(order_data))
return order_id
# ============ 3. 订单消费者 ============
async def process_orders(self):
"""消费订单队列"""
while True:
# 阻塞获取订单
result = await self.redis.brpop("seckill:orders", timeout=5)
if not result:
continue
_, order_data = result
order = json.loads(order_data)
try:
await self._save_order_to_db(order)
except Exception as e:
# 处理失败,回滚库存
await self.redis.incr(f"seckill:stock:{order['product_id']}")
await self.redis.srem(
f"seckill:users:{order['product_id']}",
order['user_id']
)
logger.error(f"Failed to process order: {e}")
async def _save_order_to_db(self, order: dict):
"""保存订单到数据库"""
async with self.db.begin():
# 扣减数据库库存
result = await self.db.execute(
update(Product)
.where(
Product.id == order['product_id'],
Product.stock > 0
)
.values(stock=Product.stock - 1)
)
if result.rowcount == 0:
raise ValueError("Stock not enough")
# 创建订单
db_order = Order(
id=order['order_id'],
user_id=order['user_id'],
product_id=order['product_id'],
status='pending',
)
self.db.add(db_order)
# ============ 限流和防刷 ============
class RateLimiter:
"""限流器"""
def __init__(self, redis: Redis):
self.redis = redis
async def is_allowed(
self,
user_id: int,
limit: int = 10,
window: int = 1
) -> bool:
"""滑动窗口限流"""
key = f"rate_limit:{user_id}"
now = time.time()
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, now - window)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, window)
results = await pipe.execute()
current_count = results[1]
return current_count < limit
# ============ API 接口 ============
@app.post("/seckill/{product_id}")
async def seckill(
product_id: int,
user: User = Depends(get_current_user),
service: SeckillService = Depends(get_seckill_service),
limiter: RateLimiter = Depends(get_rate_limiter),
):
# 1. 限流检查
if not await limiter.is_allowed(user.id):
raise HTTPException(status_code=429, detail="Too many requests")
# 2. 执行秒杀
order_id = await service.seckill(user.id, product_id)
if not order_id:
raise HTTPException(status_code=400, detail="Seckill failed")
return {"order_id": order_id, "message": "Seckill success"}
2.4 常见系统设计题目
"""
============ 高频系统设计题目 ============
1. 短链系统(TinyURL)
- 短码生成算法
- 高并发读取
- 过期清理
2. 秒杀系统
- 库存一致性
- 防超卖
- 异步下单
3. 消息队列
- 消息持久化
- 消息确认机制
- 消息重试
4. 分布式缓存
- 缓存一致性
- 缓存穿透/击穿/雪崩
- 热点 Key
5. 分布式 ID 生成器
- 雪花算法
- 号段模式
- 高可用
6. 限流系统
- 令牌桶/漏桶
- 分布式限流
- 多维度限流
7. 搜索系统
- 倒排索引
- 分词
- 相关性排序
8. 推荐系统
- 协同过滤
- 实时推荐
- 冷启动
9. 分布式任务调度
- 任务分片
- 故障转移
- 任务依赖
10. 实时聊天系统
- WebSocket
- 消息存储
- 在线状态
"""
三、软技能与面试技巧
3.1 STAR 法则回答行为问题
"""
============ STAR 法则 ============
S - Situation(情境):描述背景和情境
T - Task(任务):你的任务和目标
A - Action(行动):你采取的具体行动
R - Result(结果):取得的成果和数据
============ 示例问题 ============
Q: 请描述一次你解决过的最有挑战性的技术问题
A:
【Situation】
在上一家公司,我们的订单系统在双十一期间遇到了严重的性能问题,
QPS 从平时的 1000 飙升到 50000,导致数据库连接池耗尽,系统响应时间超过 10 秒。
【Task】
作为后端负责人,我需要在 48 小时内找到问题根因并解决,
确保系统能够支撑预计的 100000 QPS 峰值流量。
【Action】
1. 首先通过 APM 工具定位到瓶颈在数据库层,慢查询占比 60%
2. 分析慢查询日志,发现主要是订单查询缺少复合索引
3. 添加了 (user_id, status, created_at) 复合索引
4. 引入 Redis 缓存热点数据,缓存命中率达到 85%
5. 将同步下单改为异步,使用消息队列削峰填谷
6. 增加了数据库读写分离,写库 1 主,读库 3 从
【Result】
- 系统 QPS 提升到 120000
- 平均响应时间从 10 秒降到 50 毫秒
- 双十一当天零故障,订单量同比增长 300%
- 这次优化经验被整理成内部文档,在团队推广
"""
3.2 常见行为面试问题
"""
============ 技术相关 ============
1. 描述你做过的最有挑战性的项目
2. 你是如何学习新技术的?
3. 你如何进行代码 review?
4. 描述一次你解决线上故障的经历
5. 你如何保证代码质量?
============ 团队协作 ============
6. 描述一次与他人意见不一致的情况
7. 你如何与产品经理沟通需求?
8. 你如何带新人?
9. 描述一次跨团队协作的经历
10. 你如何处理工作中的压力?
============ 个人发展 ============
11. 你为什么选择这个职位?
12. 你的职业规划是什么?
13. 你认为自己最大的优点和缺点是什么?
14. 你期望的工作环境是怎样的?
15. 你有什么想问我们的?
"""
3.3 技术面试应答技巧
"""
============ 回答问题的结构 ============
1. 先给结论,再展开解释
- 错误:背景介绍很长,结论放最后
- 正确:先说结论,再解释原因和细节
2. 分层次回答
- 第一层:核心概念(是什么)
- 第二层:原理解释(为什么)
- 第三层:实际应用(怎么用)
- 第四层:延伸思考(还有什么)
3. 适当使用类比
- 用熟悉的概念解释陌生的概念
- 让面试官更容易理解
============ 示例 ============
Q: 什么是 Redis 的分布式锁?
A:
【结论】
分布式锁是一种在分布式系统中实现互斥访问共享资源的机制,
Redis 可以通过 SET NX EX 命令实现。
【原理】
SET key value NX EX seconds
- NX:只在 key 不存在时设置(保证互斥)
- EX:设置过期时间(防止死锁)
【关键要点】
1. 锁需要设置唯一标识(防止误删)
2. 释放锁时要用 Lua 脚本保证原子性
3. 需要考虑锁续期问题(看门狗机制)
【实际应用】
在秒杀系统中,我们用 Redis 分布式锁来保证库存扣减的原子性...
【延伸】
当然,Redis 单点锁有可能在主从切换时失效,
如果对一致性要求更高,可以考虑 Redlock 算法或使用 ZooKeeper。
============ 不会的问题怎么回答 ============
1. 诚实承认不知道
"这个问题我不太了解,但我可以说一下我的理解..."
2. 说出相关知识
"这个我没用过,但我知道类似的 xxx..."
3. 展示学习能力
"这个我之前没接触过,但如果需要,我会从 xxx 开始学习..."
"""
3.4 提问环节
"""
============ 可以问的问题 ============
【关于团队】
1. 团队目前有多少人?技术栈是什么?
2. 团队的开发流程是怎样的?
3. 新人入职会有什么培训或导师制度?
【关于项目】
4. 这个岗位主要负责哪些项目?
5. 目前团队面临的最大技术挑战是什么?
6. 未来一年的技术规划是什么?
【关于成长】
7. 公司对技术人员的晋升路径是怎样的?
8. 有没有技术分享或学习的机会?
9. 这个岗位的绩效考核标准是什么?
【关于文化】
10. 团队的工作氛围是怎样的?
11. 加班情况怎么样?
12. 远程办公政策是怎样的?
============ 避免问的问题 ============
- 薪资福利(第一轮面试不要问)
- 可以在网上查到的基础信息
- 太过私人的问题
"""
📝 Part 8 总结
设计模式高频考点
| 模式 | 面试频率 | 应用场景 |
|---|---|---|
| 单例模式 | ⭐⭐⭐⭐⭐ | 配置管理、数据库连接池 |
| 工厂模式 | ⭐⭐⭐⭐ | 对象创建解耦 |
| 策略模式 | ⭐⭐⭐⭐ | 算法切换、支付方式 |
| 观察者模式 | ⭐⭐⭐⭐ | 事件系统、消息通知 |
| 装饰器模式 | ⭐⭐⭐⭐⭐ | 功能增强、日志、缓存 |
| 责任链模式 | ⭐⭐⭐ | 中间件、审批流程 |
| 建造者模式 | ⭐⭐⭐ | 复杂对象构建 |
| 适配器模式 | ⭐⭐⭐ | 接口转换 |
系统设计考点
| 题目 | 频率 | 核心考点 |
|---|---|---|
| 短链系统 | ⭐⭐⭐⭐⭐ | ID 生成、缓存、高并发读 |
| 秒杀系统 | ⭐⭐⭐⭐⭐ | 库存一致性、限流、异步 |
| 消息队列 | ⭐⭐⭐⭐ | 可靠性、顺序性、幂等 |
| 分布式缓存 | ⭐⭐⭐⭐ | 一致性、穿透击穿雪崩 |
| 搜索系统 | ⭐⭐⭐ | 倒排索引、分词、排序 |
面试技巧要点
- 回答问题要有结构:结论先行,分层展开
- 展示思考过程:不要直接给答案,展示推理过程
- 诚实面对不会的:承认不知道,但展示学习能力
- 准备好提问:展示你对公司和岗位的兴趣
- STAR 法则:用于回答行为面试问题
🎉 全部完成!
面试手册总结
| Part | 主题 | 重要程度 | 核心考点 |
|---|---|---|---|
| Part 1 | Python 基础 & 数据类型 | ⭐⭐⭐⭐⭐ | 可变/不可变、is vs ==、深浅拷贝、数据结构 |
| Part 2 | 函数 & 闭包 & 装饰器 | ⭐⭐⭐⭐⭐ | 参数传递、LEGB、闭包陷阱、装饰器原理 |
| Part 3 | 面向对象 & 魔术方法 | ⭐⭐⭐⭐⭐ | 类属性、MRO、描述符、上下文管理器 |
| Part 4 | 并发编程 | ⭐⭐⭐⭐⭐ | GIL、多线程/多进程/协程选择、锁 |
| Part 5 | 内存管理 & 性能优化 | ⭐⭐⭐⭐ | 引用计数、GC、循环引用、性能分析 |
| Part 6 | 常用库 | ⭐⭐⭐⭐ | collections、itertools、requests、pytest |
| Part 7 | Web & 数据库 & 缓存 | ⭐⭐⭐⭐⭐ | FastAPI、MySQL索引、Redis缓存问题 |
| Part 8 | 设计模式 & 系统设计 | ⭐⭐⭐⭐ | 单例、工厂、策略、短链/秒杀系统 |
必须掌握的 TOP 20 知识点
Python 基础(必问)
├── 1. 可变对象 vs 不可变对象
├── 2. is vs == 的区别
├── 3. 深拷贝 vs 浅拷贝
├── 4. *args 和 **kwargs
└── 5. 列表推导式 & 生成器
函数与面向对象(高频)
├── 6. 闭包原理与陷阱
├── 7. 装饰器(带参数、类装饰器)
├── 8. __new__ vs __init__
├── 9. MRO 与 super()
└── 10. 上下文管理器
并发编程(重点)
├── 11. GIL 是什么?影响?
├── 12. 多线程 vs 多进程 vs 协程
├── 13. 什么场景用什么并发模型
└── 14. 线程安全与锁
数据库 & 缓存(核心)
├── 15. MySQL 索引原理(B+树)
├── 16. 事务隔离级别
├── 17. 缓存穿透/击穿/雪崩
├── 18. Redis 数据类型与应用
└── 19. 分布式锁实现
系统设计(加分)
└── 20. 短链/秒杀系统设计思路
写在最后
到此,我们的Python面试手册AI生成版就结束了,希望大家看完之后能够有所收获,在这个高速发展的时代,有属于自己的技术底蕴与个人的思考,祝大家都能找到一份自己喜欢的工作,永远不用担心薪资不涨或者大裁员,不要有那么大的压力,能够过上自己想要的生活。祝好!
更多推荐



所有评论(0)