实验程序:

talker.cc 

#include <std_msgs/Int32.h>
#include <ros/ros.h>

int main(int argc, char **argv) {
    ros::init(argc, argv, "talker");
    ros::NodeHandle nh;
    std_msgs::Int32 data;
    data.data = 0;
 
    ros::Publisher pub = nh.advertise<std_msgs::Int32>("alan_topic", 1);
    ros::Rate loop_rate(1.0); // 1hz
    while (ros::ok()) {
        data.data++;
        pub.publish(data);
        std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] publish " << data.data << std::endl;
        loop_rate.sleep();
    }
    return 0;
}

listener.cc

#include <ros/ros.h>
#include <std_msgs/Int32.h>

void alan_callback(const std_msgs::Int32& data){
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
    return;
}

int main(int argc, char ** argv) {
    ros::init(argc, argv, "listener");
    ros::NodeHandle nh;
    ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
    ros::spin();
    return 0;
}

实时 


listener.cc

#include <unistd.h>
#include <ros/ros.h>
#include <std_msgs/Int32.h>

void alan_callback(const std_msgs::Int32& data){
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
    sleep(5); // 5秒
    return;
}

int main(int argc, char ** argv) {
    ros::init(argc, argv, "listener");
    ros::NodeHandle nh;
    ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
    ros::spin();
    return 0;
}

回调函数处理时间长导致数据丢失(队列长度为1,处理回调函数时间过长时时,队列里旧数据不断被新数据顶掉,处理完第2帧时队列里数据为7,丢失数据的帧数跟回调处理时间有关,是动态变化的)


listener.cc

#include <unistd.h>
#include <ros/ros.h>
#include <std_msgs/Int32.h>

void alan_callback(const std_msgs::Int32& data){
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
    sleep(5); // 5秒
    return;
}

int main(int argc, char ** argv) {
    ros::init(argc, argv, "listener");
    ros::NodeHandle nh;
    ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
    ros::spin();
    return 0;
}

只扩大回调队列长度;处理有延时,且有数据丢失

处理第3帧是因为一开始队列为空,第2帧处理完时队列刚满(3,4,5,6,7),队首为3,还没被顶掉

处理第14,15,16,17帧是因为talker在publish第17帧后终止,此时队列为(13,14,15,16,17),listener会依次处理直到队列为空


listener.cc

#include <unistd.h>
#include <ros/ros.h>
#include <std_msgs/Int32.h>

void alan_callback(const std_msgs::Int32& data){
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
    if (data.data % 5 == 0) {
        std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " long time" << std::endl;
        sleep(5); // 5秒
    }
    return;
}

int main(int argc, char ** argv) {
    ros::init(argc, argv, "listener");
    ros::NodeHandle nh;
    ros::Subscriber sub = nh.subscribe("alan_topic", 5, alan_callback);
    ros::spin();
    return 0;
}

扩大回调队列长度(队列长度至少设为回调函数最长处理时间 / 订阅的topic的频率),并且做跳帧处理;无数据丢失,且在跳帧处实时


另起一个后端处理线程

listener.cc

#include <unistd.h>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <ros/ros.h>
#include <std_msgs/Int32.h>

std::queue<int> q;
std::mutex m;
std::condition_variable cv;
bool data_ready = false;

void Process() {
    while(ros::ok()) {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return data_ready;});

        std::cout << "queue size: " << q.size() << std::endl;
        if (q.empty()) {
            continue;
        }

        int data = q.front();
        data_ready = false;
        q.pop();
        lk.unlock();
        std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
        sleep(5); // 5秒
    }
}

void alan_callback(const std_msgs::Int32& data){
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
    {
        std::lock_guard<std::mutex> lk(m);
        q.push(data.data);
        data_ready = true;
    }
    cv.notify_one();
    return;
}

int main(int argc, char ** argv) {
    std::thread t(Process);
    ros::init(argc, argv, "listener");
    ros::NodeHandle nh;
    ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
    ros::spin();
    return 0;
}

前端callback实时预处理,后端处理线程延时处理,问题是队列q会不断加长,后端延迟越来越大


listener.cc

#include <unistd.h>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <ros/ros.h>
#include <std_msgs/Int32.h>

std::queue<int> q;
std::mutex m;
std::condition_variable cv;
bool data_ready = false;

void Process() {
    int cnt = 0;
    std::queue<int> q2;
    while(ros::ok()) {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return data_ready;});

        std::cout << "queue size: " << q.size() << std::endl;
        q2 = q;
        std::queue<int> empty_q;
        std::swap(empty_q, q); // 清空队列
        data_ready = false;
        lk.unlock();

        while (!q2.empty()) {
            int data = q2.front();
            cnt++;
            q2.pop();
            if (cnt % 5 == 0) {
                std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data << " long time" << std::endl;
                sleep(5); // 5秒
            }
        }        
    }
}

