Part 8: 设计模式 & 系统设计 & 软技能


一、常用设计模式

1.1 创建型模式

单例模式(Singleton)
"""
单例模式:确保一个类只有一个实例

适用场景:
- 数据库连接池
- 配置管理
- 日志记录器
- 线程池
"""

# ============ 方式1:模块级单例(推荐,最简单) ============
# config.py
class _Config:
    def __init__(self):
        self.debug = False
        self.database_url = "mysql://localhost/db"

config = _Config()  # 模块导入时创建,天然单例

# 使用
# from config import config

# ============ 方式2:装饰器实现 ============
from functools import wraps
from threading import Lock

def singleton(cls):
    """线程安全的单例装饰器"""
    instances = {}
    lock = Lock()
    
    @wraps(cls)
    def get_instance(*args, **kwargs):
        if cls not in instances: 
            with lock: 
                if cls not in instances:  # 双重检查
                    instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    
    return get_instance

@singleton
class Database:
    def __init__(self):
        print("Creating database connection...")
        self.connection = "connected"

db1 = Database()
db2 = Database()
print(db1 is db2)  # True

# ============ 方式3:__new__ 方法 ============
class Singleton:
    _instance = None
    _lock = Lock()
    
    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self, value=None):
        # 注意:__init__ 每次都会被调用
        if not hasattr(self, 'initialized'):
            self.value = value
            self.initialized = True

# ============ 方式4:元类实现 ============
class SingletonMeta(type):
    _instances = {}
    _lock = Lock()
    
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            with cls._lock:
                if cls not in cls._instances:
                    instance = super().__call__(*args, **kwargs)
                    cls._instances[cls] = instance
        return cls._instances[cls]

class Logger(metaclass=SingletonMeta):
    def __init__(self):
        self.logs = []
    
    def log(self, message):
        self.logs.append(message)
        print(f"[LOG] {message}")
工厂模式(Factory)
"""
工厂模式:将对象创建逻辑封装起来

适用场景:
- 创建逻辑复杂
- 需要根据条件创建不同类型的对象
- 解耦对象的创建和使用
"""

from abc import ABC, abstractmethod
from typing import Dict, Type

# ============ 简单工厂 ============
class PaymentProcessor(ABC):
    @abstractmethod
    def pay(self, amount:  float) -> bool:
        pass

class AlipayProcessor(PaymentProcessor):
    def pay(self, amount: float) -> bool:
        print(f"Paying {amount} via Alipay")
        return True

class WechatPayProcessor(PaymentProcessor):
    def pay(self, amount:  float) -> bool:
        print(f"Paying {amount} via WeChat Pay")
        return True

class CreditCardProcessor(PaymentProcessor):
    def pay(self, amount: float) -> bool:
        print(f"Paying {amount} via Credit Card")
        return True

# 简单工厂
class PaymentFactory: 
    """支付处理器工厂"""
    
    _processors:  Dict[str, Type[PaymentProcessor]] = {
        "alipay": AlipayProcessor,
        "wechat": WechatPayProcessor,
        "credit_card": CreditCardProcessor,
    }
    
    @classmethod
    def create(cls, payment_type: str) -> PaymentProcessor:
        processor_class = cls._processors.get(payment_type)
        if not processor_class:
            raise ValueError(f"Unknown payment type: {payment_type}")
        return processor_class()
    
    @classmethod
    def register(cls, payment_type: str, processor_class: Type[PaymentProcessor]):
        """注册新的支付处理器"""
        cls._processors[payment_type] = processor_class

# 使用
processor = PaymentFactory.create("alipay")
processor.pay(100.0)

# ============ 工厂方法模式 ============
class NotificationFactory(ABC):
    """通知工厂抽象类"""
    
    @abstractmethod
    def create_notification(self) -> "Notification":
        pass
    
    def send(self, message: str):
        notification = self.create_notification()
        notification.notify(message)

class Notification(ABC):
    @abstractmethod
    def notify(self, message: str):
        pass

class EmailNotification(Notification):
    def notify(self, message: str):
        print(f"Sending email: {message}")

class SMSNotification(Notification):
    def notify(self, message: str):
        print(f"Sending SMS:  {message}")

class EmailNotificationFactory(NotificationFactory):
    def create_notification(self) -> Notification:
        return EmailNotification()

class SMSNotificationFactory(NotificationFactory):
    def create_notification(self) -> Notification:
        return SMSNotification()

# 使用
factory = EmailNotificationFactory()
factory.send("Hello!")

# ============ 抽象工厂模式 ============
class UIFactory(ABC):
    """UI 组件抽象工厂"""
    
    @abstractmethod
    def create_button(self) -> "Button": 
        pass
    
    @abstractmethod
    def create_checkbox(self) -> "Checkbox":
        pass

class Button(ABC):
    @abstractmethod
    def render(self):
        pass

class Checkbox(ABC):
    @abstractmethod
    def render(self):
        pass

# Windows 风格
class WindowsButton(Button):
    def render(self):
        print("Rendering Windows button")

class WindowsCheckbox(Checkbox):
    def render(self):
        print("Rendering Windows checkbox")

class WindowsUIFactory(UIFactory):
    def create_button(self) -> Button:
        return WindowsButton()
    
    def create_checkbox(self) -> Checkbox:
        return WindowsCheckbox()

# Mac 风格
class MacButton(Button):
    def render(self):
        print("Rendering Mac button")

class MacCheckbox(Checkbox):
    def render(self):
        print("Rendering Mac checkbox")

class MacUIFactory(UIFactory):
    def create_button(self) -> Button:
        return MacButton()
    
    def create_checkbox(self) -> Checkbox:
        return MacCheckbox()
建造者模式(Builder)
"""
建造者模式:分步骤构建复杂对象

适用场景:
- 对象构造参数很多
- 需要构建不同配置的对象
- 构建过程需要多个步骤
"""

