前言

上一小节,我们简要介绍了基本的通信手段以实现数据推送与共享的方式,本小节将基于使用场景和使用环境,实现稳定、可靠的数据传输。


一、Http方法:GET与POST

基于HTTP的GET和POST方法是推送报警数据的常用手段之一,通常用于系统与服务器之间、或不同服务之间的轻量级数据交互。二者在语义、使用方式和适用场景上具有明显区别,合理选择方法对保证数据传输的可靠性和系统的性能具有重要意义。

1. GET方法

GET方法主要用于从服务器获取资源,其设计初衷是幂等且安全的操作,即多次重复调用不应产生副作用。在报警数据推送中,GET方法可适用于以下场景:

  • 查询报警状态:定期向服务器请求当前报警状态,如 GET /alarms?status=active。

  • 获取历史报警记录:通过参数筛选历史数据,如 GET /alarms?start=2023-10-01&end=2123-10-01。

  • 轻量级触发操作:某些简单报警可通过GET请求触发(需注意安全性)。

特点:

  • 参数通过URL传递,可见性强但安全性低(需配合HTTPS加密)。

  • 长度受浏览器和服务器限制(通常不超过2048字符)。

  • 可缓存,适合重复获取相同数据的场景。

2. POST方法

POST方法主要用于向服务器提交数据,是非幂等的操作(多次调用可能产生不同结果)。在报警推送中,POST更适用于:

  • 主动推送报警信息:将报警详情(如时间、级别、描述)通过请求体发送至服务器,如 POST /alarms。

  • 批量上报数据:一次性传输多条报警记录(JSON/XML格式)。

  • 执行敏感操作:如确认报警、下发控制指令等。

特点:

  • 参数通过请求体传输,支持更大数据量(如JSON、二进制数据)。

  • 安全性更高(结合HTTPS可加密内容)。

  • 不可缓存,适合需要实时处理的场景。

3.案例代码

假设现在服务器端需要向客户端请求报警信息查询,同时客户端需要向服务器端推送相应的报警数据,我们就需要客户端与服务器端的双向通信。以下是使用Python Flask框架实现的服务器端和客户端参考代码:

  • 服务器端代码 (使用Flask)
from flask import Flask, request, jsonify
from datetime import datetime
import threading
import time

app = Flask(__name__)

# 存储接收到的报警数据
alarms = []

@app.route('/alarms', methods=['GET'])
def get_alarms():
    """
    处理GET请求:查询报警信息
    可选参数:
    - status: 过滤特定状态的报警(active/inactive)
    - start_time: 查询起始时间
    - end_time: 查询结束时间
    - limit: 返回结果数量限制
    """
    # 获取查询参数
    status_filter = request.args.get('status')
    start_time = request.args.get('start_time')
    end_time = request.args.get('end_time')
    limit = int(request.args.get('limit', 10))
    
    # 过滤报警数据
    filtered_alarms = alarms
    
    if status_filter:
        filtered_alarms = [a for a in filtered_alarms if a['status'] == status_filter]
    
    if start_time:
        start_dt = datetime.fromisoformat(start_time)
        filtered_alarms = [a for a in filtered_alarms if datetime.fromisoformat(a['timestamp']) >= start_dt]
    
    if end_time:
        end_dt = datetime.fromisoformat(end_time)
        filtered_alarms = [a for a in filtered_alarms if datetime.fromisoformat(a['timestamp']) <= end_dt]
    
    # 限制返回数量
    result = filtered_alarms[:limit]
    
    return jsonify({
        "count": len(result),
        "alarms": result
    })

@app.route('/alarms', methods=['POST'])
def receive_alarm():
    """
    处理POST请求:接收报警数据推送
    期望JSON格式:
    {
        "device_id": "设备标识",
        "alarm_type": "报警类型",
        "severity": "紧急程度",
        "description": "报警描述",
        "timestamp": "发生时间(ISO格式)"
    }
    """
    data = request.json
    
    # 数据验证
    required_fields = ['device_id', 'alarm_type', 'severity', 'description', 'timestamp']
    if not all(field in data for field in required_fields):
        return jsonify({"error": "Missing required fields"}), 400
    
    # 添加状态字段和接收时间
    alarm_data = {
        **data,
        "status": "active",  # 默认状态为活跃
        "received_at": datetime.now().isoformat(),
        "acknowledged": False  # 是否已被确认
    }
    
    # 存储报警数据
    alarms.append(alarm_data)
    
    # 这里可以添加额外的处理逻辑,如发送邮件、短信通知等
    
    print(f"Received alarm: {alarm_data}")
    
    return jsonify({"message": "Alarm received successfully", "id": len(alarms)-1}), 201

@app.route('/alarms/<int:alarm_id>', methods=['PUT'])
def update_alarm(alarm_id):
    """
    更新报警信息(如确认报警、修改状态等)
    """
    if alarm_id < 0 or alarm_id >= len(alarms):
        return jsonify({"error": "Invalid alarm ID"}), 404
    
    data = request.json
    
    # 更新允许修改的字段
    if 'status' in data:
        alarms[alarm_id]['status'] = data['status']
    
    if 'acknowledged' in data:
        alarms[alarm_id]['acknowledged'] = data['acknowledged']
    
    if 'notes' in data:
        alarms[alarm_id]['notes'] = data['notes']
    
    return jsonify({"message": "Alarm updated successfully"})

