高级数据库配置与连接器设计

摘要:本文将展示一个基于继承的数据库配置架构,通过基础配置类和专用连接器实现多数据库的统一管理,解决配置复用和命名一致性问题。


一、优化后的配置架构设计

目录结构

project_root/
├── configs/  
│   ├── base_config.py      # 基础配置类  
│   ├── mysql_config.py     # MySQL专用配置  
│   ├── mongo_config.py     # MongoDB专用配置  
│   ├── redis_config.py     # Redis专用配置  
│   └── duckdb_config.py    # DuckDB专用配置  
├── connectors/  
│   ├── base_connector.py   # 连接器基类  
│   ├── mysql_connector.py  # MySQL连接器  
│   ├── mongo_connector.py  # MongoDB连接器  
│   ├── redis_connector.py  # Redis连接器  
│   └── duckdb_connector.py # DuckDB连接器  
└── services/  
    └── ...                 # 业务服务  

二、基础配置与连接器设计

1. 基础配置类

# configs/base_config.py
class BaseDBConfig:
    """数据库基础配置"""
    
    def __init__(self):
        # 通用连接参数
        self.timeout = 10  # 秒
        self.max_retries = 3
        self.auto_reconnect = True
        
        # 连接池参数
        self.pool_enabled = True
        self.pool_size = 5
        self.pool_timeout = 30  # 秒
        
        # 监控参数
        self.monitor_enabled = True
        self.monitor_interval = 60  # 秒
        
    def get_connection_params(self) -> dict:
        """获取连接参数(子类必须实现)"""
        raise NotImplementedError("子类必须实现此方法")

2. 基础连接器类

# connectors/base_connector.py
from abc import ABC, abstractmethod
import logging
from configs.base_config import BaseDBConfig

logger = logging.getLogger(__name__)

