Zephyr zbus - 4 原理与实现

Creative Commons
本作品采用知识共享署名
- zephyr
- zbus
- v3.3.0

本文分析说明zbus实现原理。

架构说明

从逻辑架构上看zbus包括下面三个部分:

  • 通道:由唯一标识符、控制数据和消息三者组成
  • VDED(Virtual Distributed Event Dispatcher)虚拟分布式事件调度器: 负责向观察者发送通知.VDED的执行逻辑在发布者的线程中,这可以看作是总线在分布式执行。当线程发布消息到通道时,也会通知观察者。
  • 订阅者线程和监听者回调接收来自总线的通知。
    下图是官方提供的zbus的内部细节图如下:

    发布者将消息丢到zbus的通道内,监听者的callback在发布者线程中执行,同时通知订阅者线程通道内有消息。
    订阅者线程在字节的上下文接收消息,并处理。

zbus的代码在zephyr/subsys/zbus/下, 头文件和管理结构在zephyr/include/zephyr/zbus/zbus.h

1
2
3
4
5
6
7
.
├── CMakeLists.txt
├── Kconfig
├── zbus.c //zbus基本功能/架构实现
├── zbus.ld
├── zbus_iterable_sections.c //zbus遍历器实现
└── zbus_runtime_observers.c //zbus运行时观察者实现

实现分析

观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
struct zbus_observer {
#if defined(CONFIG_ZBUS_OBSERVER_NAME) || defined(__DOXYGEN__)
/** 观察者名称. */
const char *const name;
#endif
/** 观察者启用标记. */
bool enabled;
/** 观察者消息队列. 发布者通过该队列通知订阅者*/
struct k_msgq *const queue;

/** 观察者回调. 发布者调用该监听者callback. */
void (*const callback)(const struct zbus_channel *chan);
};

观察者有订阅者和监听者两种:

订阅者

订阅者由下面宏进行定义,静态定义了一个消息队列和一个struct zbus_obsserver结构体变量,并用消息队列初始化该struct zbus_obsserver,对于订阅者来说不需要callback

