当前位置: 首页 > news >正文

进程通信之消息队列

文章目录

  • 消息队列
    • 消息队列VS管道
  • System V 消息队列
    • 系统管理命令
    • 核心函数
      • 创建/获取消息队列
      • 发送消息
      • 接收消息
      • 控制操作
    • 消息队列通信
  • POSIX 消息队列
    • 特点
    • 核心函数
      • 创建/打开队列
      • 发送消息
      • 接收消息
      • 关闭与删除
    • 文件系统集成
      • 查看消息配置
    • 异常处理
    • 消息队列通信
  • System V vs POSIX 对比

消息队列

  • 内核维护的进程间通信(IPC)机制
  • 以消息为单位进行数据交换(非字节流)
  • 支持不同类型消息的分类处理
  • 消息队列独立于创建进程存在

消息队列VS管道

特性管道消息队列
数据类型字节流结构化消息
通信方式先进先出支持消息类型过滤
持久性进程结束即消失显式删除才消失
容量固定缓冲区可配置消息数量

System V 消息队列

系统管理命令

# 查看所有IPC对象ipcs# 创建消息队列(键值12)ipcmk -Q12# 删除消息队列(ID为1)ipcrm -q1

核心函数

创建/获取消息队列

#include<sys/msg.h>intmsgget(key_tkey,intmsgflg);
  • key:消息队列键值(IPC_PRIVATE 或 ftok() 生成)
    • 可以使用ftok()函数生成,也可以指定为IPC_PRIVATE创建一个新的消息队列
  • msgflg:标志位(IPC_CREAT | 0666),
    • IPC_CREAT创建消息队列,IPC_EXCLIPC_CREAT一起使用确保创建新的消息队列
  • 返回:成功返回消息队列标识符 (msqid),失败返回 -1

发送消息

intmsgsnd(intmsqid,constvoid*msgp,size_tmsgsz,intmsgflg);
  • msqid:消息队列ID
  • msgp:消息结构体指针(必须包含long mtype)
  • msgsz:消息数据长度(不含mtype)
  • msgflg:标志位(0 或 IPC_NOWAIT)

接收消息

ssize_tmsgrcv(intmsqid,void*msgp,size_tmsgsz,longmsgtyp,intmsgflg);
  • msqid: 消息队列标识符
  • msgp: 指向消息结构的指针
  • msgsz: 消息缓冲区的最大长度
  • msgtyp:消息类型选择:
    • 0:接收第一个消息
    • >0:接收指定类型的第一个消息
    • <0:接收类型≤|msgtyp|的最小类型的消息
  • msgflg: 标志位,例如IPC_NOWAIT非阻塞接收
  • 返回值: 成功返回接收到的消息数据的长度,失败返回 -1

控制操作

intmsgctl(intmsqid,intcmd,structmsqid_ds*buf);
  • msqid: 消息队列标识符
  • cmd
    • IPC_RMID:删除队列
    • IPC_STAT:获取状态
    • IPC_SET:设置属性
  • buf: 指向msqid_ds结构的指针
  • 返回值: 成功返回 0,失败返回 -1

消息队列通信

  • 发送端
#include<sys/msg.h>#include<string.h>#include<stdio.h>structmsgbuf{longmtype;// 消息类型(必须 > 0)charmtext[100];// 消息数据};intmain(){key_tkey=ftok(".",'a');// 生成键值intmsqid=msgget(key,IPC_CREAT|0666);structmsgbufmsg;msg.mtype=1;// 设置消息类型strcpy(msg.mtext,"Hello from sender");//msgsnd(msqid, &msg, strlen(msg.mtext) + 1, 0);if(msgsnd(msqid,&msg,sizeof(msg.mtext),0)==-1){perror("msgsnd");exit(1);}printf("Message sent: %s\n",msg.mtext);return0;}
  • 接收端
#include<sys/msg.h>#include<stdio.h>structmsgbuf{longmtype;charmtext[100];};intmain(){key_tkey=ftok(".",'a');intmsqid=msgget(key,0666);structmsgbufmsg;// msgrcv(msqid, &msg, sizeof(msg.mtext), 1, 0);if(msgrcv(msqid,&msg,sizeof(msg.mtext),1,0)==-1){perror("msgrcv");exit(1);}printf("Received: %s\n",msg.mtext);msgctl(msqid,IPC_RMID,NULL);// 删除消息队列return0;}

POSIX 消息队列

特点

  • 更现代的API设计
  • 支持优先级消息
  • 与文件系统集成
  • 提供更多控制选项

核心函数

创建/打开队列

