轻量级多数据库配置管理方案

摘要:本文将提供一个简洁高效的多数据库配置管理方案,通过单一常量类集中管理所有数据库配置,并支持MySQL、MongoDB、Redis、DuckDB等多种数据库的连接管理。


一、极简配置目录结构

project_root/
├── configs/                # 配置中心
│   └── db_config.py        # 所有数据库配置常量
├── connectors/             # 数据库连接器
│   ├── mysql_connector.py
│   ├── mongo_connector.py
│   ├── redis_connector.py
│   └── duckdb_connector.py
└── services/               # 业务服务
    └── user_service.py

二、数据库配置常量类

# configs/db_config.py
"""
数据库配置中心 - 所有数据库配置集中在此
"""

class MySQLConfig:
    """MySQL数据库配置"""
    # 主业务库
    MAIN_DB = {
        "host": "localhost",
        "port": 3306,
        "user": "app_user",
        "password": "secure_password",
        "database": "main_db",
        "pool_size": 5
    }
    
    # 日志库
    LOG_DB = {
        "host": "log.db.example.com",
        "port": 3307,
        "user": "log_user",
        "password": "log_password",
        "database": "app_logs"
    }


class MongoDBConfig:
    """MongoDB数据库配置"""
    # 用户档案库
    USER_PROFILE = {
        "uri": "mongodb://user:pass@cluster0.example.com:27017",
        "db_name": "user_profiles",
        "timeout": 5000  # 毫秒
    }
    
    # 产品目录库
    PRODUCT_CATALOG = {
        "uri": "mongodb://catalog:pass@catalog-db.example.com:27017",
        "db_name": "product_db"
    }


class RedisConfig:
    """Redis缓存配置"""
    SESSION_CACHE = {
        "host": "cache.example.com",
        "port": 6379,
        "db": 0,
        "password": "redis_pass",
        "decode_responses": True  # 自动解码为字符串
    }
    
    MESSAGE_QUEUE = {
        "host": "mq.example.com",
        "port": 6379,
        "db": 1,
        "password": "mq_pass"
    }


class DuckDBConfig:
    """DuckDB分析数据库配置"""
    ANALYTICS_DB = {
        "path": "/data/analytics.duckdb",  # 文件路径
        "read_only": False
    }
    
    TEMP_DB = {
        "path": ":memory:",  # 内存数据库
        "config": {
            "threads": 4  # 使用4个线程
        }
    }

三、数据库连接器实现

1. MySQL连接器

# connectors/mysql_connector.py
import mysql.connector
from mysql.connector import pooling
from configs.db_config import MySQLConfig

class MySQLConnector:
    """MySQL数据库连接器"""
    
    _pools = {}  # 连接池缓存
    
    @classmethod
    def get_connection(cls, config_name: str):
        """获取数据库连接"""
        # 获取配置
        config = getattr(MySQLConfig, config_name).copy()
        
        # 创建或获取连接池
        if config_name not in cls._pools:
            pool_config = {
                "pool_name": f"{config_name}_pool",
                "pool_size": config.pop("pool_size", 5),
                **config
            }
            cls._pools[config_name] = pooling.MySQLConnectionPool(**pool_config)
        
        return cls._pools[config_name].get_connection()
    
    @classmethod
    def execute_query(cls, config_name: str, sql: str, params=None):
        """执行查询并返回结果"""
        with cls.get_connection(config_name) as conn:
            with conn.cursor(dictionary=True) as cursor:
                cursor.execute(sql, params)
                return cursor.fetchall()
    
    @classmethod
    def execute_command(cls, config_name: str, sql: str, params=None, commit=True):
        """执行写操作"""
        with cls.get_connection(config_name) as conn:
            with conn.cursor() as cursor:
                cursor.execute(sql, params)
                if commit:
                    conn.commit()
                return cursor.rowcount

2. MongoDB连接器

# connectors/mongo_connector.py
from pymongo import MongoClient
from configs.db_config import MongoDBConfig

