前言

在上两个小节,我们简单的介绍了报警信息推送的几种方式,包括基于HTTP的请求,基于socket的自定义消息,以及基于工业设备的MODBUS通信,这些机制使得我们的AI视频监控系统具备了强大的生态融合能力。不过需要注意的是,我们虽然具备了基本消息(如报警情况、设备运行状态等)的共享能力,但是缺少了直观的画面共享能力,即我们的AI分析结果没有做到很好的可视化展示,缺乏远程观察AI分析画面的能力。本小节,我们将基于这一问题进行深入,详细介绍一下基于gstreamer的分析画面推送


一、常见的画面推送方式

一个完整的AI视频监控系统,除了要将报警信号、分析结果等元数据(Metadata)及时上报外,将叠加了分析结果(如 bounding box、标签、轨迹等)的实时视频画面进行远程推送与共享,也至关重要。这为运营人员提供了最直观的态势感知能力,是实现远程复核、实时监看的关键。

在现代多媒体技术中,主要有以下几种常见的视频流推送方式:

  1. RTSP (Real Time Streaming Protocol)

    • 简介:RTSP 是一种网络控制协议,设计用于控制流媒体服务器。它通常与 RTP (Real-time Transport Protocol) 共同工作,用于实际传输音频视频数据。

    • 特点:低延迟、高实时性,是安防监控、IPC(网络摄像机)领域最主流、最通用的标准协议。几乎所有的专业播放器(如 VLC)、视频管理平台(VMS)、NVR(网络视频录像机)都原生支持 RTSP 流的拉取和播放。

    • 适用场景:局域网或高性能网络环境下的实时监控,对延迟要求极高的应用。

  2. RTMP (Real-Time Messaging Protocol)

    • 简介:最初由 Macromedia 为 Flash 播放器开发的一种实时音视频流传输协议。

    • 特点:采用 TCP 传输,稳定性好,但延迟相对 RTSP 略高。随着 Flash 的淘汰,其重要性有所下降,但在直播领域依然扮演着重要角色,常用于将流推送到 CDN 或流媒体服务器(如 SRS、Nginx-rtmp-module)。

    • 适用场景:互联网直播、事件直播推送,需要与现有 CDN 生态集成的场景。

  3. HLS (HTTP Live Streaming) / DASH (Dynamic Adaptive Streaming over HTTP)

    • 简介:基于 HTTP 的自适应码率流媒体传输协议。它将视频流切分成一系列小的 HTTP 文件(TS 片段或 MP4 片段)来下载播放。

    • 特点:高兼容性(穿透防火墙能力强)、支持自适应码率(可根据网络状况动态切换画质),但延迟非常高(通常在 10-30 秒以上),不适合真正的实时监控。

    • 适用场景:网络状况复杂多变的互联网点播与直播(如B站、YouTube),对延迟不敏感的监控回看。

  4. WebRTC (Web Real-Time Communication)

    • 简介:一个支持网页浏览器进行实时音视频通信的开源项目。

    • 特点:极低的端到端延迟,强大的 NAT 穿透能力(ICE/STUN/TURN),天生适合浏览器之间的直接通信,无需安装插件。

    • 适用场景:浏览器无插件化的实时视频通信、远程控制、对延迟要求极高的 Web 端监控画面展示。

对于我们的 AI 视频监控系统而言,让分析画面像一台标准IPC一样提供RTSP流输出,是集成到现有安防体系中最自然、兼容性最好的方式。这样,现有的 NVR、VMS 甚至手机 App 都可以直接通过一个 RTSP URL 来拉取并观看分析后的画面,无需任何额外的开发工作。

二、为什么选择 GStreamer 实现推流?