#include<mqueue.h>// 仅打开已存在的消息队列mqd_tmq_open(constchar*name,intoflag);// 打开或创建新的消息队列,带属性配置mqd_tmq_open(constchar*name,intoflag,mode_tmode,structmq_attr*attr);
  • name:队列名称(以/开头,如/myqueue, 且后续不能再包含其他<font style="color:rgb(31, 35, 41);background-color:rgba(0, 0, 0, 0);">/</font>),系统级可见的,不同进程通过相同 name 访问同一个消息队列
  • oflag:O_CREAT | O_RDONLY | O_WRONLY | O_RDWR
  • mode:仅当 oflag 包含 O_CREAT 时有效,用于指定新消息队列的访问权限,取值与 open() 函数的权限一致(例如 0644:所有者读写、同组读、其他读),最终权限会被进程的 umask 掩码修正(实际权限 = mode & ~umask)
  • attr:仅当 oflag 包含 O_CREAT 时有效,用于指定新消息队列的属性(若为 NULL,则使用系统默认属性),struct mq_attr 核心成员包括:
    • mq_maxmsg:队列最大可容纳的消息数量
    • mq_msgsize:队列中单个消息的最大字节长度
    • mq_curmsgs:队列当前的消息数量(仅用于查询,创建时设置无效)
  • 返回值:
    • 成功:返回一个有效的 消息队列描述符(mqd_t 类型),后续所有消息队列操(mq_send/mq_receive 等)均依赖该描述符
    • 失败:返回 (mqd_t)-1,同时设置全局变量 errno 指示错误原因(例如 EEXIST:O_CREAT|O_EXCL 时队列已存在;ENOENT:队列不存在且未指定 O_CREAT)

发送消息

#include<mqueue.h>intmq_send(mqd_tmqdes,constchar*msg_ptr,size_tmsg_len,unsignedintmsg_prio);
  • mqdes:由 mq_open() 成功返回的消息队列描述符,标识要操作的目标消息队列
  • msg_ptr:指向要发送的消息数据缓冲区的指针,消息数据可以是任意二进制数据(无格式要求)
  • msg_len: 指定msg_ptr指向的消息的长度,此长度必须小于或等于队列的mq_msgsize属性,允许使用零长度的消息,即0 < msg_len ≤ 消息队列的 mq_msgsize 属性
  • msg_prio:优先级(0最低,数值越大优先级越高,越先被接收),若无需优先级,可设为 0(默认优先级)
  • 返回值:
    • 成功:返回 0,表示消息已成功写入消息队列
    • 失败:返回 -1,同时设置 errno 指示错误原因(例如 EAGAIN:非阻塞模式下队列已满;EBADF:无效的消息队列描述符或无写权限)

接收消息

  • mq_receive()从消息队列描述符 mqdes 引用的消息队列中删除优先级最高的最早消息,并将其放置在msg_ptr指向的缓冲区中
  • 如果队列为空,则默认情况下,mq_receive()会阻塞,直到消息可用,或者调用被信号处理程序中断
  • 接收最高优先级的最早消息
ssize_tmq_receive(mqd_tmqdes,char*msg_ptr,size_tmsg_len,unsignedint*msg_prio);
  • mqdes:由 mq_open() 成功返回的消息队列描述符,标识要操作的目标消息队列
  • msg_ptr:指向用于接收消息数据的缓冲区的指针,需提前分配足够大小的内存空间
  • msg_len: 指定 msg_ptr 指向的缓冲区的大小,必须大于或等于消息队列的 mq_msgsize 属性
  • msg_prio:指向用于存储接收消息优先级的变量指针,接收成功后,消息的优先级会被写入该变量,若不关心消息优先级,可传入 NULL
  • 返回值:
    • 成功:返回实际接收到的消息数据的字节长度(ssize_t 类型,非负整数)
    • 失败:返回 -1,同时设置 errno 指示错误原因(例如 EAGAIN:非阻塞模式下队列为空;EMSGSIZE:接收缓冲区大小 msg_len < mq_msgsize)

关闭与删除

intmq_close(mqd_tmqdes);// 关闭队列描述符intmq_unlink(constchar*name);// 删除队列文件
  • mqdes: 由 mq_open() 成功返回的消息队列描述符,标识要关闭的消息队列
  • name:与 mq_open() 中一致的消息队列唯一标识名(必须以 / 开头,格式合法),标识要删除的目标消息队列
  • 返回值:
    • 成功:返回 0,表示消息队列描述符已成功关闭/消息队列的链接已被移除
    • 失败:返回 -1,同时设置 errno 指示错误原因

文件系统集成

  • 在 Linux 上,消息队列是在虚拟文件系统中创建的
  • 挂载文件系统后,可以使用通常用于文件的命令(例如,ls 和 rm )来查看和作系统上的消息队列