def run_server():
    """启动服务器"""
    app.run(host='0.0.0.0', port=5000, debug=True)

if __name__ == '__main__':
    run_server()
  • 客户端代码
import requests
import json
from datetime import datetime, timedelta
import time

class AlarmClient:
    def __init__(self, base_url):
        self.base_url = base_url
    
    def push_alarm(self, device_id, alarm_type, severity, description):
        """
        向服务器推送报警信息
        """
        alarm_data = {
            "device_id": device_id,
            "alarm_type": alarm_type,
            "severity": severity,
            "description": description,
            "timestamp": datetime.now().isoformat()
        }
        
        try:
            response = requests.post(
                f"{self.base_url}/alarms",
                json=alarm_data,
                headers={'Content-Type': 'application/json'},
                timeout=5
            )
            
            if response.status_code == 201:
                print("Alarm pushed successfully")
                return response.json()
            else:
                print(f"Failed to push alarm: {response.status_code} - {response.text}")
                return None
                
        except requests.exceptions.RequestException as e:
            print(f"Error pushing alarm: {e}")
            # 这里可以添加重试逻辑
            return None
    
    def query_alarms(self, status=None, start_time=None, end_time=None, limit=10):
        """
        查询报警信息
        """
        params = {}
        if status:
            params['status'] = status
        if start_time:
            params['start_time'] = start_time
        if end_time:
            params['end_time'] = end_time
        if limit:
            params['limit'] = limit
        
        try:
            response = requests.get(
                f"{self.base_url}/alarms",
                params=params,
                timeout=5
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                print(f"Failed to query alarms: {response.status_code} - {response.text}")
                return None
                
        except requests.exceptions.RequestException as e:
            print(f"Error querying alarms: {e}")
            return None
    
    def acknowledge_alarm(self, alarm_id, notes=None):
        """
        确认报警
        """
        update_data = {
            "acknowledged": True
        }
        
        if notes:
            update_data["notes"] = notes
        
        try:
            response = requests.put(
                f"{self.base_url}/alarms/{alarm_id}",
                json=update_data,
                headers={'Content-Type': 'application/json'},
                timeout=5
            )
            
            if response.status_code == 200:
                print("Alarm acknowledged successfully")
                return response.json()
            else:
                print(f"Failed to acknowledge alarm: {response.status_code} - {response.text}")
                return None
                
        except requests.exceptions.RequestException as e:
            print(f"Error acknowledging alarm: {e}")
            return None

# 使用示例
if __name__ == "__main__":
    # 初始化客户端
    client = AlarmClient("http://localhost:5000")
    
    # 推送报警示例
    client.push_alarm(
        device_id="sensor_001",
        alarm_type="overheat",
        severity="high",
        description="Temperature exceeded threshold: 85°C"
    )
    
    # 等待片刻让服务器处理
    time.sleep(1)
    
    # 查询活跃报警示例
    end_time = datetime.now().isoformat()
    start_time = (datetime.now() - timedelta(hours=1)).isoformat()
    
    result = client.query_alarms(
        status="active",
        start_time=start_time,
        end_time=end_time,
        limit=5
    )
    
    if result:
        print(f"Found {result['count']} active alarms:")
        for alarm in result['alarms']:
            print(f" - {alarm['device_id']}: {alarm['description']}")
            
        # 确认第一个报警
        if result['count'] > 0:
            client.acknowledge_alarm(0, "Handled by operator")
  • 代码说明
    • 服务器端功能:

      1. GET /alarms: 查询报警信息,支持状态、时间范围和数量过滤

      2. POST /alarms: 接收报警数据推送

      3. PUT /alarms/: 更新报警状态(如确认报警)

    • 客户端功能:

      1. push_alarm(): 推送报警数据到服务器

      2. query_alarms(): 从服务器查询报警信息

      3. acknowledge_alarm(): 确认特定报警

二、socket自定义通信

基于Socket的自定义通信提供了更灵活、实时的数据传输方式,特别适合需要低延迟高并发的报警数据处理场景。

1.设计思路

我们将设计一个简单的Socket服务器和客户端,使用TCP协议实现可靠的数据传输。通信协议设计如下:

  • 数据格式:JSON格式的消息体

  • 消息结构:

    • 类型(type): 标识消息类型(alarm_push, alarm_query, alarm_ack等)

    • 数据(data): 实际的消息内容

    • 时间戳(timestamp): 消息创建时间

  • 连接管理:保持长连接,支持心跳检测

2.完整案例代码

  • 服务器端代码 (socket_server.py)
import socket
import json
import threading
import time
from datetime import datetime
from typing import Dict, List

class AlarmSocketServer:
    def __init__(self, host='localhost', port=9999):
        self.host = host
        self.port = port
        self.socket = None
        self.clients = {}  # 存储客户端连接
        self.alarms = []   # 存储报警数据
        self.running = False
        self.next_alarm_id = 1
        
    def start(self):
        """启动服务器"""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind((self.host, self.port))
        self.socket.listen(5)
        self.running = True
        
        print(f"Alarm socket server started on {self.host}:{self.port}")
        
        # 启动客户端接受线程
        accept_thread = threading.Thread(target=self.accept_clients)
        accept_thread.daemon = True
        accept_thread.start()
        
        # 启动心跳检测线程
        heartbeat_thread = threading.Thread(target=self.heartbeat_check)
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        
        try:
            # 主线程保持运行
            while self.running:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Shutting down server...")
            self.stop()
    
    def stop(self):
        """停止服务器"""
        self.running = False
        if self.socket:
            self.socket.close()
        for client_id, client_info in self.clients.items():
            client_info['socket'].close()
        print("Server stopped")
    
    def accept_clients(self):
        """接受客户端连接"""
        while self.running:
            try:
                client_socket, client_address = self.socket.accept()
                client_id = f"{client_address[0]}:{client_address[1]}"
                
                print(f"New client connected: {client_id}")
                
                # 存储客户端信息
                self.clients[client_id] = {
                    'socket': client_socket,
                    'address': client_address,
                    'last_heartbeat': time.time()
                }
                
                # 为每个客户端启动处理线程
                client_thread = threading.Thread(
                    target=self.handle_client, 
                    args=(client_id, client_socket)
                )
                client_thread.daemon = True
                client_thread.start()
                
            except Exception as e:
                if self.running:
                    print(f"Error accepting client: {e}")
    
    def handle_client(self, client_id, client_socket):
        """处理客户端消息"""
        buffer = ""
        while self.running:
            try:
                data = client_socket.recv(1024).decode('utf-8')
                if not data:
                    break
                
                buffer += data
                
                # 处理可能的多条消息
                while '\n' in buffer:
                    message, buffer = buffer.split('\n', 1)
                    if message.strip():
                        self.process_message(client_id, message.strip())
                        
            except ConnectionResetError:
                break
            except Exception as e:
                print(f"Error handling client {client_id}: {e}")
                break
        
        # 客户端断开连接
        if client_id in self.clients:
            del self.clients[client_id]
        client_socket.close()
        print(f"Client disconnected: {client_id}")
    
    def process_message(self, client_id, message_str):
        """处理接收到的消息"""
        try:
            message = json.loads(message_str)
            msg_type = message.get('type')
            data = message.get('data', {})
            
            # 更新心跳时间
            if msg_type == 'heartbeat':
                self.clients[client_id]['last_heartbeat'] = time.time()
                return
            
            print(f"Received {msg_type} from {client_id}")
            
            # 处理不同类型的消息
            if msg_type == 'alarm_push':
                self.handle_alarm_push(data)
            elif msg_type == 'alarm_query':
                self.handle_alarm_query(client_id, data)
            elif msg_type == 'alarm_ack':
                self.handle_alarm_ack(client_id, data)
            else:
                print(f"Unknown message type: {msg_type}")
                
        except json.JSONDecodeError:
            print(f"Invalid JSON message from {client_id}: {message_str}")
        except Exception as e:
            print(f"Error processing message: {e}")
    
    def handle_alarm_push(self, data):
        """处理报警推送"""
        # 验证必要字段
        required_fields = ['device_id', 'alarm_type', 'severity', 'description']
        if not all(field in data for field in required_fields):
            return
        
        # 创建报警记录
        alarm = {
            'id': self.next_alarm_id,
            'device_id': data['device_id'],
            'alarm_type': data['alarm_type'],
            'severity': data['severity'],
            'description': data['description'],
            'timestamp': datetime.now().isoformat(),
            'status': 'active',
            'acknowledged': False
        }
        
        self.next_alarm_id += 1
        self.alarms.append(alarm)
        
        print(f"New alarm received: {alarm['device_id']} - {alarm['description']}")
        
        # 广播给所有客户端
        self.broadcast_message({
            'type': 'alarm_update',
            'data': alarm,
            'timestamp': datetime.now().isoformat()
        })
    
    def handle_alarm_query(self, client_id, data):
        """处理报警查询"""
        # 过滤条件
        status_filter = data.get('status')
        device_filter = data.get('device_id')
        severity_filter = data.get('severity')
        limit = data.get('limit', 10)
        
        # 过滤报警
        filtered_alarms = self.alarms
        
        if status_filter:
            filtered_alarms = [a for a in filtered_alarms if a['status'] == status_filter]
        
        if device_filter:
            filtered_alarms = [a for a in filtered_alarms if a['device_id'] == device_filter]
        
        if severity_filter:
            filtered_alarms = [a for a in filtered_alarms if a['severity'] == severity_filter]
        
        # 按时间倒序
        filtered_alarms.sort(key=lambda x: x['timestamp'], reverse=True)
        
        # 限制数量
        result = filtered_alarms[:limit]
        
        # 发送响应
        self.send_message(client_id, {
            'type': 'alarm_query_response',
            'data': {
                'count': len(result),
                'total': len(filtered_alarms),
                'alarms': result
            },
            'timestamp': datetime.now().isoformat()
        })
    
    def handle_alarm_ack(self, client_id, data):
        """处理报警确认"""
        alarm_id = data.get('alarm_id')
        notes = data.get('notes')
        
        # 查找报警
        alarm = next((a for a in self.alarms if a['id'] == alarm_id), None)
        if not alarm:
            self.send_message(client_id, {
                'type': 'error',
                'data': {'message': f'Alarm {alarm_id} not found'},
                'timestamp': datetime.now().isoformat()
            })
            return
        
        # 更新报警状态
        alarm['acknowledged'] = True
        alarm['status'] = 'inactive'
        if notes:
            alarm['notes'] = notes
        
        print(f"Alarm {alarm_id} acknowledged by {client_id}")
        
        # 广播更新
        self.broadcast_message({
            'type': 'alarm_update',
            'data': alarm,
            'timestamp': datetime.now().isoformat()
        })
        
        # 发送确认响应
        self.send_message(client_id, {
            'type': 'alarm_ack_response',
            'data': {'success': True, 'alarm_id': alarm_id},
            'timestamp': datetime.now().isoformat()
        })
    
    def send_message(self, client_id, message):
        """向指定客户端发送消息"""
        if client_id not in self.clients:
            return
        
        try:
            message_str = json.dumps(message) + '\n'
            self.clients[client_id]['socket'].sendall(message_str.encode('utf-8'))
        except Exception as e:
            print(f"Error sending message to {client_id}: {e}")
    
    def broadcast_message(self, message):
        """向所有客户端广播消息"""
        message_str = json.dumps(message) + '\n'
        disconnected_clients = []
        
        for client_id, client_info in self.clients.items():
            try:
                client_info['socket'].sendall(message_str.encode('utf-8'))
            except Exception as e:
                print(f"Error broadcasting to {client_id}: {e}")
                disconnected_clients.append(client_id)
        
        # 移除断开连接的客户端
        for client_id in disconnected_clients:
            if client_id in self.clients:
                del self.clients[client_id]
    
    def heartbeat_check(self):
        """心跳检测,移除不活跃的客户端"""
        while self.running:
            time.sleep(30)  # 每30秒检查一次
            current_time = time.time()
            disconnected_clients = []
            
            for client_id, client_info in self.clients.items():
                if current_time - client_info['last_heartbeat'] > 60:  # 60秒无心跳
                    print(f"Client {client_id} timeout, disconnecting")
                    disconnected_clients.append(client_id)
            
            # 移除不活跃的客户端
            for client_id in disconnected_clients:
                if client_id in self.clients:
                    self.clients[client_id]['socket'].close()
                    del self.clients[client_id]

if __name__ == '__main__':
    server = AlarmSocketServer('localhost', 9999)
    server.start()
  • 客户端代码 (socket_client.py)
import socket
import json
import threading
import time
from datetime import datetime

class AlarmSocketClient:
    def __init__(self, host='localhost', port=9999):
        self.host = host
        self.port = port
        self.socket = None
        self.connected = False
        self.callbacks = {}
        
    def connect(self):
        """连接到服务器"""
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((self.host, self.port))
            self.connected = True
            
            print(f"Connected to server {self.host}:{self.port}")
            
            # 启动接收线程
            receive_thread = threading.Thread(target=self.receive_messages)
            receive_thread.daemon = True
            receive_thread.start()
            
            # 启动心跳线程
            heartbeat_thread = threading.Thread(target=self.send_heartbeat)
            heartbeat_thread.daemon = True
            heartbeat_thread.start()
            
            return True
        except Exception as e:
            print(f"Failed to connect: {e}")
            return False
    
    def disconnect(self):
        """断开连接"""
        self.connected = False
        if self.socket:
            self.socket.close()
        print("Disconnected from server")
    
    def send_message(self, message):
        """发送消息到服务器"""
        if not self.connected:
            print("Not connected to server")
            return False
        
        try:
            message_str = json.dumps(message) + '\n'
            self.socket.sendall(message_str.encode('utf-8'))
            return True
        except Exception as e:
            print(f"Error sending message: {e}")
            self.disconnect()
            return False
    
    def receive_messages(self):
        """接收服务器消息"""
        buffer = ""
        while self.connected:
            try:
                data = self.socket.recv(1024).decode('utf-8')
                if not data:
                    break
                
                buffer += data
                
                # 处理可能的多条消息
                while '\n' in buffer:
                    message, buffer = buffer.split('\n', 1)
                    if message.strip():
                        self.process_message(message.strip())
                        
            except ConnectionResetError:
                break
            except Exception as e:
                print(f"Error receiving message: {e}")
                break
        
        self.disconnect()
    
    def process_message(self, message_str):
        """处理接收到的消息"""
        try:
            message = json.loads(message_str)
            msg_type = message.get('type')
            data = message.get('data', {})
            
            print(f"Received message: {msg_type}")
            
            # 调用注册的回调函数
            if msg_type in self.callbacks:
                self.callbacks[msg_type](data)
            else:
                print(f"No handler for message type: {msg_type}")
                
        except json.JSONDecodeError:
            print(f"Invalid JSON message: {message_str}")
    
    def register_callback(self, msg_type, callback):
        """注册消息回调函数"""
        self.callbacks[msg_type] = callback
    
    def send_heartbeat(self):
        """发送心跳包"""
        while self.connected:
            time.sleep(15)  # 每15秒发送一次
            self.send_message({
                'type': 'heartbeat',
                'timestamp': datetime.now().isoformat()
            })
    
    def push_alarm(self, device_id, alarm_type, severity, description):
        """推送报警信息"""
        return self.send_message({
            'type': 'alarm_push',
            'data': {
                'device_id': device_id,
                'alarm_type': alarm_type,
                'severity': severity,
                'description': description
            },
            'timestamp': datetime.now().isoformat()
        })
    
    def query_alarms(self, status=None, device_id=None, severity=None, limit=10):
        """查询报警信息"""
        query_data = {'limit': limit}
        if status:
            query_data['status'] = status
        if device_id:
            query_data['device_id'] = device_id
        if severity:
            query_data['severity'] = severity
            
        return self.send_message({
            'type': 'alarm_query',
            'data': query_data,
            'timestamp': datetime.now().isoformat()
        })
    
    def acknowledge_alarm(self, alarm_id, notes=None):
        """确认报警"""
        ack_data = {'alarm_id': alarm_id}
        if notes:
            ack_data['notes'] = notes
            
        return self.send_message({
            'type': 'alarm_ack',
            'data': ack_data,
            'timestamp': datetime.now().isoformat()
        })

# 使用示例
if __name__ == '__main__':
    client = AlarmSocketClient('localhost', 9999)
    
    # 注册消息处理回调
    def handle_alarm_update(data):
        print(f"Alarm update: {data['device_id']} - {data['description']}")
    
    def handle_alarm_query_response(data):
        print(f"Query response: {data['count']} alarms found")
        for alarm in data['alarms']:
            print(f"  - {alarm['device_id']}: {alarm['description']}")
    
    def handle_error(data):
        print(f"Error: {data['message']}")
    
    client.register_callback('alarm_update', handle_alarm_update)
    client.register_callback('alarm_query_response', handle_alarm_query_response)
    client.register_callback('error', handle_error)
    
    # 连接到服务器
    if client.connect():
        try:
            # 推送测试报警
            client.push_alarm(
                device_id="sensor_001",
                alarm_type="temperature",
                severity="high",
                description="Temperature exceeded threshold: 85°C"
            )
            
            # 等待片刻
            time.sleep(1)
            
            # 查询报警
            client.query_alarms(status="active", limit=5)
            
            # 保持连接,等待服务器推送
            print("Waiting for messages (press Ctrl+C to exit)...")
            while True:
                time.sleep(1)
                
        except KeyboardInterrupt:
            print("Exiting...")
        finally:
            client.disconnect()

三、modbus通信

不论是基于HTTP的POST、GET请求还是基于Socket的实时通信,在工业设备端使用时都存在很大的弊端

  1. 协议开销大:HTTP头部信息冗余,不适合资源受限的嵌入式设备

  2. 实时性不足:HTTP请求-响应模式无法满足工业控制对实时性的要求

  3. 兼容性问题:许多传统工业设备只支持Modbus等专用工业协议

  4. 电力消耗:复杂的协议栈会增加设备功耗

  5. 可靠性问题:工业环境需要更强的抗干扰能力和错误检测机制

针对以上面临的弊端,工业设备有一套属于自己的专属通信方式modbus

1.Modbus协议简介

Modbus是一种串行通信协议,由Modicon公司(现施耐德电气)于1979年开发,用于PLC(可编程逻辑控制器)之间的通信。它已成为工业领域最常用的通信协议之一,具有简单、开放、易于实施的特点。

  • Modbus通信模式
    • Modbus RTU - 二进制格式,使用串行通信(RS-232/RS-485)

    • Modbus ASCII - ASCII字符格式,使用串行通信

    • Modbus TCP - 基于以太网TCP/IP协议

2.Modbus工业设备通信模拟示例

由于大部分学习者对这块不是特别了解,这里就给了一个模拟界面的示例,以下是网页端模拟的工业通信面板示例,复制代码创建一个.html结尾的文件,将代码复制进去,双击运行即可。

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Modbus通信模拟系统</title>
    <style>
        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
            font-family: 'Segoe UI', Arial, sans-serif;
        }
        
        body {
            background: linear-gradient(135deg, #1a2a6c, #2a4b8c);
            color: #333;
            min-height: 100vh;
            padding: 20px;
        }
        
        .container {
            max-width: 1200px;
            margin: 0 auto;
        }
        
        header {
            text-align: center;
            padding: 20px 0;
            margin-bottom: 30px;
            color: white;
            text-shadow: 0 2px 4px rgba(0, 0, 0, 0.3);
        }
        
        h1 {
            font-size: 2.5rem;
            margin-bottom: 10px;
        }
        
        .subtitle {
            font-size: 1.2rem;
            opacity: 0.9;
        }
        
        .dashboard {
            display: grid;
            grid-template-columns: 1fr 1fr;
            gap: 20px;
            margin-bottom: 30px;
        }
        
        .panel {
            background: white;
            border-radius: 10px;
            box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2);
            padding: 20px;
            overflow: hidden;
        }
        
        .panel-title {
            font-size: 1.5rem;
            color: #2a4b8c;
            margin-bottom: 20px;
            padding-bottom: 10px;
            border-bottom: 2px solid #eaeaea;
        }
        
        .control-group {
            margin-bottom: 20px;
        }
        
        .control-label {
            display: block;
            margin-bottom: 8px;
            font-weight: 600;
            color: #555;
        }
        
        .slider-container {
            display: flex;
            align-items: center;
        }
        
        .slider {
            flex: 1;
            height: 10px;
            -webkit-appearance: none;
            appearance: none;
            background: #dde6f0;
            outline: none;
            border-radius: 5px;
        }
        
        .slider::-webkit-slider-thumb {
            -webkit-appearance: none;
            appearance: none;
            width: 20px;
            height: 20px;
            border-radius: 50%;
            background: #2a4b8c;
            cursor: pointer;
        }
        
        .value-display {
            width: 60px;
            text-align: center;
            font-weight: bold;
            color: #2a4b8c;
            margin-left: 15px;
        }
        
        .status-indicator {
            display: inline-block;
            width: 15px;
            height: 15px;
            border-radius: 50%;
            margin-right: 10px;
        }
        
        .status-on {
            background: #4CAF50;
            box-shadow: 0 0 8px #4CAF50;
        }
        
        .status-off {
            background: #f44336;
        }
        
        .button {
            padding: 10px 20px;
            border: none;
            border-radius: 5px;
            background: #2a4b8c;
            color: white;
            font-weight: bold;
            cursor: pointer;
            transition: background 0.3s;
        }
        
        .button:hover {
            background: #3a6bc8;
        }
        
        .button:active {
            transform: translateY(1px);
        }
        
        .data-table {
            width: 100%;
            border-collapse: collapse;
            margin-top: 15px;
        }
        
        .data-table th, .data-table td {
            padding: 12px 15px;
            text-align: left;
            border-bottom: 1px solid #eaeaea;
        }
        
        .data-table th {
            background: #f5f8fc;
            font-weight: 600;
            color: #2a4b8c;
        }
        
        .data-table tr:hover {
            background: #f9fbfd;
        }
        
        .communication-log {
            background: #1e1e1e;
            color: #00ff00;
            font-family: monospace;
            padding: 15px;
            border-radius: 5px;
            height: 200px;
            overflow-y: auto;
            margin-top: 20px;
        }
        
        .log-entry {
            margin-bottom: 8px;
            line-height: 1.4;
        }
        
        .log-time {
            color: #888;
        }
        
        .log-master {
            color: #4fa6ff;
        }
        
        .log-slave {
            color: #ffa64f;
        }
        
        .connection-status {
            display: flex;
            align-items: center;
            margin-bottom: 20px;
        }
        
        .connection-dot {
            width: 12px;
            height: 12px;
            border-radius: 50%;
            margin-right: 10px;
        }
        
        .connected {
            background: #4CAF50;
            box-shadow: 0 0 8px #4CAF50;
        }
        
        .disconnected {
            background: #f44336;
        }
        
        @media (max-width: 768px) {
            .dashboard {
                grid-template-columns: 1fr;
            }
        }
    </style>