GStreamer 是一个功能极其强大的开源多媒体框架。它采用基于管道(Pipeline)的架构,通过将各种功能单一的元素(Element)链接起来,可以轻松地构建复杂的多媒体处理流程。选择它来实现推流功能,主要有以下优势:

  • **功能全面:**从视频采集、格式转换、缩放、编码(软编/硬编)、分析(通过 GstInference 或自定义插件)、叠加(OSD),到最终的网络推流(RTSP/RTP/RTMP/SRT/HLS)等一系列流程,都可以在一个 Pipeline 中完成。

  • 高度模块化与灵活性:其插件生态系统非常丰富,你可以像搭积木一样组合不同的元素来应对不同的需求(例如,更换不同的视频编码器以适应不同的硬件平台)。

  • 卓越的性能:支持零拷贝(Zero-copy)机制,减少内存拷贝开销。更重要的是,它能够非常方便地利用各种硬件加速资源(如 NVIDIA 的 NVENC/NVDEC、Intel 的 QSV、Jetson 平台的硬件编解码器等),极大地降低 CPU 占用,提升并行处理能力。

  • 成熟的 RTSP 服务器支持:GStreamer 的 gst-rtsp-server 库是一个高性能的库,可以轻松地基于 GStreamer pipeline 构建一个完整的 RTSP 服务器,将内存中的视频帧(GstBuffer)直接封装成 RTP 包并发布出去,无需落盘再推流,效率极高。

写到这里,大家伙可能会有疑惑,为什么不采用基于ffmpeg的推流方式,而采用gstreamer实现该功能。这里给大家做一个简单的类比:ffmpeg类似于的pytorch 1.0,而gstreamer类似于的tensorflow 1.0。ffmpeg有很大的灵活性,我们可以通过其提供的接口方法,在每一步自定义插入想要的内容,实现更多的功能需要。而gstreamer通过搭载好管道(pipline),实现高效的运行,我们无需要在推流阶段做很多自定义的任务,所以选择"类似于静态图"搭载的gstreamer便是一个很好的选择。

三、环境的安装与搭载

3.1 实验环境

  • windows 11
  • 其它硬件暂无明确需要

这次着重强调windows系统是因为gstreamer的组件在windows上安装会比较麻烦,我们针对这点开始,进行详细的介绍,所有的代码和流程都已通过测试。

3.2 gstreamer组件的安装(很耗时)

由于windows安装python-gstreamer的绑定需要先安装gstreamer的组件,但是windows系统安装gstreamer的组件会比较麻烦,所以我们采用mysy2进行python环境和gstreamer环境的搭建。(我们在使用python-GTK的时候也是类似的安装流程)

  • 参考的流程示意图如下
Windows系统
安装MSYS2
更新MSYS2: pacman -Syu
安装编译工具: base-devel
安装Python: python python-pip
安装GStreamer核心及插件
安装Python绑定: pip install pygobject
验证安装
环境搭建成功
安装失败
检查错误并重新安装GStreamer

mysy2的安装

  1. 点击mysy2的下载链接,依据系统版本选择对应的安装包下载即可:
    在这里插入图片描述
  2. 选择合适的目录进行安装
    在这里插入图片描述
  3. 安装好后在对应目录查看执行程序
    在这里插入图片描述

安装gstreamer必要组件

  1. 在mysy2终端输入以下指令安装gstreamer组件
    pacman -S mingw-w64-x86_64-gst-plugins-ugly mingw-w64-x86_64-gst-plugins-good mingw-w64-x86_64-gst-plugins-base mingw-w64-x86_64-gst-rtsp-server mingw-w64-x86_64-gst-plugins-bad mingw-w64-x86_64-gst-plugins-rs mingw-w64-x86_64-gst-devtools mingw-w64-x86_64-gst-python mingw-w64-x86_64-gstreamer mingw-w64-x86_64-gst-libav
    
  2. 执行过程如下:
    在这里插入图片描述
  3. 这里说个小技巧,由于下载速度问题,大家可以在下载速度变慢的时候手动ctrl+c终止,然后再执行指令

安装python及必要组件

  1. python安装

    pacman -S mingw-w64-x86_64-python mingw-w64-x86_64-python-pip
    
  2. 虽然可以安装pip但是终端不推荐使用pip,如下所示
    在这里插入图片描述

  3. mysy2安装剩下的组件(numpy pillow pyobject等)

    pacman -S mingw-w64-x86_64-python-gobject
    pacman -S mingw-w64-x86_64-python-matplotlib
    pacman -S mingw-w64-x86_64-python-imageio
    pacman -S mingw-w64-x86_64-python-pillow
    

