第一章:为什么需要边缘智能?

1.1 云计算的局限

问题 说明
  • 高延迟 | 云端往返 >500ms,无法满足实时控制(如机器人避障)
  • 带宽瓶颈 | 1000 台摄像头 × 2Mbps = 2Gbps 上行带宽
  • 单点故障 | 网络中断 → 全系统瘫痪
  • 隐私风险 | 敏感数据(如人脸)上传云端

1.2 边缘计算优势

  • 低延迟:本地处理 <50ms
  • 带宽节省:仅上传摘要/告警(压缩率 >90%)
  • 高可用:断网仍可执行预设规则
  • 合规性:敏感数据不出厂

典型架构
设备 → 边缘网关 → 云(分层智能)


第二章:边缘网关架构设计

2.1 软件栈选型(轻量化)

功能 技术 说明
  • Web 框架 | Flask(非异步) | 内存占用低,启动快
  • 消息总线 | ZeroMQ(inproc + tcp) | 无代理,低开销
  • 本地存储 | SQLite(WAL 模式) | 单文件,ACID,支持并发读
  • AI 推理 | ONNX Runtime(CPU 版) | 跨框架模型部署
  • 前端 | Vue 3 + Vite(静态资源) | 构建后仅 500KB

为何不用 FastAPI?:ASGI 在低端设备上内存开销更高,Flask 足够满足边缘 API 需求。

2.2 进程模型

[主进程: Flask]
    │
    ├── [子进程: MQTT Client] ←→ 云 / 设备
    ├── [子进程: CoAP Server] ←→ 老旧传感器
    ├── [子进程: Rule Engine] ←→ 本地自动化
    └── [子进程: Model Inference] ←→ ONNX 模型
          ↑
      ZeroMQ IPC 通信

隔离性:任一子进程崩溃,主进程可重启它。


第三章:协议转换 —— 接入异构设备

3.1 支持协议

协议 适用设备 特点
  • MQTT | 新型 IoT 设备 | 轻量、发布/订阅
  • CoAP | 资源受限传感器 | UDP、RESTful
  • Modbus RTU | 工业 PLC | 串口、主从架构

3.2 CoAP 服务端(aiocoap)

# protocols/coap_server.py
import asyncio
from aiocoap import Context, Message, resource

class SensorResource(resource.Resource):
    def __init__(self):
        super().__init__()
        self.value = b"0"

    async def render_get(self, request):
        return Message(payload=self.value)

    async def render_put(self, request):
        self.value = request.payload
        # 触发 ZeroMQ 消息
        zmq_pub.send(b"coap_update", request.payload)
        return Message(code=CHANGED)

async def start_coap_server():
    root = resource.Site()
    root.add_resource(['sensor', 'temp'], SensorResource())
    await Context.create_server_context(root, bind=('::', 5683))
    await asyncio.get_event_loop().create_future()  # run forever

3.3 Modbus 串口监听

# protocols/modbus_reader.py
from pymodbus.client import ModbusSerialClient

def read_plc_registers():
    client = ModbusSerialClient(method='rtu', port='/dev/ttyUSB0', baudrate=9600)
    if client.connect():
        result = client.read_holding_registers(address=0, count=10, slave=1)
        client.close()
        return result.registers
    return None

统一数据模型:所有协议数据转为 JSON 格式:

{
  "device_id": "plc_01",
  "timestamp": 1705650000,
  "metrics": {
    "vibration": 0.85,
    "temperature": 42.3
  }
}

第四章:本地 AI 推理 —— ONNX Runtime

4.1 模型准备

  • 训练模型(PyTorch/TensorFlow) → 导出为 ONNX
  • 优化:使用 onnx-simplifier 减小体积
pip install onnxruntime
pip install onnx-simplifier

python -m onnxsim model.onnx model_optimized.onnx

4.2 边缘推理服务

# services/inference.py
import onnxruntime as ort
import numpy as np

