若该文为原创文章,转载请注明原文出处。

目的是实现视频的传输,只是个demo.

程序分为两部分,视频接收端和视频发送端。

一、视频接收端流程分析

主要流程:

  1. 初始化配置

    • 设置UDP端口(5001)和缓冲区大小
    • 初始化帧缓冲队列(最大15帧)
    • 设置目标帧率(25fps)和显示窗口大小(640x480)
  2. 创建UDP套接字

    • 绑定到本地端口,设置8MB接收缓冲区
    • 设置10ms超时以避免阻塞
  3. 帧数据管理

    • 使用defaultdict(dict)存储帧分片数据
    • 使用frame_segment_counts跟踪每个帧的分片接收情况
  4. 数据包处理(process_packet函数):

    • 解析包头信息(帧ID、分片索引、总分片数)
    • 验证CRC32校验和
    • 重组完整帧数据
    • 清理过期帧数据
  5. 视频显示循环

    • 每次尝试接收多个数据包填充缓冲区
    • 按目标帧率从缓冲区取出帧显示
    • 缓冲区不足时重复显示上一帧
    • 定期输出统计信息(接收/显示帧率、缓冲区状态等)

二、视频发送端流程分析

主要流程:

  1. 初始化配置

    • 设置目标IP地址和端口(5001)
    • 设置发送帧率(20fps)和JPEG压缩质量(70)
    • 创建UDP套接字并设置4MB发送缓冲区
  2. 视频源设置

    • 从test.mp4文件读取视频(而非实际摄像头)
    • 设置帧率和分辨率(640x480)
  3. 帧发送函数(send_frame):

    • 将帧数据分片(每片8192字节)
    • 为每个分片添加包头信息(帧ID、分片索引、总分片数)
    • 添加CRC32校验和
    • 最后一个分片添加结束标记
  4. 主发送循环

    • 控制发送帧率
    • 读取视频帧并调整尺寸
    • JPEG编码并控制压缩质量
    • 调用send_frame发送帧数据
    • 定期输出统计信息

三、关键思路

数据传输协议:

  • 分片传输:大帧被分割成多个UDP数据包发送
  • 包结构:帧ID(4字节) + 分片索引(2字节) + 总分片数(2字节) + 数据 + CRC32校验(4字节) + 结束标记(最后分片)
  • 帧重组:接收端根据帧ID和分片索引重新组装完整帧

优化策略:

  1. 缓冲机制:发送端降低帧率和压缩质量,接收端使用帧缓冲减少卡顿
  2. 错误处理:CRC校验保证数据完整性,缓冲区管理避免内存泄漏
  3. 性能平衡:通过调整分片大小、发送速率和压缩质量平衡传输效率和网络压力

四、源码

1、rk3568_video.py

import cv2
import socket
import time
import struct
import zlib
 
# 配置
SERVER_IP = '192.168.50.84'  # 上位机的IP地址
PORT = 5001  # 端口号
FPS = 20  # 降低帧率以减轻网络压力
COMPRESSION_QUALITY = 70  # 降低JPEG压缩质量,减小数据量
 
# 创建一个socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 4194304)  # 4MB发送缓冲区
 
# 相机连接参数
CAMERA_CONFIG = {
    'ip': '192.168.50.84',
    'port': 6501,
    'username': 'admin',
    'password': 'a1234567'
}
 
# RTSP流地址
 
print("正在连接摄像头...")
# 打开摄像头
cap = cv2.VideoCapture("test.mp4")
 
if not cap.isOpened():
    print("无法打开摄像头")
    exit()
 
print("摄像头连接成功")
 
# 尝试设置摄像头参数
cap.set(cv2.CAP_PROP_FPS, FPS)  # 设置帧率
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)  # 设置宽度
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)  # 设置高度
 
frame_count = 0
segment_size = 8192  # 减小分片大小,降低单包丢失影响
last_frame_time = time.time()
start_time = time.time()
 