</head>
<body>
    <div class="container">
        <header>
            <h1>Modbus通信模拟系统</h1>
            <div class="subtitle">工业设备通信与控制模拟界面</div>
        </header>
        
        <div class="dashboard">
            <div class="panel">
                <h2 class="panel-title">设备控制面板</h2>
                
                <div class="connection-status">
                    <div class="connection-dot connected"></div>
                    <span>已连接到设备 (Modbus RTU - 从站地址: 1)</span>
                </div>
                
                <div class="control-group">
                    <label class="control-label">温度设定值 (°C)</label>
                    <div class="slider-container">
                        <input type="range" min="20" max="100" value="45" class="slider" id="tempSlider">
                        <span class="value-display" id="tempValue">45</span>
                    </div>
                </div>
                
                <div class="control-group">
                    <label class="control-label">压力设定值 (kPa)</label>
                    <div class="slider-container">
                        <input type="range" min="0" max="1000" value="350" class="slider" id="pressureSlider">
                        <span class="value-display" id="pressureValue">350</span>
                    </div>
                </div>
                
                <div class="control-group">
                    <label class="control-label">流量设定值 (L/min)</label>
                    <div class="slider-container">
                        <input type="range" min="0" max="500" value="120" class="slider" id="flowSlider">
                        <span class="value-display" id="flowValue">120</span>
                    </div>
                </div>
                
                <div class="control-group">
                    <label class="control-label">设备状态</label>
                    <div>
                        <button class="button" id="powerButton">启动设备</button>
                        <button class="button" id="resetButton">复位报警</button>
                    </div>
                </div>
            </div>
            
            <div class="panel">
                <h2 class="panel-title">设备状态监测</h2>
                
                <table class="data-table">
                    <thead>
                        <tr>
                            <th>参数</th>
                            <th></th>
                            <th>状态</th>
                        </tr>
                    </thead>
                    <tbody>
                        <tr>
                            <td>实际温度</td>
                            <td id="actualTemp">45.2 °C</td>
                            <td><span class="status-indicator status-on"></span>正常</td>
                        </tr>
                        <tr>
                            <td>实际压力</td>
                            <td id="actualPressure">348.7 kPa</td>
                            <td><span class="status-indicator status-on"></span>正常</td>
                        </tr>
                        <tr>
                            <td>实际流量</td>
                            <td id="actualFlow">118.3 L/min</td>
                            <td><span class="status-indicator status-on"></span>正常</td>
                        </tr>
                        <tr>
                            <td>设备运行状态</td>
                            <td id="deviceStatus">运行中</td>
                            <td><span class="status-indicator status-on"></span>正常</td>
                        </tr>
                        <tr>
                            <td>报警状态</td>
                            <td id="alarmStatus">无报警</td>
                            <td><span class="status-indicator status-on"></span>正常</td>
                        </tr>
                    </tbody>
                </table>
                
                <h3 class="panel-title" style="margin-top: 25px;">Modbus通信日志</h3>
                <div class="communication-log" id="commLog">
                    <div class="log-entry"><span class="log-time">10:23:45</span> <span class="log-master">主站</span>: 读取保持寄存器(4x) 40001-40005</div>
                    <div class="log-entry"><span class="log-time">10:23:45</span> <span class="log-slave">从站1</span>: 响应 5个寄存器值 [45.2, 348.7, 118.3, 1, 0]</div>
                    <div class="log-entry"><span class="log-time">10:23:47</span> <span class="log-master">主站</span>: 写入单个寄存器(4x) 40001 值: 45.0</div>
                    <div class="log-entry"><span class="log-time">10:23:47</span> <span class="log-slave">从站1</span>: 响应 写入成功</div>
                </div>
            </div>
        </div>
    </div>

    <script>
        // 更新滑块显示值
        document.getElementById('tempSlider').addEventListener('input', function() {
            document.getElementById('tempValue').textContent = this.value;
            addLogEntry("主站", `写入单个寄存器(4x) 40001 值: ${this.value}.0`);
            simulateResponse(`从站1`, "响应 写入成功");
        });
        
        document.getElementById('pressureSlider').addEventListener('input', function() {
            document.getElementById('pressureValue').textContent = this.value;
            addLogEntry("主站", `写入单个寄存器(4x) 40002 值: ${this.value}.0`);
            simulateResponse(`从站1`, "响应 写入成功");
        });
        
        document.getElementById('flowSlider').addEventListener('input', function() {
            document.getElementById('flowValue').textContent = this.value;
            addLogEntry("主站", `写入单个寄存器(4x) 40003 值: ${this.value}.0`);
            simulateResponse(`从站1`, "响应 写入成功");
        });
        
        // 设备控制按钮
        let devicePoweredOn = true;
        document.getElementById('powerButton').addEventListener('click', function() {
            devicePoweredOn = !devicePoweredOn;
            if (devicePoweredOn) {
                this.textContent = "停止设备";
                document.getElementById('deviceStatus').textContent = "运行中";
                addLogEntry("主站", "写线圈(0x) 00000 值: 1 (启动设备)");
                simulateResponse(`从站1`, "响应 写入成功");
            } else {
                this.textContent = "启动设备";
                document.getElementById('deviceStatus').textContent = "已停止";
                addLogEntry("主站", "写线圈(0x) 00000 值: 0 (停止设备)");
                simulateResponse(`从站1`, "响应 写入成功");
            }
        });
        
        document.getElementById('resetButton').addEventListener('click', function() {
            document.getElementById('alarmStatus').textContent = "无报警";
            addLogEntry("主站", "写线圈(0x) 00001 值: 1 (复位报警)");
            simulateResponse(`从站1`, "响应 写入成功");
        });
        
        // 添加日志条目
        function addLogEntry(source, message) {
            const logElement = document.getElementById('commLog');
            const time = new Date().toLocaleTimeString();
            const entry = document.createElement('div');
            entry.className = 'log-entry';
            entry.innerHTML = `<span class="log-time">${time}</span> <span class="log-${source === '主站' ? 'master' : 'slave'}">${source}</span>: ${message}`;
            logElement.appendChild(entry);
            logElement.scrollTop = logElement.scrollHeight;
        }
        
        // 模拟从站响应
        function simulateResponse(source, message) {
            setTimeout(() => {
                addLogEntry(source, message);
            }, 300);
        }
        
        // 模拟定期数据读取
        setInterval(() => {
            if (devicePoweredOn) {
                // 更新实际值,在设定值附近随机波动
                const temp = (parseFloat(document.getElementById('tempSlider').value) + (Math.random() * 2 - 1)).toFixed(1);
                const pressure = (parseFloat(document.getElementById('pressureSlider').value) + (Math.random() * 5 - 2.5)).toFixed(1);
                const flow = (parseFloat(document.getElementById('flowSlider').value) + (Math.random() * 4 - 2)).toFixed(1);
                
                document.getElementById('actualTemp').textContent = temp + " °C";
                document.getElementById('actualPressure').textContent = pressure + " kPa";
                document.getElementById('actualFlow').textContent = flow + " L/min";
                
                addLogEntry("主站", "读取保持寄存器(4x) 40001-40005");
                simulateResponse("从站1", `响应 5个寄存器值 [${temp}, ${pressure}, ${flow}, 1, 0]`);
            }
        }, 5000);
    </script>