void alan_callback(const std_msgs::Int32& data){
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec() << "] process " << data.data << " short time" << std::endl;
    {
        std::lock_guard<std::mutex> lk(m);
        q.push(data.data);
        data_ready = true;
    }
    cv.notify_one();
    return;
}

int main(int argc, char ** argv) {
    std::thread t(Process);
    ros::init(argc, argv, "listener");
    ros::NodeHandle nh;
    ros::Subscriber sub = nh.subscribe("alan_topic", 1, alan_callback);
    ros::spin();
    return 0;
}

前端callback实时预处理,后端处理线程跳帧处理,队列q保持一定长度,后端在跳帧处实时处理


CMakeLists.txt

cmake_minimum_required(VERSION 2.8.3)
project(ros_test)

set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread")

find_package(catkin REQUIRED COMPONENTS
  roscpp
  std_msgs
)

include_directories(
  ${catkin_INCLUDE_DIRS}
)

add_executable(talker src/talker.cc)
target_link_libraries(talker ${catkin_LIBRARIES})
add_executable(listener src/listener.cc)
target_link_libraries(listener ${catkin_LIBRARIES})

note:

回调函数的处理时间不应太长,可只做一些简单的数据处理,然后把数据存到待处理队列,另外起一个后端线程独立处理队列中的数据;callback函数只负责push数据到队列,后端线程pop出数据并进行完整处理(在多线程中访问同一队列记得加锁mutex, condition_variable)


spinOnce原理

调用spinOnce时,首先检查当前队列(如queue1长度为4,里面有2个元素x1,x2,queue2长度为2,里面有1个元素y1),那么将会从queue1队首弹出2个元素,并执行对应的callback函数;从queue2队首弹出1个元素,并执行对应的callback函数;至于顺序,可能queue1,queue2,queue1,

或queue2,queue1,queue1等不固定。如果执行某个回调函数时间过长,队列里的元素可能被顶掉,如第一次弹出queue2的队首y1并处理y1的回调,如果y1回调处理时间较长,在处理这个回调期间,queue1来了3个数据x3,x4,x5,那么此时queue1的元素为x2,x3,x4,x5,x1被顶掉,y1的回调处理完后,弹出queue1的队首元素x2,并处理x2的回调,如果x2回调函数执行时间短,下一次弹出queue1的队首x3;如果x2回调函数执行时间长,下一次queue1弹出的队首就不一定是x3,可能是x4,x5,x6等,依次类推。

总之,调用spinOnce时,每个队列有几个元素就执行几次回调,每次都取队首的元素处理。先处理哪个队列顺序不一定。在这期间,队列中的元素还有可能会被刷新。

测试代码:

talker.cc

#include <ros/ros.h>
#include <std_msgs/Int32.h>

int main(int argc, char **argv) {
  ros::init(argc, argv, "talker");
  ros::NodeHandle nh;
  std_msgs::Int32 data;
  data.data = 0;

  ros::Publisher odd_pub = nh.advertise<std_msgs::Int32>("odd_topic", 1);
  ros::Publisher even_pub = nh.advertise<std_msgs::Int32>("even_topic", 1);
  ros::Rate loop_rate(1.0);  // 1hz
  while (ros::ok()) {
    data.data++;
    if (data.data % 2 == 0) {
      even_pub.publish(data);
    } else {
      odd_pub.publish(data);
    }
    std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
              << "] publish " << data.data << std::endl;
    loop_rate.sleep();
  }
  return 0;
}

listener.cc

#include <ros/ros.h>
#include <std_msgs/Int32.h>

void odd_callback(const std_msgs::Int32& data) {
  std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
            << "] process " << data.data << std::endl;
  return;
}

void even_callback(const std_msgs::Int32& data) {
  std::cout << std::setprecision(12) << "[" << ros::Time::now().toSec()
            << "] process " << data.data << std::endl;
  ros::Duration(4).sleep();
  return;
}

int main(int argc, char** argv) {
  ros::init(argc, argv, "listener");
  ros::NodeHandle nh;
  ros::Subscriber odd_sub =
      nh.subscribe("odd_topic", 4, odd_callback);  // 队列长度为4
  ros::Subscriber even_sub =
      nh.subscribe("even_topic", 2, even_callback);  // 队列长度为2
  ros::Duration(10).sleep(); // 保证队列全满,6个元素
  ros::spinOnce();
  std::cout << "--------------" << std::endl;
  ros::spinOnce();
  std::cout << "--------------" << std::endl;
  ros::spinOnce();

  return 0;
}

输出:

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