pycharm等编译器选择mysy的python环境

  1. 打开pycharm编辑器,选择添加环境
    在这里插入图片描述
  2. 选择mysy的python环境,参考上图所示(在mysy对应的安装目录寻找)。
  3. 点击确定即可。

四、编码推流与流媒体

我们已经在第一章节详细介绍了rtsp的基本信息,不明白的小伙伴可以回头去查看,这里主要简述RTSP编码推流的过程和实现

4.1 编码推流基本流程

编码推流是将原始音视频数据转化为可网络传输的流媒体数据,并通过流媒体服务器分发给观众的全过程。其核心目标是在保证画质和实时性的前提下,实现高效传输与大规模分发。以下是详细流程:

  1. 原始数据采集
    • 操作:通过摄像头、麦克风等设备捕获原始音视频数据(如YUV格式视频帧、PCM音频数据)。
    • 关键点:确保数据源稳定,避免采集卡顿或丢帧。
  2. 预处理(可选但推荐)
    • 操作
      • 视频:分辨率调整(如1080p→720p)、帧率控制(如60fps→30fps)、色彩空间转换(YUV→RGB)、滤镜/美颜处理。
      • 音频:降噪、混音、音量均衡。
    • 目的:降低编码复杂度,优化后续压缩效率。
  3. 编码(核心压缩步骤)
  • 操作
    • 视频编码:使用H.264/AVC、H.265/HEVC或AV1等编码器,将视频帧压缩为编码数据(如NALU单元)。
    • 音频编码:使用AAC、Opus等编码器压缩音频数据。
      关键参数
    • 码率(Bitrate):控制数据量(如2000 Kbps)。
    • GOP(Group of Pictures):设置关键帧间隔(如2秒一个I帧)。
    • 编码Profile:平衡兼容性与压缩率(如H.264 Baseline/High)。
  • 目的:大幅减少数据体积(压缩率可达50:1),适应有限带宽。
  1. 封装与切片
  • 操作
    • 封装:将编码后的音视频数据打包成容器格式(如H264、FLV、TS、MP4)。
    • 切片(分片传输):
      • 将连续流切割为小片段(如HLS的.ts文件,DASH的.m4s文件)。
      • 生成索引文件(如HLS的.m3u8播放列表)。
    • 协议适配
      • RTSP、RTMP、HLS/DASH等
  • 目的:适配不同传输协议,实现断点续传和自适应播放。
  1. 推流至流媒体服务器
  • 操作
    • 通过协议(RTSP、SRT、WebRTC)将封装/切片数据推送到流媒体服务器(如Nginx-RTMP、Wowza、AWS MediaLive)。
  • 关键步骤
    • 建立连接(如RTSP握手)。
    • 持续发送数据包(含时间戳、序列号)。
    • 处理网络抖动(通过缓冲区或FEC前向纠错)。
  • 目的:将数据上传至分发中心,为大规模观众服务做准备。

以下内容可由流媒体实现!

  1. 流媒体服务器处理
  • 操作

    • 转码:生成多码率版本(如1080p/720p/480p),支持ABR。
    • 转封装:将输入协议(如RTMP)转换为输出协议(如HLS)。
    • 分发:通过CDN边缘节点将流推送到全球观众。
  • 目的:实现协议转换、负载均衡、全球加速。

  1. 观众拉流播放
  • 操作
    • 观众通过播放器(如VLC、HLS.js、DASH.js)向流媒体服务器请求流数据。
    • 播放器根据网络状况自动选择合适码率(ABR)。
    • 解封装→解码→渲染播放。
  • 目的:观察实时推送的画面

4.2 流媒体-mediamtx的安装与运行(原rtsp-simple-server)

由于流媒体技术是一项复杂的技术,我们可以采用开源的方案如mediamtx或者liveNVR(第一章有提及)等实现我们的流转发功能。我们只需要向指定的端口写入我们的数据包即可。

  1. 我们打开mediamtx的下载页面,根据自己系统版本选择即可、
    在这里插入图片描述

  2. 解压下载的文件包,双击运行即可
    在这里插入图片描述

五、基于gstreamer的rtsp编码推流

写在最前面,我们没有在mysy2环境安装opencv,所以提前准备一些图片,我们把图片推送为rtsp视频流进行最终展示

5.1 案例代码