</body>
</html>
  • 示例界面
    在这里插入图片描述

3.模拟Modbus通信案例

由于实际Modbus设备不常见且设备需要接线并配合专属模块,所以我们将模拟一个完整的Modbus TCP通信场景,包括服务器(从设备)和客户端(主设备)。

模拟场景描述

我们将模拟一个工业环境中的温度监控系统:

  • 1个Modbus服务器模拟温度控制器

  • 3个模拟温度传感器(地址0x0001-0x0003)

  • 2个模拟继电器输出(地址0x0004-0x0005)

  • 1个模拟报警状态寄存器(地址0x0006)

开始之前请安装指定版本的pymodbus

pip install pymodbus==3.6.5

服务器端代码 (modbus_server.py)

from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.device import ModbusDeviceIdentification
import random
import time

def run_server():
    # 初始化数据存储
    # 线圈: 2个继电器 (地址 0x0004-0x0005)
    coils = ModbusSequentialDataBlock(0, [False] * 6)  # 创建6个线圈,从地址0开始
    
    # 保持寄存器: 3个温度传感器 (地址 0x0001-0x0003) 和1个报警状态 (地址 0x0006)
    hr_data = [0] * 7  # 创建7个寄存器,从地址0开始
    # 初始化温度值 (20-30度)
    hr_data[0] = 220  # 地址0x0001: 22.0°C
    hr_data[1] = 245  # 地址0x0002: 24.5°C
    hr_data[2] = 190  # 地址0x0003: 19.0°C
    holding_registers = ModbusSequentialDataBlock(0, hr_data)
    
    # 创建从设备上下文
    slave_context = ModbusSlaveContext(
        co=coils,
        hr=holding_registers
    )
    
    # 创建服务器上下文
    context = ModbusServerContext(slaves=slave_context, single=True)
    
    # 设置设备标识信息
    identity = ModbusDeviceIdentification()
    identity.VendorName = '模拟温度控制器'
    identity.ProductCode = 'TC-100'
    identity.VendorUrl = 'http://example.com'
    identity.ProductName = '温度监控系统'
    identity.ModelName = 'Modbus TCP模拟器'
    identity.MajorMinorRevision = '1.0.0'
    
    # 启动TCP服务器
    print("启动Modbus TCP服务器在 localhost:5020")
    StartTcpServer(context=context, identity=identity, address=("localhost", 5020))