def send_frame(data, frame_id):
    """发送一帧数据,使用可靠的分片方法"""
    # 计算分片数量
    total_segments = (len(data) + segment_size - 1) // segment_size
    
    # 如果分片过多,跳过此帧
    if total_segments > 100:
        print(f"帧太大,分片数量: {total_segments},跳过")
        return False
    
    # 为每个分片添加头信息:帧ID(4字节) + 分片索引(2字节) + 总分片数(2字节) + CRC32(4字节)
    for i in range(total_segments):
        # 计算当前分片数据
        start_pos = i * segment_size
        end_pos = min(start_pos + segment_size, len(data))
        segment_data = data[start_pos:end_pos]
        
        # 计算数据校验和
        checksum = zlib.crc32(segment_data)
        
        # 创建头信息
        header = struct.pack(">IHH", frame_id, i, total_segments)
        
        # 如果是最后一个分片,添加结束标记
        if i == total_segments - 1:
            # 发送分片 (头信息 + 数据 + 校验和 + 结束标记)
            packet = header + segment_data + struct.pack(">I", checksum) + b'\xff\xff'
        else:
            # 发送分片 (头信息 + 数据 + 校验和)
            packet = header + segment_data + struct.pack(">I", checksum)
        
        # 发送数据
        client_socket.sendto(packet, (SERVER_IP, PORT))
        
        # 控制发送速率,避免网络拥塞
        if total_segments > 10:
            time.sleep(0.001)
    
    return True
 
try:
    print("开始发送视频流...")
    while True:
        # 控制帧率
        current_time = time.time()
        if current_time - last_frame_time < 1.0/FPS:
            time.sleep(0.001)  # 短暂休眠避免CPU占用过高
            continue
            
        # 读取摄像头帧
        ret, frame = cap.read()
        if not ret:
            print("无法读取帧,尝试重新连接...")
            # 尝试重新连接摄像头
            cap.release()
            time.sleep(1)
            cap = cv2.VideoCapture("test.mp4")
            print("重新打开视频...")
            if not cap.isOpened():
                print("重新连接失败,退出程序")
                break
            continue
        
        last_frame_time = current_time
        
        # 调整图像尺寸,减小传输数据量
        frame = cv2.resize(frame, (640, 480))
     
        # 对帧进行编码(控制压缩质量)
        encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), COMPRESSION_QUALITY]
        encoded, buffer = cv2.imencode('.jpg', frame, encode_param)
     
        if not encoded:
            print("编码帧失败")
            continue
     
        # 获取编码后的数据
        data = buffer.tobytes()
        data_len = len(data)
        
        # 发送当前帧
        frame_id = frame_count % 10000  # 循环使用的帧ID
        success = send_frame(data, frame_id)
        
        if success:
            frame_count += 1
            
            # 每5秒显示一次统计信息
            if frame_count % 100 == 0:
                elapsed = time.time() - start_time
                fps = frame_count / elapsed
                print(f"已发送 {frame_count} 帧,平均发送速率: {fps:.1f} fps,平均帧大小: {data_len/1024:.1f} KB")
        
except KeyboardInterrupt:
    print("程序被用户中断")
except Exception as e:
    print(f"发生错误: {e}")
finally:
    # 释放资源
    print(f"程序结束,共发送 {frame_count} 帧")
    cap.release()
    client_socket.close()

2、pc_display_video.py

import cv2
import socket
import numpy as np
import time
import struct
import zlib
from collections import deque, defaultdict
 
# 配置
PORT = 5001  # 端口号
BUFFER_SIZE = 15  # 帧缓冲区大小
TARGET_FPS = 25  # 目标帧率
DISPLAY_WIDTH = 640  # 显示窗口宽度
DISPLAY_HEIGHT = 480  # 显示窗口高度
 
# 创建一个socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('0.0.0.0', PORT))
# 设置socket更大的接收缓冲区
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8388608)  # 8MB缓冲区
# 设置socket超时,避免一直阻塞
server_socket.settimeout(0.01)
 
# 创建帧缓冲队列
frame_buffer = deque(maxlen=BUFFER_SIZE)
last_display_time = time.time()
last_frame = None
 
# 数据接收缓冲区,格式: {frame_id: {segment_idx: segment_data, ...}}
frame_segments = defaultdict(dict)
# 帧片段计数,格式: {frame_id: (received_segments, total_segments)}
frame_segment_counts = {}
 
# 创建固定大小的显示窗口
cv2.namedWindow('Video Stream', cv2.WINDOW_NORMAL)
cv2.resizeWindow('Video Stream', DISPLAY_WIDTH, DISPLAY_HEIGHT)
 
# 统计信息变量
frame_received = 0
frame_displayed = 0
corrupted_frames = 0
start_time = time.time()
 
print("开始接收视频流...")
 