这里直接给出完整可行的代码(只要环境没问题):

import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import time
from PIL import Image

Gst.init(None)
print("GStreamer Python 绑定安装成功!")
import numpy as np
import os
import threading


class RTSPPushPipeline:
    def __init__(self, cap_id, name, cap_ip, fps=15, input_size=(1920, 1080), output_size=(1280, 720),
                 mediamtx_host="localhost", mediamtx_port=8554):
        """
        创建推送到mediamtx的GStreamer管道
        """
        self.cap_id = cap_id
        self.name = name
        self.cap_ip = cap_ip
        self.fps = fps
        self.input_size = input_size  # 输入图像尺寸
        self.output_size = output_size  # 输出图像尺寸
        self.mediamtx_host = mediamtx_host
        self.mediamtx_port = mediamtx_port
        self.number_frames = 0
        self.running = True

        # 添加线程安全的图像共享机制
        self.lock = threading.Lock()
        self.push_img_share = None

        self.last_push_time = time.time()

        # 缩放缓冲区
        self.scaled_buffer = None
        self.scaled_buffer_size = 0
        print(f"[Thread push stream] 推流内存初始化成功: cam{self.cap_id}")

        # 创建错误图像 (使用输出尺寸)
        self.error_img = np.zeros((output_size[1], output_size[0], 3), dtype=np.uint8)

        # 创建管道
        self.pipeline = Gst.Pipeline.new(f"pipeline-{cap_id}")

        # 创建元素
        self.appsrc = Gst.ElementFactory.make("appsrc", "source")
        self.videoconvert1 = Gst.ElementFactory.make("videoconvert", "convert")  # BGR转通用格式
        self.videoscale = Gst.ElementFactory.make("videoscale", "scale")  # 添加缩放元素
        self.capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")  # 强制转换为I420
        self.x264enc = Gst.ElementFactory.make("x264enc", "encoder")
        self.h264parse = Gst.ElementFactory.make("h264parse", "h264parse")
        self.rtspsink = Gst.ElementFactory.make("rtspclientsink", "rtspsink")

        # 检查元素是否创建成功
        if not all([self.appsrc, self.videoconvert1, self.videoscale, self.capsfilter,
                    self.x264enc, self.h264parse, self.rtspsink]):
            missing = []
            if not self.appsrc: missing.append("appsrc")
            if not self.videoconvert1: missing.append("videoconvert1")
            if not self.videoscale: missing.append("videoscale")
            if not self.capsfilter: missing.append("capsfilter")
            if not self.x264enc: missing.append("x264enc")
            if not self.h264parse: missing.append("h264parse")
            if not self.rtspsink: missing.append("rtspsink")
            raise RuntimeError(f"无法创建GStreamer元素: {', '.join(missing)}")

        # 配置appsrc
        self.appsrc.set_property("is-live", True)
        self.appsrc.set_property("format", Gst.Format.TIME)
        self.appsrc.set_property("do-timestamp", True)
        self.appsrc.set_property("block", True)
        self.appsrc.set_property("stream-type", 0)  # GST_APP_STREAM_TYPE_STREAM

        # 设置appsrc的caps为BGR格式 (使用输出尺寸)
        caps = Gst.Caps.from_string(
            f"video/x-raw,format=BGR,width={output_size[0]},height={output_size[1]},framerate={fps}/1"
        )
        self.appsrc.set_property("caps", caps)

        # 配置capsfilter强制转换为I420
        caps_i420 = Gst.Caps.from_string(
            f"video/x-raw,format=I420,width={output_size[0]},height={output_size[1]},framerate={fps}/1"
        )
        self.capsfilter.set_property("caps", caps_i420)

        # 配置编码器
        self.x264enc.set_property("speed-preset", "ultrafast")
        self.x264enc.set_property("tune", "zerolatency")
        self.x264enc.set_property("bitrate", 1024)  # 降低到1Mbps以适应720p

        # 配置h264parse
        self.h264parse.set_property("config-interval", 1)

        # 配置RTSP接收器
        location = f"rtsp://{self.mediamtx_host}:{self.mediamtx_port}/{self.name}"
        self.rtspsink.set_property("location", location)
        self.rtspsink.set_property("latency", 500)

        # 添加元素到管道
        for element in [self.appsrc, self.videoconvert1, self.videoscale, self.capsfilter,
                        self.x264enc, self.h264parse, self.rtspsink]:
            self.pipeline.add(element)

        # 链接元素:
        # appsrc -> videoconvert1 -> videoscale -> capsfilter -> x264enc -> h264parse -> rtspsink
        if not self.appsrc.link(self.videoconvert1):
            raise RuntimeError("无法链接appsrc到videoconvert1")
        if not self.videoconvert1.link(self.videoscale):
            raise RuntimeError("无法链接videoconvert1到videoscale")
        if not self.videoscale.link(self.capsfilter):
            raise RuntimeError("无法链接videoscale到capsfilter")
        if not self.capsfilter.link(self.x264enc):
            raise RuntimeError("无法链接capsfilter到x264enc")
        if not self.x264enc.link(self.h264parse):
            raise RuntimeError("无法链接x264enc到h264parse")
        if not self.h264parse.link(self.rtspsink):
            raise RuntimeError("无法链接h264parse到rtspsink")

        # 连接appsrc信号
        self.appsrc.connect("need-data", self.on_need_data)

        # 启动管道
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            raise RuntimeError("无法启动管道")
        elif ret == Gst.StateChangeReturn.ASYNC:
            # 等待状态变化完成
            ret = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
            if ret[1] != Gst.State.PLAYING:
                raise RuntimeError(f"无法切换到PLAYING状态: {ret[1].value_name}")

        print(f"启动推流到: {location}")
        self.log_state()
        # 启动状态监控线程
        self.monitor_thread = threading.Thread(target=self.monitor_pipeline, daemon=True)
        self.monitor_thread.start()

    def log_state(self):
        """记录管道元素状态"""
        print("管道元素状态:")
        # 获取整个管道的状态
        pipeline_state = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
        print(f"管道整体状态: {pipeline_state[1].value_name}")

        # 遍历所有元素并记录状态
        elements = [self.appsrc, self.videoconvert1, self.videoscale, self.x264enc, self.h264parse, self.rtspsink]
        for element in elements:
            state = element.get_state(Gst.CLOCK_TIME_NONE)
            print(f"  {element.name}: {state[1].value_name}")

    def on_need_data(self, src, length):
        """当appsrc需要数据时调用"""
        if not self.running:
            return Gst.FlowReturn.EOS

        try:
            # 从共享内存获取图像
            with self.lock:
                if self.push_img_share is None:
                    # 如果没有图像可用,使用错误图像
                    img_draw = self.error_img.copy()
                else:
                    img_draw = self.push_img_share.copy()

            # 转换为字节数据
            img_bytes = img_draw.tobytes()

            # 创建或调整缓冲区大小
            if self.scaled_buffer is None or len(img_bytes) != self.scaled_buffer_size:
                self.scaled_buffer = Gst.Buffer.new_allocate(None, len(img_bytes), None)
                self.scaled_buffer_size = len(img_bytes)

            # 填充缓冲区
            self.scaled_buffer.fill(0, img_bytes)

            # 设置时间戳
            timestamp = Gst.util_uint64_scale(self.number_frames, Gst.SECOND, self.fps)
            self.scaled_buffer.pts = self.scaled_buffer.dts = timestamp
            self.scaled_buffer.duration = Gst.util_uint64_scale(1, Gst.SECOND, self.fps)
            self.number_frames += 1

            # 更新最后推送时间
            self.last_push_time = time.time()

            # 推送缓冲区
            return src.emit("push-buffer", self.scaled_buffer)
        except Exception as e:
            print(f"处理数据时出错: {e}")
            # 使用错误图像作为后备
            try:
                if self.scaled_buffer is None or self.error_img.nbytes != self.scaled_buffer_size:
                    self.scaled_buffer = Gst.Buffer.new_allocate(None, self.error_img.nbytes, None)
                    self.scaled_buffer_size = self.error_img.nbytes

                self.scaled_buffer.fill(0, self.error_img.tobytes())
                timestamp = Gst.util_uint64_scale(self.number_frames, Gst.SECOND, self.fps)
                self.scaled_buffer.pts = self.scaled_buffer.dts = timestamp
                self.scaled_buffer.duration = Gst.util_uint64_scale(1, Gst.SECOND, self.fps)
                self.number_frames += 1

                return src.emit("push-buffer", self.scaled_buffer)
            except Exception as inner_e:
                print(f"推送错误图像时出错: {inner_e}")
                return Gst.FlowReturn.ERROR

    def set_image(self, image):
        """设置要推送的图像(线程安全)"""
        with self.lock:
            self.push_img_share = image

    def monitor_pipeline(self):
        """监控管道状态"""
        bus = self.pipeline.get_bus()
        while self.running:
            try:
                # 等待消息,超时1秒
                msg = bus.timed_pop_filtered(1000 * 1000 * 1000,  # 1秒,单位纳秒
                                             Gst.MessageType.ERROR |
                                             Gst.MessageType.EOS |
                                             Gst.MessageType.STATE_CHANGED)

                if msg is None:
                    # 超时,检查是否仍在运行
                    continue

                if msg.type == Gst.MessageType.ERROR:
                    err, debug = msg.parse_error()
                    print(f"管道 {self.cap_id} 错误: {err.message}")
                    print(f"调试信息: {debug}")
                    self.stop()

                elif msg.type == Gst.MessageType.EOS:
                    print(f"管道 {self.cap_id} 达到流结束状态")
                    self.stop()

                elif msg.type == Gst.MessageType.STATE_CHANGED:
                    if msg.src == self.pipeline:
                        old_state, new_state, pending = msg.parse_state_changed()
                        print(f"管道 {self.cap_id} 状态变化: {old_state.value_name} -> {new_state.value_name}")

            except Exception as e:
                print(f"监控管道时出错: {e}")
                time.sleep(1)

        print(f"管道 {self.cap_id} 监控线程退出")

    def stop(self):
        """停止推流管道"""
        if not self.running:
            return

        self.running = False

        try:
            print(f"正在停止管道 {self.cap_id}...")

            # 发送EOS信号
            if self.appsrc:
                self.appsrc.emit("end-of-stream")

            # 设置管道状态为NULL
            if self.pipeline:
                self.pipeline.set_state(Gst.State.NULL)

                # 等待状态变化完成
                ret = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
                while ret[1] != Gst.State.NULL:
                    print(f"等待管道停止...当前状态: {ret[1].value_name}")
                    time.sleep(0.1)
                    ret = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)

            print(f"管道 {self.cap_id} 已完全停止")

        except Exception as e:
            print(f"停止管道时出错: {e}")
        finally:
            # 清理资源
            self.pipeline = None
            self.appsrc = None
            self.scaled_buffer = None


