Zephyr内核对象-数据传递之Message Queue

Creative Commons
本作品采用知识共享署名

本文简要说明Zephyr Message Queue的使用和实现。

Zephyr内核对象–数据传递对象简介一文中已经大概介绍了Message queue的特性,本文将继续说明F其使用和实现。

使用

API

Message queue的API有下面10个全部声明在kernel.h中,每个函数都有参数struct k_msgq msgq. 都是指该函数操作或者使用的msgq后面就不在单独列出说明
**void k_msgq_init(struct k_msgq
q, char buffer, size_t msg_size, u32_t max_msgs);**
作用:初始化一个msgq, 内存由使用者分配
buffer: msgq的buffer,需要由使用者分配,大小为msg_size
max_msgs
msg_size: msgq中每个message的大小
max_mags: msgq中最多容纳的message数量
__syscall int k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size, u32_t max_msgs);
作用:初始化一个msgq, 内存由msgq从线程池中分配
msg_size: msgq中每个message的大小
max_mags: msgq中最多容纳的message数量
int k_msgq_cleanup(struct k_msgq *msgq);
作用:释放k_msgq_alloc_init分配msgq内存
__syscall int k_msgq_put(struct k_msgq msgq, void data, s32_t timeout);
作用:将message放入到msgq
data:message数据
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:放入成功返回0
__syscall int k_msgq_get(struct k_msgq msgq, void data, s32_t timeout);
作用:从msgq读出message
data:message数据
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:读出成功返回0
__syscall int k_msgq_peek(struct k_msgq msgq, void data);
作用:peek msgq
data: peek到的message
返回: peek到数据返回0
__syscall void k_msgq_purge(struct k_msgq *msgq);
作用:清空msgq中的message
__syscall u32_t k_msgq_num_free_get(struct k_msgq *msgq);
作用:获取msgq还可以放多少个message
返回值:空闲数目
__syscall void k_msgq_get_attrs(struct k_msgq msgq, struct k_msgq_attrs attrs);
作用:获取msgq的信息,也就是message的大小,总数量和已使用数量,都放在struct k_msgq_attrs 内
__syscall u32_t k_msgq_num_used_get(struct k_msgq *msgq);
作用:获取msgq中有多少个message
返回值:message数目

使用说明

可以在ISR中put msgq.也可在ISR内get msgq,但不能等待。
msgq必须事先指定message的大小和个数。大小需要是2的幂对齐。
msgq用于异步传输小数据。msgq在读写时需要锁中断,因此不建议用来传输大数据。

初始化

初始化一个queue, 由用户分配内存

1
2
3
4
5
6
7
8
9
10
struct data_item_type {     //message的数据结构
u32_t field1;
u32_t field2;
u32_t field3;
};

char __aligned(4) my_msgq_buffer[10 * sizeof(data_item_type)];
struct k_msgq my_msgq;

k_msgq_init(&my_msgq, my_msgq_buffer, sizeof(data_item_type), 10);

由message queue自己在线程池中分配

1
k_msgq_alloc_init(&my_msgq, sizeof(data_item_type), 10);

写入message

运行在线程或者ISR中写入message,ISR中写入时不能发生等待。示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void producer_thread(void)
{
struct data_item_t data;

while (1) {
/* create data item to send (e.g. measurement, timestamp, ...) */
data = ...

/* send data to consumers */
while (k_msgq_put(&my_msgq, &data, K_NO_WAIT) != 0) {
/* message queue is full: purge old data & try again */
//这里purge并不是必要步骤,是否需要进行purge根据实际应用由用户自己选择
k_msgq_purge(&my_msgq);
}

/* data item was successfully added to message queue */
}
}

读message

可以将message从msgq读出, 之后msgq中不再有该message

1
2
3
4
5
6
7
8
9
10
11
12
void consumer_thread(void)
{
struct data_item_t data;

while (1) {
/* get a data item */
k_msgq_get(&my_msgq, &data, K_FOREVER);

/* process data item */
...
}
}

也可以只是peek,该message任然保留在msgq中

1
2
3
4
5
6
7
8
9
10
11
12
void consumer_thread(void)
{
struct data_item_t data;

while (1) {
/* read a data item by peeking into the queue */
if(0 == k_msgq_peek(&my_msgq, &data)){
/* process data item */
}
...
}
}