class MongoDBConnector:
    """MongoDB数据库连接器"""
    
    _clients = {}  # 客户端缓存
    
    @classmethod
    def get_client(cls, config_name: str):
        """获取MongoDB客户端"""
        if config_name not in cls._clients:
            config = getattr(MongoDBConfig, config_name)
            cls._clients[config_name] = MongoClient(
                config["uri"],
                serverSelectionTimeoutMS=config.get("timeout", 3000)
        return cls._clients[config_name]
    
    @classmethod
    def get_database(cls, config_name: str):
        """获取数据库实例"""
        config = getattr(MongoDBConfig, config_name)
        return cls.get_client(config_name)[config["db_name"]]
    
    @classmethod
    def get_collection(cls, config_name: str, collection_name: str):
        """获取集合对象"""
        return cls.get_database(config_name)[collection_name]

3. Redis连接器

# connectors/redis_connector.py
import redis
from configs.db_config import RedisConfig

class RedisConnector:
    """Redis连接器"""
    
    _connections = {}  # 连接缓存
    
    @classmethod
    def get_connection(cls, config_name: str):
        """获取Redis连接"""
        if config_name not in cls._connections:
            config = getattr(RedisConfig, config_name)
            cls._connections[config_name] = redis.Redis(**config)
        return cls._connections[config_name]
    
    @classmethod
    def set_value(cls, config_name: str, key: str, value, ex=None):
        """设置键值"""
        conn = cls.get_connection(config_name)
        conn.set(key, value, ex=ex)
    
    @classmethod
    def get_value(cls, config_name: str, key: str):
        """获取键值"""
        conn = cls.get_connection(config_name)
        return conn.get(key)

4. DuckDB连接器

# connectors/duckdb_connector.py
import duckdb
from configs.db_config import DuckDBConfig

class DuckDBConnector:
    """DuckDB连接器"""
    
    _connections = {}  # 连接缓存
    
    @classmethod
    def get_connection(cls, config_name: str):
        """获取DuckDB连接"""
        if config_name not in cls._connections:
            config = getattr(DuckDBConfig, config_name)
            
            # 内存数据库特殊处理
            if config["path"] == ":memory:":
                conn = duckdb.connect(database=':memory:')
            else:
                conn = duckdb.connect(
                    database=config["path"],
                    read_only=config.get("read_only", False)
                )
            
            # 应用额外配置
            for key, value in config.get("config", {}).items():
                conn.execute(f"SET {key} = {value}")
                
            cls._connections[config_name] = conn
            
        return cls._connections[config_name]
    
    @classmethod
    def execute_query(cls, config_name: str, sql: str):
        """执行查询并返回DataFrame"""
        conn = cls.get_connection(config_name)
        return conn.execute(sql).fetchdf()
    
    @classmethod
    def execute_command(cls, config_name: str, sql: str):
        """执行DDL/DML语句"""
        conn = cls.get_connection(config_name)
        conn.execute(sql)

四、业务层使用示例

用户服务(使用多种数据库)

# services/user_service.py
from connectors.mysql_connector import MySQLConnector
from connectors.mongo_connector import MongoDBConnector
from connectors.redis_connector import RedisConnector

class UserService:
    """用户服务 - 使用多种数据库"""
    
    def get_user(self, user_id: int) -> dict:
        """从MySQL获取用户基本信息"""
        sql = "SELECT * FROM users WHERE id = %s"
        result = MySQLConnector.execute_query(
            "MAIN_DB", 
            sql, 
            (user_id,)
        )
        return result[0] if result else None
    
    def get_user_profile(self, user_id: int) -> dict:
        """从MongoDB获取用户档案"""
        collection = MongoDBConnector.get_collection(
            "USER_PROFILE", 
            "user_profiles"
        )
        return collection.find_one({"user_id": user_id})
    
    def cache_user_session(self, session_id: str, user_data: dict):
        """缓存用户会话到Redis"""
        RedisConnector.set_value(
            "SESSION_CACHE", 
            f"session:{session_id}", 
            user_data, 
            ex=3600  # 1小时过期
        )
    
    def update_user_activity(self, user_id: int):
        """更新用户活动日志(MySQL日志库)"""
        sql = "INSERT INTO user_activity (user_id, last_active) VALUES (%s, NOW())"
        MySQLConnector.execute_command(
            "LOG_DB", 
            sql, 
            (user_id,)
        )

数据分析服务(使用DuckDB)

# services/analytics_service.py
from connectors.duckdb_connector import DuckDBConnector
import pandas as pd

class AnalyticsService:
    """数据分析服务"""
    
    def generate_user_report(self) -> pd.DataFrame:
        """生成用户分析报告"""
        sql = """
        SELECT 
            country, 
            COUNT(*) as user_count,
            AVG(age) as avg_age,
            SUM(CASE WHEN is_active THEN 1 ELSE 0 END) as active_users
        FROM users
        GROUP BY country
        ORDER BY user_count DESC
        """
        return DuckDBConnector.execute_query("ANALYTICS_DB", sql)
    
    def create_temporary_dataset(self, data: pd.DataFrame):
        """创建临时数据集"""
        # 连接到内存数据库
        conn = DuckDBConnector.get_connection("TEMP_DB")
        
        # 创建临时表
        conn.execute("CREATE TEMP TABLE temp_data AS SELECT * FROM data")
        
        # 执行分析
        return conn.execute("""
            SELECT 
                category, 
                SUM(sales) as total_sales 
            FROM temp_data 
            GROUP BY category
        """).fetchdf()

五、配置切换技巧

环境敏感配置处理

# configs/db_config.py 顶部添加
import os

# 环境检测
ENV = os.getenv("APP_ENV", "dev").lower()

# 根据环境覆盖配置
if ENV == "prod":
    MySQLConfig.MAIN_DB["host"] = "prod-mysql.example.com"
    MongoDBConfig.USER_PROFILE["uri"] = "mongodb://prod_user:prod_pass@prod-cluster.example.com"
    # 其他生产环境配置...
elif ENV == "staging":
    MySQLConfig.MAIN_DB["host"] = "staging-mysql.example.com"
    # 其他预发环境配置...

动态配置加载

# 在连接器中添加动态配置支持
class MySQLConnector:
    @classmethod
    def set_config(cls, config_name: str, config: dict):
        """动态更新配置"""
        if config_name in cls._pools:
            # 关闭现有连接池
            cls._pools[config_name].close()
            del cls._pools[config_name]
        
        # 更新配置
        setattr(MySQLConfig, config_name, config)

多环境配置示例

# 启动应用时指定环境
APP_ENV=prod python app.py

# Docker环境变量注入
docker run -e "APP_ENV=staging" my-app

六、方案优势总结

  1. 配置集中化
    所有数据库配置集中在单一文件,一目了然

  2. 开箱即用
    预置多种数据库连接器,无需重复造轮子

  3. 即插即用
    新增数据库只需添加配置和连接器

  4. 资源高效
    连接池和连接缓存减少开销

  5. 环境友好
    轻松支持多环境配置切换

  6. 类型清晰
    分类管理不同数据库配置,避免混淆

  7. 业务解耦
    服务层无需关心具体连接细节


七、扩展建议

  1. 添加配置版本控制

    # 在db_config.py中添加
    CONFIG_VERSION = "1.1"  # 配置版本标识
    
  2. 增加健康检查方法

    class MySQLConnector:
        @classmethod
        def check_health(cls, config_name: str) -> bool:
            try:
                with cls.get_connection(config_name) as conn:
                    conn.cmd_ping()
                    return True
            except Exception:
                return False
    
  3. 支持异步操作

    # 异步MySQL连接器示例
    import aiomysql
    
    class AsyncMySQLConnector:
        @classmethod
        async def get_connection(cls, config_name: str):
            config = getattr(MySQLConfig, config_name)
            return await aiomysql.connect(**config)
    
  4. 添加配置加密

    # 简单配置加密示例
    from cryptography.fernet import Fernet
    
    class SecureConfig:
        KEY = b'your_encryption_key_here'
        
        @staticmethod
        def encrypt(value: str) -> bytes:
            cipher = Fernet(SecureConfig.KEY)
            return cipher.encrypt(value.encode())
        
        @staticmethod
        def decrypt(encrypted: bytes) -> str:
            cipher = Fernet(SecureConfig.KEY)
            return cipher.decrypt(encrypted).decode()
    
    # 在配置中使用
    MySQLConfig.MAIN_DB["password"] = SecureConfig.encrypt("my_password")
    

结语:这个轻量级方案完美解决了多数据库配置管理问题,既保持了简单性,又提供了足够的灵活性。通过分类管理不同数据库的配置和连接逻辑,你的代码将更加整洁、可维护性更强!

推荐工具

最佳实践

  1. db_config.py加入.gitignore模板
  2. 为每个环境创建db_config_template.py示例
  3. 使用lint工具检查配置格式
  4. 定期轮换数据库凭证
  5. 关键操作添加详细日志记录

关键词:Python多数据库配置MySQL连接池MongoDB连接管理Redis配置DuckDB集成

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