实现自己的AI视频监控系统-第三章-信息的推送与共享2
上一小节,我们简要介绍了基本的通信手段以实现数据推送与共享的方式,本小节将基于使用场景和使用环境,实现稳定、可靠的数据传输。基于HTTP的GET和POST方法是推送报警数据的常用手段之一,通常用于系统与服务器之间、或不同服务之间的轻量级数据交互。二者在语义、使用方式和适用场景上具有明显区别,合理选择方法对保证数据传输的可靠性和系统的性能具有重要意义。GET方法主要用于从服务器获取资源,其设计初衷是
文章目录
前言
上一小节,我们简要介绍了基本的通信手段以实现数据推送与共享的方式,本小节将基于使用场景和使用环境,实现稳定、可靠的数据传输。
一、Http方法:GET与POST
基于HTTP的GET和POST方法是推送报警数据的常用手段之一,通常用于系统与服务器之间、或不同服务之间的轻量级数据交互。二者在语义、使用方式和适用场景上具有明显区别,合理选择方法对保证数据传输的可靠性和系统的性能具有重要意义。
1. GET方法
GET方法主要用于从服务器获取资源,其设计初衷是幂等且安全的操作,即多次重复调用不应产生副作用。在报警数据推送中,GET方法可适用于以下场景:
-
查询报警状态:定期向服务器请求当前报警状态,如 GET /alarms?status=active。
-
获取历史报警记录:通过参数筛选历史数据,如 GET /alarms?start=2023-10-01&end=2123-10-01。
-
轻量级触发操作:某些简单报警可通过GET请求触发(需注意安全性)。
特点:
-
参数通过URL传递,可见性强但安全性低(需配合HTTPS加密)。
-
长度受浏览器和服务器限制(通常不超过2048字符)。
-
可缓存,适合重复获取相同数据的场景。
2. POST方法
POST方法主要用于向服务器提交数据,是非幂等的操作(多次调用可能产生不同结果)。在报警推送中,POST更适用于:
-
主动推送报警信息:将报警详情(如时间、级别、描述)通过请求体发送至服务器,如 POST /alarms。
-
批量上报数据:一次性传输多条报警记录(JSON/XML格式)。
-
执行敏感操作:如确认报警、下发控制指令等。
特点:
-
参数通过请求体传输,支持更大数据量(如JSON、二进制数据)。
-
安全性更高(结合HTTPS可加密内容)。
-
不可缓存,适合需要实时处理的场景。
3.案例代码
假设现在服务器端需要向客户端请求报警信息查询,同时客户端需要向服务器端推送相应的报警数据,我们就需要客户端与服务器端的双向通信。以下是使用Python Flask框架实现的服务器端和客户端参考代码:
- 服务器端代码 (使用Flask)
from flask import Flask, request, jsonify
from datetime import datetime
import threading
import time
app = Flask(__name__)
# 存储接收到的报警数据
alarms = []
@app.route('/alarms', methods=['GET'])
def get_alarms():
"""
处理GET请求:查询报警信息
可选参数:
- status: 过滤特定状态的报警(active/inactive)
- start_time: 查询起始时间
- end_time: 查询结束时间
- limit: 返回结果数量限制
"""
# 获取查询参数
status_filter = request.args.get('status')
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
limit = int(request.args.get('limit', 10))
# 过滤报警数据
filtered_alarms = alarms
if status_filter:
filtered_alarms = [a for a in filtered_alarms if a['status'] == status_filter]
if start_time:
start_dt = datetime.fromisoformat(start_time)
filtered_alarms = [a for a in filtered_alarms if datetime.fromisoformat(a['timestamp']) >= start_dt]
if end_time:
end_dt = datetime.fromisoformat(end_time)
filtered_alarms = [a for a in filtered_alarms if datetime.fromisoformat(a['timestamp']) <= end_dt]
# 限制返回数量
result = filtered_alarms[:limit]
return jsonify({
"count": len(result),
"alarms": result
})
@app.route('/alarms', methods=['POST'])
def receive_alarm():
"""
处理POST请求:接收报警数据推送
期望JSON格式:
{
"device_id": "设备标识",
"alarm_type": "报警类型",
"severity": "紧急程度",
"description": "报警描述",
"timestamp": "发生时间(ISO格式)"
}
"""
data = request.json
# 数据验证
required_fields = ['device_id', 'alarm_type', 'severity', 'description', 'timestamp']
if not all(field in data for field in required_fields):
return jsonify({"error": "Missing required fields"}), 400
# 添加状态字段和接收时间
alarm_data = {
**data,
"status": "active", # 默认状态为活跃
"received_at": datetime.now().isoformat(),
"acknowledged": False # 是否已被确认
}
# 存储报警数据
alarms.append(alarm_data)
# 这里可以添加额外的处理逻辑,如发送邮件、短信通知等
print(f"Received alarm: {alarm_data}")
return jsonify({"message": "Alarm received successfully", "id": len(alarms)-1}), 201
@app.route('/alarms/<int:alarm_id>', methods=['PUT'])
def update_alarm(alarm_id):
"""
更新报警信息(如确认报警、修改状态等)
"""
if alarm_id < 0 or alarm_id >= len(alarms):
return jsonify({"error": "Invalid alarm ID"}), 404
data = request.json
# 更新允许修改的字段
if 'status' in data:
alarms[alarm_id]['status'] = data['status']
if 'acknowledged' in data:
alarms[alarm_id]['acknowledged'] = data['acknowledged']
if 'notes' in data:
alarms[alarm_id]['notes'] = data['notes']
return jsonify({"message": "Alarm updated successfully"})
def run_server():
"""启动服务器"""
app.run(host='0.0.0.0', port=5000, debug=True)
if __name__ == '__main__':
run_server()
- 客户端代码
import requests
import json
from datetime import datetime, timedelta
import time
class AlarmClient:
def __init__(self, base_url):
self.base_url = base_url
def push_alarm(self, device_id, alarm_type, severity, description):
"""
向服务器推送报警信息
"""
alarm_data = {
"device_id": device_id,
"alarm_type": alarm_type,
"severity": severity,
"description": description,
"timestamp": datetime.now().isoformat()
}
try:
response = requests.post(
f"{self.base_url}/alarms",
json=alarm_data,
headers={'Content-Type': 'application/json'},
timeout=5
)
if response.status_code == 201:
print("Alarm pushed successfully")
return response.json()
else:
print(f"Failed to push alarm: {response.status_code} - {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error pushing alarm: {e}")
# 这里可以添加重试逻辑
return None
def query_alarms(self, status=None, start_time=None, end_time=None, limit=10):
"""
查询报警信息
"""
params = {}
if status:
params['status'] = status
if start_time:
params['start_time'] = start_time
if end_time:
params['end_time'] = end_time
if limit:
params['limit'] = limit
try:
response = requests.get(
f"{self.base_url}/alarms",
params=params,
timeout=5
)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to query alarms: {response.status_code} - {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error querying alarms: {e}")
return None
def acknowledge_alarm(self, alarm_id, notes=None):
"""
确认报警
"""
update_data = {
"acknowledged": True
}
if notes:
update_data["notes"] = notes
try:
response = requests.put(
f"{self.base_url}/alarms/{alarm_id}",
json=update_data,
headers={'Content-Type': 'application/json'},
timeout=5
)
if response.status_code == 200:
print("Alarm acknowledged successfully")
return response.json()
else:
print(f"Failed to acknowledge alarm: {response.status_code} - {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Error acknowledging alarm: {e}")
return None
# 使用示例
if __name__ == "__main__":
# 初始化客户端
client = AlarmClient("http://localhost:5000")
# 推送报警示例
client.push_alarm(
device_id="sensor_001",
alarm_type="overheat",
severity="high",
description="Temperature exceeded threshold: 85°C"
)
# 等待片刻让服务器处理
time.sleep(1)
# 查询活跃报警示例
end_time = datetime.now().isoformat()
start_time = (datetime.now() - timedelta(hours=1)).isoformat()
result = client.query_alarms(
status="active",
start_time=start_time,
end_time=end_time,
limit=5
)
if result:
print(f"Found {result['count']} active alarms:")
for alarm in result['alarms']:
print(f" - {alarm['device_id']}: {alarm['description']}")
# 确认第一个报警
if result['count'] > 0:
client.acknowledge_alarm(0, "Handled by operator")
- 代码说明
-
服务器端功能:
-
GET /alarms: 查询报警信息,支持状态、时间范围和数量过滤
-
POST /alarms: 接收报警数据推送
-
PUT /alarms/: 更新报警状态(如确认报警)
-
-
客户端功能:
-
push_alarm(): 推送报警数据到服务器
-
query_alarms(): 从服务器查询报警信息
-
acknowledge_alarm(): 确认特定报警
-
-
二、socket自定义通信
基于Socket的自定义通信提供了更灵活、实时的数据传输方式,特别适合需要低延迟和高并发的报警数据处理场景。
1.设计思路
我们将设计一个简单的Socket服务器和客户端,使用TCP协议实现可靠的数据传输。通信协议设计如下:
-
数据格式:JSON格式的消息体
-
消息结构:
-
类型(type): 标识消息类型(alarm_push, alarm_query, alarm_ack等)
-
数据(data): 实际的消息内容
-
时间戳(timestamp): 消息创建时间
-
-
连接管理:保持长连接,支持心跳检测
2.完整案例代码
- 服务器端代码 (socket_server.py)
import socket
import json
import threading
import time
from datetime import datetime
from typing import Dict, List
class AlarmSocketServer:
def __init__(self, host='localhost', port=9999):
self.host = host
self.port = port
self.socket = None
self.clients = {} # 存储客户端连接
self.alarms = [] # 存储报警数据
self.running = False
self.next_alarm_id = 1
def start(self):
"""启动服务器"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(5)
self.running = True
print(f"Alarm socket server started on {self.host}:{self.port}")
# 启动客户端接受线程
accept_thread = threading.Thread(target=self.accept_clients)
accept_thread.daemon = True
accept_thread.start()
# 启动心跳检测线程
heartbeat_thread = threading.Thread(target=self.heartbeat_check)
heartbeat_thread.daemon = True
heartbeat_thread.start()
try:
# 主线程保持运行
while self.running:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down server...")
self.stop()
def stop(self):
"""停止服务器"""
self.running = False
if self.socket:
self.socket.close()
for client_id, client_info in self.clients.items():
client_info['socket'].close()
print("Server stopped")
def accept_clients(self):
"""接受客户端连接"""
while self.running:
try:
client_socket, client_address = self.socket.accept()
client_id = f"{client_address[0]}:{client_address[1]}"
print(f"New client connected: {client_id}")
# 存储客户端信息
self.clients[client_id] = {
'socket': client_socket,
'address': client_address,
'last_heartbeat': time.time()
}
# 为每个客户端启动处理线程
client_thread = threading.Thread(
target=self.handle_client,
args=(client_id, client_socket)
)
client_thread.daemon = True
client_thread.start()
except Exception as e:
if self.running:
print(f"Error accepting client: {e}")
def handle_client(self, client_id, client_socket):
"""处理客户端消息"""
buffer = ""
while self.running:
try:
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
buffer += data
# 处理可能的多条消息
while '\n' in buffer:
message, buffer = buffer.split('\n', 1)
if message.strip():
self.process_message(client_id, message.strip())
except ConnectionResetError:
break
except Exception as e:
print(f"Error handling client {client_id}: {e}")
break
# 客户端断开连接
if client_id in self.clients:
del self.clients[client_id]
client_socket.close()
print(f"Client disconnected: {client_id}")
def process_message(self, client_id, message_str):
"""处理接收到的消息"""
try:
message = json.loads(message_str)
msg_type = message.get('type')
data = message.get('data', {})
# 更新心跳时间
if msg_type == 'heartbeat':
self.clients[client_id]['last_heartbeat'] = time.time()
return
print(f"Received {msg_type} from {client_id}")
# 处理不同类型的消息
if msg_type == 'alarm_push':
self.handle_alarm_push(data)
elif msg_type == 'alarm_query':
self.handle_alarm_query(client_id, data)
elif msg_type == 'alarm_ack':
self.handle_alarm_ack(client_id, data)
else:
print(f"Unknown message type: {msg_type}")
except json.JSONDecodeError:
print(f"Invalid JSON message from {client_id}: {message_str}")
except Exception as e:
print(f"Error processing message: {e}")
def handle_alarm_push(self, data):
"""处理报警推送"""
# 验证必要字段
required_fields = ['device_id', 'alarm_type', 'severity', 'description']
if not all(field in data for field in required_fields):
return
# 创建报警记录
alarm = {
'id': self.next_alarm_id,
'device_id': data['device_id'],
'alarm_type': data['alarm_type'],
'severity': data['severity'],
'description': data['description'],
'timestamp': datetime.now().isoformat(),
'status': 'active',
'acknowledged': False
}
self.next_alarm_id += 1
self.alarms.append(alarm)
print(f"New alarm received: {alarm['device_id']} - {alarm['description']}")
# 广播给所有客户端
self.broadcast_message({
'type': 'alarm_update',
'data': alarm,
'timestamp': datetime.now().isoformat()
})
def handle_alarm_query(self, client_id, data):
"""处理报警查询"""
# 过滤条件
status_filter = data.get('status')
device_filter = data.get('device_id')
severity_filter = data.get('severity')
limit = data.get('limit', 10)
# 过滤报警
filtered_alarms = self.alarms
if status_filter:
filtered_alarms = [a for a in filtered_alarms if a['status'] == status_filter]
if device_filter:
filtered_alarms = [a for a in filtered_alarms if a['device_id'] == device_filter]
if severity_filter:
filtered_alarms = [a for a in filtered_alarms if a['severity'] == severity_filter]
# 按时间倒序
filtered_alarms.sort(key=lambda x: x['timestamp'], reverse=True)
# 限制数量
result = filtered_alarms[:limit]
# 发送响应
self.send_message(client_id, {
'type': 'alarm_query_response',
'data': {
'count': len(result),
'total': len(filtered_alarms),
'alarms': result
},
'timestamp': datetime.now().isoformat()
})
def handle_alarm_ack(self, client_id, data):
"""处理报警确认"""
alarm_id = data.get('alarm_id')
notes = data.get('notes')
# 查找报警
alarm = next((a for a in self.alarms if a['id'] == alarm_id), None)
if not alarm:
self.send_message(client_id, {
'type': 'error',
'data': {'message': f'Alarm {alarm_id} not found'},
'timestamp': datetime.now().isoformat()
})
return
# 更新报警状态
alarm['acknowledged'] = True
alarm['status'] = 'inactive'
if notes:
alarm['notes'] = notes
print(f"Alarm {alarm_id} acknowledged by {client_id}")
# 广播更新
self.broadcast_message({
'type': 'alarm_update',
'data': alarm,
'timestamp': datetime.now().isoformat()
})
# 发送确认响应
self.send_message(client_id, {
'type': 'alarm_ack_response',
'data': {'success': True, 'alarm_id': alarm_id},
'timestamp': datetime.now().isoformat()
})
def send_message(self, client_id, message):
"""向指定客户端发送消息"""
if client_id not in self.clients:
return
try:
message_str = json.dumps(message) + '\n'
self.clients[client_id]['socket'].sendall(message_str.encode('utf-8'))
except Exception as e:
print(f"Error sending message to {client_id}: {e}")
def broadcast_message(self, message):
"""向所有客户端广播消息"""
message_str = json.dumps(message) + '\n'
disconnected_clients = []
for client_id, client_info in self.clients.items():
try:
client_info['socket'].sendall(message_str.encode('utf-8'))
except Exception as e:
print(f"Error broadcasting to {client_id}: {e}")
disconnected_clients.append(client_id)
# 移除断开连接的客户端
for client_id in disconnected_clients:
if client_id in self.clients:
del self.clients[client_id]
def heartbeat_check(self):
"""心跳检测,移除不活跃的客户端"""
while self.running:
time.sleep(30) # 每30秒检查一次
current_time = time.time()
disconnected_clients = []
for client_id, client_info in self.clients.items():
if current_time - client_info['last_heartbeat'] > 60: # 60秒无心跳
print(f"Client {client_id} timeout, disconnecting")
disconnected_clients.append(client_id)
# 移除不活跃的客户端
for client_id in disconnected_clients:
if client_id in self.clients:
self.clients[client_id]['socket'].close()
del self.clients[client_id]
if __name__ == '__main__':
server = AlarmSocketServer('localhost', 9999)
server.start()
- 客户端代码 (socket_client.py)
import socket
import json
import threading
import time
from datetime import datetime
class AlarmSocketClient:
def __init__(self, host='localhost', port=9999):
self.host = host
self.port = port
self.socket = None
self.connected = False
self.callbacks = {}
def connect(self):
"""连接到服务器"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
self.connected = True
print(f"Connected to server {self.host}:{self.port}")
# 启动接收线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 启动心跳线程
heartbeat_thread = threading.Thread(target=self.send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
return True
except Exception as e:
print(f"Failed to connect: {e}")
return False
def disconnect(self):
"""断开连接"""
self.connected = False
if self.socket:
self.socket.close()
print("Disconnected from server")
def send_message(self, message):
"""发送消息到服务器"""
if not self.connected:
print("Not connected to server")
return False
try:
message_str = json.dumps(message) + '\n'
self.socket.sendall(message_str.encode('utf-8'))
return True
except Exception as e:
print(f"Error sending message: {e}")
self.disconnect()
return False
def receive_messages(self):
"""接收服务器消息"""
buffer = ""
while self.connected:
try:
data = self.socket.recv(1024).decode('utf-8')
if not data:
break
buffer += data
# 处理可能的多条消息
while '\n' in buffer:
message, buffer = buffer.split('\n', 1)
if message.strip():
self.process_message(message.strip())
except ConnectionResetError:
break
except Exception as e:
print(f"Error receiving message: {e}")
break
self.disconnect()
def process_message(self, message_str):
"""处理接收到的消息"""
try:
message = json.loads(message_str)
msg_type = message.get('type')
data = message.get('data', {})
print(f"Received message: {msg_type}")
# 调用注册的回调函数
if msg_type in self.callbacks:
self.callbacks[msg_type](data)
else:
print(f"No handler for message type: {msg_type}")
except json.JSONDecodeError:
print(f"Invalid JSON message: {message_str}")
def register_callback(self, msg_type, callback):
"""注册消息回调函数"""
self.callbacks[msg_type] = callback
def send_heartbeat(self):
"""发送心跳包"""
while self.connected:
time.sleep(15) # 每15秒发送一次
self.send_message({
'type': 'heartbeat',
'timestamp': datetime.now().isoformat()
})
def push_alarm(self, device_id, alarm_type, severity, description):
"""推送报警信息"""
return self.send_message({
'type': 'alarm_push',
'data': {
'device_id': device_id,
'alarm_type': alarm_type,
'severity': severity,
'description': description
},
'timestamp': datetime.now().isoformat()
})
def query_alarms(self, status=None, device_id=None, severity=None, limit=10):
"""查询报警信息"""
query_data = {'limit': limit}
if status:
query_data['status'] = status
if device_id:
query_data['device_id'] = device_id
if severity:
query_data['severity'] = severity
return self.send_message({
'type': 'alarm_query',
'data': query_data,
'timestamp': datetime.now().isoformat()
})
def acknowledge_alarm(self, alarm_id, notes=None):
"""确认报警"""
ack_data = {'alarm_id': alarm_id}
if notes:
ack_data['notes'] = notes
return self.send_message({
'type': 'alarm_ack',
'data': ack_data,
'timestamp': datetime.now().isoformat()
})
# 使用示例
if __name__ == '__main__':
client = AlarmSocketClient('localhost', 9999)
# 注册消息处理回调
def handle_alarm_update(data):
print(f"Alarm update: {data['device_id']} - {data['description']}")
def handle_alarm_query_response(data):
print(f"Query response: {data['count']} alarms found")
for alarm in data['alarms']:
print(f" - {alarm['device_id']}: {alarm['description']}")
def handle_error(data):
print(f"Error: {data['message']}")
client.register_callback('alarm_update', handle_alarm_update)
client.register_callback('alarm_query_response', handle_alarm_query_response)
client.register_callback('error', handle_error)
# 连接到服务器
if client.connect():
try:
# 推送测试报警
client.push_alarm(
device_id="sensor_001",
alarm_type="temperature",
severity="high",
description="Temperature exceeded threshold: 85°C"
)
# 等待片刻
time.sleep(1)
# 查询报警
client.query_alarms(status="active", limit=5)
# 保持连接,等待服务器推送
print("Waiting for messages (press Ctrl+C to exit)...")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
finally:
client.disconnect()
三、modbus通信
不论是基于HTTP的POST、GET请求还是基于Socket的实时通信,在工业设备端使用时都存在很大的弊端:
-
协议开销大:HTTP头部信息冗余,不适合资源受限的嵌入式设备
-
实时性不足:HTTP请求-响应模式无法满足工业控制对实时性的要求
-
兼容性问题:许多传统工业设备只支持Modbus等专用工业协议
-
电力消耗:复杂的协议栈会增加设备功耗
-
可靠性问题:工业环境需要更强的抗干扰能力和错误检测机制
针对以上面临的弊端,工业设备有一套属于自己的专属通信方式modbus。
1.Modbus协议简介
Modbus是一种串行通信协议,由Modicon公司(现施耐德电气)于1979年开发,用于PLC(可编程逻辑控制器)之间的通信。它已成为工业领域最常用的通信协议之一,具有简单、开放、易于实施的特点。
- Modbus通信模式
-
Modbus RTU - 二进制格式,使用串行通信(RS-232/RS-485)
-
Modbus ASCII - ASCII字符格式,使用串行通信
-
Modbus TCP - 基于以太网TCP/IP协议
-
2.Modbus工业设备通信模拟示例
由于大部分学习者对这块不是特别了解,这里就给了一个模拟界面的示例,以下是网页端模拟的工业通信面板示例,复制代码创建一个.html结尾的文件,将代码复制进去,双击运行即可。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Modbus通信模拟系统</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
font-family: 'Segoe UI', Arial, sans-serif;
}
body {
background: linear-gradient(135deg, #1a2a6c, #2a4b8c);
color: #333;
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 1200px;
margin: 0 auto;
}
header {
text-align: center;
padding: 20px 0;
margin-bottom: 30px;
color: white;
text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3);
}
h1 {
font-size: 2.5rem;
margin-bottom: 10px;
}
.subtitle {
font-size: 1.2rem;
opacity: 0.9;
}
.dashboard {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
margin-bottom: 30px;
}
.panel {
background: white;
border-radius: 10px;
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2);
padding: 20px;
overflow: hidden;
}
.panel-title {
font-size: 1.5rem;
color: #2a4b8c;
margin-bottom: 20px;
padding-bottom: 10px;
border-bottom: 2px solid #eaeaea;
}
.control-group {
margin-bottom: 20px;
}
.control-label {
display: block;
margin-bottom: 8px;
font-weight: 600;
color: #555;
}
.slider-container {
display: flex;
align-items: center;
}
.slider {
flex: 1;
height: 10px;
-webkit-appearance: none;
appearance: none;
background: #dde6f0;
outline: none;
border-radius: 5px;
}
.slider::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 20px;
height: 20px;
border-radius: 50%;
background: #2a4b8c;
cursor: pointer;
}
.value-display {
width: 60px;
text-align: center;
font-weight: bold;
color: #2a4b8c;
margin-left: 15px;
}
.status-indicator {
display: inline-block;
width: 15px;
height: 15px;
border-radius: 50%;
margin-right: 10px;
}
.status-on {
background: #4CAF50;
box-shadow: 0 0 8px #4CAF50;
}
.status-off {
background: #f44336;
}
.button {
padding: 10px 20px;
border: none;
border-radius: 5px;
background: #2a4b8c;
color: white;
font-weight: bold;
cursor: pointer;
transition: background 0.3s;
}
.button:hover {
background: #3a6bc8;
}
.button:active {
transform: translateY(1px);
}
.data-table {
width: 100%;
border-collapse: collapse;
margin-top: 15px;
}
.data-table th, .data-table td {
padding: 12px 15px;
text-align: left;
border-bottom: 1px solid #eaeaea;
}
.data-table th {
background: #f5f8fc;
font-weight: 600;
color: #2a4b8c;
}
.data-table tr:hover {
background: #f9fbfd;
}
.communication-log {
background: #1e1e1e;
color: #00ff00;
font-family: monospace;
padding: 15px;
border-radius: 5px;
height: 200px;
overflow-y: auto;
margin-top: 20px;
}
.log-entry {
margin-bottom: 8px;
line-height: 1.4;
}
.log-time {
color: #888;
}
.log-master {
color: #4fa6ff;
}
.log-slave {
color: #ffa64f;
}
.connection-status {
display: flex;
align-items: center;
margin-bottom: 20px;
}
.connection-dot {
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 10px;
}
.connected {
background: #4CAF50;
box-shadow: 0 0 8px #4CAF50;
}
.disconnected {
background: #f44336;
}
@media (max-width: 768px) {
.dashboard {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<div class="container">
<header>
<h1>Modbus通信模拟系统</h1>
<div class="subtitle">工业设备通信与控制模拟界面</div>
</header>
<div class="dashboard">
<div class="panel">
<h2 class="panel-title">设备控制面板</h2>
<div class="connection-status">
<div class="connection-dot connected"></div>
<span>已连接到设备 (Modbus RTU - 从站地址: 1)</span>
</div>
<div class="control-group">
<label class="control-label">温度设定值 (°C)</label>
<div class="slider-container">
<input type="range" min="20" max="100" value="45" class="slider" id="tempSlider">
<span class="value-display" id="tempValue">45</span>
</div>
</div>
<div class="control-group">
<label class="control-label">压力设定值 (kPa)</label>
<div class="slider-container">
<input type="range" min="0" max="1000" value="350" class="slider" id="pressureSlider">
<span class="value-display" id="pressureValue">350</span>
</div>
</div>
<div class="control-group">
<label class="control-label">流量设定值 (L/min)</label>
<div class="slider-container">
<input type="range" min="0" max="500" value="120" class="slider" id="flowSlider">
<span class="value-display" id="flowValue">120</span>
</div>
</div>
<div class="control-group">
<label class="control-label">设备状态</label>
<div>
<button class="button" id="powerButton">启动设备</button>
<button class="button" id="resetButton">复位报警</button>
</div>
</div>
</div>
<div class="panel">
<h2 class="panel-title">设备状态监测</h2>
<table class="data-table">
<thead>
<tr>
<th>参数</th>
<th>值</th>
<th>状态</th>
</tr>
</thead>
<tbody>
<tr>
<td>实际温度</td>
<td id="actualTemp">45.2 °C</td>
<td><span class="status-indicator status-on"></span>正常</td>
</tr>
<tr>
<td>实际压力</td>
<td id="actualPressure">348.7 kPa</td>
<td><span class="status-indicator status-on"></span>正常</td>
</tr>
<tr>
<td>实际流量</td>
<td id="actualFlow">118.3 L/min</td>
<td><span class="status-indicator status-on"></span>正常</td>
</tr>
<tr>
<td>设备运行状态</td>
<td id="deviceStatus">运行中</td>
<td><span class="status-indicator status-on"></span>正常</td>
</tr>
<tr>
<td>报警状态</td>
<td id="alarmStatus">无报警</td>
<td><span class="status-indicator status-on"></span>正常</td>
</tr>
</tbody>
</table>
<h3 class="panel-title" style="margin-top: 25px;">Modbus通信日志</h3>
<div class="communication-log" id="commLog">
<div class="log-entry"><span class="log-time">10:23:45</span> <span class="log-master">主站</span>: 读取保持寄存器(4x) 40001-40005</div>
<div class="log-entry"><span class="log-time">10:23:45</span> <span class="log-slave">从站1</span>: 响应 5个寄存器值 [45.2, 348.7, 118.3, 1, 0]</div>
<div class="log-entry"><span class="log-time">10:23:47</span> <span class="log-master">主站</span>: 写入单个寄存器(4x) 40001 值: 45.0</div>
<div class="log-entry"><span class="log-time">10:23:47</span> <span class="log-slave">从站1</span>: 响应 写入成功</div>
</div>
</div>
</div>
</div>
<script>
// 更新滑块显示值
document.getElementById('tempSlider').addEventListener('input', function() {
document.getElementById('tempValue').textContent = this.value;
addLogEntry("主站", `写入单个寄存器(4x) 40001 值: ${this.value}.0`);
simulateResponse(`从站1`, "响应 写入成功");
});
document.getElementById('pressureSlider').addEventListener('input', function() {
document.getElementById('pressureValue').textContent = this.value;
addLogEntry("主站", `写入单个寄存器(4x) 40002 值: ${this.value}.0`);
simulateResponse(`从站1`, "响应 写入成功");
});
document.getElementById('flowSlider').addEventListener('input', function() {
document.getElementById('flowValue').textContent = this.value;
addLogEntry("主站", `写入单个寄存器(4x) 40003 值: ${this.value}.0`);
simulateResponse(`从站1`, "响应 写入成功");
});
// 设备控制按钮
let devicePoweredOn = true;
document.getElementById('powerButton').addEventListener('click', function() {
devicePoweredOn = !devicePoweredOn;
if (devicePoweredOn) {
this.textContent = "停止设备";
document.getElementById('deviceStatus').textContent = "运行中";
addLogEntry("主站", "写线圈(0x) 00000 值: 1 (启动设备)");
simulateResponse(`从站1`, "响应 写入成功");
} else {
this.textContent = "启动设备";
document.getElementById('deviceStatus').textContent = "已停止";
addLogEntry("主站", "写线圈(0x) 00000 值: 0 (停止设备)");
simulateResponse(`从站1`, "响应 写入成功");
}
});
document.getElementById('resetButton').addEventListener('click', function() {
document.getElementById('alarmStatus').textContent = "无报警";
addLogEntry("主站", "写线圈(0x) 00001 值: 1 (复位报警)");
simulateResponse(`从站1`, "响应 写入成功");
});
// 添加日志条目
function addLogEntry(source, message) {
const logElement = document.getElementById('commLog');
const time = new Date().toLocaleTimeString();
const entry = document.createElement('div');
entry.className = 'log-entry';
entry.innerHTML = `<span class="log-time">${time}</span> <span class="log-${source === '主站' ? 'master' : 'slave'}">${source}</span>: ${message}`;
logElement.appendChild(entry);
logElement.scrollTop = logElement.scrollHeight;
}
// 模拟从站响应
function simulateResponse(source, message) {
setTimeout(() => {
addLogEntry(source, message);
}, 300);
}
// 模拟定期数据读取
setInterval(() => {
if (devicePoweredOn) {
// 更新实际值,在设定值附近随机波动
const temp = (parseFloat(document.getElementById('tempSlider').value) + (Math.random() * 2 - 1)).toFixed(1);
const pressure = (parseFloat(document.getElementById('pressureSlider').value) + (Math.random() * 5 - 2.5)).toFixed(1);
const flow = (parseFloat(document.getElementById('flowSlider').value) + (Math.random() * 4 - 2)).toFixed(1);
document.getElementById('actualTemp').textContent = temp + " °C";
document.getElementById('actualPressure').textContent = pressure + " kPa";
document.getElementById('actualFlow').textContent = flow + " L/min";
addLogEntry("主站", "读取保持寄存器(4x) 40001-40005");
simulateResponse("从站1", `响应 5个寄存器值 [${temp}, ${pressure}, ${flow}, 1, 0]`);
}
}, 5000);
</script>
</body>
</html>
- 示例界面
3.模拟Modbus通信案例
由于实际Modbus设备不常见,且设备需要接线并配合专属模块,所以我们将模拟一个完整的Modbus TCP通信场景,包括服务器(从设备)和客户端(主设备)。
模拟场景描述
我们将模拟一个工业环境中的温度监控系统:
-
1个Modbus服务器模拟温度控制器
-
3个模拟温度传感器(地址0x0001-0x0003)
-
2个模拟继电器输出(地址0x0004-0x0005)
-
1个模拟报警状态寄存器(地址0x0006)
开始之前请安装指定版本的pymodbus
pip install pymodbus==3.6.5
服务器端代码 (modbus_server.py)
from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.device import ModbusDeviceIdentification
import random
import time
def run_server():
# 初始化数据存储
# 线圈: 2个继电器 (地址 0x0004-0x0005)
coils = ModbusSequentialDataBlock(0, [False] * 6) # 创建6个线圈,从地址0开始
# 保持寄存器: 3个温度传感器 (地址 0x0001-0x0003) 和1个报警状态 (地址 0x0006)
hr_data = [0] * 7 # 创建7个寄存器,从地址0开始
# 初始化温度值 (20-30度)
hr_data[0] = 220 # 地址0x0001: 22.0°C
hr_data[1] = 245 # 地址0x0002: 24.5°C
hr_data[2] = 190 # 地址0x0003: 19.0°C
holding_registers = ModbusSequentialDataBlock(0, hr_data)
# 创建从设备上下文
slave_context = ModbusSlaveContext(
co=coils,
hr=holding_registers
)
# 创建服务器上下文
context = ModbusServerContext(slaves=slave_context, single=True)
# 设置设备标识信息
identity = ModbusDeviceIdentification()
identity.VendorName = '模拟温度控制器'
identity.ProductCode = 'TC-100'
identity.VendorUrl = 'http://example.com'
identity.ProductName = '温度监控系统'
identity.ModelName = 'Modbus TCP模拟器'
identity.MajorMinorRevision = '1.0.0'
# 启动TCP服务器
print("启动Modbus TCP服务器在 localhost:5020")
StartTcpServer(context=context, identity=identity, address=("localhost", 5020))
if __name__ == "__main__":
run_server()
客户端代码 (modbus_client.py)
from pymodbus.client import ModbusTcpClient
import time
def read_temperature_sensors():
"""读取温度传感器数据"""
client = ModbusTcpClient('localhost', port=5020)
try:
client.connect()
# 读取保持寄存器 (温度传感器和报警状态)
# 地址 0x0001-0x0003: 温度传感器 (对应寄存器地址 0-2)
# 地址 0x0006: 报警状态 (对应寄存器地址 5)
result = client.read_holding_registers(address=0, count=6, slave=1)
if not result.isError():
print("=== 温度传感器读数 ===")
temperatures = [value / 10.0 for value in result.registers[:3]]
alarm_status = result.registers[5]
for i, temp in enumerate(temperatures, 1):
print(f"传感器 {i}: {temp}°C")
print(f"报警状态: {'正常' if alarm_status == 0 else '警告'}")
else:
print("读取寄存器错误:", result)
except Exception as e:
print("连接错误:", e)
finally:
client.close()
def read_relay_status():
"""读取继电器状态"""
client = ModbusTcpClient('localhost', port=5020)
try:
client.connect()
# 读取线圈 (继电器状态)
# 地址 0x0004-0x0005: 继电器 (对应线圈地址 3-4)
result = client.read_coils(address=3, count=2, slave=1)
if not result.isError():
print("\n=== 继电器状态 ===")
for i, status in enumerate(result.bits, 1):
print(f"继电器 {i}: {'开启' if status else '关闭'}")
else:
print("读取线圈错误:", result)
except Exception as e:
print("连接错误:", e)
finally:
client.close()
def toggle_relay(relay_index, state):
"""切换继电器状态"""
client = ModbusTcpClient('localhost', port=5020)
try:
client.connect()
# 写入线圈 (控制继电器)
# 继电器1: 地址 3, 继电器2: 地址 4
address = 3 + (relay_index - 1)
result = client.write_coil(address=address, value=state, slave=1)
if not result.isError():
print(f"\n继电器 {relay_index} 已{'开启' if state else '关闭'}")
else:
print("写入线圈错误:", result)
except Exception as e:
print("连接错误:", e)
finally:
client.close()
if __name__ == "__main__":
# 读取当前状态
read_temperature_sensors()
read_relay_status()
# 切换第一个继电器状态
toggle_relay(1, True)
# 再次读取状态以确认更改
time.sleep(0.5)
read_relay_status()
测试方法及说明
- 使用方法
-
首先启动Modbus服务器:
python modbus_server.py
-
然后运行客户端:
python modbus_client.py
-
客户端将连接到服务器并执行一系列Modbus操作:
-
读取温度传感器值
-
读取和控制继电器状态
-
-
- 运行示例:
=== 温度传感器读数 ===
传感器 1: 24.5°C
传感器 2: 19.0°C
传感器 3: 0.0°C
报警状态: 正常
=== 继电器状态 ===
继电器 1: 关闭
继电器 2: 关闭
继电器 3: 关闭
继电器 4: 关闭
继电器 5: 关闭
继电器 6: 关闭
继电器 7: 关闭
继电器 8: 关闭
继电器 1 已开启
=== 继电器状态 ===
继电器 1: 开启
继电器 2: 关闭
继电器 3: 关闭
继电器 4: 关闭
继电器 5: 关闭
继电器 6: 关闭
继电器 7: 关闭
继电器 8: 关闭
-
Modbus通信优势
-
简单高效:协议简单,开销小,适合资源受限的设备
-
实时性强:支持快速响应,适合工业控制场景
-
广泛兼容:几乎所有工业设备都支持Modbus协议
-
可靠性高:内置错误检测机制,CRC校验保证数据完整性
-
灵活部署:支持多种物理层(RS-485, TCP/IP等)
-
注意事项
- 对于专门的modbus设备,python等语言有pymodbus的高级接口,本案例为模拟案例,仅供参考和学习,对于深入的串口通信(如modbus RTU),可以采用pyserial等库实现实现基本的通信传输。
总结
本小节以代码的形式展示常见的信息推送和共享机制,涉及服务器端、系统内部、工业设备间的通信机制和基本实现,是AI视频监控系统接入生态圈必不可少的一部分。
下期预告
- RTSP推流与展示
更多推荐
所有评论(0)