def process_packet(packet):
    """处理接收到的数据包,返回完整的帧或None"""
    # 数据包太小,无法包含头信息
    if len(packet) < 12:  # 4(frame_id) + 2(segment_idx) + 2(total_segments) + 4(crc32)
        return None
        
    try:
        # 提取头信息:帧ID + 分片索引 + 总分片数
        header = packet[:8]
        frame_id, segment_idx, total_segments = struct.unpack(">IHH", header)
        
        # 检查是否是结束标记
        has_end_marker = False
        if packet[-2:] == b'\xff\xff':
            # 去掉结束标记
            data = packet[8:-6]  # 去掉头部8字节和尾部6字节(4字节CRC + 2字节结束标记)
            checksum_bytes = packet[-6:-2]  # 取出校验和
            has_end_marker = True
        else:
            # 没有结束标记
            data = packet[8:-4]  # 去掉头部8字节和尾部4字节校验和
            checksum_bytes = packet[-4:]  # 取出校验和
        
        # 验证校验和
        received_checksum = struct.unpack(">I", checksum_bytes)[0]
        calculated_checksum = zlib.crc32(data)
        
        if received_checksum != calculated_checksum:
            print(f"校验和错误: 帧ID={frame_id}, 分片={segment_idx}, 收到={received_checksum}, 计算={calculated_checksum}")
            return None
            
        # 更新帧分片信息
        frame_segments[frame_id][segment_idx] = data
        if frame_id not in frame_segment_counts:
            frame_segment_counts[frame_id] = [1, total_segments]
        else:
            frame_segment_counts[frame_id][0] += 1
            
        # 检查帧是否完整
        if frame_segment_counts[frame_id][0] == total_segments:
            # 按顺序拼接所有分片
            full_data = b''
            for i in range(total_segments):
                if i not in frame_segments[frame_id]:
                    print(f"错误: 帧ID={frame_id}缺少分片{i}")
                    return None
                full_data += frame_segments[frame_id][i]
            
            # 清理缓存
            del frame_segments[frame_id]
            del frame_segment_counts[frame_id]
            
            # 清理过期的帧数据(超过100个不同的帧ID时)
            if len(frame_segments) > 100:
                oldest_frame_id = min(frame_segments.keys())
                del frame_segments[oldest_frame_id]
                if oldest_frame_id in frame_segment_counts:
                    del frame_segment_counts[oldest_frame_id]
            
            return full_data
        return None
    except Exception as e:
        print(f"处理数据包错误: {e}")
        return None
 
while True:
    try:
        # 接收和处理帧的循环
        for _ in range(30):  # 每次显示前尝试接收更多包以填充缓冲区
            try:
                # 接收数据包
                packet, _ = server_socket.recvfrom(65535)
                full_data = process_packet(packet)
                
                if full_data is not None:
                    # 解码
                    np_data = np.frombuffer(full_data, dtype=np.uint8)
                    frame = cv2.imdecode(np_data, cv2.IMREAD_COLOR)
                    
                    if frame is not None:
                        # 调整帧大小以确保一致性
                        frame = cv2.resize(frame, (DISPLAY_WIDTH, DISPLAY_HEIGHT))
                        # 将解码成功的帧添加到缓冲区
                        frame_buffer.append(frame)
                        frame_received += 1
                    else:
                        corrupted_frames += 1
                        print("解码帧失败")
            except socket.timeout:
                # 超时继续循环
                break
            except Exception as e:
                print(f"接收/解码错误: {e}")
                break
        
        # 控制显示帧率并从缓冲区取帧显示
        current_time = time.time()
        time_elapsed = current_time - last_display_time
        
        # 按照目标帧率显示
        if time_elapsed >= 1.0/TARGET_FPS:
            # 缓冲区有足够帧时才消耗
            if len(frame_buffer) > BUFFER_SIZE // 3:
                display_frame = frame_buffer.popleft()
                last_frame = display_frame.copy()  # 保存该帧用于后续可能的填充
                cv2.imshow('Video Stream', display_frame)
                frame_displayed += 1
            # 缓冲区不足但有上一帧时显示上一帧
            elif last_frame is not None:
                cv2.imshow('Video Stream', last_frame)
            
            last_display_time = current_time
            
            # 每5秒显示一次统计信息
            if current_time - start_time > 5:
                fps_received = frame_received / (current_time - start_time)
                fps_displayed = frame_displayed / (current_time - start_time)
                buffer_status = len(frame_buffer)
                print(f"接收帧率: {fps_received:.1f} fps, 显示帧率: {fps_displayed:.1f} fps, 缓冲区: {buffer_status}/{BUFFER_SIZE}, 损坏帧: {corrupted_frames}")
                # 重置统计
                frame_received = 0
                frame_displayed = 0
                corrupted_frames = 0
                start_time = current_time
     
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    except Exception as e:
        print(f"主循环错误: {e}")
        continue
 
# 释放资源
server_socket.close()
cv2.destroyAllWindows()

测试是正常的。后续想在rk3568上实现可视对讲功能。

如有侵权,或需要完整代码,请及时联系博主。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