基于物联网嵌入式的智能家居使用新大陆云和本地API
├─ main.js# 应用入口:注册 Vue、Pinia、API、全局组件。││└─ devices.vue# 设备管理(传感器/执行器 CRUD)# 其余按需引入的 uni 生态插件。││└─ search.vue# 全局搜索(用户+设备关键字)││└─ index.vue# 首页(欢迎页/快速入口)
一、系统架构

二、uniapp项目结构
smart-campus\ # 项目根
├─ api\ # 接口封装
│ └─ index.js # 统一导出 axios 实例与业务 API
├─ pages\ # 页面级 vue 组件
│ ├─ devices\
│ │ └─ devices.vue # 设备管理(传感器/执行器 CRUD)
│ ├─ home-data\
│ │ └─ home-data.vue # 家居环境数据实时展示
│ ├─ index\
│ │ └─ index.vue # 首页(欢迎页/快速入口)
│ ├─ login\
│ │ └─ login.vue # 登录(账号+人脸二次认证)
│ ├─ profile\
│ │ └─ profile.vue # 个人中心(密码修改/统计)
│ ├─ register\
│ │ └─ register.vue # 用户注册
│ ├─ search\
│ │ └─ search.vue # 全局搜索(用户+设备关键字)
│ └─ users\
│ └─ users.vue # 用户管理(仅管理员可见)
├─ static\ # 纯静态资源(图片、字体、图标)
│ └─ logo.png # 项目 Logo(App 启动图、导航栏)
├─ uni_modules\ # 插件市场下载的扩展包
│ ├─ uni-icons\ # 图标组件(可能目录)
│ ├─ uni-popup\ # 弹窗组件
│ └─ … # 其余按需引入的 uni 生态插件
├─ unpackage\ # 编译产出(可 git-ignore)
│ ├─ dist\
│ │ ├─ build\ # H5/小程序/App 各端产物
│ │ └─ dev\ # 开发模式临时文件
│ └─ cache\ # IDE 缓存
├─ App.vue # 根组件:全局样式、生命周期
├─ index.html # H5 入口模板(可注入 CDN)
├─ main.js # 应用入口:注册 Vue、Pinia、API、全局组件
├─ manifest.json # 跨端配置:AppID、图标、权限、模块
├─ pages.json # 页面路由、导航栏、tabBar、分包
├─ package.json # npm 依赖 & 脚本
├─ package-lock.json # 依赖锁版本
├─ uni.promisify.adaptor.js # 把 uni 回调式 API 转 Promise
└─ uni.scss # 全局 SCSS 变量、主题色、Mixin
三、数据表
表2.1用户表(tb_username)
|
名称 |
类型 |
长度 |
小数点 |
允许null |
键 |
注释 |
|
id |
int |
11 |
True |
1 |
||
|
username |
archar |
50 |
True |
|||
|
password |
varchar |
255 |
True |
|||
|
login_count |
int |
11 |
False |
|||
|
last_login_time |
datetime |
False |
||||
|
register_time |
datetime |
False |
||||
|
permission |
enum |
False |
||||
|
status |
tinyint |
4 |
False |
1:正常 0:禁用 |
表1.2人脸识别表(tb_confirm)
|
名称 |
类型 |
长度 |
小数点 |
允许null |
键 |
注释 |
|
userid |
int |
11 |
True |
1 |
||
|
usercode |
varchar |
8 |
True |
|||
|
username |
varchar |
8 |
True |
|||
|
pwd |
varchar |
10 |
True |
|||
|
loginnum |
int |
11 |
False |
|||
|
lastlogintime |
datetime |
False |
||||
|
registertime |
datetime |
False |
||||
|
Result |
varchar |
20 |
False |
|||
|
success |
tinyint |
4 |
False |
|||
|
snapshot_path |
text |
False |
四、API接口
表3.1 本地自定义API接口
|
功能 |
方式 |
接口 |
|
登录 |
POST |
http://localhost:3000//api/auth/login |
|
注册 |
POST |
http://localhost:3000//api/auth/register |
|
获取用户列表(管理员) |
GET |
http://localhost:3000//api/users |
|
更改用户权限(管理员) |
PUT |
http://localhost:3000//api/users/:id/permission |
|
删除用户(管理员) |
DELETE |
http://localhost:3000//api/users/:id |
|
启动和禁用用户(管理员) |
PUT |
http://localhost:3000//api/users/:id/status |
|
统计用户 |
GET |
http://localhost:3000//api/users |
表3.2 新大陆云平台的API接口
|
功能 |
方式 |
接口 |
|
获取新大陆token |
POST |
http://api.nlecloud.com//Users/Login |
|
获取实时数据 |
GET |
http://api.nlecloud.com//Projects/{PROJECT_ID}/SensorsRealTimeData |
|
删除设备 |
DELETE |
http://api.nlecloud.com/Devices/{deviceId} |
|
删除传感器 |
DELETE |
http://api.nlecloud.com/devices/{deviceId}/Sensors/{apiTag} |
|
发送命令控制设备 |
POST |
http://api.nlecloud.com/Cmds |
五、人脸识别流程图

六、人脸识别效果