实现

msgq的实现代码在zephyr/kernel/msg_q.c中,msgq是以ringbuffer的模式进行管理,在初始化的时候建立ringbuffer,读写数据时都是以固定的单位大小从ringbuffer内读写数据。
msgq的数据结构如下

1
2
3
4
5
6
7
8
9
10
11
12
struct k_msgq {
_wait_q_t wait_q; //wait_q用于控制msgq的等待
struct k_spinlock lock; //msgq多线程保护锁
size_t msg_size; // message的大小
u32_t max_msgs; // msgq最大容纳message的个数
char *buffer_start; //msgq ringbuffer的开始地址
char *buffer_end; //msgq ringbuffer的结束地址
char *read_ptr; //msgq ringbuffer的读指针
char *write_ptr; //msgq ringbuffer的写指针
u32_t used_msgs; //msgq中有效message的个数
u8_t flags; //msgq的ringbuffer从线程池分配标志
};

示意图 如下,每读或者写一个message,读写指针就向前移动msg_size
msgq

初始化/释放

初始化msgq就是对struct_msgq中的各成员进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
u32_t max_msgs)
{
//初始化各成员
msgq->msg_size = msg_size;
msgq->max_msgs = max_msgs;
msgq->buffer_start = buffer;
msgq->buffer_end = buffer + (max_msgs * msg_size);
msgq->read_ptr = buffer;
msgq->write_ptr = buffer;
msgq->used_msgs = 0;
msgq->flags = 0; // msgq ringbuffer是由使用者分配,这里设为0
z_waitq_init(&msgq->wait_q); //初始化msgq的wait_q
msgq->lock = (struct k_spinlock) {};

z_object_init(msgq);
}

k_msgq_alloc_init->z_impl_k_msgq_alloc_init, msgq内存从线程池中分配,再使用k_msgq_init初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
u32_t max_msgs)
{
void *buffer;
int ret;
size_t total_size;

//计算msg_size乘max_msgs,并检查是否溢出,实际调用的是__builtin_mul_overflow
if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
ret = -EINVAL;
} else {
//从线程池中分配msgq的ringbuffer 内存
buffer = z_thread_malloc(total_size);
if (buffer != NULL) {
//初始化各变量
k_msgq_init(msgq, buffer, msg_size, max_msgs);
//使用K_MSGQ_FLAG_ALLOC在flags中标识ringbuffer是从线程池中分配
msgq->flags = K_MSGQ_FLAG_ALLOC;
ret = 0;
} else {
ret = -ENOMEM;
}
}

return ret;
}

如果msgq是从线程池中分配的内存,可以使用k_msgq_cleanup将其释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int k_msgq_cleanup(struct k_msgq *msgq)
{
//如果还有thread在等待msgq,说明不能释放,退出
CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
return -EBUSY;
}

//判断alloc标志,并释放内存
if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0) {
k_free(msgq->buffer_start);
msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
}
return 0;
}

mssage操作

写msgq

k_msgq_put->z_impl_k_msgq_put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int z_impl_k_msgq_put(struct k_msgq *msgq, void *data, s32_t timeout)
{
//isr内写msgq不能等
__ASSERT(!arch_is_in_isr() || timeout == K_NO_WAIT, "");

struct k_thread *pending_thread;
k_spinlock_key_t key;
int result;

key = k_spin_lock(&msgq->lock);


if (msgq->used_msgs < msgq->max_msgs) {
//msgq中ringbuffer有空间

//检查是否有thread在等待读取msgq的message
pending_thread = z_unpend_first_thread(&msgq->wait_q);
if (pending_thread != NULL) {
//有线程在等message,直接将该数据提供给等待线程
(void)memcpy(pending_thread->base.swap_data, data,
msgq->msg_size);
//等待线程拿到数据后,让等待线程ready,并重新调度
arch_thread_return_value_set(pending_thread, 0);
z_ready_thread(pending_thread);
z_reschedule(&msgq->lock, key);
return 0;
} else {
//没有线程需要数据,则将数据放入ringbuffer
(void)memcpy(msgq->write_ptr, data, msgq->msg_size);
msgq->write_ptr += msgq->msg_size;
if (msgq->write_ptr == msgq->buffer_end) {
msgq->write_ptr = msgq->buffer_start;
}
//更新msgq剩余的message数
msgq->used_msgs++;
}
result = 0;
} else if (timeout == K_NO_WAIT) {
//msgq ringbuffer满,且不等待就立即退出
result = -ENOMSG;
} else {
//msgq 满,将message放入swap_data,等待其它thread来读
_current->base.swap_data = data;
return z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
}

k_spin_unlock(&msgq->lock, key);

return result;
}