# 挂载消息队列文件系统mkdir/dev/mqueuemount-t mqueue none /dev/mqueue# 查看队列信息ls/dev/mqueue/cat/dev/mqueue/myqueue# 目录中每个文件的内容都由一行组成,其中包含有关队列的信息:# 输出:QSIZE:129 NOTIFY:2 SIGNO:0 NOTIFY_PID:8260
  • QSIZE: 队列中所有消息的数据字节数
  • NOTIFY_PID: 如果该值为非零,则具有此 PID 的进程已使用**mq_notify()**来注册异步消息通知,其余字段描述通知的发生方式
  • NOTIFY: 通知方式:
    • 0SIGEV_SIGNAL
    • 1表示SIGEV_NONE
    • 2SIGEV_THREAD
  • SIGNO: 用于SIGEV_SIGNALSIGNO信号

查看消息配置

# 默认消息大小 cat/proc/sys/fs/mqueue/msgsize_default # 默认8192字节 # 最大消息大小 cat/proc/sys/fs/mqueue/msgsize_max # 限制值

异常处理

// 检查消息队列是否已满if(msgsnd(msqid,&msg,size,IPC_NOWAIT)==-1){if(errno==EAGAIN){// 队列已满处理逻辑}}// POSIX 非阻塞接收if(mq_receive(mqdes,buf,size,&prio)==-1){if(errno==EAGAIN){// 队列为空}}

消息队列通信

  • 发送端
#include<mqueue.h>#include<string.h>#include<fcntl.h>#defineQUEUE_NAME"/my_queue"#defineMSG_PRIORITY1intmain(){mqd_tmqdes=mq_open(QUEUE_NAME,O_WRONLY|O_CREAT,0666,NULL);charmessage[]="Hello, POSIX!";mq_send(mqdes,message,strlen(message),MSG_PRIORITY);mq_close(mqdes);return0;}
  • 接收端
#include<mqueue.h>#include<stdio.h>#defineQUEUE_NAME"/my_queue"#defineMAX_MSG_SIZE8192intmain(){mqd_tmqdes=mq_open(QUEUE_NAME,O_RDONLY);charbuffer[MAX_MSG_SIZE];unsignedintpriority;ssize_tlen=mq_receive(mqdes,buffer,MAX_MSG_SIZE,&priority);buffer[len]='\0';printf("[优先级:%u] %s\n",priority,buffer);mq_close(mqdes);// mq_unlink(QUEUE_NAME); // 删除队列return0;}

System V vs POSIX 对比

特性System VPOSIX
API风格较旧较新,类似文件操作
命名方式数字键值路径名
消息优先级
阻塞控制有限更灵活
文件系统集成有(/dev/mqueue)
可移植性广泛较新系统
默认限制系统配置可调节
http://www.jsqmd.com/news/226953/

相关文章:

  • RabbitMQ之交换机
  • hal_uart_transmit驱动开发全流程:初始化到发送一文说清
  • 通信协议仿真:通信协议基础_(9).通信协议仿真案例分析
  • 物理公式学习神器:免费无广含多分支助记忆
  • QoS质量配置
  • Spark大数据ETL实战:数据清洗与转换最佳实践
  • 【教程4>第10章>第20节】基于FPGA的图像sobel锐化算法开发——图像sobel锐化仿真测试以及MATLAB辅助验证
  • python的sql解析库-sqlparse
  • 数字频率计共阴极数码管驱动电路实战
  • STM32CubeMX安装步骤系统学习:配套工具链配置
  • Java Web 教学资源库系统源码-SpringBoot2+Vue3+MyBatis-Plus+MySQL8.0【含文档】
  • Python爬虫完整代码拿走不谢
  • 系统管理工具,多功能隐私清理文件粉碎工具
  • SpringBoot+Vue 智能推荐卫生健康系统平台完整项目源码+SQL脚本+接口文档【Java Web毕设】
  • 【踩坑记】WSL1 下 Docker 报错 iptables: No chain/target/match by that name 排查实录
  • MPC5634 Bootloader
  • autosar软件开发中诊断协议栈配置实践案例
  • RabbitMQ 集群部署方案
  • 无线网络仿真:5G网络仿真_(3).5G关键技术和性能指标
  • 洗衣店订单管理系统信息管理系统源码-SpringBoot后端+Vue前端+MySQL【可直接运行】
  • WSL Ubuntu 安装 Docker 操作指南
  • Python高级之操作Mysql
  • cruise仿真模型,四轮驱动。 轮毂电机,轮边电机驱动cruise动力性经济性仿真模型,ba...
  • 35 岁职场危机?网络安全这行为啥越老越吃香?
  • SpringBoot+Vue 课程答疑系统管理平台源码【适合毕设/课设/学习】Java+MySQL
  • 从零实现framebuffer显示:裸机环境下简单图形输出教程
  • 前后端分离BB平台系统|SpringBoot+Vue+MyBatis+MySQL完整源码+部署教程
  • 安全副业指南:漏洞挖掘 / 技术博客 / 竞赛奖金实战,哪个方向更适合你?
  • STM32新手必看:Keil5代码自动补全设置手把手教程
  • Java Web 购物推荐网站系统源码-SpringBoot2+Vue3+MyBatis-Plus+MySQL8.0【含文档】