from dataclasses import dataclass, field
from typing import List, Optional

# ============ 使用 dataclass + Builder ============
@dataclass
class HttpRequest:
    method: str
    url: str
    headers: dict = field(default_factory=dict)
    params: dict = field(default_factory=dict)
    body: Optional[str] = None
    timeout: int = 30

class HttpRequestBuilder:
    """HTTP 请求建造者"""
    
    def __init__(self):
        self._method = "GET"
        self._url = ""
        self._headers = {}
        self._params = {}
        self._body = None
        self._timeout = 30
    
    def method(self, method:  str) -> "HttpRequestBuilder":
        self._method = method
        return self
    
    def url(self, url: str) -> "HttpRequestBuilder":
        self._url = url
        return self
    
    def header(self, key: str, value:  str) -> "HttpRequestBuilder":
        self._headers[key] = value
        return self
    
    def headers(self, headers:  dict) -> "HttpRequestBuilder":
        self._headers.update(headers)
        return self
    
    def param(self, key: str, value: str) -> "HttpRequestBuilder":
        self._params[key] = value
        return self
    
    def body(self, body:  str) -> "HttpRequestBuilder":
        self._body = body
        return self
    
    def timeout(self, timeout:  int) -> "HttpRequestBuilder":
        self._timeout = timeout
        return self
    
    def build(self) -> HttpRequest:
        if not self._url:
            raise ValueError("URL is required")
        
        return HttpRequest(
            method=self._method,
            url=self._url,
            headers=self._headers,
            params=self._params,
            body=self._body,
            timeout=self._timeout,
        )

# 使用:链式调用
request = (
    HttpRequestBuilder()
    .method("POST")
    .url("https://api.example.com/users")
    .header("Content-Type", "application/json")
    .header("Authorization", "Bearer token")
    .body('{"name": "Alice"}')
    .timeout(60)
    .build()
)

# ============ 更 Pythonic 的方式:使用 __init__ 关键字参数 ============
@dataclass
class QueryBuilder:
    """SQL 查询建造者"""
    
    _table: str = ""
    _columns:  List[str] = field(default_factory=lambda: ["*"])
    _where: List[str] = field(default_factory=list)
    _order_by: List[str] = field(default_factory=list)
    _limit: Optional[int] = None
    _offset: Optional[int] = None
    
    def select(self, *columns: str) -> "QueryBuilder":
        self._columns = list(columns) if columns else ["*"]
        return self
    
    def from_table(self, table: str) -> "QueryBuilder": 
        self._table = table
        return self
    
    def where(self, condition: str) -> "QueryBuilder":
        self._where.append(condition)
        return self
    
    def order_by(self, column:  str, desc: bool = False) -> "QueryBuilder": 
        order = f"{column} DESC" if desc else column
        self._order_by.append(order)
        return self
    
    def limit(self, limit: int) -> "QueryBuilder": 
        self._limit = limit
        return self
    
    def offset(self, offset: int) -> "QueryBuilder":
        self._offset = offset
        return self
    
    def build(self) -> str:
        if not self._table:
            raise ValueError("Table is required")
        
        sql = f"SELECT {', '.join(self._columns)} FROM {self._table}"
        
        if self._where:
            sql += f" WHERE {' AND '.join(self._where)}"
        
        if self._order_by: 
            sql += f" ORDER BY {', '.join(self._order_by)}"
        
        if self._limit:
            sql += f" LIMIT {self._limit}"
        
        if self._offset:
            sql += f" OFFSET {self._offset}"
        
        return sql

# 使用
query = (
    QueryBuilder()
    .select("id", "name", "email")
    .from_table("users")
    .where("status = 1")
    .where("age >= 18")
    .order_by("created_at", desc=True)
    .limit(10)
    .offset(0)
    .build()
)
print(query)
# SELECT id, name, email FROM users WHERE status = 1 AND age >= 18 ORDER BY created_at DESC LIMIT 10 OFFSET 0

1.2 结构型模式

装饰器模式(Decorator)
"""
装饰器模式:动态给对象添加职责

Python 的 @ 装饰器语法天然支持这种模式
"""

from abc import ABC, abstractmethod
from functools import wraps
from typing import Callable
import time
import logging

logger = logging.getLogger(__name__)

# ============ 函数装饰器(最常用) ============
def timer(func:  Callable) -> Callable:
    """计时装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        logger.info(f"{func.__name__} took {elapsed:.4f}s")
        return result
    return wrapper

def retry(max_attempts:  int = 3, delay: float = 1.0):
    """重试装饰器"""
    def decorator(func: Callable) -> Callable: 
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try: 
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        time.sleep(delay)
            raise last_exception
        return wrapper
    return decorator

def cache(ttl: int = 300):
    """缓存装饰器"""
    def decorator(func:  Callable) -> Callable:
        cache_data = {}
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(kwargs)
            now = time.time()
            
            if key in cache_data: 
                value, expire_time = cache_data[key]
                if now < expire_time: 
                    return value
            
            result = func(*args, **kwargs)
            cache_data[key] = (result, now + ttl)
            return result
        
        wrapper.cache_clear = lambda: cache_data.clear()
        return wrapper
    return decorator

@timer
@retry(max_attempts=3)
@cache(ttl=60)
def fetch_data(url:  str) -> dict:
    """获取数据"""
    pass

# ============ 类装饰器模式 ============
class DataSource(ABC):
    @abstractmethod
    def read_data(self) -> str:
        pass

class FileDataSource(DataSource):
    def __init__(self, filename: str):
        self.filename = filename
    
    def read_data(self) -> str:
        with open(self.filename) as f:
            return f.read()

class DataSourceDecorator(DataSource):
    """数据源装饰器基类"""
    
    def __init__(self, source: DataSource):
        self._source = source
    
    def read_data(self) -> str:
        return self._source.read_data()

class EncryptionDecorator(DataSourceDecorator):
    """加密装饰器"""
    
    def read_data(self) -> str:
        data = super().read_data()
        return self._decrypt(data)
    
    def _decrypt(self, data:  str) -> str:
        # 解密逻辑
        return f"decrypted({data})"

class CompressionDecorator(DataSourceDecorator):
    """压缩装饰器"""
    
    def read_data(self) -> str:
        data = super().read_data()
        return self._decompress(data)
    
    def _decompress(self, data: str) -> str:
        # 解压逻辑
        return f"decompressed({data})"

# 使用:可以叠加多个装饰器
source = FileDataSource("data.txt")
source = CompressionDecorator(source)
source = EncryptionDecorator(source)
data = source.read_data()
适配器模式(Adapter)
"""
适配器模式:将一个接口转换成另一个接口