if __name__ == "__main__":
    run_server()

客户端代码 (modbus_client.py)

from pymodbus.client import ModbusTcpClient
import time


def read_temperature_sensors():
    """读取温度传感器数据"""
    client = ModbusTcpClient('localhost', port=5020)

    try:
        client.connect()

        # 读取保持寄存器 (温度传感器和报警状态)
        # 地址 0x0001-0x0003: 温度传感器 (对应寄存器地址 0-2)
        # 地址 0x0006: 报警状态 (对应寄存器地址 5)
        result = client.read_holding_registers(address=0, count=6, slave=1)

        if not result.isError():
            print("=== 温度传感器读数 ===")
            temperatures = [value / 10.0 for value in result.registers[:3]]
            alarm_status = result.registers[5]

            for i, temp in enumerate(temperatures, 1):
                print(f"传感器 {i}: {temp}°C")

            print(f"报警状态: {'正常' if alarm_status == 0 else '警告'}")
        else:
            print("读取寄存器错误:", result)

    except Exception as e:
        print("连接错误:", e)
    finally:
        client.close()


def read_relay_status():
    """读取继电器状态"""
    client = ModbusTcpClient('localhost', port=5020)

    try:
        client.connect()

        # 读取线圈 (继电器状态)
        # 地址 0x0004-0x0005: 继电器 (对应线圈地址 3-4)
        result = client.read_coils(address=3, count=2, slave=1)

        if not result.isError():
            print("\n=== 继电器状态 ===")
            for i, status in enumerate(result.bits, 1):
                print(f"继电器 {i}: {'开启' if status else '关闭'}")
        else:
            print("读取线圈错误:", result)

    except Exception as e:
        print("连接错误:", e)
    finally:
        client.close()


