原理简介
Zephyr的邮箱(mailbox)可以看作是升级版的消息队列(msgq),与msgq不一样的是:邮箱的消息可以指定发送者和接收者,邮箱消息的大小可以不固定,长度也没有对齐的要求。此外邮箱只能用于线程间交换消息,不能用于ISR内。
邮箱构成和特性
Zephyr允许定义任意数量的邮箱,最大数量只由可用内存的量决定。
一个邮箱主要是由发送队列和消息队列两个wait_q组才,如下定义:1
2
3
4
5struct k_mbox {
_wait_q_t tx_msg_queue;
_wait_q_t rx_msg_queue;
struct k_spinlock lock;
};
- 发送队列:用于保存已发送但还未被接收的消息
- 接收队列:用于保存正在等待消息的线程
当发送线程发送消息到邮箱后会阻塞等待(同步发送)接收线程完成消息接收才退出等待,或是不阻塞而是等待接收线程完成消息接收后通过信号量通知(异步发送)。
邮箱支持:
- 一对一: A发到邮箱的消息只能B收, B只收A的消息
- 一对多: A发到邮箱的消息任意线程都可以收,注意不是广播。
- 多对一:B线程可以收任意线程发到邮箱中消息。
邮箱不支持广播,邮箱中的消息只能被一个线程使用。
消息
消息描述符
消息是由消息描述符描述,指定消息的内存地址,并描述消息如何被邮箱处理。发送线程和接收线程在访问邮箱时都需要提供消息描述符,消息描述符相互匹配才能传递消息。
消息描述符的结构如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14struct k_mbox_msg {
uint32_t _mailbox;
size_t size;
uint32_t info;
void *tx_data;
void *_rx_data;
struct k_mem_block tx_block;
k_tid_t rx_source_thread;
k_tid_t tx_target_thread;
k_tid_t _syncing_thread;
#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
struct k_sem *_async_sem;
#endif
};
_mailbox
和_rx_data
这两个字段已经不再使用_syncing_thread
用于通知发送者发送的消息已经被接收,_async_sem
用于异步发送后通知消息已经完成,这两个字段都是内部使用。
消息描述符中容纳消息的方式有两种:
- 缓冲: 给定一段buffer,消息数据放于其中
- 内存块:申请的一片内存块,消息数据放于其中
实际的消息发送二选一,如果二者都为NULL,则表示发送空消息。tx_data
发送缓冲消息的内存指针,如果是内存块消息或者空消息该字段为NULL,接收者不初始化该字段tx_block
内存块id,发送内存块消息时使用该字段。如果是缓冲消息或是空消息该字段为NULL,接收者不初始化该字段。size
消息大小,允许为0,为0时表示空消息. 接收时表示允许接收消息的最大size, 如果收到的消息不是想要的发送方的size将为设为0,如果消息被接收,发送方的size将被更新为实际接收的大小。info
发送者和接收者在消息接收时双向交换字段,32bit大小tx_target_thread
接收消息的线程id,接收者消息不初始化该字段,接收者接收该消息后邮箱会将该字段更新为为实际接收者的id。当指定为K_ANY时该消息可以被任意线程接收。rx_source_thread
发送消息的线程id,发送者消息不初始化该字段,接收者接收该消息后邮箱会将该字段更新为为实际发送者的id。当指定为K_ANY时该线程可接收任意消息。
消息的生命周期
创建消息:发送线程将消息发送到邮箱
消息保存在邮箱
删除消息:接收线程从邮箱接收消息,并将其消耗
使用
API
#define K_MBOX_DEFINE(name)
作用:定义一个k_mbox,并初始化
name: k_mbox name
void k_mbox_init(struct k_mbox *mbox)
作用:初始化k_mbox
mbox: 要初始化的mbox
返回值: 0标示初始化成功
int k_mbox_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg, k_timeout_t timeout)
作用:同步发送消息到邮箱,会等到有线程从邮箱接收,或是超时才退出。如果发送超时消息将不会被保存在邮箱中
mbox: 邮箱
tx_msg: 发送消息的描述符,包含了接收者,消息信息,见前文说明
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:0发送成果,-ENOMSG不等待发送失败,-EAGAIN 等待超时
void k_mbox_async_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg, struct k_sem *sem)
作用:异步发送消息到邮箱,发送后立即返回,有线程从邮箱接收该消息时使用sem通知。
mbox: 邮箱
tx_msg: 发送消息的描述符,包含了接收者,消息信息,见前文说明
sem:接收消息通知信号量
int k_mbox_get(struct k_mbox *mbox, struct k_mbox_msg *rx_msg, void *buffer, k_timeout_t timeout)
作用:从邮箱接收消息。
mbox: 邮箱
rx_msg: 接收消息的描述符,指定发送者,接收消息的信息,见前文说明
buffer: 接收消息数据存放buffer,如果为NULL,邮箱将保存该消息,消息的信息保存在rx_msg中,直到使用k_mbox_data_get消耗该消息才会从邮箱中删除
timeout: 等待时间,单位ms。K_NO_WAIT不等待, K_FOREVER一直等
返回值:0发送成果,-ENOMSG不等待发送失败,-EAGAIN 等待超时
void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer)
作用:将消息数据搬运到buffer中,并从邮箱中删除该消息。
rx_msg: 接收消息的描述符,包含了要接收消息的信息
buffer: 接收消息数据存放buffer,如果为NULL将直接丢弃该消息
使用说明
初始化
先定义初始化一个邮箱,下面两种方式的效果是一样的
使用宏1
K_MBOX_DEFINE(my_mailbox);
使用函数1
2struct k_mbox my_mailbox;
k_mbox_init(&my_mailbox);
发送消息
同步1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35void producer_thread(void)
{
char buffer[100];
int buffer_bytes_used;
struct k_mbox_msg send_msg;
while (1) {
//准备要发送的消息数据
...
buffer_bytes_used = ... ;
memcpy(buffer, source, buffer_bytes_used);
//准备消息描述符
send_msg.info = 123;
send_msg.size = buffer_bytes_used;
send_msg.tx_data = buffer;
send_msg.tx_block.data = NULL;
send_msg.tx_target_thread = consumer_thread_id;
//发送消息到mailbox,并等待被接收
k_mbox_put(&my_mailbox, &send_msg, K_FOREVER);
//接收完毕退出等待,检查info, size, tx_target_thread 的更新
//info和接收者交换会变为456
//size变为实际接收的30
/* verify that message data was fully received */
if (send_msg.size < buffer_bytes_used) {
printf("some message data dropped during transfer!");
printf("receiver only had room for %d bytes", send_msg.info);
}
}
}
异步,异步发送函数k_mbox_async_put
在配置了CONFIG_NUM_MBOX_ASYNC_MSGS=y
后才会有效1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41void producer_thread(void)
{
char buffer[100];
int buffer_bytes_used = 100;
struct k_mbox_msg send_msg;
struct k_sem rev_sem;
k_sem_init(&rev_sem, 0, 10);
while (1) {
//准备消息描述符
...
buffer_bytes_used = ... ;
memcpy(buffer, source, buffer_bytes_used);
/* prepare to send message */
send_msg.info = 123;
send_msg.size = buffer_bytes_used;
send_msg.tx_data = buffer;
send_msg.tx_block.data = NULL;
send_msg.tx_target_thread = consumer_thread_id;
//发送消息到mailbox
k_mbox_async_put(&my_mailbox, &send_msg, &rev_sem);
//等待消息被接收通知信号量
k_sem_take(&rev_sem, K_FOREVER);
//接收完毕退出等待,检查info, size, tx_target_thread 的更新
//info和接收者交换会变为456
//size变为实际接收的30
/* verify that message data was fully received */
if (send_msg.size < buffer_bytes_used) {
printf("some message data dropped during transfer!");
printf("receiver only had room for %d bytes", send_msg.info);
}
}
}
接收消息
1 | void consumer_thread(void) |
实现
该小节通过对邮箱内核代码的分析,理解Zephyr是如何实现以上描述的功能特性.
mailbox的实现代码在kernel\mailbox.c中
初始化
前文提到过mailbox的核心就是两个wait_q,初始化则是对发送和接收的wait_q进行初始话,并初始化一个lock用于操作wait_q时锁调度1
2
3
4
5
6void k_mbox_init(struct k_mbox *mbox)
{
z_waitq_init(&mbox->tx_msg_queue);
z_waitq_init(&mbox->rx_msg_queue);
mbox->lock = (struct k_spinlock) {};
}
发送消息
这里主要分析同步发送,k_mbox_put->mbox_message_put
1
2
3
4
5
6
7
8
9
10int k_mbox_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,
k_timeout_t timeout)
{
//发送消息的线程在消息被接收前会被放入tx_msg_queue中挂起,这里将当前thread保存在消息中的_syncing_thread,方便消息被接收后对_syncing_thread进行恢复
tx_msg->_syncing_thread = _current;
int ret = mbox_message_put(mbox, tx_msg, timeout);
return ret;
}
消息发送的主要流程位于mbox_message_put
中,有下面两种情况:
- 遍历邮箱
rx_msg_queue
中等待消息的线程,如果找到匹配的接收线程,让接收线程变为就绪,挂起当前线程。 - 如果没找到匹配的接收线程,将消息放入
tx_msg_queue
,挂起当前线程并等待超时。
代码详细分析如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69static int mbox_message_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,
k_timeout_t timeout)
{
struct k_thread *sending_thread;
struct k_thread *receiving_thread;
struct k_mbox_msg *rx_msg;
k_spinlock_key_t key;
//设置描述符中发送线程id为当前线程
tx_msg->rx_source_thread = _current;
//发送线程要交换的数据被设置为消息描述符
sending_thread = tx_msg->_syncing_thread;
sending_thread->base.swap_data = tx_msg;
//锁调度,开始处理消息发送
key = k_spin_lock(&mbox->lock);
//检查rx_msg_queue中是否有线程在等待接收邮箱消息
_WAIT_Q_FOR_EACH(&mbox->rx_msg_queue, receiving_thread) {
//获取接收消息描述符
rx_msg = (struct k_mbox_msg *)receiving_thread->base.swap_data;
//将收发消息描述进行匹配
if (mbox_message_match(tx_msg, rx_msg) == 0) {
//该消息有线程可匹配,让接收线程从挂起恢复
z_unpend_thread(receiving_thread);
//pend返回设置为0,表示有正常拿到消息
arch_thread_return_value_set(receiving_thread, 0);
//让接收线程处于就绪态,下一次调度的时候接收线程就会取得消息进行处理
z_ready_thread(receiving_thread);
#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
//异步发送流程,接收线程处理消失时不做阻塞
if ((sending_thread->base.thread_state & _THREAD_DUMMY)
!= 0U) {
z_reschedule(&mbox->lock, key);
return 0;
}
#endif
//同步发送流程,异步线程处理消息时,发送消息线程被直接挂起,异步线程处理完后会通过线程交换数据中保存的tx_msg->_syncing_thread让发送线程进入就绪继续运行
int ret = z_pend_curr(&mbox->lock, key, NULL, K_FOREVER);
return ret;
}
}
//没有匹配的接收线程,同时也不等待消息发送,就立即退出
if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
k_spin_unlock(&mbox->lock, key);
return -ENOMSG;
}
#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
//异步发送时会用一个dummy thread来进行等待消息接收完成,真正的发送线程会立即退出
if ((sending_thread->base.thread_state & _THREAD_DUMMY) != 0U) {
//这里是在发送线程中执行,而挂起的dummy thread,因此不会卡在这里
z_pend_thread(sending_thread, &mbox->tx_msg_queue, K_FOREVER);
k_spin_unlock(&mbox->lock, key);
return 0;
}
#endif
//同步发送,如果没有匹配的线程接收,当前线程被加入tx_msg_queue中,等待超时。因为tx_msg是放到当前线程的交换数据内,这个操作就相当于将消息放入了邮箱
int ret = z_pend_curr(&mbox->lock, key, &mbox->tx_msg_queue, timeout);
return ret;
}
接收
消息接收的流程和发送的流程是对称的,有下面两种情况:
- 遍历邮箱
tx_msg_queue
中查看是否有消息,如果找到匹配的消息,接收线程接收处理该消息后通知发送线程恢复执行。 - 如果没找到匹配的消息,将接收线程放入
rx_msg_queue
,挂起当前线程并等待接收消息超时。
代码详细分析如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50int k_mbox_get(struct k_mbox *mbox, struct k_mbox_msg *rx_msg, void *buffer,
k_timeout_t timeout)
{
struct k_thread *sending_thread;
struct k_mbox_msg *tx_msg;
k_spinlock_key_t key;
int result;
//设置描述符中接收线程id为当前线程
rx_msg->tx_target_thread = _current;
//锁调度,开始处理消息接收
key = k_spin_lock(&mbox->lock);
//检查邮箱的tx_msg_queue中是否有可用消息
_WAIT_Q_FOR_EACH(&mbox->tx_msg_queue, sending_thread) {
//从线程交换数据中获取发送消息描述符
tx_msg = (struct k_mbox_msg *)sending_thread->base.swap_data;
//将收发消息描述进行匹配
if (mbox_message_match(tx_msg, rx_msg) == 0) {
//该消息和接收线程匹配,将发送消息的线程从邮箱tx_msg_queue中取出
z_unpend_thread(sending_thread);
k_spin_unlock(&mbox->lock, key);
//处理数据,数据处理完后会让发送线程就绪并重新调度。如果这里buffer为空,会将数据保留,同时发送线程仍然处理挂起状态。
result = mbox_message_data_check(rx_msg, buffer);
return result;
}
}
//没有匹配的消息,同时也不等待,退出本次接收
if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
k_spin_unlock(&mbox->lock, key);
return -ENOMSG;
}
//如果没有匹配的消息,当前线程被加入rx_msg_queue中,等待超时
_current->base.swap_data = rx_msg;
result = z_pend_curr(&mbox->lock, key, &mbox->rx_msg_queue, timeout);
//接收到消息进行消息处理,数据处理完后会让发送线程就绪并重新调度。如果这里buffer为空,会将数据保留,同时发送线程仍然处理挂起状态。
if (result == 0) {
result = mbox_message_data_check(rx_msg, buffer);
}
return result;
}
消息匹配
从前面的分析看到消息匹配是使用mbox_message_match
对收发消息描述符进行比较,比较过程中主要有如下事项:
- 比较收发的thread id是否匹配
- 进行thread id交换
- 进行info交换
- 更新实际可以接收的size
- 将消息的数据地址更新到接收消息描述符内
- 将发送线程id更新到接收消息描述符中
1 | static int mbox_message_match(struct k_mbox_msg *tx_msg, |
消息处理
邮箱消息数据的处理有内部函数mbox_message_data_check
和外部函数k_mbox_data_ge
t两个。在k_mbox_get
传入的参数buffer=NULL
时,mbox_message_data_check
检查到时消息会被保留在邮箱。之后可以再通过k_mbox_data_get
来读取该消息,如果此时buffer=NULL
该消息就会从邮箱中取出丢弃。
代码分析如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32static int mbox_message_data_check(struct k_mbox_msg *rx_msg, void *buffer)
{
if (buffer != NULL) {
//buffer不为NULL将数据读到buffer中
k_mbox_data_get(rx_msg, buffer);
} else if (rx_msg->size == 0U) {
//buffer为空,且size为0,表示为空消息无需接收数据,直接将消息从邮箱中取出丢弃
mbox_message_dispose(rx_msg);
} else {
//buffer为NULL且不是空消息,表示应用后续会用k_mbox_data_get取数据
}
return 0;
}
void k_mbox_data_get(struct k_mbox_msg *rx_msg, void *buffer)
{
//buffer为空,不需要消息数据,直接丢弃该消息
if (buffer == NULL) {
rx_msg->size = 0;
mbox_message_dispose(rx_msg);
return;
}
//不是空消息,将消息数据copy到buffer中
if ((rx_msg->tx_data != NULL) && (rx_msg->size > 0U)) {
(void)memcpy(buffer, rx_msg->tx_data, rx_msg->size);
}
//空消息,无需接收数据,直接将消息从邮箱中取出丢弃
mbox_message_dispose(rx_msg);
}
mbox_message_dispose再来看取出消息丢弃的流程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42static void mbox_message_dispose(struct k_mbox_msg *rx_msg)
{
struct k_thread *sending_thread;
struct k_mbox_msg *tx_msg;
// 消息被接收被通知的thread会变为NULL,因此不用再丢弃,这季节返回
if (rx_msg->_syncing_thread == NULL) {
return;
}
if (rx_msg->tx_block.data != NULL) {
rx_msg->tx_block.data = NULL;
}
//通过将被通知thread设置为NULL,表示该消息已经被接收,并从邮箱丢弃
sending_thread = rx_msg->_syncing_thread;
rx_msg->_syncing_thread = NULL;
//更新实际接收数据的大小
tx_msg = (struct k_mbox_msg *)sending_thread->base.swap_data;
tx_msg->size = rx_msg->size;
#if (CONFIG_NUM_MBOX_ASYNC_MSGS > 0)
//异步发送,消息接收完毕,邮箱中已删除该消息,使用信号量进行通知
if ((sending_thread->base.thread_state & _THREAD_DUMMY) != 0U) {
struct k_sem *async_sem = tx_msg->_async_sem;
mbox_async_free((struct k_mbox_async *)sending_thread);
if (async_sem != NULL) {
k_sem_give(async_sem);
}
return;
}
#endif
//同步发送,消息接收完毕,邮箱中已删除该消息,通知发送者退出等待状态,发送线程变为ready,并重新调度
arch_thread_return_value_set(sending_thread, 0);
z_mark_thread_as_not_pending(sending_thread);
z_ready_thread(sending_thread);
z_reschedule_unlocked();
}
异步支持
邮箱支持异步,在发送者发送消息后可以立即退出不用等到被接收后才退出。通过下面手段实现异步支持:
- 初始化时创建指定数量的异步消息,里面包含一个发送消息描述符和一个用于等待消息被接收的dummy thread
- 发送异步消息时,将发送描述符存放在异步消息内,并将等待的同步_syncing_thread设置为dummy thread,并指定通知接收完成的信号量
- 接收消息完成后通过信号量通知接收已经完成
异步消息支持的数量受CONFIG_NUM_MBOX_ASYNC_MSGS
配置限制。
这里我们主要分析异步消息初始化和发送的流程,其它都已经在前文分析的代码中进行了注释
异步消息
异步消息的结构如下1
2
3
4struct k_mbox_async {
struct _thread_base thread; //dummy thread
struct k_mbox_msg tx_msg; //发送消息描述符
};
初始化
初始化建立异步消息的stack,并将声明好的异步消息放入stack1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22static int init_mbox_module(const struct device *dev)
{
ARG_UNUSED(dev);
//声明CONFIG_NUM_MBOX_ASYNC_MSGS个异步消息
static struct k_mbox_async __noinit async_msg[CONFIG_NUM_MBOX_ASYNC_MSGS];
int i;
for (i = 0; i < CONFIG_NUM_MBOX_ASYNC_MSGS; i++) {
//将异步消息内的thread设置为dummy状态
z_init_thread_base(&async_msg[i].thread, 0, _THREAD_DUMMY, 0);
//将异步消息压入堆栈内
k_stack_push(&async_msg_free, (stack_data_t)&async_msg[i]);
}
return 0;
}
//在Zephyr初始化的PRE_KERNEL_1阶段会调用init_mbox_module
SYS_INIT(init_mbox_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
异步发送
异步发送和同步发送的主要流程一致,差别就是使用异步消息和dummy thread进行等待,的代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21void k_mbox_async_put(struct k_mbox *mbox, struct k_mbox_msg *tx_msg,
struct k_sem *sem)
{
struct k_mbox_async *async;
//分配异步消息
mbox_async_alloc(&async);
//初始化dummy thread的优先级
async->thread.prio = _current->base.prio;
//将发送消息存放在异步消息内
async->tx_msg = *tx_msg;
//将dummy_thread做为被通知thread进行等待,当前thread调用k_mbox_async_put可立即返回
async->tx_msg._syncing_thread = (struct k_thread *)&async->thread;
//将通知信号量保存在异步信息中
async->tx_msg._async_sem = sem;
//发送消息
(void)mbox_message_put(mbox, &async->tx_msg, K_FOREVER);
}
上面出现了对异步消息的堆栈的处理函数,其实就是对初始化时异步消息堆栈的处理,发送时pop出一个空的异步消息,接收完后将异步消息又加入到堆栈中1
2
3
4
5
6
7
8
9static inline void mbox_async_alloc(struct k_mbox_async **async)
{
(void)k_stack_pop(&async_msg_free, (stack_data_t *)async, K_FOREVER);
}
static inline void mbox_async_free(struct k_mbox_async *async)
{
k_stack_push(&async_msg_free, (stack_data_t)async);
}
从前面的分析我们知道初始化时空闲的异步消息只有CONFIG_NUM_MBOX_ASYNC_MSGS
个。当空闲的异步消息用完后stack就为空了,此时再发送异步消息就会因为stack pop的特性发生阻塞,k_mbox_async_put
被挂起,直到有异步消息被接收后push回stack。
关于stack的特性可以参考Zephyr内核对象-数据传递之Stack
参考
https://docs.zephyrproject.org/latest/reference/kernel/data_passing/mailboxes.html