适用场景:
- 使用第三方库但接口不兼容
- 统一多个不同接口的类
"""

from abc import ABC, abstractmethod
from typing import Dict, Any
import json
import xml.etree.ElementTree as ET

# ============ 目标接口 ============
class DataParser(ABC):
    @abstractmethod
    def parse(self, data: str) -> Dict[str, Any]:
        pass

# ============ 已有的类(接口不兼容) ============
class LegacyXMLParser: 
    """旧的 XML 解析器"""
    
    def parse_xml_string(self, xml_string: str) -> ET.Element:
        return ET.fromstring(xml_string)
    
    def get_element_value(self, element:  ET.Element, tag: str) -> str:
        child = element.find(tag)
        return child.text if child is not None else ""

# ============ 适配器 ============
class XMLParserAdapter(DataParser):
    """XML 解析器适配器"""
    
    def __init__(self):
        self._legacy_parser = LegacyXMLParser()
    
    def parse(self, data: str) -> Dict[str, Any]:
        root = self._legacy_parser.parse_xml_string(data)
        return self._element_to_dict(root)
    
    def _element_to_dict(self, element: ET.Element) -> Dict[str, Any]: 
        result = {}
        for child in element:
            if len(child) == 0:
                result[child.tag] = child.text
            else:
                result[child.tag] = self._element_to_dict(child)
        return result

class JSONParserAdapter(DataParser):
    """JSON 解析器适配器"""
    
    def parse(self, data: str) -> Dict[str, Any]:
        return json.loads(data)

# ============ 使用 ============
def process_data(parser:  DataParser, data:  str):
    """统一的数据处理函数"""
    result = parser.parse(data)
    return result

xml_data = "<root><name>Alice</name><age>30</age></root>"
json_data = '{"name":  "Alice", "age": 30}'

xml_parser = XMLParserAdapter()
json_parser = JSONParserAdapter()

print(process_data(xml_parser, xml_data))
print(process_data(json_parser, json_data))

1.3 行为型模式

策略模式(Strategy)
"""
策略模式:定义一系列算法,将每个算法封装起来,使它们可以互换

适用场景:
- 多种算法可以互换
- 避免大量 if-else
- 运行时选择算法
"""

from abc import ABC, abstractmethod
from typing import List, Callable
from dataclasses import dataclass

# ============ 策略接口 ============
class PricingStrategy(ABC):
    @abstractmethod
    def calculate_price(self, original_price: float) -> float:
        pass

# ============ 具体策略 ============
class RegularPricing(PricingStrategy):
    """普通定价"""
    def calculate_price(self, original_price:  float) -> float:
        return original_price

class VIPPricing(PricingStrategy):
    """VIP 定价:8 折"""
    def calculate_price(self, original_price:  float) -> float:
        return original_price * 0.8

class PromotionPricing(PricingStrategy):
    """促销定价:满 100 减 20"""
    def calculate_price(self, original_price: float) -> float:
        if original_price >= 100:
            return original_price - 20
        return original_price

class CouponPricing(PricingStrategy):
    """优惠券定价"""
    def __init__(self, discount:  float):
        self.discount = discount
    
    def calculate_price(self, original_price: float) -> float:
        return original_price - self.discount

# ============ 上下文 ============
class ShoppingCart:
    def __init__(self, strategy: PricingStrategy = None):
        self._strategy = strategy or RegularPricing()
        self._items:  List[float] = []
    
    def set_strategy(self, strategy: PricingStrategy):
        self._strategy = strategy
    
    def add_item(self, price: float):
        self._items.append(price)
    
    def get_total(self) -> float:
        original = sum(self._items)
        return self._strategy.calculate_price(original)

# 使用
cart = ShoppingCart()
cart.add_item(50)
cart.add_item(80)

print(f"普通价格: {cart.get_total()}")  # 130

cart.set_strategy(VIPPricing())
print(f"VIP 价格: {cart.get_total()}")  # 104

cart.set_strategy(PromotionPricing())
print(f"促销价格:  {cart.get_total()}")  # 110

# ============ 更 Pythonic 的方式:使用函数 ============
@dataclass
class Order:
    items:  List[float]
    pricing_strategy:  Callable[[float], float] = lambda x: x
    
    def get_total(self) -> float:
        original = sum(self.items)
        return self.pricing_strategy(original)

# 策略函数
def vip_pricing(price: float) -> float:
    return price * 0.8

def promotion_pricing(price:  float) -> float:
    return price - 20 if price >= 100 else price

order = Order(items=[50, 80], pricing_strategy=vip_pricing)
print(order.get_total())  # 104
观察者模式(Observer)
"""
观察者模式:定义对象间一对多的依赖关系,当一个对象状态改变时,所有依赖它的对象都会收到通知