# 示例使用代码
def inference_example():
    """推理和推流示例"""
    # 创建推流管道
    pipeline = RTSPPushPipeline(
        cap_id=0,
        name="camera1",
        cap_ip="test",  # 摄像头IP地址
        fps=15,
        input_size=(1920, 1080),
        output_size=(1280, 720),
        mediamtx_host="127.0.0.1",
        mediamtx_port=8554
    )

    img_path = r"path/to/image_folder"
    img_dir = os.listdir(img_path)
    try:
        new_size = (1280, 720)  # 直接指定宽高
        # 处理每一帧
        while pipeline.running:
            for img in img_dir:
                cur_path = os.path.join(img_path, img)
                image = Image.open(cur_path)  # 替换为你的图片路径

                # 获取原始尺寸
                width, height = image.size
                print(f"原始尺寸: {width}x{height}")
                # 调整帧大小以适应推流尺寸

                frame = image.resize(new_size)

                # 设置图像到推流管道
                pipeline.set_image(frame)

                # 控制帧率
                time.sleep(5)  # 15 FPS

    except KeyboardInterrupt:
        print("用户中断")
    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        pipeline.stop()
        print("推流已停止")


if __name__ == "__main__":
    # 运行示例
    inference_example()

5.2 代码详解

我们实现了一个基于GStreamer的RTSP视频推流管道,可以将处理后的视频帧推送到RTSP服务器(mediamtx)。代码主要包含一个RTSPPushPipeline类,该类封装了GStreamer管道的创建、配置、运行和监控功能。