七、人脸识别代码(Python)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
智能人脸识别安全监控系统(全功能 + MySQL 日志归档 + 识别成败落库)
"""
# ==========================================================
# 依赖导入
# ==========================================================
import subprocess
import threading
import queue
import winsound
import tempfile
import face_recognition
import cv2
import numpy as np
import os
import pickle
import time
import pyttsx3
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
from PIL import Image, ImageDraw, ImageFont, ImageTk
import platform
import json
from datetime import datetime
import logging
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from collections import deque, defaultdict
import ctypes
import sys
from typing import Dict, List, Tuple, Any, Optional, Union
import shutil
import pymysql
from dbutils.pooled_db import PooledDB
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('face_recognition.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# 高DPI支持(Windows)
if platform.system() == "Windows":
try:
ctypes.windll.shcore.SetProcessDpiAwareness(1)
except Exception:
pass
# ==========================================================
# 主类:FaceRecognitionSystem
# ==========================================================
class FaceRecognitionSystem:
def __init__(self, root: tk.Tk) -> None:
self.root = root
self.root.title("智能人脸识别安全监控系统")
self.root.geometry("1300x850")
self.root.configure(bg='#f0f0f0')
self.root.update_idletasks()
w = self.root.winfo_width()
h = self.root.winfo_height()
sw = self.root.winfo_screenwidth()
sh = self.root.winfo_screenheight()
self.root.geometry(f'+{(sw - w) // 2}+{(sh - h) // 2}')
self.root.deiconify()
self.root.lift()
# 颜色主题
self.colors: Dict[str, str] = {
'primary': '#2c3e50',
'secondary': '#3498db',
'success': '#2ecc71',
'warning': '#f39c12',
'danger': '#e74c3c',
'light': '#ecf0f1',
'dark': '#34495e'
}
# 基础路径
self.dataset_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "face_database")
self.encodings_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "face_encodings.pkl")
self.names_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "face_names.pkl")
self.config_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "config.json")
self.alert_log_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "alert_logs.txt")
os.makedirs(self.dataset_path, exist_ok=True)
os.makedirs(os.path.join(os.path.expanduser("~"), "FaceRecognition", "alerts"), exist_ok=True)
os.makedirs(os.path.join(os.path.expanduser("~"), "FaceRecognition", "logs"), exist_ok=True)
os.makedirs(os.path.join(os.path.expanduser("~"), "FaceRecognition", "auth_frames"), exist_ok=True)
# 配置
self.config: Dict[str, Any] = self._load_config()
# 人脸数据
self.known_face_encodings: List[np.ndarray] = []
self.known_face_names: List[str] = []
self.known_face_ids: List[int] = []
self._load_face_data()
# 授权用户
self.authorized_users: Dict[int, Dict[str, Any]] = {}
self._load_authorized_users()
# 摄像头参数
self.camera_width = self.config.get('camera_width', 640)
self.camera_height = self.config.get('camera_height', 480)
self.frame_skip = self.config.get('frame_skip', 2)
self.tolerance = self.config.get('tolerance', 0.6)
# 字体
self.font = self._setup_font()
self.small_font = self._setup_font(size=12)
self.large_font = self._setup_font(size=20)
# 语音
self.voice_engine = pyttsx3.init()
self.voice_engine.setProperty('rate', self.config.get('voice_rate', 130))
self.voice_queue: queue.Queue[str] = queue.Queue(maxsize=3)
self._start_voice_thread()
# 安全监控
self.alert_enabled = self.config.get('alert_enabled', True)
self.unknown_alert_threshold = self.config.get('unknown_alert_threshold', 3)
self.unauthorized_alert_threshold = self.config.get('unauthorized_alert_threshold', 2)
self.alert_cooldown = self.config.get('alert_cooldown', 3)
self.last_alert_time = 0.0
self.unknown_face_count = 0
self.unauthorized_face_count = 0
self.alert_lock = threading.Lock()
# 精细化计数
self.current_persons: Dict[str, Dict[str, Any]] = {}
self.person_id_counter = 0
self.detection_interval = 1.0
self.disappear_threshold = 3.0
self.tracked_faces: Dict[str, Tuple[float, float]] = {}
# 历史统计
self.authorized_detections = 0
self.unauthorized_detections = 0
self.unknown_detections = 0
self.count_lock = threading.Lock()
# 播报
self.last_announcement_time = 0.0
self.announcement_queue: queue.Queue[bool] = queue.Queue(maxsize=1)
# 性能
self.processing = False
self.frame_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=1)
self.result_queue: queue.Queue[
Tuple[List[Tuple[int, int, int, int]], List[Tuple[str, str, Tuple[int, int, int]]]]] = queue.Queue(
maxsize=1)
self.processing_lock = threading.Lock()
# 状态
self.last_recognized: Optional[str] = None
self.last_recognized_time = 0.0
self.current_frame: Optional[np.ndarray] = None
self.recognition_active = False
self.camera_active = False
self.camera_stopped = False
self.cap: Optional[cv2.VideoCapture] = None
self.detection_history = deque(maxlen=100)
# 线程
self.process_thread = threading.Thread(target=self._process_frames, daemon=True)
self.process_thread.start()
self.announcement_thread = threading.Thread(target=self._periodic_announcement, daemon=True)
self.announcement_thread.start()
self.cleanup_thread = threading.Thread(target=self._cleanup_disappeared_persons, daemon=True)
self.cleanup_thread.start()
# GUI
self._create_gui()
self._update_camera_preview()
logging.info("人脸识别系统初始化完成!")
logging.info(f"已加载{len(self.known_face_encodings)}个已注册人脸")
# 窗口居中
self.root.update_idletasks()
w = self.root.winfo_width()
h = self.root.winfo_height()
sw = self.root.winfo_screenwidth()
sh = self.root.winfo_screenheight()
self.root.geometry(f'+{(sw - w) // 2}+{(sh - h) // 2}')
self.root.deiconify()
self.root.lift()
# ===== 新增:MySQL 连接池(懒连接) =====
self._mysql_pool = None
self._mysql_cfg = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "root",
"database": "school_db",
"charset": "utf8mb4",
"autocommit": True
}
self._ensure_table()
# -------------------- 字体设置 --------------------
def _setup_font(self, size: int = 18) -> ImageFont.FreeTypeFont:
font_paths = []
if platform.system() == "Windows":
font_paths.extend([
os.path.join(os.environ.get("WINDIR", ""), "Fonts", "simhei.ttf"),
os.path.join(os.environ.get("WINDIR", ""), "Fonts", "msyh.ttc"),
os.path.join(os.environ.get("WINDIR", ""), "Fonts", "simsun.ttc"),
])
elif platform.system() == "Darwin":
font_paths.extend(["/System/Library/Fonts/PingFang.ttc"])
else:
font_paths.extend(["/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc"])
for path in font_paths:
if os.path.exists(path):
try:
return ImageFont.truetype(path, size, index=0) if path.endswith('.ttc') else ImageFont.truetype(
path, size)
except Exception as e:
logging.warning(f"加载字体 {path} 失败: {e}")
return ImageFont.load_default()
# -------------------- 配置 --------------------
def _load_config(self) -> Dict[str, Any]:
default_config = {
'camera_width': 640, 'camera_height': 480, 'tolerance': 0.6,
'voice_rate': 130, 'alert_enabled': True, 'unknown_alert_threshold': 3,
'unauthorized_alert_threshold': 2, 'alert_cooldown': 3, 'frame_skip': 2,
'email_notification': False, 'smtp_server': 'smtp.example.com',
'smtp_port': 587, 'smtp_user': 'your_email@example.com',
'smtp_password': 'your_password', 'alert_recipient': 'recipient@example.com'
}
if os.path.exists(self.config_path):
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
default_config.update(json.load(f))
except Exception as e:
logging.error(f"加载配置失败: {e}")
return default_config
def _save_config(self) -> None:
try:
with open(self.config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, indent=4, ensure_ascii=False)
except Exception as e:
logging.error(f"保存配置失败: {e}")
# -------------------- 识别成败落库(新增)--------------------
logging.info("[DEBUG] 准备调用 _write_confirm_result_ex,user_id=%s, user_type=%s")
def _write_confirm_result_ex(self, user_id: int, usercode: str,
username: str, pwd: str, result: str,
success: int = 0,
snapshot_path: str = "") -> None:
def _worker():
logging.info("[DEBUG] _worker 线程启动,准备写入数据库...")
logging.info("[INSERT] 准备写入 tb_confirm:uid=%s,ucode=%s,uname=%s,res=%s,success=%s",
user_id, usercode, username, result, success)
conn = self._get_mysql_conn()
if not conn:
logging.warning("[INSERT] 拿不到连接,直接返回")
return
try:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO tb_confirm
(userid, usercode, username, pwd, Result,
lastlogintime, success, snapshot_path)
VALUES (%s,%s,%s,%s,%s, NOW(),%s,%s)
ON DUPLICATE KEY UPDATE
usercode = VALUES(usercode),
username = VALUES(username),
pwd = VALUES(pwd),
Result = VALUES(Result),
lastlogintime = VALUES(lastlogintime),
success = VALUES(success),
snapshot_path = VALUES(snapshot_path)
""", (user_id, usercode, username, pwd, result, success, snapshot_path))
logging.info("[INSERT] 写入成功")
self.speak("数据插入成功")
except Exception as e:
logging.exception("[INSERT] SQL 异常:%s", e)
except Exception as e:
logging.exception("[INSERT] SQL 异常:%s", e) # 详细堆栈
finally:
conn.close()
threading.Thread(target=_worker, daemon=True).start()
# -------------------- 人脸数据 --------------------
def _load_face_data(self) -> None:
if os.path.exists(self.encodings_path) and os.path.exists(self.names_path):
try:
with open(self.encodings_path, 'rb') as f:
data = pickle.load(f)
self.known_face_encodings = data.get('encodings', [])
self.known_face_ids = data.get('ids', [])
with open(self.names_path, 'rb') as f:
self.known_face_names = pickle.load(f)
except Exception as e:
logging.error(f"加载人脸数据失败: {e}")
self.known_face_encodings, self.known_face_names, self.known_face_ids = [], [], []
def _save_face_data(self) -> None:
try:
with open(self.encodings_path, 'wb') as f:
pickle.dump({'encodings': self.known_face_encodings, 'ids': self.known_face_ids}, f)
with open(self.names_path, 'wb') as f:
pickle.dump(self.known_face_names, f)
except Exception as e:
logging.error(f"保存人脸数据失败: {e}")
# -------------------- 授权用户 --------------------
def _load_authorized_users(self) -> None:
auth_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "authorized_users.pkl")
if os.path.exists(auth_path):
try:
with open(auth_path, 'rb') as f:
self.authorized_users = pickle.load(f)
except Exception as e:
logging.error(f"加载授权用户失败: {e}")
self.authorized_users = {}
else:
self.authorized_users = {1: {"name": "管理员", "authorized": True},
2: {"name": "访客", "authorized": False}}
self._save_authorized_users()
def _save_authorized_users(self) -> None:
auth_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "authorized_users.pkl")
try:
with open(auth_path, 'wb') as f:
pickle.dump(self.authorized_users, f)
except Exception as e:
logging.error(f"保存授权用户失败: {e}")
# -------------------- GUI --------------------
def _create_gui(self) -> None:
main_container = ttk.Frame(self.root, style='Light.TFrame')
main_container.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self._create_top_status_bar(main_container)
content_frame = ttk.Frame(main_container, style='Light.TFrame')
content_frame.pack(fill=tk.BOTH, expand=True, pady=10)
left_frame = ttk.LabelFrame(content_frame, text="实时监控", padding=10)
left_frame.grid(row=0, column=0, rowspan=2, sticky=tk.NSEW, padx=(0, 10))
self.camera_frame = ttk.Frame(left_frame)
self.camera_frame.pack(fill=tk.BOTH, expand=True)
self.camera_label = ttk.Label(self.camera_frame, relief=tk.SUNKEN)
self.camera_label.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
control_frame = ttk.Frame(left_frame)
control_frame.pack(fill=tk.X, pady=10)
self.start_button = ttk.Button(control_frame, text="启动监控", command=self._start_recognition)
self.start_button.grid(row=0, column=0, padx=5)
self.stop_button = ttk.Button(control_frame, text="停止监控", command=self._stop_recognition, state=tk.DISABLED)
self.stop_button.grid(row=0, column=1, padx=5)
self.reset_count_button = ttk.Button(control_frame, text="重置计数", command=self._reset_counts)
self.reset_count_button.grid(row=0, column=2, padx=5)
self.alert_var = tk.BooleanVar(value=self.alert_enabled)
self.alert_check = ttk.Checkbutton(control_frame, text="启用警报", variable=self.alert_var,
command=self._toggle_alert)
self.alert_check.grid(row=0, column=3, padx=10)
right_frame = ttk.Frame(content_frame, style='Light.TFrame')
right_frame.grid(row=0, column=1, sticky=tk.NSEW)
self._create_user_management_panel(right_frame)
bottom_right_frame = ttk.Frame(content_frame, style='Light.TFrame')
bottom_right_frame.grid(row=1, column=1, sticky=tk.NSEW, pady=(10, 0))
self._create_system_info_panel(bottom_right_frame)
content_frame.columnconfigure(0, weight=7)
content_frame.columnconfigure(1, weight=3)
content_frame.rowconfigure(0, weight=1)
content_frame.rowconfigure(1, weight=1)
self.ui_update_interval = 30
def _create_top_status_bar(self, parent: ttk.Frame) -> None:
status_bar = ttk.Frame(parent, height=40, style='Primary.TFrame')
status_bar.pack(fill=tk.X, pady=(0, 10))
status_bar.pack_propagate(False)
system_info = ttk.Label(status_bar, text="智能人脸识别安全监控系统", background=self.colors['primary'],
foreground='white', font=('SimHei', 12, 'bold'))
system_info.pack(side=tk.LEFT, padx=20)
self.status_indicator = ttk.Label(status_bar, text="状态: 就绪", background=self.colors['primary'],
foreground='white', font=('SimHei', 10))
self.status_indicator.pack(side=tk.LEFT, padx=20)
self.stats_label = ttk.Label(status_bar, text="授权: 0 | 未授权: 0 | 未知: 0 | 当前: 0",
background=self.colors['primary'], foreground='white', font=('SimHei', 10))
self.stats_label.pack(side=tk.RIGHT, padx=20)
def _create_user_management_panel(self, parent: ttk.Frame) -> None:
user_frame = ttk.LabelFrame(parent, text="用户管理", padding=10)
user_frame.pack(fill=tk.BOTH, expand=True)
list_frame = ttk.Frame(user_frame)
list_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
user_scroll = ttk.Scrollbar(list_frame)
user_scroll.pack(side=tk.RIGHT, fill=tk.Y)
self.user_tree = ttk.Treeview(list_frame, columns=("ID", "Name", "Status"), show="headings",
yscrollcommand=user_scroll.set, height=8)
self.user_tree.heading("ID", text="用户ID")
self.user_tree.heading("Name", text="姓名")
self.user_tree.heading("Status", text="授权状态")
self.user_tree.column("ID", width=80, anchor=tk.CENTER)
self.user_tree.column("Name", width=150, anchor=tk.W)
self.user_tree.column("Status", width=100, anchor=tk.CENTER)
self.user_tree.pack(fill=tk.BOTH, expand=True)
user_scroll.config(command=self.user_tree.yview)
self._update_user_list()
button_frame = ttk.Frame(user_frame)
button_frame.pack(fill=tk.X)
ttk.Button(button_frame, text="添加用户", command=self._add_user).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="编辑用户", command=self._edit_user).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="删除用户", command=self._delete_user).pack(side=tk.LEFT, padx=5)
def _create_system_info_panel(self, parent: ttk.Frame) -> None:
status_frame = ttk.LabelFrame(parent, text="系统状态", padding=10)
status_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
self._create_status_cards(status_frame)
log_frame = ttk.LabelFrame(parent, text="警报日志", padding=10)
log_frame.pack(fill=tk.BOTH, expand=True)
log_container = ttk.Frame(log_frame)
log_container.pack(fill=tk.BOTH, expand=True)
log_scroll = ttk.Scrollbar(log_container)
log_scroll.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text = tk.Text(log_container, height=8, yscrollcommand=log_scroll.set, state=tk.DISABLED, bg='white',
fg='black', font=('SimHei', 9))
self.log_text.pack(fill=tk.BOTH, expand=True)
log_scroll.config(command=self.log_text.yview)
self._load_logs()
def _create_status_cards(self, parent: ttk.Frame) -> None:
card_container = ttk.Frame(parent)
card_container.pack(fill=tk.BOTH, expand=True)
authorized_card = self._create_status_card(card_container, "累计授权", "0", self.colors['success'])
authorized_card.grid(row=0, column=0, padx=5, pady=5, sticky=tk.NSEW)
unauthorized_card = self._create_status_card(card_container, "累计未授权", "0", self.colors['warning'])
unauthorized_card.grid(row=0, column=1, padx=5, pady=5, sticky=tk.NSEW)
unknown_card = self._create_status_card(card_container, "累计未知", "0", self.colors['danger'])
unknown_card.grid(row=0, column=2, padx=5, pady=5, sticky=tk.NSEW)
current_card = self._create_status_card(card_container, "当前检测", "0", self.colors['secondary'])
current_card.grid(row=1, column=0, columnspan=3, padx=5, pady=5, sticky=tk.NSEW)
self.status_cards: Dict[str, ttk.Frame] = {
'authorized': authorized_card,
'unauthorized': unauthorized_card,
'unknown': unknown_card,
'current': current_card
}
card_container.columnconfigure(0, weight=1)
card_container.columnconfigure(1, weight=1)
card_container.columnconfigure(2, weight=1)
card_container.rowconfigure(0, weight=1)
card_container.rowconfigure(1, weight=1)
def _create_status_card(self, parent: ttk.Frame, title: str, value: str, color: str) -> ttk.Frame:
card = ttk.Frame(parent)
card.configure(relief=tk.RAISED, borderwidth=1)
title_label = ttk.Label(card, text=title, font=('SimHei', 10, 'bold'))
title_label.pack(pady=(5, 2))
value_label = ttk.Label(card, text=value, font=('SimHei', 16, 'bold'), foreground=color)
value_label.pack(pady=2)
setattr(card, 'value_label', value_label)
return card
def _update_status_cards(self) -> None:
try:
with self.count_lock:
authorized = self.authorized_detections
unauthorized = self.unauthorized_detections
unknown = self.unknown_detections
current = len(self.current_persons)
getattr(self.status_cards['authorized'], 'value_label').config(text=str(authorized))
getattr(self.status_cards['unauthorized'], 'value_label').config(text=str(unauthorized))
getattr(self.status_cards['unknown'], 'value_label').config(text=str(unknown))
getattr(self.status_cards['current'], 'value_label').config(text=str(current))
self.stats_label.config(
text=f"授权: {authorized} | 未授权: {unauthorized} | 未知: {unknown} | 当前: {current}")
except Exception as e:
logging.error(f"更新状态卡片错误: {e}")
def _reset_counts(self) -> None:
with self.count_lock:
self.authorized_detections = 0
self.unauthorized_detections = 0
self.unknown_detections = 0
self.current_persons.clear()
self._update_status_cards()
self._add_log("计数已重置")
def _cleanup_disappeared_persons(self) -> None:
while True:
try:
if not self.recognition_active:
time.sleep(1)
continue
current_time = time.time()
to_remove = []
for pid, info in self.current_persons.items():
if current_time - info['last_seen'] > self.disappear_threshold:
to_remove.append(pid)
for pid in to_remove:
del self.current_persons[pid]
if pid in self.tracked_faces:
del self.tracked_faces[pid]
logging.debug(f"人员 {pid} 已离开,移除计数")
if to_remove:
self.root.after(0, self._update_status_cards)
time.sleep(1)
except Exception as e:
logging.error(f"清理人员状态错误: {e}")
time.sleep(1)
# -------------------- 帧处理 --------------------
def _process_frames(self) -> None:
frame_count = 0
while True:
try:
if not self.recognition_active:
time.sleep(0.1)
continue
try:
frame = self.frame_queue.get(timeout=0.5)
except queue.Empty:
continue
frame_count += 1
if frame_count % (self.frame_skip + 1) != 0:
continue
small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
rgb_small_frame = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
face_locations = face_recognition.face_locations(rgb_small_frame, model="hog")
face_encodings = face_recognition.face_encodings(rgb_small_frame, face_locations,
num_jitters=1) if face_locations else []
results = []
current_time = time.time()
detected_persons: Dict[str, Dict[str, Any]] = {}
max_faces = 5
face_encodings = face_encodings[:max_faces]
face_locations = face_locations[:max_faces]
for i, (face_encoding, face_location) in enumerate(zip(face_encodings, face_locations)):
matches = face_recognition.compare_faces(self.known_face_encodings, face_encoding,
tolerance=self.tolerance)
name = "未知用户"
status = "未知"
color = (0, 0, 255)
user_type = 'unknown'
user_id: Optional[int] = None
if self.known_face_encodings and any(matches):
face_distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
best_match_index = np.argmin(face_distances)
if matches[best_match_index]:
name = self.known_face_names[best_match_index]
user_id = self.known_face_ids[best_match_index]
auth_info = self.authorized_users.get(user_id, {"authorized": False})
if auth_info.get("authorized", False):
status = "已授权"
color = (0, 255, 0)
user_type = 'authorized'
else:
status = "未授权"
color = (0, 165, 255)
user_type = 'unauthorized'
# 匹配追踪
matched_pid: Optional[str] = None
if user_id is not None:
for pid, pinfo in self.current_persons.items():
if pinfo.get('user_id') == user_id and current_time - pinfo[
'last_seen'] < self.detection_interval:
matched_pid = pid
break
if matched_pid is None and user_type == 'unknown':
face_center = ((face_location[1] + face_location[3]) / 2,
(face_location[0] + face_location[2]) / 2)
for pid, pinfo in self.current_persons.items():
if pinfo['type'] == 'unknown' and current_time - pinfo[
'last_seen'] < self.detection_interval:
if pid in self.tracked_faces:
tracked_center = self.tracked_faces[pid]
distance = np.sqrt((face_center[0] - tracked_center[0]) ** 2 + (
face_center[1] - tracked_center[1]) ** 2)
if distance < 50:
matched_pid = pid
break
if matched_pid is None:
self.person_id_counter += 1
matched_pid = f"p{self.person_id_counter}"
self.current_persons[matched_pid] = {
'name': name, 'type': user_type, 'user_id': user_id,
'last_seen': current_time, 'first_seen': current_time,
'count_added': False, 'face_location': face_location
}
if user_type == 'unknown':
self.tracked_faces[matched_pid] = face_center
else:
self.current_persons[matched_pid]['last_seen'] = current_time
self.current_persons[matched_pid]['face_location'] = face_location
if user_type == 'unknown':
self.tracked_faces[matched_pid] = face_center
detected_persons[matched_pid] = self.current_persons[matched_pid]
results.append((name, status, color))
# ===== 识别结果落库(修正位置和作用域)=====
alert_image_path = ""
if user_id is not None: # 已识别用户
# 生成截图路径
alert_image_path = self._trigger_alert(frame,
"认证成功" if user_type == 'authorized' else "认证失败")
usercode = str(user_id).zfill(8)
username = name
pwd = "12345678" # 本地默认密码,可根据实际情况修改
if user_type == 'authorized':
self._write_confirm_result_ex(
user_id, usercode, username, pwd, "认证成功",
success=1, snapshot_path=alert_image_path)
self.speak(f"认证成功,{username}")
else: # 未授权用户
self._write_confirm_result_ex(
user_id, usercode, username, pwd, "认证失败",
success=0, snapshot_path=alert_image_path)
self.speak("非法入侵,请联系管理")
else: # 未知人员
alert_image_path = self._trigger_alert(frame, "检测到未知人员入侵")
self._write_confirm_result_ex(
-1, "unknown", name, "", "认证失败",
success=0, snapshot_path=alert_image_path)
self.speak("非法入侵,请联系管理")
# 计数更新
for pid, pinfo in detected_persons.items():
if not pinfo['count_added'] and current_time - pinfo['first_seen'] > 1.0:
with self.count_lock:
if pinfo['type'] == 'authorized':
self.authorized_detections += 1
elif pinfo['type'] == 'unauthorized':
self.unauthorized_detections += 1
else:
self.unknown_detections += 1
self.current_persons[pid]['count_added'] = True
logging.debug(f"新增计数 - {pinfo['type']}: {pinfo['name']}")
# 更新UI
self.root.after(0, self._update_status_cards)
# 警报处理
if self.alert_enabled:
unknown_detected = any(p['type'] == 'unknown' for p in detected_persons.values())
unauthorized_detected = any(p['type'] == 'unauthorized' for p in detected_persons.values())
if unknown_detected:
self._handle_unknown_alert(frame)
if unauthorized_detected:
unauthorized_names = [p['name'] for p in detected_persons.values() if
p['type'] == 'unauthorized']
self._handle_unauthorized_alert(frame, unauthorized_names)
if not self.result_queue.full():
self.result_queue.put((face_locations, results))
except Exception as e:
logging.error(f"处理帧错误: {e}")
time.sleep(0.1)
# -------------------- 警报处理 --------------------
def _handle_unknown_alert(self, frame: np.ndarray) -> None:
try:
with self.alert_lock:
current_time = time.time()
if current_time - self.last_alert_time < self.alert_cooldown:
return
self.unknown_face_count += 1
if self.unknown_face_count >= self.unknown_alert_threshold:
self._trigger_alert(frame, "检测到未知人员入侵")
self.unknown_face_count = 0
self.last_alert_time = current_time
except Exception as e:
logging.error(f"处理未知警报错误: {e}")
def _handle_unauthorized_alert(self, frame: np.ndarray, unauthorized_names: List[str]) -> None:
try:
with self.alert_lock:
current_time = time.time()
if current_time - self.last_alert_time < self.alert_cooldown:
return
self.unauthorized_face_count += 1
if self.unauthorized_face_count >= self.unauthorized_alert_threshold:
alert_msg = f"检测到未授权用户: {', '.join(unauthorized_names)}" if unauthorized_names else "检测到未授权用户"
self._trigger_alert(frame, alert_msg)
self.unauthorized_face_count = 0
self.last_alert_time = current_time
except Exception as e:
logging.error(f"处理未授权警报错误: {e}")
def _trigger_alert(self, frame: np.ndarray, alert_type: str) -> str:
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
alert_image_path = os.path.join(os.path.expanduser("~"), "FaceRecognition", "alerts",
f"alert_{timestamp}.jpg")
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 70]
cv2.imwrite(alert_image_path, frame, encode_param)
alert_msg = f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - {alert_type}"
with open(self.alert_log_path, 'a', encoding='utf-8') as f:
f.write(alert_msg + "\n")
self.root.after(0, self._add_log, alert_msg)
self.speak(f"警告!{alert_type}")
# ===== 新增:MySQL 落库 + 本地存图 =====
if "未知" in alert_type:
self._write_auth_log("未知人员", "unknown", frame)
elif "未授权" in alert_type:
name = alert_type.split("未授权用户: ")[-1].split(" ")[0] if "未授权用户: " in alert_type else "未授权用户"
self._write_auth_log(name, "unauthorized", frame)
if self.config.get('email_notification', False):
threading.Thread(target=self._send_alert_email, args=(alert_image_path, alert_type),
daemon=True).start()
self.root.after(0, self._show_alert_status, alert_type)
logging.warning(f"安全警报: {alert_type}")
return alert_image_path
except Exception as e:
logging.error(f"触发警报错误: {e}")
return ""
def _show_alert_status(self, alert_type: str) -> None:
try:
self.status_indicator.config(text=f"警报: {alert_type}", foreground='red')
self.root.after(5000, lambda: self.status_indicator.config(
text=f"状态: {'监控中' if self.recognition_active else '就绪'}", foreground='white'))
except Exception as e:
logging.error(f"显示警报状态错误: {e}")
# -------------------- MySQL 相关 --------------------
# 导入DBUtils的连接池
from dbutils.pooled_db import PooledDB
def _get_mysql_conn(self):
if self._mysql_pool is None:
try:
self._mysql_pool = PooledDB(
creator=pymysql,
host=self._mysql_cfg['host'],
port=self._mysql_cfg['port'],
user=self._mysql_cfg['user'],
password=self._mysql_cfg['password'],
database=self._mysql_cfg['database'],
charset=self._mysql_cfg['charset'],
autocommit=self._mysql_cfg['autocommit'],
maxconnections=3 # 最大连接数
)
except Exception as e:
logging.warning("MySQL 连接池创建失败:%s", e)
return None
try:
return self._mysql_pool.connection()
except Exception as e:
logging.warning("MySQL 获取连接失败:%s", e)
return None
def _ensure_table(self) -> None:
"""建库建表(仅首次)"""
conn = self._get_mysql_conn()
if not conn:
return
try:
with conn.cursor() as cur:
cur.execute(
f"CREATE DATABASE IF NOT EXISTS `{self._mysql_cfg['database']}` DEFAULT CHARACTER SET utf8mb4")
conn.database = self._mysql_cfg['database']
cur.execute("""
CREATE TABLE IF NOT EXISTS `face_auth_log` (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
person_name VARCHAR(64) NOT NULL,
person_type ENUM('authorized','unauthorized','unknown') NOT NULL,
camera_frame TEXT NOT NULL,
UNIQUE KEY uniq_ts_name (timestamp, person_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_confirm` (
userid INT(11) NOT NULL,
usercode VARCHAR(8) NOT NULL,
username VARCHAR(8) NOT NULL,
pwd VARCHAR(10) NOT NULL,
loginnum INT(11) DEFAULT 0,
lastlogintime DATETIME DEFAULT NULL,
registertime DATETIME DEFAULT CURRENT_TIMESTAMP,
Result VARCHAR(20) DEFAULT NULL,
success TINYINT(1) DEFAULT 0 COMMENT '1-成功 0-失败',
snapshot_path TEXT DEFAULT NULL,
PRIMARY KEY (userid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
self.speak("数据库连接成功")
except Exception as e:
logging.warning("MySQL 建表失败:%s", e)
finally:
conn.close()
def _write_auth_log(self, person_name: str, person_type: str, frame: np.ndarray) -> None:
def _worker():
now = datetime.now()
sub_dir = now.strftime("%Y-%m/%d")
save_dir = os.path.join(os.path.expanduser("~"), "FaceRecognition", "auth_frames", sub_dir)
os.makedirs(save_dir, exist_ok=True)
file_name = now.strftime("%H%M%S_%f")[:-3] + ".jpg"
file_path = os.path.join(save_dir, file_name)
cv2.imwrite(file_path, frame)
conn = self._get_mysql_conn()
if not conn:
return
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO face_auth_log (person_name, person_type, camera_frame) VALUES (%s,%s,%s) "
"ON DUPLICATE KEY UPDATE camera_frame=VALUES(camera_frame)",
(person_name, person_type, file_path)
)
except Exception as e:
logging.warning("MySQL 写入失败:%s", e)
finally:
conn.close()
threading.Thread(target=_worker, daemon=True).start()
# -------------------- 语音 --------------------
def _start_voice_thread(self) -> None:
def worker():
while True:
try:
text = self.voice_queue.get(timeout=1)
if text is None:
break
cmd = f'PowerShell -Command "Add-Type -AssemblyName System.Speech; $synth = New-Object System.Speech.Synthesis.SpeechSynthesizer; $synth.Speak(\'{text}\')"'
subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except queue.Empty:
continue
except Exception as e:
logging.error("语音异常:%s", e)
threading.Thread(target=worker, daemon=True).start()
def speak(self, text: str) -> None:
try:
self.voice_queue.put_nowait(text)
except queue.Full:
pass
def _periodic_announcement(self) -> None:
while True:
try:
if self.recognition_active and self.alert_enabled:
current_time = time.time()
if current_time - self.last_announcement_time >= 5 and self.current_persons:
if not self.announcement_queue.full():
self.announcement_queue.put(True)
self._make_announcement()
self.last_announcement_time = current_time
time.sleep(1)
except Exception as e:
logging.error(f"播报线程错误: {e}")
time.sleep(1)
def _make_announcement(self) -> None:
try:
authorized_users = []
unauthorized_users = []
unknown_count = 0
for pinfo in self.current_persons.values():
if pinfo['type'] == 'authorized':
authorized_users.append(pinfo['name'])
elif pinfo['type'] == 'unauthorized':
unauthorized_users.append(pinfo['name'])
else:
unknown_count += 1
announcement = ""
if authorized_users:
announcement += f"检测到授权用户: {', '.join(authorized_users)}。"
if unauthorized_users:
announcement += f"检测到未授权用户: {', '.join(unauthorized_users)}。"
if unknown_count > 0:
announcement += f"检测到{unknown_count}名未知人员。"
if not announcement:
announcement = f"当前监控区域共有{len(self.current_persons)}人。"
self.speak(announcement)
except Exception as e:
logging.error(f"生成播报内容错误: {e}")
# -------------------- 摄像头 --------------------
def _update_camera_preview(self) -> None:
if not self.camera_active:
self.cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if self.cap.isOpened():
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap.set(cv2.CAP_PROP_FPS, 15)
self.camera_active = True
self.camera_stopped = False
try:
if self.cap and self.cap.isOpened():
ret, frame = self.cap.read()
if ret:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(rgb_frame)
draw = ImageDraw.Draw(pil_img)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
draw.text((10, 10), timestamp, font=self.font, fill=(255, 255, 255))
user_count_text = f"已注册用户: {len(self.known_face_encodings)} | 当前人员: {len(self.current_persons)}"
draw.text((10, 40), user_count_text, font=self.font, fill=(0, 255, 0))
frame = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
if self.recognition_active:
try:
if self.frame_queue.empty():
self.frame_queue.put_nowait(frame)
except Exception:
pass
try:
face_locations, results = self.result_queue.get_nowait()
display_frame = frame.copy()
display_pil = Image.fromarray(cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(display_pil)
for (top, right, bottom, left), (name, status, color) in zip(face_locations, results):
top = int(top * 4)
right = int(right * 4)
bottom = int(bottom * 4)
left = int(left * 4)
draw.rectangle([(left, top), (right, bottom)], outline=color, width=2)
info_height = 30
draw.rectangle([(left, bottom - info_height), (right, bottom)], fill=color)
text = f"{name} ({status})"
text_bbox = draw.textbbox((0, 0), text, font=self.small_font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = left + (right - left - text_width) // 2
text_y = bottom - info_height + (info_height - text_height) // 2
draw.text((text_x, text_y), text, font=self.small_font, fill=(255, 255, 255))
frame = cv2.cvtColor(np.array(display_pil), cv2.COLOR_RGB2BGR)
except queue.Empty:
pass
else:
frame = np.zeros((self.camera_height, self.camera_width, 3), dtype=np.uint8)
cv2.putText(frame, "摄像头未连接", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
else:
frame = np.zeros((self.camera_height, self.camera_width, 3), dtype=np.uint8)
cv2.putText(frame, "无法打开摄像头", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
if not self.recognition_active:
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
text = "点击启动监控开始人脸识别"
text_bbox = draw.textbbox((0, 0), text, font=self.font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_x = (self.camera_width - text_width) // 2
text_y = (self.camera_height + text_height) // 2
overlay = Image.new('RGBA', pil_img.size, (0, 0, 0, 128))
overlay_draw = ImageDraw.Draw(overlay)
overlay_draw.rectangle([
(text_x - 10, text_y - text_height - 5),
(text_x + text_width + 10, text_y + 5)
], fill=(0, 0, 0, 128))
pil_img = Image.alpha_composite(pil_img.convert('RGBA'), overlay).convert('RGB')
draw = ImageDraw.Draw(pil_img)
draw.text((text_x, text_y), text, font=self.font, fill=(0, 255, 0))
frame = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
label_width = self.camera_label.winfo_width()
label_height = self.camera_label.winfo_height()
if label_width > 1 and label_height > 1:
h, w = frame.shape[:2]
aspect_ratio = w / h
if label_width / label_height > aspect_ratio:
new_height = label_height
new_width = int(new_height * aspect_ratio)
else:
new_width = label_width
new_height = int(new_width / aspect_ratio)
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
img = Image.fromarray(frame)
imgtk = ImageTk.PhotoImage(image=img)
self.camera_label.imgtk = imgtk
self.camera_label.configure(image=imgtk)
except Exception as e:
logging.error(f"更新摄像头预览错误: {e}")
if not self.camera_stopped:
self.root.after(self.ui_update_interval, self._update_camera_preview)
# -------------------- 控制 --------------------
def _start_recognition(self) -> None:
try:
self.recognition_active = True
self.status_indicator.config(text="状态: 监控中", foreground='white')
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
threading.Thread(target=self.speak, args=("人脸识别监控系统已启动",), daemon=True).start()
logging.info("人脸识别监控已启动")
except Exception as e:
logging.error(f"启动识别错误: {e}")
def _stop_recognition(self) -> None:
try:
self.recognition_active = False
self.current_persons.clear()
self.tracked_faces.clear()
self.status_indicator.config(text="状态: 就绪", foreground='white')
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
threading.Thread(target=self.speak, args=("人脸识别监控系统已停止",), daemon=True).start()
logging.info("人脸识别监控已停止")
except Exception as e:
logging.error(f"停止识别错误: {e}")
def _toggle_alert(self) -> None:
try:
self.alert_enabled = self.alert_var.get()
self.config['alert_enabled'] = self.alert_enabled
self._save_config()
self._add_log(f"警报功能{'开启' if self.alert_enabled else '关闭'}")
except Exception as e:
logging.error(f"切换警报状态错误: {e}")
def _add_log(self, message: str) -> None:
try:
self.log_text.config(state=tk.NORMAL)
timestamp = datetime.now().strftime("%H:%M:%S")
self.log_text.insert(tk.END, f"[{timestamp}] {message}\n")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
except Exception as e:
logging.error(f"添加日志错误: {e}")
def _load_logs(self) -> None:
self.log_text.config(state=tk.NORMAL)
self.log_text.delete(1.0, tk.END)
if os.path.exists(self.alert_log_path):
try:
with open(self.alert_log_path, 'r', encoding='utf-8') as f:
logs = f.readlines()[-10:]
for log in logs:
self.log_text.insert(tk.END, log)
except Exception as e:
logging.error(f"加载日志失败: {e}")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
# -------------------- 用户管理 --------------------
def _update_user_list(self) -> None:
for item in self.user_tree.get_children():
self.user_tree.delete(item)
for user_id, user_name in zip(self.known_face_ids, self.known_face_names):
auth_info = self.authorized_users.get(user_id, {})
status = "已授权" if auth_info.get("authorized", False) else "未授权"
self.user_tree.insert("", "end", values=(user_id, user_name, status))
def _add_user(self) -> None:
add_window = tk.Toplevel(self.root)
add_window.title("添加用户")
add_window.geometry("400x300")
add_window.transient(self.root)
add_window.grab_set()
add_window.configure(bg='#f0f0f0')
ttk.Label(add_window, text="用户ID:", font=('SimHei', 10)).grid(row=0, column=0, padx=20, pady=20, sticky=tk.W)
user_id_entry = ttk.Entry(add_window)
user_id_entry.grid(row=0, column=1, padx=20, pady=20)
ttk.Label(add_window, text="用户名:", font=('SimHei', 10)).grid(row=1, column=0, padx=20, pady=10, sticky=tk.W)
name_entry = ttk.Entry(add_window)
name_entry.grid(row=1, column=1, padx=20, pady=10)
auth_var = tk.BooleanVar(value=True)
ttk.Checkbutton(add_window, text="授权用户", variable=auth_var).grid(row=2, column=0, columnspan=2, padx=20,
pady=10)
button_frame = ttk.Frame(add_window)
button_frame.grid(row=3, column=0, columnspan=2, pady=20)
ttk.Button(button_frame, text="确定",
command=lambda: self._register_user(add_window, user_id_entry, name_entry, auth_var)).pack(
side=tk.LEFT, padx=10)
ttk.Button(button_frame, text="取消", command=add_window.destroy).pack(side=tk.LEFT, padx=10)
def _register_user(self, window: tk.Toplevel, user_id_entry: ttk.Entry, name_entry: ttk.Entry,
auth_var: tk.BooleanVar) -> None:
try:
user_id_text = user_id_entry.get().strip()
if not user_id_text:
messagebox.showerror("错误", "用户ID不能为空")
return
user_id = int(user_id_text)
user_name = name_entry.get().strip()
authorized = auth_var.get()
if not user_name:
messagebox.showerror("错误", "用户名不能为空")
return
if user_id in self.known_face_ids:
if not messagebox.askyesno("确认", f"用户ID {user_id}已存在,是否覆盖?"):
return
window.destroy()
self._collect_face_samples(user_id, user_name, authorized)
except ValueError:
messagebox.showerror("错误", "用户ID必须是数字")
def _collect_face_samples(self, user_id: int, user_name: str, authorized: bool) -> None:
collect_window = tk.Toplevel(self.root)
collect_window.title(f"采集 {user_name} 的人脸样本")
collect_window.geometry("700x550")
collect_window.transient(self.root)
collect_window.grab_set()
collect_window.configure(bg='#f0f0f0')
title_label = ttk.Label(collect_window, text=f"请面对摄像头,正在采集 {user_name} 的人脸样本...",
font=('SimHei', 12, 'bold'))
title_label.pack(pady=10)
cam_frame = ttk.Frame(collect_window, relief=tk.SUNKEN, borderwidth=2)
cam_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=10)
self.collect_cam_label = ttk.Label(cam_frame)
self.collect_cam_label.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
progress_frame = ttk.Frame(collect_window)
progress_frame.pack(fill=tk.X, padx=20, pady=10)
self.collect_status_label = ttk.Label(progress_frame, text="准备采集...", font=('SimHei', 10))
self.collect_status_label.pack(pady=5)
self.progress_var = tk.IntVar()
self.progress_bar = ttk.Progressbar(progress_frame, variable=self.progress_var, maximum=5)
self.progress_bar.pack(fill=tk.X, pady=5)
button_frame = ttk.Frame(collect_window)
button_frame.pack(pady=10)
self.cancel_button = ttk.Button(button_frame, text="取消",
command=lambda: self._cancel_collection(collect_window))
self.cancel_button.pack(padx=10)
self._init_collection(user_id, user_name, authorized, collect_window)
def _init_collection(self, user_id: int, user_name: str, authorized: bool, window: tk.Toplevel) -> None:
self.collect_cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if self.collect_cap.isOpened():
self.collect_cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.collect_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.collect_face_encodings: List[np.ndarray] = []
self.collect_samples = 0
self.collect_max_samples = 5
self.collect_user_id = user_id
self.collect_user_name = user_name
self.collect_authorized = authorized
self.collect_window = window
self.collect_stopped = False
self._update_collection_preview()
def _update_collection_preview(self) -> None:
if self.collect_stopped:
return
try:
if self.collect_cap and self.collect_cap.isOpened():
ret, frame = self.collect_cap.read()
if ret:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_locations = face_recognition.face_locations(rgb_frame)
if face_locations:
encodings = face_recognition.face_encodings(rgb_frame, face_locations)
if encodings and self.collect_samples < self.collect_max_samples:
self.collect_face_encodings.append(encodings[0])
self.collect_samples += 1
self.progress_var.set(self.collect_samples)
self.collect_status_label.config(
text=f"已采集 {self.collect_samples}/{self.collect_max_samples} 个样本")
if self.collect_samples >= self.collect_max_samples:
self._finish_collection()
return
for (top, right, bottom, left) in face_locations:
cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
draw.text((left, top - 30), "请保持姿势", font=self.font, fill=(0, 255, 0))
frame = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(pil_img)
draw.text((10, 10), "请面对摄像头,保持光线充足", font=self.font, fill=(255, 255, 255))
frame = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
else:
frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(frame, "摄像头未连接", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
else:
frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(frame, "无法打开摄像头", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
img = Image.fromarray(frame)
label_width = self.collect_cam_label.winfo_width()
label_height = self.collect_cam_label.winfo_height()
if label_width > 1 and label_height > 1:
img = img.resize((label_width, label_height), Image.Resampling.BILINEAR)
imgtk = ImageTk.PhotoImage(image=img)
self.collect_cam_label.imgtk = imgtk
self.collect_cam_label.configure(image=imgtk)
except Exception as e:
logging.error(f"采集预览错误: {e}")
if not self.collect_stopped:
self.collect_window.after(30, self._update_collection_preview)
def _finish_collection(self) -> None:
self.collect_stopped = True
if self.collect_cap:
self.collect_cap.release()
self.collect_window.destroy()
if len(self.collect_face_encodings) > 0:
avg_encoding = np.mean(self.collect_face_encodings, axis=0)
if self.collect_user_id in self.known_face_ids:
idx = self.known_face_ids.index(self.collect_user_id)
self.known_face_encodings[idx] = avg_encoding
self.known_face_names[idx] = self.collect_user_name
else:
self.known_face_encodings.append(avg_encoding)
self.known_face_names.append(self.collect_user_name)
self.known_face_ids.append(self.collect_user_id)
self.authorized_users[self.collect_user_id] = {
"name": self.collect_user_name,
"authorized": self.collect_authorized
}
self._save_face_data()
self._save_authorized_users()
self._update_user_list()
messagebox.showinfo("成功", f"{self.collect_user_name}注册成功!")
self._add_log(f"用户 {self.collect_user_name} (ID: {self.collect_user_id}) 注册成功")
else:
messagebox.showerror("错误", "未能采集到有效样本")
def _cancel_collection(self, window: tk.Toplevel) -> None:
self.collect_stopped = True
if hasattr(self, 'collect_cap') and self.collect_cap:
self.collect_cap.release()
window.destroy()
def _edit_user(self) -> None:
selected = self.user_tree.selection()
if not selected:
messagebox.showwarning("警告", "请选择要编辑的用户")
return
try:
item = self.user_tree.item(selected[0])
user_id = int(item['values'][0])
user_name = item['values'][1]
current_auth = item['values'][2] == "已授权"
edit_window = tk.Toplevel(self.root)
edit_window.title("编辑用户")
edit_window.geometry("400x250")
edit_window.transient(self.root)
edit_window.grab_set()
edit_window.configure(bg='#f0f0f0')
ttk.Label(edit_window, text=f"用户ID: {user_id} (不可修改)", font=('SimHei', 10)).grid(row=0, column=0,
columnspan=2,
padx=20, pady=15)
ttk.Label(edit_window, text="用户名:", font=('SimHei', 10)).grid(row=1, column=0, padx=20, pady=10,
sticky=tk.W)
name_entry = ttk.Entry(edit_window)
name_entry.insert(0, user_name)
name_entry.grid(row=1, column=1, padx=20, pady=10)
auth_var = tk.BooleanVar(value=current_auth)
ttk.Checkbutton(edit_window, text="授权用户", variable=auth_var).grid(row=2, column=0, columnspan=2,
padx=20, pady=10)
button_frame = ttk.Frame(edit_window)
button_frame.grid(row=3, column=0, columnspan=2, pady=20)
ttk.Button(button_frame, text="保存",
command=lambda: self._save_user_edit(user_id, name_entry, auth_var, edit_window)).pack(
side=tk.LEFT, padx=10)
ttk.Button(button_frame, text="取消", command=edit_window.destroy).pack(side=tk.LEFT, padx=10)
except (ValueError, IndexError) as e:
logging.error(f"编辑用户错误: {e}")
messagebox.showerror("错误", "无法获取用户信息")
def _save_user_edit(self, user_id: int, name_entry: ttk.Entry, auth_var: tk.BooleanVar,
window: tk.Toplevel) -> None:
new_name = name_entry.get().strip()
new_auth = auth_var.get()
if not new_name:
messagebox.showerror("错误", "用户名不能为空")
return
try:
idx = self.known_face_ids.index(user_id)
old_name = self.known_face_names[idx]
self.known_face_names[idx] = new_name
self.authorized_users[user_id]["name"] = new_name
self.authorized_users[user_id]["authorized"] = new_auth
self._save_face_data()
self._save_authorized_users()
self._update_user_list()
window.destroy()
self._add_log(
f"用户 {old_name} (ID: {user_id}) 已更新为 {new_name},授权状态: {'已授权' if new_auth else '未授权'}")
except ValueError as e:
logging.error(f"保存用户编辑错误: {e}")
messagebox.showerror("错误", "用户不存在")
def _delete_user(self) -> None:
selected = self.user_tree.selection()
if not selected:
messagebox.showwarning("警告", "请选择要删除的用户")
return
try:
item = self.user_tree.item(selected[0])
user_id = int(item['values'][0])
user_name = item['values'][1]
if messagebox.askyesno("确认", f"确定要删除用户 {user_name} (ID: {user_id})?"):
idx = self.known_face_ids.index(user_id)
del self.known_face_encodings[idx]
del self.known_face_names[idx]
del self.known_face_ids[idx]
if user_id in self.authorized_users:
del self.authorized_users[user_id]
user_dir = os.path.join(self.dataset_path, f"user_{user_id}")
if os.path.exists(user_dir):
shutil.rmtree(user_dir)
self._save_face_data()
self._save_authorized_users()
self._update_user_list()
self._add_log(f"用户 {user_name} (ID: {user_id}) 已删除")
except (ValueError, IndexError) as e:
logging.error(f"删除用户错误: {e}")
messagebox.showerror("错误", "无法删除用户")
# -------------------- 邮件 --------------------
def _send_alert_email(self, image_path: str, alert_type: str) -> None:
try:
msg = MIMEMultipart()
msg['From'] = self.config['smtp_user']
msg['To'] = self.config['alert_recipient']
msg['Subject'] = f"人脸识别警报 - {alert_type} - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
body = f"安全警报: {alert_type}\n时间: {datetime.now()}\n\n系统已自动记录此事件。"
msg.attach(MIMEText(body, 'plain'))
if os.path.exists(image_path):
with open(image_path, 'rb') as f:
img_data = f.read()
image = MIMEImage(img_data, name=os.path.basename(image_path))
msg.attach(image)
server = smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port'], timeout=10)
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.send_message(msg)
server.quit()
logging.info("警报邮件已发送")
except Exception as e:
logging.error(f"发送警报邮件失败: {e}")
self._add_log(f"邮件发送失败: {str(e)[:50]}...")
# -------------------- 析构 --------------------
def __del__(self) -> None:
try:
self.camera_stopped = True
if self.cap:
self.cap.release()
except Exception:
pass
# ==========================================================
# 主程序入口
# ==========================================================
if __name__ == "__main__":
try:
sys.excepthook = lambda exc_type, exc_value, exc_traceback: logging.error(
"未捕获的异常", exc_info=(exc_type, exc_value, exc_traceback)
)
root = tk.Tk()
app = FaceRecognitionSystem(root)
def on_closing() -> None:
try:
app.camera_stopped = True
if app.cap:
app.cap.release()
except Exception:
pass
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
except ImportError as e:
print("face_recognition库未安装,正在使用简化版界面...")
print("建议安装: pip install face_recognition opencv-python pillow pyttsx3 mysql-connector-python")
root = tk.Tk()
root.title("人脸识别系统")
root.geometry("500x300")
root.configure(bg='#f0f0f0')
frame = ttk.Frame(root, padding=30)
frame.pack(fill=tk.BOTH, expand=True)
ttk.Label(frame, text="错误: face_recognition库未安装", font=("SimHei", 16, 'bold')).pack(pady=20)
ttk.Label(frame, text="请安装必要的依赖库:", font=("SimHei", 12)).pack(pady=10)
cmd_frame = ttk.Frame(frame)
cmd_frame.pack(pady=10)
cmd_label = tk.Label(cmd_frame,
text="pip install face_recognition opencv-python pillow pyttsx3 mysql-connector-python",
font=("Consolas", 10), bg='#f8f8f8', relief=tk.SUNKEN, padx=10, pady=5)
cmd_label.pack()
ttk.Button(frame, text="退出", command=root.quit).pack(pady=20)
root.mainloop()
except Exception as e:
logging.error(f"系统启动失败: {e}")
print(f"系统启动失败: {e}")
八、新大陆API使用(仅给出执行器页面的js代码)
<script setup>
import { ref, computed, onMounted, onUnmounted } from 'vue'
/* ====== 常量 ====== */
const ACCESS_TOKEN = "4A0B91D658F1D953030891D787B608454B696681AC529330F0277191DAE01BD50E7D6159E67A81D466E5F081CDD3A011F3C91420C964544CD97AE658C4DDA9526F32106EE8D5C1E543D29A92937B6A14830F38574F704EEA42BBB8F0A2FC58980AB37BAFBB6D43CD6C26E12C36B3B2B4A569BAAAF4B842EC4B4E4495974DB6096D84E606D8F71EC5AB40210526B991F7EBB1D660E942BA292A790E2AA7C8F2D1A1326DB95D6AAB25F4F4AD3BCBDCD9AA34FF28829EE321C67BAC555CD9D94C4BC1864DFCE69B436D30C3DECD280805B0E0A3EDB8BB7168DA0526E0561D27F9E8"
const PROJECT_ID = 1077052
const BASE_URL = 'https://api.nlecloud.com' // ← 去掉了多余空格
/* ========== 数据 ========== */
const devices = ref([])
const searchKeyword = ref('')
const filterStatus = ref('all')
const editingDevice = ref(null)
const editingSensor = ref(null)
const editingActuator = ref(null)
const form = ref({ device_name:'',device_tag:'',project_id:PROJECT_ID })
const subForm = ref({ api_tag:'',name:'',unit:'',operType:1,min:0,max:100,step:1 })
const modal = ref(null)
const modalSensor = ref(null)
const modalActuator = ref(null)
const operTypeOpts = [1,4]
let timer = null
/* ========== 网络 ========== */
const request = (url, method='GET', data={}) => new Promise((resolve,reject)=>{
uni.request({
url:BASE_URL+url,
method,
data,
header:{ 'AccessToken':ACCESS_TOKEN,'Content-Type':'application/json' },
success:res=>{
if(res.statusCode===200 && res.data.Status===0) resolve({code:200,data:res.data.ResultObj})
else { uni.showToast({title:res.data.Msg||'请求失败',icon:'none'}); reject(res.data) }
},
fail:err=>{ uni.showToast({title:'网络错误',icon:'none'}); reject(err) }
})
})
/* 主列表:slient=true 时后台静默刷新,不弹 loading */
const getDevices = async (slient = false) => {
if(!slient) uni.showLoading({title:'加载中'})
try{
const {data} = await request(`/Projects/${PROJECT_ID}/SensorsRealTimeData`)
if(!data || !data.length){ devices.value=[]; return }
const map={}
data.forEach(s=>{
const d=map[s.DeviceId]||={
id:s.DeviceId,
device_name:s.DeviceId==1380445?'NEWLab-ZigBee网关':'虚拟设备',
device_tag:'',
online_status:'在线',
sensors:[],
actuators:[]
}
const isAct=['m_fan','nl_fan','zbefwwzebvgn','ddwyegyjlmou'].includes(s.ApiTag)
const item={ api_tag:s.ApiTag,name:s.Name,value:s.Value,unit:s.Unit||'' }
if(isAct){
item.operType=1; item.min=0; item.max=100; item.step=1
d.actuators.push(item)
}else{
d.sensors.push(item)
}
map[s.DeviceId]=d
})
devices.value=Object.values(map)
}finally{
if(!slient) uni.hideLoading()
}
}
/* =============== 设备增删改 =============== */
const saveDevice = async () => {
if(!form.value.device_name||!form.value.device_tag) return uni.showToast({title:'请补全',icon:'none'})
uni.showLoading({title:'保存中'})
try{
if(editingDevice.value){
await request(`/devices/${editingDevice.value.id}`,'PUT',form.value)
}else{
await request('/devices','POST',form.value)
}
uni.showToast({title:'保存成功',icon:'success'})
closeModal(); getDevices()
}finally{ uni.hideLoading() }
}
const deleteDevice = (d) => {
uni.showModal({
title:'提示',content:`删除设备「${d.device_name}」?`,
success:async ({confirm})=>{
if(!confirm)return
uni.showLoading({title:'删除中'})
try{
await request(`/devices/${d.id}`,'DELETE')
uni.showToast({title:'已删除',icon:'success'})
getDevices()
}finally{ uni.hideLoading() }
}
})
}
const openAdd=()=>{ editingDevice.value=null; form.value={device_name:'',device_tag:'',project_id:PROJECT_ID}; modal.value.open() }
const openEdit=(d)=>{ editingDevice.value=d; Object.assign(form.value,d); modal.value.open() }
const closeModal=()=>{ modal.value.close(); editingDevice.value=null; form.value={device_name:'',device_tag:'',project_id:PROJECT_ID} }
/* =============== 传感器增删改 =============== */
const openAddSensor = (d) => {
editingSensor.value=null; subForm.value={api_tag:'',name:'',unit:'',operType:0}
modalSensor.value.open()
}
const saveSensor = async () => {
if(!subForm.value.api_tag||!subForm.value.name) return uni.showToast({title:'请补全',icon:'none'})
uni.showLoading({title:'保存中'})
try{
const device=devices.value.find(v=>v.sensors.some(s=>s.api_tag===subForm.value.api_tag))||devices.value[0]
const body={Name:subForm.value.name,ApiTag:subForm.value.api_tag,Unit:subForm.value.unit,TransType:0,DataType:0}
if(editingSensor.value){
await request(`/devices/${device.id}/sensors/${editingSensor.value.api_tag}`,'PUT',body)
}else{
await request(`/devices/${device.id}/sensors`,'POST',body)
}
uni.showToast({title:'保存成功',icon:'success'}); modalSensor.value.close(); getDevices()
}finally{ uni.hideLoading() }
}
const deleteSensor = (device,s) => {
uni.showModal({
title:'提示',content:`删除传感器「${s.name}」?`,
success:async ({confirm})=>{
if(!confirm)return
uni.showLoading({title:'删除中'})
try{
await request(`/devices/${device.id}/sensors/${s.api_tag}`,'DELETE')
uni.showToast({title:'已删除',icon:'success'}); getDevices()
}finally{ uni.hideLoading() }
}
})
}
/* =============== 执行器增删改 =============== */
const openAddActuator = (d) => {
editingActuator.value=null; subForm.value={api_tag:'',name:'',operType:1,min:0,max:100,step:1}
modalActuator.value.open()
}
const saveActuator = async () => {
if(!subForm.value.api_tag||!subForm.value.name) return uni.showToast({title:'请补全',icon:'none'})
uni.showLoading({title:'保存中'})
try{
const device=devices.value.find(v=>v.actuators.some(a=>a.api_tag===subForm.value.api_tag))||devices.value[0]
const body={
Name:subForm.value.name,
ApiTag:subForm.value.api_tag,
TransType:1,
DataType:2,
OperType:subForm.value.operType,
OperTypeAttrs:JSON.stringify({MinRange:subForm.value.min,MaxRange:subForm.value.max,Step:subForm.value.step})
}
if(editingActuator.value){
await request(`/devices/${device.id}/sensors/${editingActuator.value.api_tag}`,'PUT',body)
}else{
await request(`/devices/${device.id}/sensors`,'POST',body)
}
uni.showToast({title:'保存成功',icon:'success'}); modalActuator.value.close(); getDevices()
}finally{ uni.hideLoading() }
}
const deleteActuator = (device,a) => {
uni.showModal({
title:'提示',content:`删除执行器「${a.name}」?`,
success:async ({confirm})=>{
if(!confirm)return
uni.showLoading({title:'删除中'})
try{
await request(`/devices/${device.id}/sensors/${a.api_tag}`,'DELETE')
uni.showToast({title:'已删除',icon:'success'}); getDevices()
}finally{ uni.hideLoading() }
}
})
}
/* =============== 执行器控制 =============== */
const sendCmd = async (deviceId, apiTag, val) => {
const params = new URLSearchParams({ deviceId, apiTag }) // ← 拼出 URL 参数
try {
await request(`/Cmds?${params}`, 'POST', { value: String(val) }) // ← 真正发指令
uni.showToast({ title: '操作成功', icon: 'success' })
} catch (e) {
uni.showToast({ title: e?.Msg || '设备不在线或指令失败', icon: 'none' })
throw e // 让外层回滚 UI
}
}
const toggleActuator = (dev, act) => {
const next = act.value === '1' ? '0' : '1'
act.value = next
sendCmd(dev.id, act.api_tag, next).catch(() => act.value = act.value === '1' ? '0' : '1')
}
const setActuator = (dev, act, v) => {
act.value = String(v)
sendCmd(dev.id, act.api_tag, v)
}
/* =============== 搜索 & 筛选 =============== */
const filteredDevices = computed(() => {
let list = devices.value
if (filterStatus.value !== 'all') list = list.filter(d => d.online_status === filterStatus.value)
if (searchKeyword.value) {
const kw = searchKeyword.value.toLowerCase()
list = list.filter(d => d.device_name.toLowerCase().includes(kw) || d.device_tag.toLowerCase().includes(kw))
}
return list
})
const handleSearch = () => { }
const loadMore = () => { }
/* =============== 生命周期 =============== */
onMounted(() => {
getDevices() // 首次带 loading
timer = setInterval(() => getDevices(true), 5000) // 后台静默刷新
})
onUnmounted(() => clearInterval(timer))
</script>
九、总结
结语:朝乾夕惕,功不唐捐,玉汝于成
更多推荐

所有评论(0)