适用场景:
- 事件驱动系统
- 发布-订阅模式
- 状态变化通知
"""

from abc import ABC, abstractmethod
from typing import List, Dict, Callable, Any
from dataclasses import dataclass, field
import asyncio

# ============ 经典实现 ============
class Observer(ABC):
    @abstractmethod
    def update(self, event: str, data:  Any):
        pass

class Subject: 
    def __init__(self):
        self._observers: List[Observer] = []
    
    def attach(self, observer: Observer):
        self._observers.append(observer)
    
    def detach(self, observer: Observer):
        self._observers.remove(observer)
    
    def notify(self, event: str, data:  Any = None):
        for observer in self._observers:
            observer.update(event, data)

class OrderSubject(Subject):
    def create_order(self, order_data: dict):
        # 创建订单逻辑
        print(f"Order created: {order_data}")
        self.notify("order_created", order_data)

class EmailObserver(Observer):
    def update(self, event: str, data: Any):
        if event == "order_created":
            print(f"Sending email for order: {data}")

class SMSObserver(Observer):
    def update(self, event: str, data:  Any):
        if event == "order_created":
            print(f"Sending SMS for order: {data}")

class InventoryObserver(Observer):
    def update(self, event:  str, data: Any):
        if event == "order_created":
            print(f"Updating inventory for order: {data}")

# 使用
order_subject = OrderSubject()
order_subject.attach(EmailObserver())
order_subject.attach(SMSObserver())
order_subject.attach(InventoryObserver())

order_subject.create_order({"id": 1, "product": "iPhone"})

# ============ 更 Pythonic 的事件系统 ============
class EventEmitter:
    """事件发射器"""
    
    def __init__(self):
        self._listeners: Dict[str, List[Callable]] = {}
    
    def on(self, event: str, callback: Callable):
        """注册事件监听器"""
        if event not in self._listeners:
            self._listeners[event] = []
        self._listeners[event].append(callback)
        return self
    
    def off(self, event: str, callback:  Callable = None):
        """移除事件监听器"""
        if event in self._listeners:
            if callback:
                self._listeners[event].remove(callback)
            else:
                del self._listeners[event]
        return self
    
    def emit(self, event: str, *args, **kwargs):
        """触发事件"""
        if event in self._listeners:
            for callback in self._listeners[event]: 
                callback(*args, **kwargs)
        return self
    
    def once(self, event:  str, callback:  Callable):
        """只监听一次"""
        def wrapper(*args, **kwargs):
            callback(*args, **kwargs)
            self.off(event, wrapper)
        self.on(event, wrapper)
        return self

# 使用
emitter = EventEmitter()

def on_order_created(order):
    print(f"Order created: {order}")

def send_email(order):
    print(f"Sending email for order: {order['id']}")

emitter.on("order: created", on_order_created)
emitter.on("order:created", send_email)

emitter.emit("order:created", {"id": 1, "product": "iPhone"})

# ============ 异步事件系统 ============
class AsyncEventEmitter:
    """异步事件发射器"""
    
    def __init__(self):
        self._listeners: Dict[str, List[Callable]] = {}
    
    def on(self, event: str, callback: Callable):
        if event not in self._listeners:
            self._listeners[event] = []
        self._listeners[event].append(callback)
    
    async def emit(self, event: str, *args, **kwargs):
        if event in self._listeners:
            tasks = []
            for callback in self._listeners[event]:
                if asyncio.iscoroutinefunction(callback):
                    tasks.append(callback(*args, **kwargs))
                else: 
                    callback(*args, **kwargs)
            if tasks:
                await asyncio.gather(*tasks)

# 使用
async def async_handler(data):
    await asyncio.sleep(1)
    print(f"Async handler: {data}")

emitter = AsyncEventEmitter()
emitter.on("data", async_handler)

# await emitter.emit("data", {"key": "value"})
责任链模式(Chain of Responsibility)
"""
责任链模式:将请求沿着处理者链传递,直到有一个处理者处理它

适用场景:
- 中间件
- 审批流程
- 过滤器链
"""

from abc import ABC, abstractmethod
from typing import Optional, Any
from dataclasses import dataclass

# ============ 请求对象 ============
@dataclass
class Request:
    user_id: int
    amount: float
    request_type: str

# ============ 处理者抽象类 ============
class Handler(ABC):
    def __init__(self):
        self._next_handler:  Optional[Handler] = None
    
    def set_next(self, handler: "Handler") -> "Handler":
        self._next_handler = handler
        return handler
    
    def handle(self, request:  Request) -> Optional[str]:
        result = self._process(request)
        if result is not None: 
            return result
        if self._next_handler:
            return self._next_handler.handle(request)
        return None
    
    @abstractmethod
    def _process(self, request: Request) -> Optional[str]: 
        pass

# ============ 具体处理者 ============
class AuthenticationHandler(Handler):
    """认证处理"""
    def _process(self, request: Request) -> Optional[str]: 
        if request.user_id <= 0:
            return "Authentication failed:  invalid user"
        print(f"Authentication passed for user {request.user_id}")
        return None

class AuthorizationHandler(Handler):
    """授权处理"""
    def _process(self, request: Request) -> Optional[str]:
        if request.amount > 10000 and request.user_id != 1:
            return "Authorization failed:  amount too large"
        print(f"Authorization passed for amount {request.amount}")
        return None

class ValidationHandler(Handler):
    """验证处理"""
    def _process(self, request: Request) -> Optional[str]: 
        if request.amount <= 0:
            return "Validation failed: invalid amount"
        print(f"Validation passed")
        return None

class BusinessHandler(Handler):
    """业务处理"""
    def _process(self, request: Request) -> Optional[str]:
        print(f"Processing business logic for request")
        return "Request processed successfully"

# 构建责任链
auth = AuthenticationHandler()
authz = AuthorizationHandler()
valid = ValidationHandler()
business = BusinessHandler()

auth.set_next(authz).set_next(valid).set_next(business)

# 处理请求
request = Request(user_id=1, amount=5000, request_type="payment")
result = auth.handle(request)
print(f"Result: {result}")

# ============ 中间件风格实现 ============
from typing import Callable, List

Middleware = Callable[[Request, Callable], Any]

class Pipeline:
    """中间件管道"""
    
    def __init__(self):
        self._middlewares: List[Middleware] = []
    
    def use(self, middleware:  Middleware) -> "Pipeline": 
        self._middlewares.append(middleware)
        return self
    
    def execute(self, request:  Request) -> Any:
        def create_next(index: int) -> Callable:
            if index >= len(self._middlewares):
                return lambda req: None
            
            def next_func(req: Request) -> Any:
                return self._middlewares[index](req, create_next(index + 1))
            
            return next_func
        
        return create_next(0)(request)

# 中间件函数
def logging_middleware(request: Request, next:  Callable) -> Any:
    print(f"[LOG] Processing request: {request}")
    result = next(request)
    print(f"[LOG] Request completed")
    return result

def auth_middleware(request:  Request, next:  Callable) -> Any:
    if request.user_id <= 0:
        raise ValueError("Unauthorized")
    return next(request)

def business_middleware(request:  Request, next:  Callable) -> Any:
    return f"Processed:  {request}"

# 使用
pipeline = Pipeline()
pipeline.use(logging_middleware)
pipeline.use(auth_middleware)
pipeline.use(business_middleware)

result = pipeline.execute(Request(user_id=1, amount=100, request_type="order"))

二、系统设计

2.1 系统设计面试思路

"""
============ 系统设计面试框架 ============

