【AI总结】python连接MySQL(4)- 轻量级多数据库配置管理方案
配置集中化所有数据库配置集中在单一文件,一目了然开箱即用预置多种数据库连接器,无需重复造轮子即插即用新增数据库只需添加配置和连接器资源高效连接池和连接缓存减少开销环境友好轻松支持多环境配置切换类型清晰分类管理不同数据库配置,避免混淆业务解耦服务层无需关心具体连接细节。
·
目录
轻量级多数据库配置管理方案
摘要:本文将提供一个简洁高效的多数据库配置管理方案,通过单一常量类集中管理所有数据库配置,并支持MySQL、MongoDB、Redis、DuckDB等多种数据库的连接管理。
一、极简配置目录结构
project_root/
├── configs/ # 配置中心
│ └── db_config.py # 所有数据库配置常量
├── connectors/ # 数据库连接器
│ ├── mysql_connector.py
│ ├── mongo_connector.py
│ ├── redis_connector.py
│ └── duckdb_connector.py
└── services/ # 业务服务
└── user_service.py
二、数据库配置常量类
# configs/db_config.py
"""
数据库配置中心 - 所有数据库配置集中在此
"""
class MySQLConfig:
"""MySQL数据库配置"""
# 主业务库
MAIN_DB = {
"host": "localhost",
"port": 3306,
"user": "app_user",
"password": "secure_password",
"database": "main_db",
"pool_size": 5
}
# 日志库
LOG_DB = {
"host": "log.db.example.com",
"port": 3307,
"user": "log_user",
"password": "log_password",
"database": "app_logs"
}
class MongoDBConfig:
"""MongoDB数据库配置"""
# 用户档案库
USER_PROFILE = {
"uri": "mongodb://user:pass@cluster0.example.com:27017",
"db_name": "user_profiles",
"timeout": 5000 # 毫秒
}
# 产品目录库
PRODUCT_CATALOG = {
"uri": "mongodb://catalog:pass@catalog-db.example.com:27017",
"db_name": "product_db"
}
class RedisConfig:
"""Redis缓存配置"""
SESSION_CACHE = {
"host": "cache.example.com",
"port": 6379,
"db": 0,
"password": "redis_pass",
"decode_responses": True # 自动解码为字符串
}
MESSAGE_QUEUE = {
"host": "mq.example.com",
"port": 6379,
"db": 1,
"password": "mq_pass"
}
class DuckDBConfig:
"""DuckDB分析数据库配置"""
ANALYTICS_DB = {
"path": "/data/analytics.duckdb", # 文件路径
"read_only": False
}
TEMP_DB = {
"path": ":memory:", # 内存数据库
"config": {
"threads": 4 # 使用4个线程
}
}
三、数据库连接器实现
1. MySQL连接器
# connectors/mysql_connector.py
import mysql.connector
from mysql.connector import pooling
from configs.db_config import MySQLConfig
class MySQLConnector:
"""MySQL数据库连接器"""
_pools = {} # 连接池缓存
@classmethod
def get_connection(cls, config_name: str):
"""获取数据库连接"""
# 获取配置
config = getattr(MySQLConfig, config_name).copy()
# 创建或获取连接池
if config_name not in cls._pools:
pool_config = {
"pool_name": f"{config_name}_pool",
"pool_size": config.pop("pool_size", 5),
**config
}
cls._pools[config_name] = pooling.MySQLConnectionPool(**pool_config)
return cls._pools[config_name].get_connection()
@classmethod
def execute_query(cls, config_name: str, sql: str, params=None):
"""执行查询并返回结果"""
with cls.get_connection(config_name) as conn:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
@classmethod
def execute_command(cls, config_name: str, sql: str, params=None, commit=True):
"""执行写操作"""
with cls.get_connection(config_name) as conn:
with conn.cursor() as cursor:
cursor.execute(sql, params)
if commit:
conn.commit()
return cursor.rowcount
2. MongoDB连接器
# connectors/mongo_connector.py
from pymongo import MongoClient
from configs.db_config import MongoDBConfig
class MongoDBConnector:
"""MongoDB数据库连接器"""
_clients = {} # 客户端缓存
@classmethod
def get_client(cls, config_name: str):
"""获取MongoDB客户端"""
if config_name not in cls._clients:
config = getattr(MongoDBConfig, config_name)
cls._clients[config_name] = MongoClient(
config["uri"],
serverSelectionTimeoutMS=config.get("timeout", 3000)
return cls._clients[config_name]
@classmethod
def get_database(cls, config_name: str):
"""获取数据库实例"""
config = getattr(MongoDBConfig, config_name)
return cls.get_client(config_name)[config["db_name"]]
@classmethod
def get_collection(cls, config_name: str, collection_name: str):
"""获取集合对象"""
return cls.get_database(config_name)[collection_name]
3. Redis连接器
# connectors/redis_connector.py
import redis
from configs.db_config import RedisConfig
class RedisConnector:
"""Redis连接器"""
_connections = {} # 连接缓存
@classmethod
def get_connection(cls, config_name: str):
"""获取Redis连接"""
if config_name not in cls._connections:
config = getattr(RedisConfig, config_name)
cls._connections[config_name] = redis.Redis(**config)
return cls._connections[config_name]
@classmethod
def set_value(cls, config_name: str, key: str, value, ex=None):
"""设置键值"""
conn = cls.get_connection(config_name)
conn.set(key, value, ex=ex)
@classmethod
def get_value(cls, config_name: str, key: str):
"""获取键值"""
conn = cls.get_connection(config_name)
return conn.get(key)
4. DuckDB连接器
# connectors/duckdb_connector.py
import duckdb
from configs.db_config import DuckDBConfig
class DuckDBConnector:
"""DuckDB连接器"""
_connections = {} # 连接缓存
@classmethod
def get_connection(cls, config_name: str):
"""获取DuckDB连接"""
if config_name not in cls._connections:
config = getattr(DuckDBConfig, config_name)
# 内存数据库特殊处理
if config["path"] == ":memory:":
conn = duckdb.connect(database=':memory:')
else:
conn = duckdb.connect(
database=config["path"],
read_only=config.get("read_only", False)
)
# 应用额外配置
for key, value in config.get("config", {}).items():
conn.execute(f"SET {key} = {value}")
cls._connections[config_name] = conn
return cls._connections[config_name]
@classmethod
def execute_query(cls, config_name: str, sql: str):
"""执行查询并返回DataFrame"""
conn = cls.get_connection(config_name)
return conn.execute(sql).fetchdf()
@classmethod
def execute_command(cls, config_name: str, sql: str):
"""执行DDL/DML语句"""
conn = cls.get_connection(config_name)
conn.execute(sql)
四、业务层使用示例
用户服务(使用多种数据库)
# services/user_service.py
from connectors.mysql_connector import MySQLConnector
from connectors.mongo_connector import MongoDBConnector
from connectors.redis_connector import RedisConnector
class UserService:
"""用户服务 - 使用多种数据库"""
def get_user(self, user_id: int) -> dict:
"""从MySQL获取用户基本信息"""
sql = "SELECT * FROM users WHERE id = %s"
result = MySQLConnector.execute_query(
"MAIN_DB",
sql,
(user_id,)
)
return result[0] if result else None
def get_user_profile(self, user_id: int) -> dict:
"""从MongoDB获取用户档案"""
collection = MongoDBConnector.get_collection(
"USER_PROFILE",
"user_profiles"
)
return collection.find_one({"user_id": user_id})
def cache_user_session(self, session_id: str, user_data: dict):
"""缓存用户会话到Redis"""
RedisConnector.set_value(
"SESSION_CACHE",
f"session:{session_id}",
user_data,
ex=3600 # 1小时过期
)
def update_user_activity(self, user_id: int):
"""更新用户活动日志(MySQL日志库)"""
sql = "INSERT INTO user_activity (user_id, last_active) VALUES (%s, NOW())"
MySQLConnector.execute_command(
"LOG_DB",
sql,
(user_id,)
)
数据分析服务(使用DuckDB)
# services/analytics_service.py
from connectors.duckdb_connector import DuckDBConnector
import pandas as pd
class AnalyticsService:
"""数据分析服务"""
def generate_user_report(self) -> pd.DataFrame:
"""生成用户分析报告"""
sql = """
SELECT
country,
COUNT(*) as user_count,
AVG(age) as avg_age,
SUM(CASE WHEN is_active THEN 1 ELSE 0 END) as active_users
FROM users
GROUP BY country
ORDER BY user_count DESC
"""
return DuckDBConnector.execute_query("ANALYTICS_DB", sql)
def create_temporary_dataset(self, data: pd.DataFrame):
"""创建临时数据集"""
# 连接到内存数据库
conn = DuckDBConnector.get_connection("TEMP_DB")
# 创建临时表
conn.execute("CREATE TEMP TABLE temp_data AS SELECT * FROM data")
# 执行分析
return conn.execute("""
SELECT
category,
SUM(sales) as total_sales
FROM temp_data
GROUP BY category
""").fetchdf()
五、配置切换技巧
环境敏感配置处理
# configs/db_config.py 顶部添加
import os
# 环境检测
ENV = os.getenv("APP_ENV", "dev").lower()
# 根据环境覆盖配置
if ENV == "prod":
MySQLConfig.MAIN_DB["host"] = "prod-mysql.example.com"
MongoDBConfig.USER_PROFILE["uri"] = "mongodb://prod_user:prod_pass@prod-cluster.example.com"
# 其他生产环境配置...
elif ENV == "staging":
MySQLConfig.MAIN_DB["host"] = "staging-mysql.example.com"
# 其他预发环境配置...
动态配置加载
# 在连接器中添加动态配置支持
class MySQLConnector:
@classmethod
def set_config(cls, config_name: str, config: dict):
"""动态更新配置"""
if config_name in cls._pools:
# 关闭现有连接池
cls._pools[config_name].close()
del cls._pools[config_name]
# 更新配置
setattr(MySQLConfig, config_name, config)
多环境配置示例
# 启动应用时指定环境
APP_ENV=prod python app.py
# Docker环境变量注入
docker run -e "APP_ENV=staging" my-app
六、方案优势总结
-
配置集中化
所有数据库配置集中在单一文件,一目了然 -
开箱即用
预置多种数据库连接器,无需重复造轮子 -
即插即用
新增数据库只需添加配置和连接器 -
资源高效
连接池和连接缓存减少开销 -
环境友好
轻松支持多环境配置切换 -
类型清晰
分类管理不同数据库配置,避免混淆 -
业务解耦
服务层无需关心具体连接细节
七、扩展建议
-
添加配置版本控制
# 在db_config.py中添加 CONFIG_VERSION = "1.1" # 配置版本标识
-
增加健康检查方法
class MySQLConnector: @classmethod def check_health(cls, config_name: str) -> bool: try: with cls.get_connection(config_name) as conn: conn.cmd_ping() return True except Exception: return False
-
支持异步操作
# 异步MySQL连接器示例 import aiomysql class AsyncMySQLConnector: @classmethod async def get_connection(cls, config_name: str): config = getattr(MySQLConfig, config_name) return await aiomysql.connect(**config)
-
添加配置加密
# 简单配置加密示例 from cryptography.fernet import Fernet class SecureConfig: KEY = b'your_encryption_key_here' @staticmethod def encrypt(value: str) -> bytes: cipher = Fernet(SecureConfig.KEY) return cipher.encrypt(value.encode()) @staticmethod def decrypt(encrypted: bytes) -> str: cipher = Fernet(SecureConfig.KEY) return cipher.decrypt(encrypted).decode() # 在配置中使用 MySQLConfig.MAIN_DB["password"] = SecureConfig.encrypt("my_password")
结语:这个轻量级方案完美解决了多数据库配置管理问题,既保持了简单性,又提供了足够的灵活性。通过分类管理不同数据库的配置和连接逻辑,你的代码将更加整洁、可维护性更强!
推荐工具:
最佳实践:
- 将
db_config.py
加入.gitignore
模板 - 为每个环境创建
db_config_template.py
示例 - 使用lint工具检查配置格式
- 定期轮换数据库凭证
- 关键操作添加详细日志记录
关键词:
Python多数据库配置
、MySQL连接池
、MongoDB连接管理
、Redis配置
、DuckDB集成
更多推荐
所有评论(0)