def toggle_relay(relay_index, state):
    """切换继电器状态"""
    client = ModbusTcpClient('localhost', port=5020)

    try:
        client.connect()

        # 写入线圈 (控制继电器)
        # 继电器1: 地址 3, 继电器2: 地址 4
        address = 3 + (relay_index - 1)
        result = client.write_coil(address=address, value=state, slave=1)

        if not result.isError():
            print(f"\n继电器 {relay_index}{'开启' if state else '关闭'}")
        else:
            print("写入线圈错误:", result)

    except Exception as e:
        print("连接错误:", e)
    finally:
        client.close()


if __name__ == "__main__":
    # 读取当前状态
    read_temperature_sensors()
    read_relay_status()

    # 切换第一个继电器状态
    toggle_relay(1, True)

    # 再次读取状态以确认更改
    time.sleep(0.5)
    read_relay_status()

测试方法及说明

  1. 使用方法
    • 首先启动Modbus服务器:

      python modbus_server.py
      
    • 然后运行客户端:

      python modbus_client.py
      
    • 客户端将连接到服务器并执行一系列Modbus操作:

      • 读取温度传感器值

      • 读取和控制继电器状态

  • 运行示例
=== 温度传感器读数 ===
传感器 1: 24.5°C
传感器 2: 19.0°C
传感器 3: 0.0°C
报警状态: 正常