执行逻辑概述

  1. 初始化阶段

    • 创建GStreamer管道,包含appsrc(应用数据源)、videoconvert(格式转换)、videoscale(缩放)、capsfilter(格式过滤)、x264enc(H.264编码)、h264parse(H.264解析)和rtspclientsink(RTSP客户端输出)
  • 配置各个元素的参数和属性

  • 链接所有元素形成完整的处理管道

  • 启动管道并开始监控状态

  1. 数据处理阶段
  • 通过set_image()方法接收要推送的图像帧

  • 当appsrc需要数据时,调用on_need_data()回调函数

  • 将图像数据转换为GStreamer缓冲区并推送到管道中

  • 编码后的视频流通过RTSP协议推送到指定服务器

  1. 监控与清理阶段
  • 使用独立线程监控管道状态,处理错误和异常

  • 提供stop()方法用于优雅停止管道并释放资源


  • 类结构图
RTSPPushPipeline
-cap_id: int
-name: str
-cap_ip: str
-fps: int
-input_size: tuple
-output_size: tuple
-mediamtx_host: str
-mediamtx_port: int
-number_frames: int
-running: bool
-lock: threading.Lock
-push_img_share: ndarray
-last_push_time: float
-scaled_buffer: Gst.Buffer
-scaled_buffer_size: int
-error_img: ndarray
-pipeline: Gst.Pipeline
-appsrc: Gst.Element
-videoconvert1: Gst.Element
-videoscale: Gst.Element
-capsfilter: Gst.Element
-x264enc: Gst.Element
-h264parse: Gst.Element
-rtspsink: Gst.Element
-monitor_thread: threading.Thread
+__init__(cap_id, name, cap_ip, fps, input_size, output_size, mediamtx_host, mediamtx_port)
+log_state()
+on_need_data(src, length)
+set_image(image)
+monitor_pipeline()
+stop()

  • 管道元素连接图