class EdgeInference:
    def __init__(self, model_path: str):
        self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
        self.input_name = self.session.get_inputs()[0].name
        self.output_name = self.session.get_outputs()[0].name

    def predict(self, input_data: np.ndarray) -> np.ndarray:
        return self.session.run([self.output_name], {self.input_name: input_data})[0]

# 全局实例(避免重复加载)
vibration_model = EdgeInference('/models/vibration_anomaly.onnx')

4.3 工厂设备预测性维护

# rules/predictive_maintenance.py
def check_vibration(data: dict):
    if 'vibration' in data['metrics']:
        # 预处理:滑动窗口标准化
        window = get_last_10_vibrations(data['device_id'])
        features = np.array(window).reshape(1, -1).astype(np.float32)
        
        # 推理
        anomaly_score = vibration_model.predict(features)[0][0]
        
        if anomaly_score > 0.9:
            trigger_local_alert(f"设备 {data['device_id']} 振动异常!")
            # 仅上传告警,不上传原始数据
            cloud_uploader.enqueue({
                "alert": "vibration_anomaly",
                "device": data['device_id'],
                "score": float(anomaly_score)
            })

效果

  • 原始数据(10KB/s/设备) → 告警(<1KB/小时)
  • 告警延迟 <100ms

第五章:断网自治 —— 本地规则引擎

5.1 规则定义(YAML)

# rules/home_automation.yaml
- name: "夜间自动关灯"
  condition: 
    time: "22:00-06:00"
    sensor.light: ">300"
  action: 
    command: "turn_off"
    target: "light_living_room"

- name: "温度过高开空调"
  condition:
    sensor.temperature: ">28"
  action:
    command: "set_mode"
    target: "ac_bedroom"
    params: {"mode": "cool", "temp": 26}

5.2 规则引擎执行

# services/rule_engine.py
import yaml
from datetime import datetime

class LocalRuleEngine:
    def __init__(self, rules_file: str):
        with open(rules_file) as f:
            self.rules = yaml.safe_load(f)

    def evaluate(self, sensor_data: dict):
        current_time = datetime.now().strftime("%H:%M")
        for rule in self.rules:
            if self._check_condition(rule['condition'], sensor_data, current_time):
                self._execute_action(rule['action'])

    def _check_condition(self, cond: dict, data: dict, time_str: str) -> bool:
        # 时间条件
        if 'time' in cond:
            start, end = cond['time'].split('-')
            if not (start <= time_str <= end):
                return False
        # 传感器条件
        for key, expr in cond.items():
            if key.startswith('sensor.'):
                metric = key.split('.')[1]
                if metric not in data['metrics']:
                    return False
                value = data['metrics'][metric]
                # 简单表达式解析(如 ">300")
                op, threshold = expr[0], float(expr[1:])
                if op == '>' and not (value > threshold): return False
                if op == '<' and not (value < threshold): return False
        return True

    def _execute_action(self, action: dict):
        # 通过 ZeroMQ 发送控制命令
        zmq_control.send_json(action)

断网时:规则引擎继续运行,控制本地设备。


第六章:数据同步 —— 断网续传

6.1 本地队列设计

# services/cloud_uploader.py
import sqlite3
import threading

class EdgeQueue:
    def __init__(self, db_path="/data/edge_queue.db"):
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.conn.execute("CREATE TABLE IF NOT EXISTS queue (id INTEGER PRIMARY KEY, payload TEXT)")
        self.lock = threading.Lock()

    def enqueue(self, payload: dict):
        with self.lock:
            self.conn.execute("INSERT INTO queue (payload) VALUES (?)", (json.dumps(payload),))
            self.conn.commit()

    def dequeue_batch(self, limit=100):
        with self.lock:
            cur = self.conn.cursor()
            cur.execute("SELECT id, payload FROM queue LIMIT ?", (limit,))
            items = cur.fetchall()
            if items:
                ids = [item[0] for item in items]
                self.conn.execute(f"DELETE FROM queue WHERE id IN ({','.join('?'*len(ids))})", ids)
                self.conn.commit()
            return [json.loads(item[1]) for item in items]