读msgq

k_msgq_get->z_impl_k_msgq_get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, s32_t timeout)
{
//isr内读msgq不能等
__ASSERT(!arch_is_in_isr() || timeout == K_NO_WAIT, "");

k_spinlock_key_t key;
struct k_thread *pending_thread;
int result;

key = k_spin_lock(&msgq->lock);

if (msgq->used_msgs > 0) {
//ringbuffer中有数据,直接从ringbuffer中读出message
(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
msgq->read_ptr += msgq->msg_size;
if (msgq->read_ptr == msgq->buffer_end) {
msgq->read_ptr = msgq->buffer_start;
}

//更新msgq剩余的message数
msgq->used_msgs--;

//此时ringbuffer有空闲空间,如果有thead在等待写msgq,则在这里写入
pending_thread = z_unpend_first_thread(&msgq->wait_q);
if (pending_thread != NULL) {
/* add thread's message to queue */
(void)memcpy(msgq->write_ptr, pending_thread->base.swap_data,
msgq->msg_size);
msgq->write_ptr += msgq->msg_size;
if (msgq->write_ptr == msgq->buffer_end) {
msgq->write_ptr = msgq->buffer_start;
}
//更新msgq剩余的message数
msgq->used_msgs++;

//等待写入msgq的thread在写入msgq后变为ready,并重新调度
arch_thread_return_value_set(pending_thread, 0);
z_ready_thread(pending_thread);
z_reschedule(&msgq->lock, key);
return 0;
}
result = 0;
} else if (timeout == K_NO_WAIT) {
/msgq ringbuffer空,且不等待就立即退出
result = -ENOMSG;
} else {
//msgq 空,将message放入swap_data,等待其它thread来写
_current->base.swap_data = data;
return z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
}

k_spin_unlock(&msgq->lock, key);

return result;
}

peek msgq

也可以通过peek读message,该方式不会将message从msgq的ringbuffer中删除
k_msgq_peek->z_impl_k_msgq_peek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
{
k_spinlock_key_t key;
int result;

key = k_spin_lock(&msgq->lock);

if (msgq->used_msgs > 0) {
//ringbuffer中有数据直接copy出去
(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
result = 0;
} else {
//ringbuffer中有无数据返回错误
result = -ENOMSG;
}

k_spin_unlock(&msgq->lock, key);

return result;
}

清空msgq

当不需要msgq中的数据时可以使用k_msgq_purge清空
k_msgq_purge->z_impl_k_msgq_purge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void z_impl_k_msgq_purge(struct k_msgq *msgq)
{
k_spinlock_key_t key;
struct k_thread *pending_thread;

key = k_spin_lock(&msgq->lock);

//清空ringbuffer前,会先让等待读msgq的thead将message读走
while ((pending_thread = z_unpend_first_thread(&msgq->wait_q)) != NULL) {
arch_thread_return_value_set(pending_thread, -ENOMSG);
z_ready_thread(pending_thread);
}

//复位 ringbuffer
msgq->used_msgs = 0;
msgq->read_ptr = msgq->write_ptr;

z_reschedule(&msgq->lock, key);
}

获取msgq信息

获取msgq的信息函数实现很简单结合前面k_msgq的结构体很容易理解,这里就不再注释分析了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
{
attrs->msg_size = msgq->msg_size;
attrs->max_msgs = msgq->max_msgs;
attrs->used_msgs = msgq->used_msgs;
}

static inline u32_t z_impl_k_msgq_num_free_get(struct k_msgq *msgq)
{
return msgq->max_msgs - msgq->used_msgs;
}

static inline u32_t z_impl_k_msgq_num_used_get(struct k_msgq *msgq)
{
return msgq->used_msgs;
}

参考

https://gcc.gnu.org/onlinedocs/gcc/Integer-Overflow-Builtins.html
https://docs.zephyrproject.org/latest/reference/kernel/data_passing/message_queues.html