本文简要说明Zephyr Message Queue的使用和实现。
在Zephyr内核对象–数据传递对象简介一文中已经大概介绍了Message queue的特性,本文将继续说明F其使用和实现。
使用
API
Message queue的API有下面10个全部声明在kernel.h中,每个函数都有参数struct k_msgq msgq. 都是指该函数操作或者使用的msgq后面就不在单独列出说明
**void k_msgq_init(struct k_msgq q, char buffer, size_t msg_size, u32_t max_msgs);**
作用:初始化一个msgq, 内存由使用者分配
buffer: msgq的buffer,需要由使用者分配,大小为msg_sizemax_msgs
msg_size: msgq中每个message的大小
max_mags: msgq中最多容纳的message数量
__syscall int k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size, u32_t max_msgs);
作用:初始化一个msgq, 内存由msgq从线程池中分配
msg_size: msgq中每个message的大小
max_mags: msgq中最多容纳的message数量
int k_msgq_cleanup(struct k_msgq *msgq);
作用:释放k_msgq_alloc_init分配msgq内存
__syscall int k_msgq_put(struct k_msgq msgq, void data, s32_t timeout);
作用:将message放入到msgq
data:message数据
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:放入成功返回0
__syscall int k_msgq_get(struct k_msgq msgq, void data, s32_t timeout);
作用:从msgq读出message
data:message数据
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:读出成功返回0
__syscall int k_msgq_peek(struct k_msgq msgq, void data);
作用:peek msgq
data: peek到的message
返回: peek到数据返回0
__syscall void k_msgq_purge(struct k_msgq *msgq);
作用:清空msgq中的message
__syscall u32_t k_msgq_num_free_get(struct k_msgq *msgq);
作用:获取msgq还可以放多少个message
返回值:空闲数目
__syscall void k_msgq_get_attrs(struct k_msgq msgq, struct k_msgq_attrs attrs);
作用:获取msgq的信息,也就是message的大小,总数量和已使用数量,都放在struct k_msgq_attrs 内
__syscall u32_t k_msgq_num_used_get(struct k_msgq *msgq);
作用:获取msgq中有多少个message
返回值:message数目
使用说明
可以在ISR中put msgq.也可在ISR内get msgq,但不能等待。
msgq必须事先指定message的大小和个数。大小需要是2的幂对齐。
msgq用于异步传输小数据。msgq在读写时需要锁中断,因此不建议用来传输大数据。
初始化
初始化一个queue, 由用户分配内存1
2
3
4
5
6
7
8
9
10struct data_item_type { //message的数据结构
u32_t field1;
u32_t field2;
u32_t field3;
};
char __aligned(4) my_msgq_buffer[10 * sizeof(data_item_type)];
struct k_msgq my_msgq;
k_msgq_init(&my_msgq, my_msgq_buffer, sizeof(data_item_type), 10);
由message queue自己在线程池中分配1
k_msgq_alloc_init(&my_msgq, sizeof(data_item_type), 10);
写入message
运行在线程或者ISR中写入message,ISR中写入时不能发生等待。示例如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18void producer_thread(void)
{
struct data_item_t data;
while (1) {
/* create data item to send (e.g. measurement, timestamp, ...) */
data = ...
/* send data to consumers */
while (k_msgq_put(&my_msgq, &data, K_NO_WAIT) != 0) {
/* message queue is full: purge old data & try again */
//这里purge并不是必要步骤,是否需要进行purge根据实际应用由用户自己选择
k_msgq_purge(&my_msgq);
}
/* data item was successfully added to message queue */
}
}
读message
可以将message从msgq读出, 之后msgq中不再有该message1
2
3
4
5
6
7
8
9
10
11
12void consumer_thread(void)
{
struct data_item_t data;
while (1) {
/* get a data item */
k_msgq_get(&my_msgq, &data, K_FOREVER);
/* process data item */
...
}
}
也可以只是peek,该message任然保留在msgq中1
2
3
4
5
6
7
8
9
10
11
12void consumer_thread(void)
{
struct data_item_t data;
while (1) {
/* read a data item by peeking into the queue */
if(0 == k_msgq_peek(&my_msgq, &data)){
/* process data item */
}
...
}
}
实现
msgq的实现代码在zephyr/kernel/msg_q.c中,msgq是以ringbuffer的模式进行管理,在初始化的时候建立ringbuffer,读写数据时都是以固定的单位大小从ringbuffer内读写数据。
msgq的数据结构如下1
2
3
4
5
6
7
8
9
10
11
12struct k_msgq {
_wait_q_t wait_q; //wait_q用于控制msgq的等待
struct k_spinlock lock; //msgq多线程保护锁
size_t msg_size; // message的大小
u32_t max_msgs; // msgq最大容纳message的个数
char *buffer_start; //msgq ringbuffer的开始地址
char *buffer_end; //msgq ringbuffer的结束地址
char *read_ptr; //msgq ringbuffer的读指针
char *write_ptr; //msgq ringbuffer的写指针
u32_t used_msgs; //msgq中有效message的个数
u8_t flags; //msgq的ringbuffer从线程池分配标志
};
示意图 如下,每读或者写一个message,读写指针就向前移动msg_size
初始化/释放
初始化msgq就是对struct_msgq中的各成员进行初始化
1 | void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size, |
k_msgq_alloc_init->z_impl_k_msgq_alloc_init, msgq内存从线程池中分配,再使用k_msgq_init初始化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
u32_t max_msgs)
{
void *buffer;
int ret;
size_t total_size;
//计算msg_size乘max_msgs,并检查是否溢出,实际调用的是__builtin_mul_overflow
if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
ret = -EINVAL;
} else {
//从线程池中分配msgq的ringbuffer 内存
buffer = z_thread_malloc(total_size);
if (buffer != NULL) {
//初始化各变量
k_msgq_init(msgq, buffer, msg_size, max_msgs);
//使用K_MSGQ_FLAG_ALLOC在flags中标识ringbuffer是从线程池中分配
msgq->flags = K_MSGQ_FLAG_ALLOC;
ret = 0;
} else {
ret = -ENOMEM;
}
}
return ret;
}
如果msgq是从线程池中分配的内存,可以使用k_msgq_cleanup将其释放1
2
3
4
5
6
7
8
9
10
11
12
13
14int k_msgq_cleanup(struct k_msgq *msgq)
{
//如果还有thread在等待msgq,说明不能释放,退出
CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
return -EBUSY;
}
//判断alloc标志,并释放内存
if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0) {
k_free(msgq->buffer_start);
msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
}
return 0;
}
mssage操作
写msgq
k_msgq_put->z_impl_k_msgq_put1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50int z_impl_k_msgq_put(struct k_msgq *msgq, void *data, s32_t timeout)
{
//isr内写msgq不能等
__ASSERT(!arch_is_in_isr() || timeout == K_NO_WAIT, "");
struct k_thread *pending_thread;
k_spinlock_key_t key;
int result;
key = k_spin_lock(&msgq->lock);
if (msgq->used_msgs < msgq->max_msgs) {
//msgq中ringbuffer有空间
//检查是否有thread在等待读取msgq的message
pending_thread = z_unpend_first_thread(&msgq->wait_q);
if (pending_thread != NULL) {
//有线程在等message,直接将该数据提供给等待线程
(void)memcpy(pending_thread->base.swap_data, data,
msgq->msg_size);
//等待线程拿到数据后,让等待线程ready,并重新调度
arch_thread_return_value_set(pending_thread, 0);
z_ready_thread(pending_thread);
z_reschedule(&msgq->lock, key);
return 0;
} else {
//没有线程需要数据,则将数据放入ringbuffer
(void)memcpy(msgq->write_ptr, data, msgq->msg_size);
msgq->write_ptr += msgq->msg_size;
if (msgq->write_ptr == msgq->buffer_end) {
msgq->write_ptr = msgq->buffer_start;
}
//更新msgq剩余的message数
msgq->used_msgs++;
}
result = 0;
} else if (timeout == K_NO_WAIT) {
//msgq ringbuffer满,且不等待就立即退出
result = -ENOMSG;
} else {
//msgq 满,将message放入swap_data,等待其它thread来读
_current->base.swap_data = data;
return z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
}
k_spin_unlock(&msgq->lock, key);
return result;
}
读msgq
k_msgq_get->z_impl_k_msgq_get1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, s32_t timeout)
{
//isr内读msgq不能等
__ASSERT(!arch_is_in_isr() || timeout == K_NO_WAIT, "");
k_spinlock_key_t key;
struct k_thread *pending_thread;
int result;
key = k_spin_lock(&msgq->lock);
if (msgq->used_msgs > 0) {
//ringbuffer中有数据,直接从ringbuffer中读出message
(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
msgq->read_ptr += msgq->msg_size;
if (msgq->read_ptr == msgq->buffer_end) {
msgq->read_ptr = msgq->buffer_start;
}
//更新msgq剩余的message数
msgq->used_msgs--;
//此时ringbuffer有空闲空间,如果有thead在等待写msgq,则在这里写入
pending_thread = z_unpend_first_thread(&msgq->wait_q);
if (pending_thread != NULL) {
/* add thread's message to queue */
(void)memcpy(msgq->write_ptr, pending_thread->base.swap_data,
msgq->msg_size);
msgq->write_ptr += msgq->msg_size;
if (msgq->write_ptr == msgq->buffer_end) {
msgq->write_ptr = msgq->buffer_start;
}
//更新msgq剩余的message数
msgq->used_msgs++;
//等待写入msgq的thread在写入msgq后变为ready,并重新调度
arch_thread_return_value_set(pending_thread, 0);
z_ready_thread(pending_thread);
z_reschedule(&msgq->lock, key);
return 0;
}
result = 0;
} else if (timeout == K_NO_WAIT) {
/msgq ringbuffer空,且不等待就立即退出
result = -ENOMSG;
} else {
//msgq 空,将message放入swap_data,等待其它thread来写
_current->base.swap_data = data;
return z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
}
k_spin_unlock(&msgq->lock, key);
return result;
}
peek msgq
也可以通过peek读message,该方式不会将message从msgq的ringbuffer中删除
k_msgq_peek->z_impl_k_msgq_peek1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
{
k_spinlock_key_t key;
int result;
key = k_spin_lock(&msgq->lock);
if (msgq->used_msgs > 0) {
//ringbuffer中有数据直接copy出去
(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
result = 0;
} else {
//ringbuffer中有无数据返回错误
result = -ENOMSG;
}
k_spin_unlock(&msgq->lock, key);
return result;
}
清空msgq
当不需要msgq中的数据时可以使用k_msgq_purge清空
k_msgq_purge->z_impl_k_msgq_purge1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19void z_impl_k_msgq_purge(struct k_msgq *msgq)
{
k_spinlock_key_t key;
struct k_thread *pending_thread;
key = k_spin_lock(&msgq->lock);
//清空ringbuffer前,会先让等待读msgq的thead将message读走
while ((pending_thread = z_unpend_first_thread(&msgq->wait_q)) != NULL) {
arch_thread_return_value_set(pending_thread, -ENOMSG);
z_ready_thread(pending_thread);
}
//复位 ringbuffer
msgq->used_msgs = 0;
msgq->read_ptr = msgq->write_ptr;
z_reschedule(&msgq->lock, key);
}
获取msgq信息
获取msgq的信息函数实现很简单结合前面k_msgq的结构体很容易理解,这里就不再注释分析了1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
{
attrs->msg_size = msgq->msg_size;
attrs->max_msgs = msgq->max_msgs;
attrs->used_msgs = msgq->used_msgs;
}
static inline u32_t z_impl_k_msgq_num_free_get(struct k_msgq *msgq)
{
return msgq->max_msgs - msgq->used_msgs;
}
static inline u32_t z_impl_k_msgq_num_used_get(struct k_msgq *msgq)
{
return msgq->used_msgs;
}
参考
https://gcc.gnu.org/onlinedocs/gcc/Integer-Overflow-Builtins.html
https://docs.zephyrproject.org/latest/reference/kernel/data_passing/message_queues.html