【Linux 物联网网关主控系统-Linux主控部分(六)】
pthread_main.c 作为“指令分发中枢”,核心流程是初始化消息队列→阻塞接收外部指令→按指令类型分发(唤醒对应业务线程),依赖消息队列实现跨进程/线程的指令接收,通过互斥锁+条件变量保证指令分发的线程安全。pthread_camera.c 作为摄像头控制专属线程,核心流程是初始化摄像头设备→死循环等待拍照指令→接收到指令后执行指定次数的拍照操作,依赖互斥锁和条件变量实现与线程的指令同步,
Linux 物联网网关主控系统-Linux主控部分(六)
一、核心头文件 + 数据结构操作文件(链表 / 缓存)
| 类型 | 文件名 | 核心作用 |
|---|---|---|
| 核心头文件 | data_global.h | 全局宏定义(设备路径、消息类型、长度等)、核心结构体(环境数据/消息结构)、所有线程函数声明 |
| 核心头文件 | link_list.h | 环境数据链表的结构体(link_datatype/linknode)及链表操作函数声明 |
| 核心头文件 | uart_cache.h | ZigBee 命令缓存链表的结构体(uart_cache_node)及缓存操作函数声明 |
| 核心头文件 | sem.h | 信号量操作(初始化、P/V 操作)的函数声明和联合体定义 |
| 数据结构实现文件 | link_list.c | 实现环境数据链表的基础操作(创建空链表、判空、取节点、插入节点) |
| 数据结构实现文件 | uart_cache.c | 实现 ZigBee 命令缓存链表的基础操作(创建空缓存、取缓存节点、插入缓存节点) |
| 全局数据实现文件 | data_global.c | 全局变量定义(条件变量、互斥锁、设备文件描述符、消息队列 ID 等)、消息发送封装函数 |
二、核心功能实现文件(按线程 / 功能模块)
| 模块分类 | 文件名 | 对应线程 / 功能 | 核心功能 |
|---|---|---|---|
| 串口 / ZigBee 通信 | pthread_zigbee_rcv.c | ZigBee 接收线程 | 初始化串口、解析 ZigBee 上报的环境数据、将数据插入环境链表、触发刷新信号 |
| 串口 / ZigBee 通信 | pthread_uart_send.c | ZigBee 发送线程 | 从命令缓存链表取指令、通过串口发送给 ZigBee、释放缓存节点 |
| 消息 / 线程调度 | pthread_main.c | 主线程 | 创建消息队列、接收外部消息(摄像头 / ZigBee 指令)、分发到对应模块 / 线程 |
| 外设控制 | pthread_camera.c | 摄像头线程 | 打开摄像头设备、等待拍照信号、执行拍照指令(通过系统命令触发) |
| 共享内存同步 | pthread_refresh.c | 共享内存刷新线程 | 解析环境链表数据、更新全局状态、将数据写入共享内存(通过信号量互斥) |
| 程序入口 / 资源管理 | main.c | 程序主入口 | 初始化同步原语(互斥锁 / 条件变量)、创建所有业务线程、注册资源释放信号处理函数 |
main.c
#include <stdio.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include "data_global.h"
extern pthread_cond_t cond_zigbee_rcv;
extern pthread_cond_t cond_uart_cmd;
extern pthread_cond_t cond_main_thread;
extern pthread_cond_t cond_camera;
extern pthread_cond_t cond_refresh;
extern pthread_cond_t cond_refresh_updata;
extern pthread_mutex_t mutex_slinklist;
extern pthread_mutex_t mutex_zigbee_rcv;
extern pthread_mutex_t mutex_uart_cmd;
extern pthread_mutex_t mutex_main_thread;
extern pthread_mutex_t mutex_camera;
extern pthread_mutex_t mutex_refresh;
extern pthread_mutex_t mutex_refresh_updata;
extern pthread_mutex_t mutex_global;
extern pthread_mutex_t mutex_linklist;
extern int dev_camera_fd;
extern int dev_uart_fd;
extern int msgid;
extern int shmid;
extern int semid;
extern struct env_info_array all_info_RT;
pthread_t id_zigbee_rcv,
id_uart_cmd,
id_main_thread,
id_camera,
id_refresh;
void ReleaseResource (int signo)
{
pthread_mutex_destroy (&mutex_linklist);
pthread_mutex_destroy (&mutex_global);
pthread_mutex_destroy (&mutex_refresh_updata);
pthread_mutex_destroy (&mutex_refresh);
pthread_mutex_destroy (&mutex_camera);
pthread_mutex_destroy (&mutex_main_thread);
pthread_mutex_destroy (&mutex_uart_cmd);
pthread_mutex_destroy (&mutex_zigbee_rcv);
pthread_mutex_destroy (&mutex_slinklist);
pthread_cond_destroy (&cond_refresh_updata);
pthread_cond_destroy (&cond_refresh);
pthread_cond_destroy (&cond_camera);
pthread_cond_destroy (&cond_main_thread);
pthread_cond_destroy (&cond_uart_cmd);
msgctl (msgid, IPC_RMID, NULL);
shmctl (shmid, IPC_RMID, NULL);
semctl (semid, 1, IPC_RMID, NULL);
pthread_cancel (id_refresh);
pthread_cancel (id_camera);
pthread_cancel (id_main_thread);
pthread_cancel (id_uart_cmd);
pthread_cancel (id_zigbee_rcv);
close (dev_camera_fd);
close (dev_uart_fd);
printf ("All quit\n");
exit(0);
}
int main(int argc, char **argv)
{
pthread_mutex_init (&mutex_slinklist, NULL);
pthread_mutex_init (&mutex_uart_cmd, NULL);
pthread_mutex_init (&mutex_main_thread, NULL);
pthread_mutex_init (&mutex_camera, NULL);
pthread_mutex_init (&mutex_refresh, NULL);
pthread_mutex_init (&mutex_refresh_updata, NULL);
pthread_mutex_init (&mutex_global, NULL);
pthread_mutex_init (&mutex_linklist, NULL);
pthread_cond_init (&cond_zigbee_rcv, NULL);
pthread_cond_init (&cond_uart_cmd, NULL);
pthread_cond_init (&cond_main_thread, NULL);
pthread_cond_init (&cond_camera, NULL);
pthread_cond_init (&cond_refresh, NULL);
pthread_cond_init (&cond_refresh_updata, NULL);
signal (SIGINT, ReleaseResource);
pthread_create (&id_zigbee_rcv, 0, pthread_zigbee_rcv, NULL);
sleep (1);
pthread_create (&id_uart_cmd, 0, pthread_uart_send, NULL);
pthread_create (&id_main_thread, 0, pthread_main, NULL);
pthread_create (&id_camera, 0, pthread_camera, NULL);
pthread_create (&id_refresh, 0, pthread_refresh, NULL);
pthread_join (id_zigbee_rcv, NULL);
printf ("g1\n");
pthread_join (id_uart_cmd, NULL);
printf ("g2\n");
pthread_join (id_main_thread, NULL);
printf ("g3\n");
pthread_join (id_camera, NULL);
printf ("g4\n");
pthread_join (id_refresh, NULL);
printf ("g5\n");
return 0;
}
阶段 1:初始化线程同步资源(程序启动第一步)
调用pthread_mutex_init依次初始化 9 个互斥锁:mutex_slinklist、mutex_uart_cmd、mutex_main_thread、mutex_camera、mutex_refresh、mutex_refresh_updata、mutex_global、mutex_linklist(为多线程临界资源访问提供互斥保障);
调用pthread_cond_init依次初始化 6 个条件变量:cond_zigbee_rcv、cond_uart_cmd、cond_main_thread、cond_camera、cond_refresh、cond_refresh_updata(实现线程间 “等待 - 唤醒” 通信)。
阶段 2:注册程序退出资源释放逻辑
调用signal(SIGINT, ReleaseResource)注册 SIGINT 信号(Ctrl+C)的处理函数ReleaseResource,确保程序被手动终止时能释放所有资源,避免泄漏。
阶段 3:创建核心业务线程(功能模块启动)
调用pthread_create创建 ZigBee 数据接收线程,关联线程函数pthread_zigbee_rcv,线程 ID 为id_zigbee_rcv;
调用sleep(1)等待串口初始化完成;
依次调用pthread_create创建剩余 4 个线程:
- ZigBee 指令发送线程:关联
pthread_uart_send,线程 IDid_uart_cmd; - 主线程(消息队列监听 / 指令分发):关联
pthread_main,线程 IDid_main_thread; - 摄像头控制线程:关联
pthread_camera,线程 IDid_camera; - 共享内存刷新线程:关联
pthread_refresh,线程 IDid_refresh。
阶段 4:阻塞等待线程退出(维持程序运行)
依次调用pthread_join,传入各线程 ID(id_zigbee_rcv、id_uart_cmd、id_main_thread、id_camera、id_refresh),阻塞主线程,确保所有业务线程持续运行(正常场景下不会执行到返回,因业务线程均为死循环)。
阶段 5:信号触发的资源释放(程序退出清理)
当接收到 SIGINT 信号时,执行ReleaseResource函数,流程如下:
- 调用
pthread_mutex_destroy销毁所有已初始化的互斥锁; - 调用
pthread_cond_destroy销毁所有已初始化的条件变量; - 调用
msgctl、shmctl、semctl分别删除消息队列、共享内存、信号量; - 调用
pthread_cancel终止所有已创建的业务线程; - 调用
close关闭摄像头、串口设备文件描述符; - 调用
exit(0)退出程序。
pthread_main.c
#include "data_global.h"
#include "uart_cache.h"
extern unsigned char dev_led_mask;
extern unsigned char dev_camera_mask;
extern unsigned char dev_buzzer_mask;
extern unsigned char dev_uart_mask;
extern pthread_cond_t cond_camera;
extern pthread_cond_t cond_refresh;
extern pthread_cond_t cond_uart_cmd;
extern pthread_cond_t cond_sqlite;
extern pthread_mutex_t mutex_global;
extern pthread_mutex_t mutex_uart_cmd;
extern pthread_mutex_t mutex_camera;
extern pthread_mutex_t mutex_slinklist;
extern int msgid;
extern struct env_info_array all_info_RT;
extern uart_cache_list zigbee_cache_head, zigbee_cache_tail;
void *pthread_main (void *arg)
{
key_t key;
ssize_t msgsize;
struct msg msgbuf;
if ((key = ftok ("/app", 'g')) < 0)
{
perror ("ftok msgqueue");
exit (-1);
}
if ((msgid = msgget (key, IPC_CREAT | IPC_EXCL | 0666)) < 0)
{
if(errno == EEXIST)
{
msgid = msgget (key,0666);
return 0;
}
else
{
perror ("msgget msgid");
exit (-1);
}
}
zigbee_cache_head = CreateEmptyCacheList ();
zigbee_cache_tail = zigbee_cache_head;
unsigned char *zigbee_temp;
printf ("pthread_main is ok\n");
while (1)
{
bzero (&msgbuf, sizeof (msgbuf));
printf ("\nwait for the msg\n");
msgsize = msgrcv (msgid, &msgbuf, sizeof (msgbuf) - sizeof (long), 1L, 0);
printf (" Get %ldL msg [%ld]\n", msgbuf.msgtype,msgsize);
printf (" text[0] = %#x\n", msgbuf.text[0]);
switch (msgbuf.msgtype)
{
case MSG_CAMERA:
{//发送给camera控制线程
pthread_mutex_lock (&mutex_camera);
dev_camera_mask = msgbuf.text[0];
pthread_cond_signal (&cond_camera);
pthread_mutex_unlock (&mutex_camera);
break;
}
case MSG_ZIGBEE:
{//zigbee发送命令
//usleep (200000);
pthread_mutex_lock (&mutex_uart_cmd);
zigbee_temp = (unsigned char *)malloc (sizeof (unsigned char));
*zigbee_temp = msgbuf.text[0];
printf(" msgbuf.text = %x\n",msgbuf.text[0]);
InsertCacheNode (&zigbee_cache_tail, zigbee_temp);
//dev_uart_mask = msgbuf.text[0];
pthread_mutex_unlock (&mutex_uart_cmd);
pthread_cond_signal (&cond_uart_cmd);
break;
}
default :
break;
}
}
}
阶段1:消息队列初始化(线程启动核心准备)
- 调用
ftok("/app", 'g')生成消息队列的键值key,用于标识唯一消息队列; - 调用
msgget尝试创建新的消息队列(参数IPC_CREAT | IPC_EXCL | 0666):- 若创建失败且错误码为
EEXIST(队列已存在),则调用msgget以0666权限获取已存在的消息队列ID; - 若为其他错误,打印错误信息并退出线程;
- 若创建失败且错误码为
- 调用
CreateEmptyCacheList创建空的ZigBee指令缓存链表,初始化zigbee_cache_head和zigbee_cache_tail(链表头尾指针)。
阶段2:消息队列监听循环(线程核心逻辑)
- 进入死循环,首先调用
bzero清空消息缓冲区msgbuf; - 调用
msgrcv阻塞监听消息队列,接收类型为1L的消息,将消息存入msgbuf; - 解析消息:打印消息类型、长度及指令内容(
msgbuf.text[0]),根据msgbuf.msgtype分支处理:- 分支1:MSG_CAMERA(摄像头指令)
① 调用pthread_mutex_lock(&mutex_camera)加摄像头互斥锁;
② 将指令内容赋值给全局变量dev_camera_mask;
③ 调用pthread_cond_signal(&cond_camera)唤醒摄像头控制线程;
④ 调用pthread_mutex_unlock(&mutex_camera)释放互斥锁; - 分支2:MSG_ZIGBEE(ZigBee指令)
① 调用pthread_mutex_lock(&mutex_uart_cmd)加ZigBee指令互斥锁;
② 调用malloc分配内存存储指令内容,赋值给临时变量zigbee_temp;
③ 调用InsertCacheNode将指令节点插入ZigBee缓存链表(zigbee_cache_tail);
④ 调用pthread_mutex_unlock(&mutex_uart_cmd)释放互斥锁;
⑤ 调用pthread_cond_signal(&cond_uart_cmd)唤醒ZigBee指令发送线程; - 默认分支:无处理,直接跳出。
- 分支1:MSG_CAMERA(摄像头指令)
核心总结
pthread_main.c 作为“指令分发中枢”,核心流程是初始化消息队列→阻塞接收外部指令→按指令类型分发(唤醒对应业务线程),依赖消息队列实现跨进程/线程的指令接收,通过互斥锁+条件变量保证指令分发的线程安全。
pthread_camera.c
#include "data_global.h"
extern char dev_camera_mask;
extern int dev_camera_fd;
extern pthread_mutex_t mutex_camera;
extern pthread_cond_t cond_camera;
void *pthread_camera (void *arg)
{
unsigned char picture = 0;
#if 1
if ((dev_camera_fd = open (DEV_CAMERA, O_RDWR | O_NOCTTY | O_NDELAY)) < 0)
{
printf ("Cann't open file %s\n",DEV_CAMERA);
// exit (-1);
}
printf ("pthread_camera is ok\n");
#endif
while (1)
{
pthread_mutex_lock (&mutex_camera);
pthread_cond_wait (&cond_camera, &mutex_camera);
picture = dev_camera_mask;
pthread_mutex_unlock (&mutex_camera);
#if 1
for (; picture > 0; picture--)
{
//write (dev_camera_fd, "one", 3); //无效
system("echo one > /tmp/webcom"); //ok
printf("picture = %d\n", picture);
sleep(4);
}
#endif
}
}
阶段1:摄像头设备初始化(线程启动准备)
- 调用
open函数打开摄像头设备文件DEV_CAMERA(路径为/dev/video0),参数为O_RDWR | O_NOCTTY | O_NDELAY,获取设备文件描述符dev_camera_fd; - 若打开失败,打印错误提示(
Cann't open file /dev/video0),但不退出线程;若成功,打印pthread_camera is ok标识初始化完成。
阶段2:摄像头指令监听循环(线程核心逻辑)
- 进入死循环,首先调用
pthread_mutex_lock(&mutex_camera)加摄像头互斥锁; - 调用
pthread_cond_wait(&cond_camera, &mutex_camera)阻塞等待条件变量cond_camera唤醒(由pthread_main线程触发); - 被唤醒后,将全局变量
dev_camera_mask(摄像头指令,代表拍照数量)赋值给局部变量picture; - 调用
pthread_mutex_unlock(&mutex_camera)释放摄像头互斥锁; - 进入拍照逻辑循环(
picture > 0时执行):- 调用
system("echo one > /tmp/webcom")触发摄像头拍照动作; - 打印当前剩余拍照次数
picture = %d; - 调用
sleep(4)等待4秒,完成一次拍照间隔; picture自减,直至为0时退出该循环。
- 调用
核心总结
pthread_camera.c 作为摄像头控制专属线程,核心流程是初始化摄像头设备→死循环等待拍照指令→接收到指令后执行指定次数的拍照操作,依赖互斥锁mutex_camera和条件变量cond_camera实现与pthread_main线程的指令同步,保证拍照动作的线程安全执行。
phread_uart_send.c
#include "data_global.h"
extern unsigned char dev_uart_mask;
extern int dev_uart_fd;
extern pthread_cond_t cond_uart_cmd;
extern pthread_mutex_t mutex_uart_cmd;
extern uart_cache_list zigbee_cache_head, zigbee_cache_tail;
void *pthread_uart_send (void *arg)
{
unsigned char *uart_p = NULL;
uart_cache_list uart_cache_p = NULL;
printf ("pthread_uart_send is ok\n");
while (1)
{
pthread_mutex_lock (&mutex_uart_cmd);
pthread_cond_wait (&cond_uart_cmd, &mutex_uart_cmd);
while ((uart_cache_p = GetCacheNode (zigbee_cache_head, &zigbee_cache_tail)) != NULL)
{
pthread_mutex_unlock (&mutex_uart_cmd);
uart_p = (unsigned char *)uart_cache_p->data;
dev_uart_mask = *uart_p;
write (dev_uart_fd, &dev_uart_mask, 1);
printf("\tuart:m0 cmd = %x\n", dev_uart_mask);
free (uart_p);
uart_p = NULL;
free (uart_cache_p);
uart_cache_p = NULL;
usleep (200000);
pthread_mutex_lock (&mutex_uart_cmd);
}
pthread_mutex_unlock (&mutex_uart_cmd);
}
}
阶段1:线程启动初始化(无显式初始化,直接进入循环)
- 打印
pthread_uart_send is ok标识线程启动成功; - 直接进入死循环,核心逻辑为等待ZigBee指令并发送。
阶段2:ZigBee指令等待与处理(线程核心逻辑)
- 调用
pthread_mutex_lock(&mutex_uart_cmd)加ZigBee指令互斥锁; - 调用
pthread_cond_wait(&cond_uart_cmd, &mutex_uart_cmd)阻塞等待条件变量cond_uart_cmd唤醒(由pthread_main线程触发); - 被唤醒后,进入子循环处理指令缓存链表:
- 调用
GetCacheNode(zigbee_cache_head, &zigbee_cache_tail)从缓存链表获取指令节点uart_cache_p; - 若节点为空,退出子循环;若不为空:
① 调用pthread_mutex_unlock(&mutex_uart_cmd)临时释放互斥锁;
② 提取节点数据:uart_p = (unsigned char *)uart_cache_p->data,赋值给全局变量dev_uart_mask;
③ 调用write(dev_uart_fd, &dev_uart_mask, 1)将指令写入ZigBee串口设备(/dev/ttyUSB0);
④ 打印指令内容(uart:m0 cmd = %x),释放uart_p和uart_cache_p内存;
⑤ 调用usleep(200000)休眠200ms,避免指令发送过快;
⑥ 重新调用pthread_mutex_lock(&mutex_uart_cmd)加锁,继续子循环;
- 调用
- 子循环结束后,调用
pthread_mutex_unlock(&mutex_uart_cmd)释放互斥锁,回到外层死循环等待下一次唤醒。
核心总结
pthread_uart_send.c 作为“ZigBee指令发送器”,核心流程是加锁等待指令唤醒→从缓存链表读取指令→串口发送指令→释放资源→循环等待,依赖mutex_uart_cmd保证链表操作线程安全,通过cond_uart_cmd接收pthread_main的指令分发信号,完成最终的硬件指令下发。
pthread_zigbee_rcv.c
#include "link_list.h"
#include "data_global.h"
extern int dev_uart_fd;
extern linklist envlinkHead;
extern pthread_mutex_t mutex_linklist;
extern pthread_cond_t cond_refresh;
void serial_init(int fd)
{
struct termios options;
tcgetattr(fd, &options);
options.c_cflag |= ( CLOCAL | CREAD );
options.c_cflag &= ~CSIZE;
options.c_cflag &= ~CRTSCTS;
options.c_cflag |= CS8;
options.c_cflag &= ~CSTOPB;
options.c_iflag |= IGNPAR;
options.c_iflag &= ~(ICRNL | IXON);
options.c_oflag = 0;
options.c_lflag = 0;
cfsetispeed(&options, B115200);
cfsetospeed(&options, B115200);
tcsetattr(fd,TCSANOW,&options);
}
void *pthread_zigbee_rcv (void *arg)
{
int i = 0, len;
char flag = 0, check;
link_datatype buf;
envlinkHead = CreateEmptyLinklist ();
#if 1
if ((dev_uart_fd = open (DEV_ZIGBEE, O_RDWR)) < 0)
{
perror ("open ttyUSB0 fail");
// exit (-1);
// return -1;
}
serial_init (dev_uart_fd);
printf ("pthread_transfer is ok\n");
#endif
while (1)
{
memset (&buf, 0, sizeof (link_datatype));
read (dev_uart_fd, &check, 1);
/* if (check == 'c')
{
sendMsgQueue(MSG_ZIGBEE,MSG_CONNECT_SUCCESS);
}*/
if (check == 's')
{
check = 0;
read (dev_uart_fd, &check, 1);
if (check == 't')
{
check = 0;
read (dev_uart_fd, &check, 1);
if (check == ':')
{
check = 0;
read (dev_uart_fd, &check, 1);
if (check == 'e')
{
buf.msg_type = 'e';
usleep(1);
if ((len = read (dev_uart_fd, buf.text, LEN_ENV)) != LEN_ENV)
{
for (i = len; i < LEN_ENV; i++)
{
read (dev_uart_fd, buf.text+i, 1);
}
}
flag = 1;
}
}
}
}
if (1 == flag)
{
pthread_mutex_lock (&mutex_linklist);
//接收到的额数据加入到链表中
if ((InsertLinknode (buf)) == -1)
{
pthread_mutex_unlock (&mutex_linklist);
printf ("NONMEM\n");
}
pthread_mutex_unlock (&mutex_linklist);
flag = 0;
pthread_cond_signal (&cond_refresh);
}
}
return 0;
}
阶段1:串口初始化与链表创建(线程启动准备)
- 调用
CreateEmptyLinklist()创建空的环境数据链表,初始化全局链表头envlinkHead; - 调用
open(DEV_ZIGBEE, O_RDWR)打开ZigBee串口设备(/dev/ttyUSB0),获取文件描述符dev_uart_fd;- 若打开失败,打印
open ttyUSB0 fail但不退出线程; - 若成功,调用
serial_init(dev_uart_fd)配置串口参数:- 波特率设置为115200,数据位8位、停止位1位,禁用奇偶校验、流控;
- 配置
termios结构体(CLOCAL | CREAD开启本地连接/读使能,IGNPAR忽略奇偶错误等);
- 若打开失败,打印
- 打印
pthread_transfer is ok标识串口初始化完成。
阶段2:ZigBee数据接收解析循环(线程核心逻辑)
- 进入死循环,首先调用
memset(&buf, 0, sizeof(link_datatype))清空数据缓冲区; - 调用
read(dev_uart_fd, &check, 1)逐字节读取串口数据,进行帧头校验:- 校验帧头为
st:e(依次检测s→t→:→e),校验通过则标记flag = 1; - 若帧头校验通过,读取环境数据段(长度
LEN_ENV=20):- 若单次
read读取长度不足LEN_ENV,循环补读至满足长度; - 将数据存入
buf.text,并标记buf.msg_type = 'e'(环境数据类型);
- 若单次
- 校验帧头为
- 帧数据校验通过后(
flag=1):- 调用
pthread_mutex_lock(&mutex_linklist)加链表互斥锁; - 调用
InsertLinknode(buf)将解析后的环境数据插入全局链表envlinkHead;- 若插入失败(内存不足),打印
NONMEM并释放互斥锁;
- 若插入失败(内存不足),打印
- 调用
pthread_mutex_unlock(&mutex_linklist)释放互斥锁; - 重置
flag=0,调用pthread_cond_signal(&cond_refresh)唤醒共享内存刷新线程。
- 调用
核心总结
pthread_zigbee_rcv.c 作为“ZigBee数据接收解析器”,核心流程是初始化串口+创建数据链表→死循环读取串口数据→帧头校验+数据补全→线程安全插入链表→唤醒刷新线程,依赖mutex_linklist保证链表操作的线程安全,通过cond_refresh触发共享内存数据更新,完成环境感知数据的接收与中转。
pthread_refresh.c
#include <arpa/inet.h>
#include "sem.h"
#include "data_global.h"
#include "link_list.h"
#define N 1024
extern pthread_mutex_t mutex_refresh;
extern pthread_mutex_t mutex_refresh_updata;
extern pthread_mutex_t mutex_global;
extern pthread_mutex_t mutex_slinklist;
extern pthread_cond_t cond_refresh;
extern pthread_cond_t cond_refresh_updata;
extern pthread_mutex_t mutex_linklist;
extern char qt_status;
extern int shmid;
extern int semid;
extern struct env_info_array all_info_RT;
extern linklist envlinkHead, envlinkTail;
struct shm_addr
{
char cgi_status;
char qt_status;
struct env_info_array rt_status;
};
struct getEnvMsg
{
unsigned char tem[2];
unsigned char hum[2];
unsigned char ep_no;
unsigned char x;
unsigned char y;
unsigned char z;
unsigned short hongwai;
unsigned short lux; //光照 2个字节
unsigned short rsv2;
unsigned short gas; //烟雾
unsigned short rsv3;
unsigned short adc;
};
void getEnvPackage (link_datatype *buf)
{
struct getEnvMsg pack;
float temh;//,teml;
int ep_no;
memcpy (&pack, buf->text, LEN_ENV);
ep_no = pack.ep_no;
pthread_mutex_lock (&mutex_global);
struct env_info current = all_info_RT.env_no[ep_no];
pthread_mutex_unlock (&mutex_global);
current.storage_status = 1;
current.x = pack.x;
current.y = pack.y;
current.z = pack.z;
temh = pack.tem[1];
//teml = pack.tem[0];
current.temperature = temh;
temh = pack.hum[1];
//teml = pack.hum[0];
current.humidity = temh;
current.illumination = (unsigned short)pack.lux;//光强
current.gas = (unsigned short)pack.gas;//烟雾
current.hongwai = pack.hongwai;
printf ("ep_no = %d tem = %0.2f hum = %0.2f ill = %d gas = %d hongwai=%d\n",
ep_no,current.temperature, current.humidity, current.illumination, current.gas,current.hongwai);
//checkEnv (sto_no, ¤t);
pthread_mutex_lock (&mutex_global);
all_info_RT.env_no[ep_no] = current;
pthread_mutex_unlock (&mutex_global);
return ;
}
void *pthread_refresh (void *arg)
{
key_t key_info;
int shmid, semid;
linklist node;
link_datatype buf;
struct shm_addr *shm_buf;
if ((key_info = ftok ("/app", 'i')) < 0)
{
perror ("ftok info");
exit (-1);
}
if ((semid = semget (key_info, 1, IPC_CREAT | IPC_EXCL |0666)) < 0)
{
if (errno == EEXIST)
{
semid = semget (key_info, 1, 0666);
}
else
{
perror ("semget");
exit (-1);
}
}
else
{
init_sem (semid, 0, 1);
}
if ((shmid = shmget (key_info, N, IPC_CREAT | IPC_EXCL | 0666)) < 0)
{
if (errno == EEXIST)
{
shmid = shmget (key_info, N, 0666);
shm_buf = (struct shm_addr *)shmat (shmid, NULL, 0);
}
else
{
perror ("shmget");
exit (-1);
}
}
else
{
if ((shm_buf = (struct shm_addr *)shmat (shmid, NULL, 0)) == (void *)-1)
{
perror ("shmat");
exit (-1);
}
}
bzero (shm_buf, sizeof (struct shm_addr));
printf ("pthread_refresh is ok\n");
while (1)
{
pthread_mutex_lock (&mutex_refresh);
pthread_cond_wait (&cond_refresh, &mutex_refresh);
while (1)
{
pthread_mutex_lock (&mutex_linklist);
if ((node = GetLinknode (envlinkHead)) == NULL)
{
pthread_mutex_unlock (&mutex_linklist);
break;
}
buf = node->data;
free (node);
pthread_mutex_unlock (&mutex_linklist);
if ('e' == buf.msg_type)
{
getEnvPackage (&buf);
}
}
//共享内存shm_buf需要由进程信号量来控制互斥访问
sem_p (semid, 0);
pthread_mutex_lock (&mutex_global);
shm_buf->rt_status = all_info_RT;
pthread_mutex_unlock (&mutex_global);
sem_v (semid, 0);
pthread_mutex_unlock (&mutex_refresh);
}
return 0;
}
阶段1:共享内存与信号量初始化(线程启动核心准备)
- 调用
ftok("/app", 'i')生成共享内存/信号量的键值key_info; - 调用
semget创建/获取信号量集(参数IPC_CREAT | IPC_EXCL | 0666):- 若创建失败且错误码为
EEXIST,则获取已存在的信号量ID; - 若为新创建的信号量,调用
init_sem初始化信号量值为1;
- 若创建失败且错误码为
- 调用
shmget创建/获取共享内存(大小N=1024,权限IPC_CREAT | IPC_EXCL | 0666):- 若创建失败且错误码为
EEXIST,则获取已存在的共享内存ID并调用shmat映射; - 若为新创建的共享内存,调用
shmat完成内存映射,失败则打印错误并退出;
- 若创建失败且错误码为
- 调用
bzero清空共享内存缓冲区shm_buf,打印pthread_refresh is ok标识初始化完成。
阶段2:共享内存刷新循环(线程核心逻辑)
- 进入死循环,首先调用
pthread_mutex_lock(&mutex_refresh)加刷新互斥锁; - 调用
pthread_cond_wait(&cond_refresh, &mutex_refresh)阻塞等待条件变量cond_refresh唤醒(由pthread_zigbee_rcv线程触发); - 被唤醒后,进入子循环处理环境数据链表:
- 调用
pthread_mutex_lock(&mutex_linklist)加链表互斥锁; - 调用
GetLinknode(envlinkHead)从链表获取数据节点node,若为空则释放锁并退出子循环; - 提取节点数据
buf = node->data,释放节点内存,调用pthread_mutex_unlock(&mutex_linklist)释放链表锁; - 若
buf.msg_type = 'e'(环境数据),调用getEnvPackage(&buf)解析数据:
① 从buf.text拷贝数据到getEnvMsg结构体,解析仓库编号ep_no;
② 加mutex_global锁读取全局环境数据all_info_RT.env_no[ep_no],解锁后赋值解析后的温湿度、光照、烟雾等数据;
③ 重新加mutex_global锁,将解析后的数据写回all_info_RT.env_no[ep_no],解锁;
- 调用
- 子循环结束后,更新共享内存:
- 调用
sem_p(semid, 0)获取信号量(进程级互斥); - 加
mutex_global锁,将全局数据all_info_RT拷贝到共享内存shm_buf->rt_status; - 释放
mutex_global锁,调用sem_v(semid, 0)释放信号量;
- 调用
- 调用
pthread_mutex_unlock(&mutex_refresh)释放刷新互斥锁,回到外层死循环等待下一次唤醒。
核心总结
pthread_refresh.c 作为“数据同步中枢”,核心流程是初始化共享内存/信号量→死循环等待数据唤醒→解析环境数据链表→线程安全更新全局数据→进程安全同步到共享内存,依赖mutex_linklist保证链表操作安全、mutex_global保证全局数据安全、信号量保证共享内存的跨进程互斥访问,完成ZigBee采集数据到共享内存的同步。
三、测试文件
| 文件名 | 测试目标 |
|---|---|
| test_zigbee.c | 测试 ZigBee 指令发送:构造消息队列消息,向主线程发送 LED / 蜂鸣器控制指令 |
| test_cam.c | 测试摄像头控制:构造消息队列消息,向摄像头线程发送拍照指令 |
| test_shm.c | 测试共享内存读取:挂载共享内存、通过信号量互斥读取环境数据并打印 |
四、工作流程梳理
一、整体工作流程(核心逻辑)
该项目是无线传感网主控程序,核心实现:ZigBee 采集环境数据→主线程接收外部控制指令→分发指令到外设 / 通信模块→共享内存同步数据→测试程序验证功能,整体基于 Linux 多线程 + IPC(消息队列 / 共享内存 / 信号量)实现,流程如下:
1.程序初始化(main.c)
- 初始化所有互斥锁 / 条件变量(如mutex_camera/cond_uart_cmd等),用于线程间同步;
- 注册SIGINT信号处理函数ReleaseResource,保证程序退出时释放资源(销毁锁 / 删除消息队列 / 关闭设备等);
- 创建 5 个核心业务线程:
pthread_zigbee_rcv:ZigBee 数据接收线程;
pthread_uart_send:ZigBee 指令发送线程;
pthread_main:主线程(指令分发);
pthread_camera:摄像头控制线程;
pthread_refresh:共享内存数据刷新线程;
2. 数据采集(pthread_zigbee_rcv.c)
- 初始化串口(/dev/ttyUSB0),配置波特率 115200 等参数;
- 循环读取 ZigBee 上报的环境数据,解析以st:e为标识的环境数据包;
- 将解析后的环境数据(温度 / 湿度 / 光照 / 烟雾等)封装到link_datatype结构体,插入envlinkHead链表(环境数据链表);
- 插入完成后触发cond_refresh条件变量,通知pthread_refresh线程刷新共享内存。
3. 指令分发(pthread_main.c)
- 创建 / 获取消息队列(ftok(“/app”, ‘g’)),作为外部指令(测试程序 / Qt/CGI)与主控程序的通信通道;
- 初始化 ZigBee 指令缓存链表zigbee_cache_head/zigbee_cache_tail;
- 循环监听消息队列,接收type=1L的消息,根据msgtype分发指令:
–MSG_CAMERA(3L):设置dev_camera_mask(拍照数量),触发cond_camera通知摄像头线程执行拍照;
–MSG_ZIGBEE(4L):将指令封装到缓存链表zigbee_cache,触发cond_uart_cmd通知串口发送线程执行指令。
4. 外设控制
- 摄像头控制(pthread_camera.c):
阻塞等待cond_camera信号,收到后根据dev_camera_mask(拍照数量)执行拍照(通过system(“echo one > /tmp/webcom”)触发); - ZigBee 指令发送(pthread_uart_send.c):
阻塞等待cond_uart_cmd信号,收到后从zigbee_cache链表取出指令,通过串口发送给 ZigBee(控制 LED / 蜂鸣器等),发送完成后释放链表节点内存。
5. 数据同步(pthread_refresh.c)
- 阻塞等待cond_refresh信号(由 ZigBee 接收线程触发);
- 从环境数据链表envlinkHead取出数据,解析为env_info结构体(温度 / 湿度等),更新全局变量all_info_RT;
- 通过信号量(sem.h) 保证互斥访问共享内存,将all_info_RT写入共享内存(ftok(“/app”, ‘i’)),供外部程序(如 test_shm.c)读取。
6. 功能测试(test_*.c)
- test_cam.c:构造消息队列消息(msgtype=3L),向主线程发送拍照指令;
- test_zigbee.c:构造 ZigBee 控制指令(LED / 蜂鸣器开关),通过消息队列发送给主线程;
- test_shm.c:挂载共享内存,通过信号量互斥读取环境数据并打印,验证数据同步效果。
7. 程序退出
- 按下Ctrl+C触发SIGINT信号,执行ReleaseResource:
销毁所有互斥锁 / 条件变量;
删除消息队列 / 共享内存 / 信号量;
取消所有线程,关闭串口 / 摄像头设备文件。
更多推荐



所有评论(0)