1. 需求澄清(5 分钟)
   - 功能需求:核心功能是什么?
   - 非功能需求:性能、可用性、扩展性
   - 规模估算:DAU、QPS、数据量

2. 高层设计(10 分钟)
   - 画出核心组件
   - 确定数据流向
   - 识别关键接口

3. 详细设计(15 分钟)
   - 数据库设计
   - API 设计
   - 关键算法

4. 扩展性设计(10 分钟)
   - 如何水平扩展
   - 如何处理热点
   - 如何保证高可用

5. 总结和权衡(5 分钟)
   - 技术选型的理由
   - 系统的不足和改进方向
"""

2.2 短链系统设计

"""
============ 需求分析 ============

功能需求:
1. 给定长 URL,生成短链
2. 访问短链,跳转到长 URL
3. (可选)自定义短链
4. (可选)链接有效期

非功能需求:
- 高可用:99.99%
- 低延迟:<100ms
- 高并发:10000 QPS 写,100000 QPS 读

规模估算:
- 每天新增短链:100万
- 短链长度:7 位 (62^7 ≈ 3.5 万亿)
- 读写比:100:1
"""

import hashlib
import time
from typing import Optional
from dataclasses import dataclass
import redis
from sqlalchemy import Column, String, BigInteger, DateTime
from sqlalchemy.ext.asyncio import AsyncSession

# ============ 数据模型 ============
class ShortUrl(Base):
    __tablename__ = "short_urls"
    
    id = Column(BigInteger, primary_key=True)
    short_code = Column(String(10), unique=True, index=True, nullable=False)
    long_url = Column(String(2048), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    expires_at = Column(DateTime, nullable=True)
    click_count = Column(BigInteger, default=0)

# ============ 短码生成器 ============
class ShortCodeGenerator:
    """短码生成器"""
    
    CHARS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    BASE = len(CHARS)
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    # 方式1:自增 ID + Base62 编码
    async def generate_by_id(self) -> str:
        """使用自增 ID 生成短码"""
        id = await self.redis.incr("short_url: id_counter")
        return self._encode(id)
    
    def _encode(self, num: int) -> str:
        """Base62 编码"""
        if num == 0:
            return self.CHARS[0]
        
        result = []
        while num: 
            result.append(self.CHARS[num % self.BASE])
            num //= self.BASE
        
        return ''.join(reversed(result))
    
    def _decode(self, code: str) -> int:
        """Base62 解码"""
        num = 0
        for char in code: 
            num = num * self.BASE + self.CHARS.index(char)
        return num
    
    # 方式2:Hash + 冲突处理
    def generate_by_hash(self, url: str, retry:  int = 0) -> str:
        """使用 Hash 生成短码"""
        data = f"{url}{retry}{time.time()}"
        hash_value = hashlib.md5(data.encode()).hexdigest()
        return hash_value[:7]

# ============ 短链服务 ============
class ShortUrlService:
    """短链服务"""
    
    def __init__(
        self,
        db:  AsyncSession,
        redis:  redis.Redis,
        generator: ShortCodeGenerator,
    ):
        self.db = db
        self.redis = redis
        self.generator = generator
        self.cache_ttl = 86400  # 24 小时
    
    async def create_short_url(
        self,
        long_url: str,
        custom_code: str = None,
        expires_at: datetime = None,
    ) -> str:
        """创建短链"""
        # 1. 检查是否已存在
        cached = await self.redis.get(f"url: long:{long_url}")
        if cached: 
            return cached.decode()
        
        # 2. 生成短码
        if custom_code: 
            if await self._code_exists(custom_code):
                raise ValueError("Custom code already exists")
            short_code = custom_code
        else:
            short_code = await self._generate_unique_code()
        
        # 3. 保存到数据库
        short_url = ShortUrl(
            short_code=short_code,
            long_url=long_url,
            expires_at=expires_at,
        )
        self.db.add(short_url)
        await self.db.commit()
        
        # 4. 写入缓存
        await self.redis.setex(
            f"url: short:{short_code}",
            self.cache_ttl,
            long_url
        )
        await self.redis.setex(
            f"url:long:{long_url}",
            self.cache_ttl,
            short_code
        )
        
        return short_code
    
    async def get_long_url(self, short_code: str) -> Optional[str]:
        """获取长链接"""
        # 1. 查缓存
        cached = await self.redis.get(f"url:short:{short_code}")
        if cached: 
            # 异步更新点击次数
            asyncio.create_task(self._increment_click(short_code))
            return cached.decode()
        
        # 2. 查数据库
        result = await self.db.execute(
            select(ShortUrl).where(ShortUrl.short_code == short_code)
        )
        short_url = result.scalar_one_or_none()
        
        if not short_url:
            return None
        
        # 3. 检查是否过期
        if short_url.expires_at and short_url.expires_at < datetime.utcnow():
            return None
        
        # 4. 写入缓存
        await self.redis.setex(
            f"url: short:{short_code}",
            self.cache_ttl,
            short_url.long_url
        )
        
        return short_url.long_url
    
    async def _generate_unique_code(self) -> str:
        """生成唯一短码"""
        for _ in range(10):  # 最多重试 10 次
            code = await self.generator.generate_by_id()
            if not await self._code_exists(code):
                return code
        raise RuntimeError("Failed to generate unique code")
    
    async def _code_exists(self, code: str) -> bool:
        """检查短码是否存在"""
        # 先查缓存
        if await self.redis.exists(f"url:short:{code}"):
            return True
        
        # 再查数据库
        result = await self.db.execute(
            select(ShortUrl.id).where(ShortUrl.short_code == code)
        )
        return result.scalar_one_or_none() is not None
    
    async def _increment_click(self, short_code: str):
        """增加点击次数"""
        await self.db.execute(
            update(ShortUrl)
            .where(ShortUrl.short_code == short_code)
            .values(click_count=ShortUrl.click_count + 1)
        )
        await self.db.commit()

# ============ API 接口 ============
@app.post("/shorten")
async def create_short_url(
    request: CreateShortUrlRequest,
    service: ShortUrlService = Depends(get_short_url_service),
):
    short_code = await service.create_short_url(
        long_url=request.url,
        custom_code=request.custom_code,
        expires_at=request.expires_at,
    )
    return {"short_url": f"https://short.url/{short_code}"}

@app.get("/{short_code}")
async def redirect(
    short_code: str,
    service: ShortUrlService = Depends(get_short_url_service),
):
    long_url = await service.get_long_url(short_code)
    if not long_url: 
        raise HTTPException(status_code=404, detail="Short URL not found")
    return RedirectResponse(url=long_url, status_code=301)

2.3 秒杀系统设计

"""
============ 需求分析 ============