6.2 云同步服务

# services/sync_to_cloud.py
import requests

def sync_loop():
    while True:
        if is_network_available():
            batch = edge_queue.dequeue_batch()
            if batch:
                try:
                    requests.post(CLOUD_ENDPOINT, json=batch, timeout=10)
                except Exception as e:
                    # 重入队列
                    for item in batch:
                        edge_queue.enqueue(item)
        time.sleep(5)  # 每 5 秒尝试同步

可靠性:SQLite WAL 模式确保断电不丢数据。


第七章:前端边缘管理(Vue)

7.1 边缘节点状态面板

<template>
  <div class="edge-node">
    <h3>{{ node.name }}</h3>
    <div class="status-grid">
      <MetricCard label="CPU" :value="node.cpu_usage + '%'" />
      <MetricCard label="内存" :value="node.mem_usage + 'MB'" />
      <MetricCard label="网络" :value="node.net_status" />
      <MetricCard label="存储" :value="node.disk_free + 'GB'" />
    </div>
    <button @click="deployFunction">部署边缘函数</button>
  </div>
</template>

<script setup>
const props = defineProps({
  node: Object // { name, cpu_usage, mem_usage, ... }
})

const deployFunction = async () => {
  const file = await openFilePicker() // 用户选择 .py 或 .onnx
  await fetch(`/api/edge/${props.node.id}/deploy`, {
    method: 'POST',
    body: file
  })
  alert('部署成功!')
}
</script>

7.2 远程函数热加载

# routes/edge_management.py
@app.post('/api/edge/<node_id>/deploy')
def deploy_edge_function():
    file = request.files['file']
    if file.filename.endswith('.py'):
        # 保存并 reload 规则模块
        file.save(f"/rules/custom_{secure_filename(file.filename)}")
        importlib.reload(custom_rules)
        return jsonify({"status": "success"})
    elif file.filename.endswith('.onnx'):
        # 替换模型
        file.save("/models/custom.onnx")
        global custom_model
        custom_model = EdgeInference("/models/custom.onnx")
        return jsonify({"status": "success"})

运维价值:无需物理接触设备,远程更新 AI 模型或业务逻辑。


第八章:性能与资源优化

8.1 内存控制

  • Flask 多进程禁用app.run(threaded=True, processes=1)
  • ZeroMQ 高水位:限制内存队列长度
  • SQLite PRAGMA
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;  -- 平衡安全与性能

8.2 启动加速

  • 懒加载模型:首次推理时才加载 ONNX
  • 预编译字节码python -m compileall /app

实测(树莓派 4B)

  • 内存占用:85MB
  • 启动时间:2.3s
  • CoAP 响应延迟:<10ms

第九章:安全设计

9.1 边缘安全

  • TLS 双向认证:边缘 ↔ 云
  • 固件签名:防止恶意代码部署
  • 最小权限:Flask 运行于非 root 用户

9.2 数据隐私

  • 本地脱敏:人脸/车牌在边缘模糊后再上传
  • 加密存储:SQLite 使用 SQLCipher 加密

第十章:场景总结

10.1 工厂预测性维护

  • 输入:设备振动、温度传感器
  • 边缘动作:实时异常检测 → 本地停机 + 云告警
  • 收益:减少非计划停机 30%

10.2 智能家居

  • 输入:温湿度、光照、人体红外
  • 边缘动作:断网时仍可执行“人来开灯、人走关灯”
  • 收益:用户体验不依赖互联网

10.3 精准农业

  • 输入:土壤湿度、气象站数据
  • 边缘动作:融合多源数据 → 自动灌溉决策
  • 收益:节水 25%,减少云流量 95%

总结:智能下沉,价值上升

边缘不是云的延伸,而是智能的新前线。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