【AI总结】python连接MySQL(5)- 高级数据库配置与连接器设计
本文将展示一个基于继承的数据库配置架构,通过基础配置类和专用连接器实现多数据库的统一管理,解决配置复用和命名一致性问题。
·
目录
高级数据库配置与连接器设计
摘要:本文将展示一个基于继承的数据库配置架构,通过基础配置类和专用连接器实现多数据库的统一管理,解决配置复用和命名一致性问题。
一、优化后的配置架构设计
目录结构
project_root/
├── configs/
│ ├── base_config.py # 基础配置类
│ ├── mysql_config.py # MySQL专用配置
│ ├── mongo_config.py # MongoDB专用配置
│ ├── redis_config.py # Redis专用配置
│ └── duckdb_config.py # DuckDB专用配置
├── connectors/
│ ├── base_connector.py # 连接器基类
│ ├── mysql_connector.py # MySQL连接器
│ ├── mongo_connector.py # MongoDB连接器
│ ├── redis_connector.py # Redis连接器
│ └── duckdb_connector.py # DuckDB连接器
└── services/
└── ... # 业务服务
二、基础配置与连接器设计
1. 基础配置类
# configs/base_config.py
class BaseDBConfig:
"""数据库基础配置"""
def __init__(self):
# 通用连接参数
self.timeout = 10 # 秒
self.max_retries = 3
self.auto_reconnect = True
# 连接池参数
self.pool_enabled = True
self.pool_size = 5
self.pool_timeout = 30 # 秒
# 监控参数
self.monitor_enabled = True
self.monitor_interval = 60 # 秒
def get_connection_params(self) -> dict:
"""获取连接参数(子类必须实现)"""
raise NotImplementedError("子类必须实现此方法")
2. 基础连接器类
# connectors/base_connector.py
from abc import ABC, abstractmethod
import logging
from configs.base_config import BaseDBConfig
logger = logging.getLogger(__name__)
class BaseDBConnector(ABC):
"""数据库连接器基类"""
def __init__(self, config: BaseDBConfig):
"""
初始化连接器
:param config: 数据库配置实例
"""
self.config = config
self._connection = None
self._connections = {} # 多实例连接缓存
logger.info(f"初始化 {self.__class__.__name__} 连接器")
@abstractmethod
def connect(self, config_name: str = "default"):
"""建立数据库连接(子类实现)"""
pass
@abstractmethod
def close(self, config_name: str = "default"):
"""关闭数据库连接(子类实现)"""
pass
@abstractmethod
def execute_query(self, query: str, params=None, config_name: str = "default"):
"""执行查询操作(子类实现)"""
pass
@abstractmethod
def execute_command(self, command: str, params=None, config_name: str = "default"):
"""执行写操作(子类实现)"""
pass
def health_check(self, config_name: str = "default") -> bool:
"""健康检查(默认实现,子类可覆盖)"""
try:
self.connect(config_name)
return True
except Exception as e:
logger.error(f"数据库健康检查失败: {e}")
return False
finally:
self.close(config_name)
def __enter__(self):
"""上下文管理器支持"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出时自动关闭连接"""
self.close()
三、MySQL专用配置与连接器
1. MySQL配置类
# configs/mysql_config.py
from configs.base_config import BaseDBConfig
class MySQLConfig(BaseDBConfig):
"""MySQL数据库专用配置"""
def __init__(self):
super().__init__()
# MySQL特有参数
self.charset = "utf8mb4"
self.autocommit = False
self.client_flag = 0
self.ssl_disabled = False
def get_connection_params(self) -> dict:
"""获取MySQL连接参数"""
return {
"host": self.host,
"port": self.port,
"user": self.user,
"password": self.password,
"database": self.database,
"charset": self.charset,
"autocommit": self.autocommit,
"ssl_disabled": self.ssl_disabled
}
# 具体数据库实例配置
class MainMySQLConfig(MySQLConfig):
"""主业务数据库配置"""
def __init__(self):
super().__init__()
self.host = "db-main.example.com"
self.port = 3306
self.user = "app_user"
self.password = "secure_pass"
self.database = "main_db"
self.pool_size = 10 # 更大的连接池
class LogMySQLConfig(MySQLConfig):
"""日志数据库配置"""
def __init__(self):
super().__init__()
self.host = "log-db.example.com"
self.port = 3307
self.user = "log_user"
self.password = "log_pass"
self.database = "app_logs"
self.autocommit = True # 日志自动提交
2. MySQL连接器
# connectors/mysql_connector.py
import mysql.connector
from mysql.connector import pooling, Error
from configs.mysql_config import MySQLConfig
from connectors.base_connector import BaseDBConnector
class MySQLConnector(BaseDBConnector):
"""MySQL数据库连接器"""
def __init__(self, config: MySQLConfig):
super().__init__(config)
self._pools = {} # 连接池缓存
def connect(self, config_name: str = "default"):
"""建立数据库连接"""
if config_name not in self._pools:
# 创建新连接池
params = self.config.get_connection_params()
pool_config = {
"pool_name": f"{config_name}_pool",
"pool_size": self.config.pool_size,
**params
}
try:
self._pools[config_name] = pooling.MySQLConnectionPool(**pool_config)
self._connections[config_name] = self._pools[config_name].get_connection()
except Error as e:
raise ConnectionError(f"MySQL连接失败: {e}")
return self._connections[config_name]
def close(self, config_name: str = "default"):
"""关闭数据库连接"""
if config_name in self._connections:
conn = self._connections.pop(config_name)
conn.close()
def execute_query(self, query: str, params=None, config_name: str = "default"):
"""执行查询操作"""
conn = self.connect(config_name)
try:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(query, params)
return cursor.fetchall()
except Error as e:
raise RuntimeError(f"查询执行失败: {e}")
def execute_command(self, command: str, params=None, config_name: str = "default"):
"""执行写操作"""
conn = self.connect(config_name)
try:
with conn.cursor() as cursor:
cursor.execute(command, params)
affected = cursor.rowcount
if not self.config.autocommit:
conn.commit()
return affected
except Error as e:
conn.rollback()
raise RuntimeError(f"命令执行失败: {e}")
四、其他数据库配置与连接器
1. MongoDB配置与连接器
# configs/mongo_config.py
from configs.base_config import BaseDBConfig
class MongoDBConfig(BaseDBConfig):
"""MongoDB专用配置"""
def __init__(self):
super().__init__()
self.uri = ""
self.db_name = ""
self.tls = False
self.replica_set = ""
def get_connection_params(self) -> dict:
return {
"host": self.uri,
"tls": self.tls,
"replicaSet": self.replica_set
}
class UserProfileMongoConfig(MongoDBConfig):
"""用户档案数据库配置"""
def __init__(self):
super().__init__()
self.uri = "mongodb://user:pass@cluster0.example.com:27017"
self.db_name = "user_profiles"
self.tls = True
# connectors/mongo_connector.py
from pymongo import MongoClient
from configs.mongo_config import MongoDBConfig
from connectors.base_connector import BaseDBConnector
class MongoDBConnector(BaseDBConnector):
"""MongoDB数据库连接器"""
def __init__(self, config: MongoDBConfig):
super().__init__(config)
self._clients = {} # 客户端缓存
def connect(self, config_name: str = "default"):
"""建立数据库连接"""
if config_name not in self._clients:
params = self.config.get_connection_params()
try:
self._clients[config_name] = MongoClient(**params)
self._connections[config_name] = self._clients[config_name][self.config.db_name]
except Exception as e:
raise ConnectionError(f"MongoDB连接失败: {e}")
return self._connections[config_name]
def get_collection(self, collection_name: str, config_name: str = "default"):
"""获取集合对象"""
db = self.connect(config_name)
return db[collection_name]
# MongoDB不需要传统SQL查询方法
def execute_query(self, query: dict, collection_name: str, config_name: str = "default"):
"""执行MongoDB查询"""
collection = self.get_collection(collection_name, config_name)
return list(collection.find(query))
def execute_command(self, command: dict, collection_name: str, config_name: str = "default"):
"""执行MongoDB写操作"""
# 这里简化处理,实际需要区分insert/update/delete
collection = self.get_collection(collection_name, config_name)
if command.get("insert"):
return collection.insert_many(command["data"])
elif command.get("update"):
return collection.update_many(command["filter"], command["update"])
elif command.get("delete"):
return collection.delete_many(command["filter"])
else:
raise ValueError("无效的MongoDB命令")
2. Redis配置与连接器
# configs/redis_config.py
from configs.base_config import BaseDBConfig
class RedisConfig(BaseDBConfig):
"""Redis专用配置"""
def __init__(self):
super().__init__()
self.host = "localhost"
self.port = 6379
self.db = 0
self.decode_responses = True
self.ssl = False
def get_connection_params(self) -> dict:
return {
"host": self.host,
"port": self.port,
"db": self.db,
"decode_responses": self.decode_responses,
"ssl": self.ssl
}
class SessionRedisConfig(RedisConfig):
"""会话缓存配置"""
def __init__(self):
super().__init__()
self.host = "cache.example.com"
self.db = 1
self.decode_responses = False # 原始字节数据
# connectors/redis_connector.py
import redis
from configs.redis_config import RedisConfig
from connectors.base_connector import BaseDBConnector
class RedisConnector(BaseDBConnector):
"""Redis数据库连接器"""
def __init__(self, config: RedisConfig):
super().__init__(config)
def connect(self, config_name: str = "default"):
"""建立Redis连接"""
if config_name not in self._connections:
params = self.config.get_connection_params()
try:
self._connections[config_name] = redis.Redis(**params)
# 测试连接
self._connections[config_name].ping()
except Exception as e:
raise ConnectionError(f"Redis连接失败: {e}")
return self._connections[config_name]
# Redis不需要传统SQL查询方法
def execute_query(self, key: str, config_name: str = "default"):
"""获取键值"""
conn = self.connect(config_name)
return conn.get(key)
def execute_command(self, command: str, *args, config_name: str = "default"):
"""执行Redis命令"""
conn = self.connect(config_name)
return conn.execute_command(command, *args)
3. DuckDB配置与连接器
# configs/duckdb_config.py
from configs.base_config import BaseDBConfig
class DuckDBConfig(BaseDBConfig):
"""DuckDB专用配置"""
def __init__(self):
super().__init__()
self.path = ":memory:" # 默认内存数据库
self.read_only = False
self.config = {} # 额外配置
def get_connection_params(self) -> dict:
return {
"database": self.path,
"read_only": self.read_only,
"config": self.config
}
class AnalyticsDuckDBConfig(DuckDBConfig):
"""分析数据库配置"""
def __init__(self):
super().__init__()
self.path = "/data/analytics.duckdb"
self.config = {"threads": 8} # 使用8线程
# connectors/duckdb_connector.py
import duckdb
from configs.duckdb_config import DuckDBConfig
from connectors.base_connector import BaseDBConnector
class DuckDBConnector(BaseDBConnector):
"""DuckDB数据库连接器"""
def __init__(self, config: DuckDBConfig):
super().__init__(config)
def connect(self, config_name: str = "default"):
"""建立DuckDB连接"""
if config_name not in self._connections:
params = self.config.get_connection_params()
try:
self._connections[config_name] = duckdb.connect(**params)
# 应用额外配置
for key, value in params["config"].items():
self._connections[config_name].execute(f"SET {key} = {value}")
except Exception as e:
raise ConnectionError(f"DuckDB连接失败: {e}")
return self._connections[config_name]
def execute_query(self, query: str, config_name: str = "default"):
"""执行查询返回DataFrame"""
conn = self.connect(config_name)
return conn.execute(query).fetchdf()
def execute_command(self, command: str, config_name: str = "default"):
"""执行DDL/DML语句"""
conn = self.connect(config_name)
return conn.execute(command)
五、业务层统一使用接口
数据库管理器
# connectors/db_manager.py
from configs import mysql_config, mongo_config, redis_config, duckdb_config
from connectors import mysql_connector, mongo_connector, redis_connector, duckdb_connector
class DBManager:
"""多数据库统一管理"""
_instances = {} # 连接器实例缓存
# 配置实例
MYSQL_MAIN = mysql_config.MainMySQLConfig()
MYSQL_LOG = mysql_config.LogMySQLConfig()
MONGO_PROFILE = mongo_config.UserProfileMongoConfig()
REDIS_SESSION = redis_config.SessionRedisConfig()
DUCKDB_ANALYTICS = duckdb_config.AnalyticsDuckDBConfig()
@classmethod
def get_mysql(cls, config: MySQLConfig = MYSQL_MAIN) -> mysql_connector.MySQLConnector:
"""获取MySQL连接器"""
key = f"mysql_{id(config)}"
if key not in cls._instances:
cls._instances[key] = mysql_connector.MySQLConnector(config)
return cls._instances[key]
@classmethod
def get_mongo(cls, config: MongoDBConfig = MONGO_PROFILE) -> mongo_connector.MongoDBConnector:
"""获取MongoDB连接器"""
key = f"mongo_{id(config)}"
if key not in cls._instances:
cls._instances[key] = mongo_connector.MongoDBConnector(config)
return cls._instances[key]
@classmethod
def get_redis(cls, config: RedisConfig = REDIS_SESSION) -> redis_connector.RedisConnector:
"""获取Redis连接器"""
key = f"redis_{id(config)}"
if key not in cls._instances:
cls._instances[key] = redis_connector.RedisConnector(config)
return cls._instances[key]
@classmethod
def get_duckdb(cls, config: DuckDBConfig = DUCKDB_ANALYTICS) -> duckdb_connector.DuckDBConnector:
"""获取DuckDB连接器"""
key = f"duckdb_{id(config)}"
if key not in cls._instances:
cls._instances[key] = duckdb_connector.DuckDBConnector(config)
return cls._instances[key]
业务服务使用示例
# services/user_service.py
from connectors.db_manager import DBManager
class UserService:
"""用户服务 - 多数据库操作"""
def __init__(self):
# 初始化各数据库连接器
self.mysql = DBManager.get_mysql()
self.mongo = DBManager.get_mongo()
self.redis = DBManager.get_redis()
def get_user_data(self, user_id: int):
"""获取用户完整数据"""
# MySQL获取基本信息
sql = "SELECT * FROM users WHERE id = %s"
user_info = self.mysql.execute_query(sql, (user_id,))[0]
# MongoDB获取档案
user_profile = self.mongo.execute_query(
{"user_id": user_id},
collection_name="profiles"
)
# Redis获取会话状态
session_state = self.redis.execute_query(f"session:{user_id}")
return {
"info": user_info,
"profile": user_profile,
"session": session_state
}
def update_user(self, user_id: int, data: dict):
"""更新用户信息"""
# MySQL更新基本信息
update_sql = """
UPDATE users
SET name = %s, email = %s
WHERE id = %s
"""
self.mysql.execute_command(
update_sql,
(data["name"], data["email"], user_id)
)
# MongoDB更新档案
self.mongo.execute_command(
{"update": True},
collection_name="profiles",
command={
"filter": {"user_id": user_id},
"update": {"$set": data["profile"]}
}
)
# Redis更新缓存
self.redis.execute_command(
"SET",
f"user:{user_id}",
data["cache_data"],
"EX", 3600
)
六、架构优势与最佳实践
设计优势
- 统一接口:所有数据库连接器继承自同一基类,提供一致API
- 配置继承:基础配置复用,专用配置扩展
- 类型安全:强类型配置和连接器,减少错误
- 连接复用:管理器确保连接器单例
- 多实例支持:同一数据库类型支持多个配置实例
- 开箱即用:预置主流数据库实现
- 可扩展性:轻松添加新数据库支持
命名规范建议
-
配置类:
[数据库类型]Config
+[用途]Config
(e.g.,MySQLConfig
,MainMySQLConfig
) -
连接器类:
[数据库类型]Connector
(e.g.,MySQLConnector
,MongoDBConnector
) -
配置实例:全大写描述性名称
(e.g.,MYSQL_MAIN
,REDIS_SESSION
) -
方法命名:
connect()
: 建立连接close()
: 关闭连接execute_query()
: 执行查询execute_command()
: 执行写操作
最佳实践
-
环境区分:使用环境变量切换配置
class MainMySQLConfig(MySQLConfig): def __init__(self): super().__init__() env = os.getenv("ENV", "dev") self.host = f"db-{env}.example.com"
-
配置加密:敏感数据加密存储
from cryptography.fernet import Fernet class SecureConfig: @staticmethod def decrypt(encrypted: str) -> str: cipher = Fernet(os.getenv("CONFIG_KEY")) return cipher.decrypt(encrypted.encode()).decode() # 在配置中使用 self.password = SecureConfig.decrypt(ENCRYPTED_PASSWORD)
-
连接监控:定期检查连接健康
import threading import time def monitor_connections(): while True: for name, connector in DBManager._instances.items(): status = connector.health_check() print(f"{name} 状态: {'OK' if status else 'FAIL'}") time.sleep(DBManager.MYSQL_MAIN.monitor_interval) threading.Thread(target=monitor_connections, daemon=True).start()
-
资源清理:应用退出时关闭所有连接
import atexit @atexit.register def cleanup(): for connector in DBManager._instances.values(): connector.close_all()
结语:通过这种基于继承的配置架构和统一命名的连接器设计,我们实现了多数据库的高效管理。这种设计既保持了配置的灵活性,又提供了代码的一致性,是中型到大型项目的理想选择。
扩展方向:
- 添加异步IO支持
- 集成分布式配置中心
- 实现数据库迁移工具
- 添加ORM集成层
- 开发Web管理界面
安全提示:
- 生产环境禁用默认配置
- 定期轮换数据库凭证
- 使用网络隔离限制数据库访问
- 加密敏感配置数据
- 实施最小权限原则
关键词:
Python数据库架构
、多数据库管理
、配置继承模式
、数据库连接器设计
、企业级Python开发
更多推荐
所有评论(0)