功能需求:
1. 商品展示:秒杀商品列表和详情
2. 秒杀下单:高并发抢购
3. 订单处理:创建订单、支付

非功能需求:
- 高并发:100 万 QPS 瞬时流量
- 高可用:核心链路 99.99%
- 一致性:不能超卖
- 防作弊:限制刷单

核心挑战:
1. 瞬时高并发
2. 库存一致性
3. 防止超卖
"""

import asyncio
import time
from typing import Optional
from dataclasses import dataclass
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import AsyncSession
import uuid

# ============ 数据模型 ============
@dataclass
class SeckillProduct:
    id: int
    name: str
    price: float
    stock:  int
    start_time: datetime
    end_time: datetime

@dataclass
class SeckillOrder: 
    id: str
    user_id: int
    product_id:  int
    price: float
    status: str  # pending, paid, cancelled
    created_at: datetime

# ============ 秒杀服务 ============
class SeckillService:
    """秒杀服务"""
    
    def __init__(self, redis: Redis, db: AsyncSession):
        self.redis = redis
        self.db = db
    
    # ============ 1. 预热库存到 Redis ============
    async def warm_up(self, product_id: int, stock: int):
        """秒杀开始前预热库存"""
        key = f"seckill:stock:{product_id}"
        await self.redis.set(key, stock)
        
        # 设置秒杀标志
        await self.redis.set(f"seckill:active:{product_id}", 1)
    
    # ============ 2. 秒杀核心逻辑 ============
    async def seckill(self, user_id: int, product_id: int) -> Optional[str]:
        """
        秒杀入口
        返回订单 ID 或 None
        """
        # 1. 前置校验
        if not await self._pre_check(user_id, product_id):
            return None
        
        # 2. 扣减库存(Redis)
        if not await self._deduct_stock(product_id):
            return None
        
        # 3. 创建订单(异步)
        order_id = await self._create_order(user_id, product_id)
        
        return order_id
    
    async def _pre_check(self, user_id:  int, product_id: int) -> bool:
        """前置校验"""
        # 1. 检查秒杀是否开始
        if not await self.redis.get(f"seckill: active:{product_id}"):
            return False
        
        # 2. 检查用户是否已购买(一人一单)
        if await self.redis.sismember(f"seckill:users:{product_id}", user_id):
            return False
        
        # 3. 检查库存(快速失败)
        stock = await self.redis.get(f"seckill:stock:{product_id}")
        if not stock or int(stock) <= 0:
            return False
        
        return True
    
    async def _deduct_stock(self, product_id: int) -> bool:
        """扣减库存(Lua 脚本保证原子性)"""
        script = """
        local stock = tonumber(redis.call('get', KEYS[1]))
        if stock and stock > 0 then
            redis.call('decr', KEYS[1])
            return 1
        end
        return 0
        """
        result = await self.redis.eval(
            script, 1, f"seckill:stock:{product_id}"
        )
        return bool(result)
    
    async def _create_order(self, user_id:  int, product_id: int) -> str:
        """创建订单"""
        order_id = str(uuid.uuid4())
        
        # 记录用户已购买
        await self.redis.sadd(f"seckill:users:{product_id}", user_id)
        
        # 发送到消息队列异步处理
        order_data = {
            "order_id": order_id,
            "user_id": user_id,
            "product_id":  product_id,
            "created_at": time.time(),
        }
        await self.redis.lpush("seckill:orders", json.dumps(order_data))
        
        return order_id
    
    # ============ 3. 订单消费者 ============
    async def process_orders(self):
        """消费订单队列"""
        while True:
            # 阻塞获取订单
            result = await self.redis.brpop("seckill:orders", timeout=5)
            if not result:
                continue
            
            _, order_data = result
            order = json.loads(order_data)
            
            try:
                await self._save_order_to_db(order)
            except Exception as e: 
                # 处理失败,回滚库存
                await self.redis.incr(f"seckill:stock:{order['product_id']}")
                await self.redis.srem(
                    f"seckill:users:{order['product_id']}", 
                    order['user_id']
                )
                logger.error(f"Failed to process order:  {e}")
    
    async def _save_order_to_db(self, order: dict):
        """保存订单到数据库"""
        async with self.db.begin():
            # 扣减数据库库存
            result = await self.db.execute(
                update(Product)
                .where(
                    Product.id == order['product_id'],
                    Product.stock > 0
                )
                .values(stock=Product.stock - 1)
            )
            
            if result.rowcount == 0:
                raise ValueError("Stock not enough")
            
            # 创建订单
            db_order = Order(
                id=order['order_id'],
                user_id=order['user_id'],
                product_id=order['product_id'],
                status='pending',
            )
            self.db.add(db_order)

# ============ 限流和防刷 ============
class RateLimiter:
    """限流器"""
    
    def __init__(self, redis:  Redis):
        self.redis = redis
    
    async def is_allowed(
        self, 
        user_id:  int, 
        limit: int = 10, 
        window: int = 1
    ) -> bool:
        """滑动窗口限流"""
        key = f"rate_limit:{user_id}"
        now = time.time()
        
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, now - window)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, window)
        
        results = await pipe.execute()
        current_count = results[1]
        
        return current_count < limit

# ============ API 接口 ============
@app.post("/seckill/{product_id}")
async def seckill(
    product_id: int,
    user:  User = Depends(get_current_user),
    service: SeckillService = Depends(get_seckill_service),
    limiter: RateLimiter = Depends(get_rate_limiter),
):
    # 1. 限流检查
    if not await limiter.is_allowed(user.id):
        raise HTTPException(status_code=429, detail="Too many requests")
    
    # 2. 执行秒杀
    order_id = await service.seckill(user.id, product_id)
    
    if not order_id:
        raise HTTPException(status_code=400, detail="Seckill failed")
    
    return {"order_id": order_id, "message": "Seckill success"}

2.4 常见系统设计题目

"""
============ 高频系统设计题目 ============