1
2
3
4
5
6
7
8
9
#define ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size)                                                 \
//定义定义订阅者用的消息列的
K_MSGQ_DEFINE(_zbus_observer_queue_##_name, sizeof(const struct zbus_channel *), \
_queue_size, sizeof(const struct zbus_channel *)); \
//定义struct zbus_observer结构体变量,并对齐进行初始化
_ZBUS_STRUCT_DECLARE(zbus_observer, \
_name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.enabled = true, \
.queue = &_zbus_observer_queue_##_name, .callback = NULL}

因此当定义如下一个订阅者时

1
ZBUS_SUBSCRIBER_DEFINE(test_obs, 4);

得到的是如下结构体,它有一个大小为4的消息队列

1
2
3
4
5
6
struct zbus_observer test_obs = {
.name = "test_obs",
.enable = true
.queue = &_zbus_observer_queue_test_name,
.callback = NULL
}

监听者

监听者由下面宏定义, 它也是一个struct zbus_observer结构体变量,相反的是它没有消息队列,只有一个callback

1
2
3
4
5
#define ZBUS_LISTENER_DEFINE(_name, _cb)                                                           \
_ZBUS_STRUCT_DECLARE(zbus_observer, \
_name) = {ZBUS_OBSERVER_NAME_INIT(_name) /* Name field */ \
.enabled = true, \
.queue = NULL, .callback = (_cb)}

因此当定义如下一个监听者时

1
ZBUS_LISTENER_DEFINE(test_lis, test_listener_cb);

得到的是

1
2
3
4
5
6
struct zbus_observer test_lis = {
.name = "test_lis",
.enable = true
.queue = NULL,
.callback = test_listener_cb
}

从上面分析可以看到,订阅者和监听者同属于订阅者,使用相同的数据结构,因此在定义时不能使用相同的名称。

通道

通道的的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct zbus_channel {
#if defined(CONFIG_ZBUS_CHANNEL_NAME) || defined(__DOXYGEN__)
/** 通道名. */
const char *const name;
#endif
/** 通道的消息大小,和消息结构体一致. */
const uint16_t message_size;

/** 用户自定义数据*/
void *const user_data;

/** 消息引用,指向实际贡献内存区域的消息 */
void *const message;

/** 消息验证器函数,返回false表示验证失败,如果为NULL表示不需要验证 */
bool (*const validator)(const void *msg, size_t msg_size);

/** 通道访问锁 */
struct k_mutex *mutex;
#if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0) || defined(__DOXYGEN__)
/** 运行时订阅者链表*/
sys_slist_t *runtime_observers;
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */

/** 静态定义订阅者数组指针/
const struct zbus_observer *const *observers;
};

定义通道使用下面宏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#define ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val)        \
static _type _CONCAT(_zbus_message_, _name) = _init_val; //定义消息共享内存 \
static K_MUTEX_DEFINE(_CONCAT(_zbus_mutex_, _name)); //定义通道锁 \
ZBUS_RUNTIME_OBSERVERS_LIST_DECL(_CONCAT(_runtime_observers_, _name)); //定义运行时订阅者链表 \
FOR_EACH_NONEMPTY_TERM(_ZBUS_OBS_EXTERN, (;), _observers) //订阅者可能和通道不在同一个文件定义,因此外部引用订阅者 \
//定义一个订阅者数组,将所有订阅者加入到数组中
static const struct zbus_observer *const _CONCAT(_zbus_observers_, _name)[] = { \
FOR_EACH_NONEMPTY_TERM(ZBUS_REF, (,), _observers) NULL}; \

//定义通道结构体变量`struct zbus_channel`
const _ZBUS_STRUCT_DECLARE(zbus_channel, _name) = { \
ZBUS_CHANNEL_NAME_INIT(_name) /* 通道名称 */ \
.message_size = sizeof(_type), /* 通道消息大小 */ \
.user_data = _user_data, /* 用户数据 */ \
.message = &_CONCAT(_zbus_message_, _name), /* 指向消息共享内存 */\
.validator = (_validator), /* 验证器函数 */ \
.mutex = &_CONCAT(_zbus_mutex_, _name), /* 初始化通道锁 */ \
ZBUS_RUNTIME_OBSERVERS_LIST_INIT( \
_CONCAT(_runtime_observers_, _name)) /* 初始化运行时订阅者链表 */ \
.observers = _CONCAT(_zbus_observers_, _name)} /* 静态订阅者数组 */

当定义如下一个通道时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct test_msg{
int x,
int y
};

static bool test_chan_validator(const void *msg, size_t msg_size)
{
return true;
}
ZBUS_CHAN_DEFINE(test_chan,
struct test_msg,
test_chan_validator,
test_user_data,
ZBUS_OBSERVERS(test_obs, test_lis),
{.x=0, .y=0}
);

得到的是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static struct test_msg _zbus_message_test_chan = {.x=0, .y=0};
static K_MUTEX_DEFINE(_zbus_mutex_test_chan);
static sys_slist_t _runtime_observers_test_chan;
extern struct zbus_observer test_obs;
extern struct zbus_observer test_lis;
static const struct zbus_observer *const _zbus_observers_test_chan[]={
&test_obs,
&test_lis,
NULL
};
struct zbus_channel test_chan = {
.name = "test_chan"
.message_size = sizeof(struct test_msg),
.user_data = test_user_data,
.message = &_zbus_message_test_chan,
.validator = (test_chan_validator),
.mutex = &_zbus_mutex_test_chan,
.runtime_observers = &_runtime_observers_test_chan,
.observers = _zbus_observers_test_chan}
}

发布消息

使用zbus_chan_pub发布消息,具体流程如下分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
{
int err;
uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);

//不能在ISR中执行
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(msg != NULL, "msg is required");

//执行验证器,验证消息
if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
return -ENOMSG;
}

//通道上锁
err = k_mutex_lock(chan->mutex, timeout);
if (err) {
return err;
}

//将消息拷贝到共享消息内存中
memcpy(chan->message, msg, chan->message_size);

//通知观察者处理
err = _zbus_notify_observers(chan, end_ticks);

//通道解锁
k_mutex_unlock(chan->mutex);

return err;
}

static int _zbus_notify_observers(const struct zbus_channel *chan, uint64_t end_ticks)
{
int last_error = 0, err;
//遍历静态观察者列表,如果是监听者,调用监听者回调
for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
//订阅者没有被禁用的情况下,进行回调
if ((*obs)->enabled && ((*obs)->callback != NULL)) {
(*obs)->callback(chan);
}
}

//处理动态监听者,后文说明
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
_zbus_notify_runtime_listeners(chan);
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */

//遍历静态观察者列表,通知订阅者
for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
//订阅者没有被禁用的情况下,进行通知
if ((*obs)->enabled && ((*obs)->queue != NULL)) {
//通过订阅者的消息列队将通道结构发送过去
err = k_msgq_put((*obs)->queue, &chan, _zbus_timeout_remainder(end_ticks));
_ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s.",
_ZBUS_OBS_NAME(*obs));
if (err) {
LOG_ERR("Observer %s at %p could not be notified. Error code %d",
_ZBUS_OBS_NAME(*obs), *obs, err);
last_error = err;
}
}
}

//处理动态订阅者,后文说明
#if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
err = _zbus_notify_runtime_subscribers(chan, end_ticks);
if (err) {
last_error = err;
}
#endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
return last_error;
}

发布消息总结如下:

  1. 不能在ISR中执行,因为使用了k_mutex_lock
  2. 消息会被拷贝到通道的共享内存中
  3. 通道上监听器处理时通道被上锁,其它发布者只能等待
  4. 一个消息的处理顺序是:静态监听者处理->通知静态订阅者->动态监听者处理->通知动态订阅者
  5. 在发布消息的线程中执行监听者的回调函数

订阅消息

订阅者使用zbus_sub_wait等待发布者通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
k_timeout_t timeout)
{
//ISR中不能等待消息
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(sub != NULL, "sub is required");
_ZBUS_ASSERT(chan != NULL, "chan is required");

if (sub->queue == NULL) {
return -EINVAL;
}

//通过订阅者的消息队列等待
return k_msgq_get(sub->queue, chan, timeout);
}

当订阅者的消息队列中通知时使用zbus_chan_read读消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
{
int err;
//ISR中不能读消息
_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(msg != NULL, "msg is required");

//通道上锁
err = k_mutex_lock(chan->mutex, timeout);
if (err) {
return err;
}
//从通道内的消息共享内存中将消息拷贝出来
memcpy(msg, chan->message, chan->message_size);

//通道解锁
return k_mutex_unlock(chan->mutex);
}

订阅收消息总结如下:

  • 不能在ISR中等待和读取消息,因为这二者的实现分别使用了k_msgq_getk_mutex_lock进行等待
  • zbus_chan_read读取通道时,通道会被上锁

动态观察者

在配置了CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE后zbus允许将观察者动态的加入到通道内,该部分具体的实现代码在zephyr/subsys/zbus/zbus_runtime_observers.c,首先定义一个slab内存池_zbus_runtime_obs_pool,其大小决定了同时允许多少个动态观察者加入到通道中

1
2
K_MEM_SLAB_DEFINE_STATIC(_zbus_runtime_obs_pool, sizeof(struct zbus_observer_node),
CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE, 4);

添加观察者到通道

使用zbus_chan_add_obs将观察者加入到通道中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout)
{
int err;
struct zbus_observer_node *obs_nd, *tmp;
uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);

//不能再ISR中添加
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required");

//如果被添加观察者已经是静态观察者,就不能再添加
for (const struct zbus_observer *const *static_obs = chan->observers; *static_obs != NULL;
++static_obs) {
if (*static_obs == obs) {
return -EEXIST;
}
}

//通道上锁
err = k_mutex_lock(chan->mutex, timeout);
if (err) {
return err;
}

//如果该观察者已经被加入到该通道,就不能再添加
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
if (obs_nd->obs == obs) {
k_mutex_unlock(chan->mutex);

return -EALREADY;
}
}

//从观察者节点池中分配一个节点`struct zbus_observer_node`用于保存观察者信息
err = k_mem_slab_alloc(&_zbus_runtime_obs_pool, (void **)&obs_nd,
_zbus_timeout_remainder(end_ticks));

if (err) {
LOG_ERR("Could not allocate memory on runtime observers pool\n");

k_mutex_unlock(chan->mutex);

return err;
}

obs_nd->obs = obs;

//将观察者节点加入到运行时观察者动态链表
sys_slist_append(chan->runtime_observers, &obs_nd->node);

//通道解锁
k_mutex_unlock(chan->mutex);

return 0;
}

从通道移除观察者

使用zbus_chan_rm_obs将观察者从通道中移除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs,
k_timeout_t timeout)
{
int err;
struct zbus_observer_node *obs_nd, *tmp;
struct zbus_observer_node *prev_obs_nd = NULL;
//不能再ISR中删除
_ZBUS_ASSERT(!k_is_in_isr(), "ISR blocked");
_ZBUS_ASSERT(chan != NULL, "chan is required");
_ZBUS_ASSERT(obs != NULL, "obs is required");

//通道上锁
err = k_mutex_lock(chan->mutex, timeout);
if (err) {
return err;
}

//遍历通道的运行时观察者链表,如果有匹配到观察者,从链表中删除,并释放节点内存
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
if (obs_nd->obs == obs) {
sys_slist_remove(chan->runtime_observers, &prev_obs_nd->node,
&obs_nd->node);

k_mem_slab_free(&_zbus_runtime_obs_pool, (void **)&obs_nd);

k_mutex_unlock(chan->mutex);

return 0;
}

prev_obs_nd = obs_nd;
}
//通道解锁
k_mutex_unlock(chan->mutex);