appsrc
数据源
videoconvert
格式转换
videoscale
缩放
capsfilter
格式过滤
x264enc
H.264编码
h264parse
H.264解析
rtspclientsink
RTSP输出

  • 执行流程图
Main Thread RTSPPushPipeline appsrc GStreamer Pipeline RTSP Server __init__() 创建并配置管道 启动管道 连接RTSP服务器 连接确认 管道运行中 set_image(image) 存储图像到共享变量 loop [图像处理循环] need-data信号 从共享变量获取图像 转换为GStreamer缓冲区 push-buffer 推送数据到管道 处理数据(转换→缩放→编码) 发送编码后的RTSP流 loop [数据推送循环] stop() 发送EOS信号 设置状态为NULL 断开连接 管道已停止 Main Thread RTSPPushPipeline appsrc GStreamer Pipeline RTSP Server

  • 监控状态执行图
ERROR
EOS
STATE_CHANGED
启动监控线程
等待GStreamer消息
收到消息?
消息类型
记录错误并停止管道
记录EOS并停止管道
记录状态变化
结束监控

5.3 执行结果展示

  • 我们默认的推流地址是:rtsp://localhost:8554/camera1

  • 运行代码后会在终端出现以下信息:
    在这里插入图片描述

  • 我们采用vlc打开rtsp地址,可以 观察到对应的图片已经被推送为视频流
    在这里插入图片描述

  • 终端也会显示出我们对应的查看记录
    在这里插入图片描述

  • 至此,我们的推流管道基本完成

总结

gstreamer的代码总的来说比较抽象,且理解起来不容易,不过基于gstreamer的AI系统确实各大公司的主流(如NVIDIA的deepstream、intel 的DL streamer等)。我建议大家可以抽时间学习一下提供的示例代码,因为这将是我们AI视频监控系统的重要组成部分,接下来,我们将基于第一章和第二章的内容,将我们的AI视频监控系统融入信息的推送和共享模块,使之成为真正信息化、智能化时代的AI视频监控系统。
需要注意的是,windows上采用python进行gstreamer的开发存在诸多阻力,所以后续我们将逐步将运行环境和代码转到linux系统上。这儿就不在描述系统之间的差异,我们将在后面的内容进行对比

下期预告

  1. 代码整合和推流模块的添加
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