1. 短链系统(TinyURL)
   - 短码生成算法
   - 高并发读取
   - 过期清理

2. 秒杀系统
   - 库存一致性
   - 防超卖
   - 异步下单

3. 消息队列
   - 消息持久化
   - 消息确认机制
   - 消息重试

4. 分布式缓存
   - 缓存一致性
   - 缓存穿透/击穿/雪崩
   - 热点 Key

5. 分布式 ID 生成器
   - 雪花算法
   - 号段模式
   - 高可用

6. 限流系统
   - 令牌桶/漏桶
   - 分布式限流
   - 多维度限流

7. 搜索系统
   - 倒排索引
   - 分词
   - 相关性排序

8. 推荐系统
   - 协同过滤
   - 实时推荐
   - 冷启动

9. 分布式任务调度
   - 任务分片
   - 故障转移
   - 任务依赖

10. 实时聊天系统
    - WebSocket
    - 消息存储
    - 在线状态
"""

三、软技能与面试技巧

3.1 STAR 法则回答行为问题

"""
============ STAR 法则 ============

S - Situation(情境):描述背景和情境
T - Task(任务):你的任务和目标
A - Action(行动):你采取的具体行动
R - Result(结果):取得的成果和数据

============ 示例问题 ============

Q: 请描述一次你解决过的最有挑战性的技术问题

A: 
【Situation】
在上一家公司,我们的订单系统在双十一期间遇到了严重的性能问题,
QPS 从平时的 1000 飙升到 50000,导致数据库连接池耗尽,系统响应时间超过 10 秒。

【Task】
作为后端负责人,我需要在 48 小时内找到问题根因并解决,
确保系统能够支撑预计的 100000 QPS 峰值流量。

【Action】
1. 首先通过 APM 工具定位到瓶颈在数据库层,慢查询占比 60%
2. 分析慢查询日志,发现主要是订单查询缺少复合索引
3. 添加了 (user_id, status, created_at) 复合索引
4. 引入 Redis 缓存热点数据,缓存命中率达到 85%
5. 将同步下单改为异步,使用消息队列削峰填谷
6. 增加了数据库读写分离,写库 1 主,读库 3 从

【Result】
- 系统 QPS 提升到 120000
- 平均响应时间从 10 秒降到 50 毫秒
- 双十一当天零故障,订单量同比增长 300%
- 这次优化经验被整理成内部文档,在团队推广
"""

3.2 常见行为面试问题

"""
============ 技术相关 ============

1. 描述你做过的最有挑战性的项目
2. 你是如何学习新技术的?
3. 你如何进行代码 review?
4. 描述一次你解决线上故障的经历
5. 你如何保证代码质量?

============ 团队协作 ============

6. 描述一次与他人意见不一致的情况
7. 你如何与产品经理沟通需求?
8. 你如何带新人?
9. 描述一次跨团队协作的经历
10. 你如何处理工作中的压力?

============ 个人发展 ============

11. 你为什么选择这个职位?
12. 你的职业规划是什么?
13. 你认为自己最大的优点和缺点是什么?
14. 你期望的工作环境是怎样的?
15. 你有什么想问我们的?
"""

3.3 技术面试应答技巧

"""
============ 回答问题的结构 ============

1. 先给结论,再展开解释
   - 错误:背景介绍很长,结论放最后
   - 正确:先说结论,再解释原因和细节

2. 分层次回答
   - 第一层:核心概念(是什么)
   - 第二层:原理解释(为什么)
   - 第三层:实际应用(怎么用)
   - 第四层:延伸思考(还有什么)

3. 适当使用类比
   - 用熟悉的概念解释陌生的概念
   - 让面试官更容易理解

============ 示例 ============

Q: 什么是 Redis 的分布式锁?

A:  
【结论】
分布式锁是一种在分布式系统中实现互斥访问共享资源的机制,
Redis 可以通过 SET NX EX 命令实现。