=== 继电器状态 ===
继电器 1: 关闭
继电器 2: 关闭
继电器 3: 关闭
继电器 4: 关闭
继电器 5: 关闭
继电器 6: 关闭
继电器 7: 关闭
继电器 8: 关闭

继电器 1 已开启

=== 继电器状态 ===
继电器 1: 开启
继电器 2: 关闭
继电器 3: 关闭
继电器 4: 关闭
继电器 5: 关闭
继电器 6: 关闭
继电器 7: 关闭
继电器 8: 关闭

  1. Modbus通信优势

    • 简单高效:协议简单,开销小,适合资源受限的设备

    • 实时性强:支持快速响应,适合工业控制场景

    • 广泛兼容:几乎所有工业设备都支持Modbus协议

    • 可靠性高:内置错误检测机制,CRC校验保证数据完整性

    • 灵活部署:支持多种物理层(RS-485, TCP/IP等)

注意事项

  • 对于专门的modbus设备,python等语言有pymodbus的高级接口,本案例为模拟案例,仅供参考和学习,对于深入的串口通信(如modbus RTU),可以采用pyserial等库实现实现基本的通信传输。

总结

本小节以代码的形式展示常见的信息推送和共享机制,涉及服务器端、系统内部、工业设备间的通信机制和基本实现,是AI视频监控系统接入生态圈必不可少的一部分。

下期预告

  • RTSP推流与展示
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