class BaseDBConnector(ABC):
    """数据库连接器基类"""
    
    def __init__(self, config: BaseDBConfig):
        """
        初始化连接器
        :param config: 数据库配置实例
        """
        self.config = config
        self._connection = None
        self._connections = {}  # 多实例连接缓存
        logger.info(f"初始化 {self.__class__.__name__} 连接器")
        
    @abstractmethod
    def connect(self, config_name: str = "default"):
        """建立数据库连接(子类实现)"""
        pass
        
    @abstractmethod
    def close(self, config_name: str = "default"):
        """关闭数据库连接(子类实现)"""
        pass
        
    @abstractmethod
    def execute_query(self, query: str, params=None, config_name: str = "default"):
        """执行查询操作(子类实现)"""
        pass
        
    @abstractmethod
    def execute_command(self, command: str, params=None, config_name: str = "default"):
        """执行写操作(子类实现)"""
        pass
        
    def health_check(self, config_name: str = "default") -> bool:
        """健康检查(默认实现,子类可覆盖)"""
        try:
            self.connect(config_name)
            return True
        except Exception as e:
            logger.error(f"数据库健康检查失败: {e}")
            return False
        finally:
            self.close(config_name)
            
    def __enter__(self):
        """上下文管理器支持"""
        self.connect()
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出时自动关闭连接"""
        self.close()

三、MySQL专用配置与连接器

1. MySQL配置类

# configs/mysql_config.py
from configs.base_config import BaseDBConfig

class MySQLConfig(BaseDBConfig):
    """MySQL数据库专用配置"""
    
    def __init__(self):
        super().__init__()
        # MySQL特有参数
        self.charset = "utf8mb4"
        self.autocommit = False
        self.client_flag = 0
        self.ssl_disabled = False
        
    def get_connection_params(self) -> dict:
        """获取MySQL连接参数"""
        return {
            "host": self.host,
            "port": self.port,
            "user": self.user,
            "password": self.password,
            "database": self.database,
            "charset": self.charset,
            "autocommit": self.autocommit,
            "ssl_disabled": self.ssl_disabled
        }

# 具体数据库实例配置
class MainMySQLConfig(MySQLConfig):
    """主业务数据库配置"""
    
    def __init__(self):
        super().__init__()
        self.host = "db-main.example.com"
        self.port = 3306
        self.user = "app_user"
        self.password = "secure_pass"
        self.database = "main_db"
        self.pool_size = 10  # 更大的连接池

class LogMySQLConfig(MySQLConfig):
    """日志数据库配置"""
    
    def __init__(self):
        super().__init__()
        self.host = "log-db.example.com"
        self.port = 3307
        self.user = "log_user"
        self.password = "log_pass"
        self.database = "app_logs"
        self.autocommit = True  # 日志自动提交

2. MySQL连接器

# connectors/mysql_connector.py
import mysql.connector
from mysql.connector import pooling, Error
from configs.mysql_config import MySQLConfig
from connectors.base_connector import BaseDBConnector

class MySQLConnector(BaseDBConnector):
    """MySQL数据库连接器"""
    
    def __init__(self, config: MySQLConfig):
        super().__init__(config)
        self._pools = {}  # 连接池缓存
        
    def connect(self, config_name: str = "default"):
        """建立数据库连接"""
        if config_name not in self._pools:
            # 创建新连接池
            params = self.config.get_connection_params()
            pool_config = {
                "pool_name": f"{config_name}_pool",
                "pool_size": self.config.pool_size,
                **params
            }
            
            try:
                self._pools[config_name] = pooling.MySQLConnectionPool(**pool_config)
                self._connections[config_name] = self._pools[config_name].get_connection()
            except Error as e:
                raise ConnectionError(f"MySQL连接失败: {e}")
                
        return self._connections[config_name]
    
    def close(self, config_name: str = "default"):
        """关闭数据库连接"""
        if config_name in self._connections:
            conn = self._connections.pop(config_name)
            conn.close()
            
    def execute_query(self, query: str, params=None, config_name: str = "default"):
        """执行查询操作"""
        conn = self.connect(config_name)
        try:
            with conn.cursor(dictionary=True) as cursor:
                cursor.execute(query, params)
                return cursor.fetchall()
        except Error as e:
            raise RuntimeError(f"查询执行失败: {e}")
    
    def execute_command(self, command: str, params=None, config_name: str = "default"):
        """执行写操作"""
        conn = self.connect(config_name)
        try:
            with conn.cursor() as cursor:
                cursor.execute(command, params)
                affected = cursor.rowcount
                if not self.config.autocommit:
                    conn.commit()
                return affected
        except Error as e:
            conn.rollback()
            raise RuntimeError(f"命令执行失败: {e}")

四、其他数据库配置与连接器

1. MongoDB配置与连接器

# configs/mongo_config.py
from configs.base_config import BaseDBConfig

class MongoDBConfig(BaseDBConfig):
    """MongoDB专用配置"""
    
    def __init__(self):
        super().__init__()
        self.uri = ""
        self.db_name = ""
        self.tls = False
        self.replica_set = ""
        
    def get_connection_params(self) -> dict:
        return {
            "host": self.uri,
            "tls": self.tls,
            "replicaSet": self.replica_set
        }

class UserProfileMongoConfig(MongoDBConfig):
    """用户档案数据库配置"""
    
    def __init__(self):
        super().__init__()
        self.uri = "mongodb://user:pass@cluster0.example.com:27017"
        self.db_name = "user_profiles"
        self.tls = True
# connectors/mongo_connector.py
from pymongo import MongoClient
from configs.mongo_config import MongoDBConfig
from connectors.base_connector import BaseDBConnector

class MongoDBConnector(BaseDBConnector):
    """MongoDB数据库连接器"""
    
    def __init__(self, config: MongoDBConfig):
        super().__init__(config)
        self._clients = {}  # 客户端缓存
        
    def connect(self, config_name: str = "default"):
        """建立数据库连接"""
        if config_name not in self._clients:
            params = self.config.get_connection_params()
            try:
                self._clients[config_name] = MongoClient(**params)
                self._connections[config_name] = self._clients[config_name][self.config.db_name]
            except Exception as e:
                raise ConnectionError(f"MongoDB连接失败: {e}")
        return self._connections[config_name]
    
    def get_collection(self, collection_name: str, config_name: str = "default"):
        """获取集合对象"""
        db = self.connect(config_name)
        return db[collection_name]
    
    # MongoDB不需要传统SQL查询方法
    def execute_query(self, query: dict, collection_name: str, config_name: str = "default"):
        """执行MongoDB查询"""
        collection = self.get_collection(collection_name, config_name)
        return list(collection.find(query))
    
    def execute_command(self, command: dict, collection_name: str, config_name: str = "default"):
        """执行MongoDB写操作"""
        # 这里简化处理,实际需要区分insert/update/delete
        collection = self.get_collection(collection_name, config_name)
        if command.get("insert"):
            return collection.insert_many(command["data"])
        elif command.get("update"):
            return collection.update_many(command["filter"], command["update"])
        elif command.get("delete"):
            return collection.delete_many(command["filter"])
        else:
            raise ValueError("无效的MongoDB命令")

2. Redis配置与连接器

# configs/redis_config.py
from configs.base_config import BaseDBConfig

class RedisConfig(BaseDBConfig):
    """Redis专用配置"""
    
    def __init__(self):
        super().__init__()
        self.host = "localhost"
        self.port = 6379
        self.db = 0
        self.decode_responses = True
        self.ssl = False
        
    def get_connection_params(self) -> dict:
        return {
            "host": self.host,
            "port": self.port,
            "db": self.db,
            "decode_responses": self.decode_responses,
            "ssl": self.ssl
        }

class SessionRedisConfig(RedisConfig):
    """会话缓存配置"""
    
    def __init__(self):
        super().__init__()
        self.host = "cache.example.com"
        self.db = 1
        self.decode_responses = False  # 原始字节数据
# connectors/redis_connector.py
import redis
from configs.redis_config import RedisConfig
from connectors.base_connector import BaseDBConnector

class RedisConnector(BaseDBConnector):
    """Redis数据库连接器"""
    
    def __init__(self, config: RedisConfig):
        super().__init__(config)
        
    def connect(self, config_name: str = "default"):
        """建立Redis连接"""
        if config_name not in self._connections:
            params = self.config.get_connection_params()
            try:
                self._connections[config_name] = redis.Redis(**params)
                # 测试连接
                self._connections[config_name].ping()
            except Exception as e:
                raise ConnectionError(f"Redis连接失败: {e}")
        return self._connections[config_name]
    
    # Redis不需要传统SQL查询方法
    def execute_query(self, key: str, config_name: str = "default"):
        """获取键值"""
        conn = self.connect(config_name)
        return conn.get(key)
    
    def execute_command(self, command: str, *args, config_name: str = "default"):
        """执行Redis命令"""
        conn = self.connect(config_name)
        return conn.execute_command(command, *args)

3. DuckDB配置与连接器

# configs/duckdb_config.py
from configs.base_config import BaseDBConfig

class DuckDBConfig(BaseDBConfig):
    """DuckDB专用配置"""
    
    def __init__(self):
        super().__init__()
        self.path = ":memory:"  # 默认内存数据库
        self.read_only = False
        self.config = {}  # 额外配置
        
    def get_connection_params(self) -> dict:
        return {
            "database": self.path,
            "read_only": self.read_only,
            "config": self.config
        }

class AnalyticsDuckDBConfig(DuckDBConfig):
    """分析数据库配置"""
    
    def __init__(self):
        super().__init__()
        self.path = "/data/analytics.duckdb"
        self.config = {"threads": 8}  # 使用8线程
# connectors/duckdb_connector.py
import duckdb
from configs.duckdb_config import DuckDBConfig
from connectors.base_connector import BaseDBConnector

class DuckDBConnector(BaseDBConnector):
    """DuckDB数据库连接器"""
    
    def __init__(self, config: DuckDBConfig):
        super().__init__(config)
        
    def connect(self, config_name: str = "default"):
        """建立DuckDB连接"""
        if config_name not in self._connections:
            params = self.config.get_connection_params()
            try:
                self._connections[config_name] = duckdb.connect(**params)
                
                # 应用额外配置
                for key, value in params["config"].items():
                    self._connections[config_name].execute(f"SET {key} = {value}")
                    
            except Exception as e:
                raise ConnectionError(f"DuckDB连接失败: {e}")
        return self._connections[config_name]
    
    def execute_query(self, query: str, config_name: str = "default"):
        """执行查询返回DataFrame"""
        conn = self.connect(config_name)
        return conn.execute(query).fetchdf()
    
    def execute_command(self, command: str, config_name: str = "default"):
        """执行DDL/DML语句"""
        conn = self.connect(config_name)
        return conn.execute(command)

五、业务层统一使用接口

数据库管理器

# connectors/db_manager.py
from configs import mysql_config, mongo_config, redis_config, duckdb_config
from connectors import mysql_connector, mongo_connector, redis_connector, duckdb_connector

class DBManager:
    """多数据库统一管理"""
    
    _instances = {}  # 连接器实例缓存
    
    # 配置实例
    MYSQL_MAIN = mysql_config.MainMySQLConfig()
    MYSQL_LOG = mysql_config.LogMySQLConfig()
    MONGO_PROFILE = mongo_config.UserProfileMongoConfig()
    REDIS_SESSION = redis_config.SessionRedisConfig()
    DUCKDB_ANALYTICS = duckdb_config.AnalyticsDuckDBConfig()
    
    @classmethod
    def get_mysql(cls, config: MySQLConfig = MYSQL_MAIN) -> mysql_connector.MySQLConnector:
        """获取MySQL连接器"""
        key = f"mysql_{id(config)}"
        if key not in cls._instances:
            cls._instances[key] = mysql_connector.MySQLConnector(config)
        return cls._instances[key]
    
    @classmethod
    def get_mongo(cls, config: MongoDBConfig = MONGO_PROFILE) -> mongo_connector.MongoDBConnector:
        """获取MongoDB连接器"""
        key = f"mongo_{id(config)}"
        if key not in cls._instances:
            cls._instances[key] = mongo_connector.MongoDBConnector(config)
        return cls._instances[key]
    
    @classmethod
    def get_redis(cls, config: RedisConfig = REDIS_SESSION) -> redis_connector.RedisConnector:
        """获取Redis连接器"""
        key = f"redis_{id(config)}"
        if key not in cls._instances:
            cls._instances[key] = redis_connector.RedisConnector(config)
        return cls._instances[key]
    
    @classmethod
    def get_duckdb(cls, config: DuckDBConfig = DUCKDB_ANALYTICS) -> duckdb_connector.DuckDBConnector:
        """获取DuckDB连接器"""
        key = f"duckdb_{id(config)}"
        if key not in cls._instances:
            cls._instances[key] = duckdb_connector.DuckDBConnector(config)
        return cls._instances[key]

业务服务使用示例

# services/user_service.py
from connectors.db_manager import DBManager

class UserService:
    """用户服务 - 多数据库操作"""
    
    def __init__(self):
        # 初始化各数据库连接器
        self.mysql = DBManager.get_mysql()
        self.mongo = DBManager.get_mongo()
        self.redis = DBManager.get_redis()
    
    def get_user_data(self, user_id: int):
        """获取用户完整数据"""
        # MySQL获取基本信息
        sql = "SELECT * FROM users WHERE id = %s"
        user_info = self.mysql.execute_query(sql, (user_id,))[0]
        
        # MongoDB获取档案
        user_profile = self.mongo.execute_query(
            {"user_id": user_id}, 
            collection_name="profiles"
        )
        
        # Redis获取会话状态
        session_state = self.redis.execute_query(f"session:{user_id}")
        
        return {
            "info": user_info,
            "profile": user_profile,
            "session": session_state
        }
    
    def update_user(self, user_id: int, data: dict):
        """更新用户信息"""
        # MySQL更新基本信息
        update_sql = """
        UPDATE users 
        SET name = %s, email = %s 
        WHERE id = %s
        """
        self.mysql.execute_command(
            update_sql, 
            (data["name"], data["email"], user_id)
        )
        
        # MongoDB更新档案
        self.mongo.execute_command(
            {"update": True},
            collection_name="profiles",
            command={
                "filter": {"user_id": user_id},
                "update": {"$set": data["profile"]}
            }
        )
        
        # Redis更新缓存
        self.redis.execute_command(
            "SET", 
            f"user:{user_id}", 
            data["cache_data"], 
            "EX", 3600
        )

六、架构优势与最佳实践

设计优势

  1. 统一接口:所有数据库连接器继承自同一基类,提供一致API
  2. 配置继承:基础配置复用,专用配置扩展
  3. 类型安全:强类型配置和连接器,减少错误
  4. 连接复用:管理器确保连接器单例
  5. 多实例支持:同一数据库类型支持多个配置实例
  6. 开箱即用:预置主流数据库实现
  7. 可扩展性:轻松添加新数据库支持

命名规范建议

  1. 配置类[数据库类型]Config + [用途]Config
    (e.g., MySQLConfig, MainMySQLConfig)

  2. 连接器类[数据库类型]Connector
    (e.g., MySQLConnector, MongoDBConnector)

  3. 配置实例:全大写描述性名称
    (e.g., MYSQL_MAIN, REDIS_SESSION)

  4. 方法命名

    • connect(): 建立连接
    • close(): 关闭连接
    • execute_query(): 执行查询
    • execute_command(): 执行写操作

最佳实践

  1. 环境区分:使用环境变量切换配置

    class MainMySQLConfig(MySQLConfig):
        def __init__(self):
            super().__init__()
            env = os.getenv("ENV", "dev")
            self.host = f"db-{env}.example.com"
    
  2. 配置加密:敏感数据加密存储

    from cryptography.fernet import Fernet
    
    class SecureConfig:
        @staticmethod
        def decrypt(encrypted: str) -> str:
            cipher = Fernet(os.getenv("CONFIG_KEY"))
            return cipher.decrypt(encrypted.encode()).decode()
    
    # 在配置中使用
    self.password = SecureConfig.decrypt(ENCRYPTED_PASSWORD)
    
  3. 连接监控:定期检查连接健康

    import threading
    import time
    
    def monitor_connections():
        while True:
            for name, connector in DBManager._instances.items():
                status = connector.health_check()
                print(f"{name} 状态: {'OK' if status else 'FAIL'}")
            time.sleep(DBManager.MYSQL_MAIN.monitor_interval)
    
    threading.Thread(target=monitor_connections, daemon=True).start()
    
  4. 资源清理:应用退出时关闭所有连接

    import atexit
    
    @atexit.register
    def cleanup():
        for connector in DBManager._instances.values():
            connector.close_all()
    

结语:通过这种基于继承的配置架构和统一命名的连接器设计,我们实现了多数据库的高效管理。这种设计既保持了配置的灵活性,又提供了代码的一致性,是中型到大型项目的理想选择。

扩展方向

  1. 添加异步IO支持
  2. 集成分布式配置中心
  3. 实现数据库迁移工具
  4. 添加ORM集成层
  5. 开发Web管理界面

安全提示

  • 生产环境禁用默认配置
  • 定期轮换数据库凭证
  • 使用网络隔离限制数据库访问
  • 加密敏感配置数据
  • 实施最小权限原则

关键词:Python数据库架构多数据库管理配置继承模式数据库连接器设计企业级Python开发

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