【原理】
SET key value NX EX seconds
- NX:只在 key 不存在时设置(保证互斥)
- EX:设置过期时间(防止死锁)

【关键要点】
1. 锁需要设置唯一标识(防止误删)
2. 释放锁时要用 Lua 脚本保证原子性
3. 需要考虑锁续期问题(看门狗机制)

【实际应用】
在秒杀系统中,我们用 Redis 分布式锁来保证库存扣减的原子性... 

【延伸】
当然,Redis 单点锁有可能在主从切换时失效,
如果对一致性要求更高,可以考虑 Redlock 算法或使用 ZooKeeper。

============ 不会的问题怎么回答 ============

1. 诚实承认不知道
   "这个问题我不太了解,但我可以说一下我的理解..."

2. 说出相关知识
   "这个我没用过,但我知道类似的 xxx..."

3. 展示学习能力
   "这个我之前没接触过,但如果需要,我会从 xxx 开始学习..."
"""

3.4 提问环节

"""
============ 可以问的问题 ============

【关于团队】
1. 团队目前有多少人?技术栈是什么?
2. 团队的开发流程是怎样的?
3. 新人入职会有什么培训或导师制度?

【关于项目】
4. 这个岗位主要负责哪些项目?
5. 目前团队面临的最大技术挑战是什么?
6. 未来一年的技术规划是什么?

【关于成长】
7. 公司对技术人员的晋升路径是怎样的?
8. 有没有技术分享或学习的机会?
9. 这个岗位的绩效考核标准是什么?

【关于文化】
10. 团队的工作氛围是怎样的?
11. 加班情况怎么样?
12. 远程办公政策是怎样的?

============ 避免问的问题 ============

- 薪资福利(第一轮面试不要问)
- 可以在网上查到的基础信息
- 太过私人的问题
"""

📝 Part 8 总结

设计模式高频考点

模式 面试频率 应用场景
单例模式 ⭐⭐⭐⭐⭐ 配置管理、数据库连接池
工厂模式 ⭐⭐⭐⭐ 对象创建解耦
策略模式 ⭐⭐⭐⭐ 算法切换、支付方式
观察者模式 ⭐⭐⭐⭐ 事件系统、消息通知
装饰器模式 ⭐⭐⭐⭐⭐ 功能增强、日志、缓存
责任链模式 ⭐⭐⭐ 中间件、审批流程
建造者模式 ⭐⭐⭐ 复杂对象构建
适配器模式 ⭐⭐⭐ 接口转换

系统设计考点

题目 频率 核心考点
短链系统 ⭐⭐⭐⭐⭐ ID 生成、缓存、高并发读
秒杀系统 ⭐⭐⭐⭐⭐ 库存一致性、限流、异步
消息队列 ⭐⭐⭐⭐ 可靠性、顺序性、幂等
分布式缓存 ⭐⭐⭐⭐ 一致性、穿透击穿雪崩
搜索系统 ⭐⭐⭐ 倒排索引、分词、排序

面试技巧要点

  1. 回答问题要有结构:结论先行,分层展开
  2. 展示思考过程:不要直接给答案,展示推理过程
  3. 诚实面对不会的:承认不知道,但展示学习能力
  4. 准备好提问:展示你对公司和岗位的兴趣
  5. STAR 法则:用于回答行为面试问题

🎉 全部完成!

面试手册总结

Part 主题 重要程度 核心考点
Part 1 Python 基础 & 数据类型 ⭐⭐⭐⭐⭐ 可变/不可变、is vs ==、深浅拷贝、数据结构
Part 2 函数 & 闭包 & 装饰器 ⭐⭐⭐⭐⭐ 参数传递、LEGB、闭包陷阱、装饰器原理
Part 3 面向对象 & 魔术方法 ⭐⭐⭐⭐⭐ 类属性、MRO、描述符、上下文管理器
Part 4 并发编程 ⭐⭐⭐⭐⭐ GIL、多线程/多进程/协程选择、锁
Part 5 内存管理 & 性能优化 ⭐⭐⭐⭐ 引用计数、GC、循环引用、性能分析
Part 6 常用库 ⭐⭐⭐⭐ collections、itertools、requests、pytest
Part 7 Web & 数据库 & 缓存 ⭐⭐⭐⭐⭐ FastAPI、MySQL索引、Redis缓存问题
Part 8 设计模式 & 系统设计 ⭐⭐⭐⭐ 单例、工厂、策略、短链/秒杀系统

必须掌握的 TOP 20 知识点

Python 基础(必问)
├── 1. 可变对象 vs 不可变对象
├── 2. is vs == 的区别
├── 3. 深拷贝 vs 浅拷贝
├── 4. *args 和 **kwargs
└── 5. 列表推导式 & 生成器

函数与面向对象(高频)
├── 6. 闭包原理与陷阱
├── 7. 装饰器(带参数、类装饰器)
├── 8. __new__ vs __init__
├── 9. MRO 与 super()
└── 10. 上下文管理器

并发编程(重点)
├── 11. GIL 是什么?影响?
├── 12. 多线程 vs 多进程 vs 协程
├── 13. 什么场景用什么并发模型
└── 14. 线程安全与锁

数据库 & 缓存(核心)
├── 15. MySQL 索引原理(B+树)
├── 16. 事务隔离级别
├── 17. 缓存穿透/击穿/雪崩
├── 18. Redis 数据类型与应用
└── 19. 分布式锁实现

系统设计(加分)
└── 20. 短链/秒杀系统设计思路

写在最后

到此,我们的Python面试手册AI生成版就结束了,希望大家看完之后能够有所收获,在这个高速发展的时代,有属于自己的技术底蕴与个人的思考,祝大家都能找到一份自己喜欢的工作,永远不用担心薪资不涨或者大裁员,不要有那么大的压力,能够过上自己想要的生活。祝好!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