return -ENODATA;
}

动态观察者消息处理

动态观察者的消息处理分别是:

  • 发布者的线程中处理动态监听者,_zbus_notify_runtime_listeners
  • 发布者的线程中通过消息队列通知订阅者, _zbus_notify_runtime_subscribers
    二者的实现都是遍历动态观察者的链表取出观察者分别调用callback或是发送消息,具体代码不再列出。

弱点分析

下面借用Zephyr官方文档说明VDED的图例进行说明:

为了说明VDED执行,上图例子中有四个线程,按优先级升序排列:T1,T2,T3和T4(最高优先级);两个监听器L1和L2;以及通道A。L1、L2、T2、T3和T4观察通道A。

上图中a-i字母表示与VDED执行相关操作。X轴表示时间,Y轴表示线程的优先级。通道A的消息共享内存用对话框表示,每个通道只有一份。当T1发布消息到通道A时会按a-i的顺序发生下面动作:
a. T1发布消息到通道A
b. VDED过程开始,VDED对通道A进行上锁
c. VDED将消息拷贝到通道A的消息共享内存中
d,e. VDED执行L1和L2,回调中可以通过zbus_chan_const_msg对通道 A 的消息共享内存的直接常量引用
f,g,h. VDED依次将通知消息推送到T2、T3和T4的队列中。线程在接收到通知后立即准备执行。由于T1还拿到通道锁T2,T3,T4将进入待处理状态。此时,T1线程会因为T4的等待发生优先级翻转,T1的优先级被升到和T4一样,这确保了T1将尽快完成VDED执行。
i. VDED结束消息发布,解锁通道
j,k 当通道A未被锁定时,最后优先级T4获取通道锁,进入CPU并开始执行,从通道A拷贝出消息,解锁,处理,并释放CPU。
l,m,n 结合T3和T2按优先级依次从通道A读取数据,过程同T4
对于上面的过程可以看出,通道中只有一个消息的共享内容,在zbus_chan_pub发布完后,通道的锁已经被释放,此时订阅者的线程并不一定会马上被调度,如果此时有更高优先级的线程向通道发布消息,原来通道中的消息将会被覆盖,这将导致老的消息丢失,而新的消息被收到两次。例如图中T5在此刻也发布了消息 共享内存中原来T1的消息将被覆盖,对此官方文档也有如下说明:

Zbus always delivers the messages to the listeners. However, there are no message delivery guarantees for subscribers because zbus only sends the notification, but the message reading depends on the subscriber’s implementation. This is because channels have a mutex protected singleton objects for which message transfer is used. In other words, it can be seen as a single size queue where publishers always overwrite if queue is full.

并给出了建议:

  • Keep the listeners quick-as-possible (deal with them as ISRs). If some processing is needed, consider submitting a work to a work-queue;
  • Try to give producers a high priority to avoid losses;
  • Leave spare CPU for observers to consume data produced;
  • Consider using message queues or pipes for intensive byte transfers.

但这并不能解决前面提到T5这种更高优先级发布产生的问题,这只能要求消费者的优先级高于生产者才能避免。总结目前zbus的这种实现可能会导致的问题:

  1. 发布者的消息丢失
  2. 订阅者可能多次收到相同的消息内容
  3. 订阅者和监听者观察到的消息不一致
  4. 消息被覆盖时发布者不知道
    zbus的用户需要在一开始的设计就要关注单消息共享内存的限制,在流程或优先级上做仔细的设计,以上问题可能造成zbus的使用者困扰,也不利于zbus模块的封闭性,增加模块间解耦过程中额外的考虑。
    如果将通道中的单消息共享内存修改为FIFO,允许通道保留多个消息,在FIFO满的情况下允许发布进行超时等待,如果等待失败发布者也能知道本次消息发布失败。
    该问题已提交讨论:https://github.com/zephyrproject-rtos/zephyr/issues/57898

参考

https://docs.zephyrproject.org/3.3.0/services/zbus/index.html