Zephyr zbus - 3 使用方法

Creative Commons
本作品采用知识共享署名

本文举例说明zbus的使用方法。

在上篇文章中列举了zbus API 的使用方法,本文以实例说明如何使用zbus。

基本使用方法

本文基于esp32c3进行测试,这个实例演示的内容如下

  • 通道:trigger_chan, temp_data_chan
  • 观察者:trigger_sub, temp_lis, temp_sub
  • 发布者:shell, sensor thread
    工作流程如下
  1. shell命令发送trigger消息到trigger_chan
  2. sensor thread通过trigger_sub获取到该消息后从芯片内部温度传感器读出温度
  3. sensor thread将读出的温度封装为消息发送到temp_data_chan
  4. 监听者temp_lis先收到温度数据
  5. display task通过temp_sub获取到温度消息后发送到串口上

配置

使用zbus前首先要先配置启用zbus, 由于要使用sensor,因此也要配置启用。

1
2
CONFIG_ZBUS=y
CONFIG_SENSOR=y

代码

定义zbus的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#define CPU_TEMP    0
//trigger消息类型
typedef struct{
int8_t type;
}trigger_msg_t;

//温度消息类型
typedef struct{
int32_t temp; //温度值
uint32_t time; //读取传感器的时间
}display_msg_t;

//验证器
static bool trigger_chan_validator(const void *msg, size_t msg_size)
{
ARG_UNUSED(msg_size);

const trigger_msg_t *trigger = msg;
//验证trigger_msg_t消息,不是获取CPU_TEMP的消息不予处理
if (trigger->type == CPU_TEMP) {
LOG_INF("Validator pass");
return true;
}

LOG_ERR("Validator fail No support trigger type %d", trigger->type);

return false;
}

//监听者回调函数
static void trigger_listener_callback(const struct zbus_channel *chan)
{
const trigger_msg_t *trigger = zbus_chan_const_msg(chan);

LOG_INF("From listener -> trigger %d", trigger->type);
}

//监听者
ZBUS_LISTENER_DEFINE(temp_lis, temp_listener_callback);

//订阅者
ZBUS_SUBSCRIBER_DEFINE(trigger_sub, 4);
ZBUS_SUBSCRIBER_DEFINE(temp_sub, 4);

// trigger通道
ZBUS_CHAN_DEFINE(trigger_chan, /* 通道名trigger_chan */
trigger_msg_t, /* 消息类型 */

trigger_chan_validator, /* 指定验证器 */
NULL, /* 没有用户数据 */
ZBUS_OBSERVERS(trigger_sub), /* 指定观察者为订阅者trigger_sub */
ZBUS_MSG_INIT(.type = 0) /* 初始化内部消息 */
);

ZBUS_CHAN_DEFINE(temp_data_chan, /* 通道名temp_data_chan */
display_msg_t, /* 消息类型 */
NULL, /* 不指定验证器 */
NULL, /* 没有用户数据 */
ZBUS_OBSERVERS(temp_lis, temp_sub), /* 指定观察者为订阅者temp_sub和监听者temp_lis */
ZBUS_MSG_INIT(.temp = 0, .time = 0) /* 初始化内部消息 */
);

shell发布trigger消息
当在shell执行zbus trigger时会调用zbus_cmd_trigger发布trigger消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static int zbus_cmd_trigger(const struct shell *shell, size_t argc, char *argv[])
{
trigger_msg_t trigger;
//初始化要发布的消息为获取CPU温度
trigger.type = CPU_TEMP;
//发布消息到trigger_chan
zbus_chan_pub(&trigger_chan, &trigger, K_SECONDS(1));
return 0;
}

//注册shell命令
SHELL_STATIC_SUBCMD_SET_CREATE(sub_zbus,
SHELL_CMD(trigger, NULL, "trigger sensor thread work", zbus_cmd_trigger),
SHELL_SUBCMD_SET_END
);

SHELL_CMD_REGISTER(zbus, &sub_zbus, "zbus test commands", NULL);

sensor接收消息并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static void sensor_subscriber_task(void)
{
const struct zbus_channel *chan;
const struct device *const dev = DEVICE_DT_GET_ONE(espressif_esp32_temp);
struct sensor_value val;

// sensor thread等待订阅者trigger_sub的消息
while (!zbus_sub_wait(&trigger_sub, &chan, K_FOREVER)) {
trigger_msg_t msg;
//只处理trigger_chan上的消息
if (&trigger_chan == chan) {
//从trigger_chan上读取消息
zbus_chan_read(&trigger_chan, &msg, K_MSEC(500));
LOG_INF("From trigger_sub -> Trigger Msg type=%d", msg.type);

//处理要读取CPU温度的消息
if(msg.type == CPU_TEMP){
//从传感器上读处CPU的温度
sensor_sample_fetch(dev);
sensor_channel_get(dev, SENSOR_CHAN_DIE_TEMP, &val);
LOG_INF("Current temperature: %d °C\n", val.val1);

//将CPU的温度封装为display消息
display_msg_t display_msg;
display_msg.temp = val.val1;
display_msg.time = k_cycle_get_32();
//将display的消息发布到通道temp_data_chan上
zbus_chan_pub(&temp_data_chan, &display_msg, K_SECONDS(1));
}
}
}
}

//静态创建sensor线程
K_THREAD_DEFINE(temp_task_id, CONFIG_MAIN_STACK_SIZE,
sensor_subscriber_task, NULL, NULL, NULL, 3, 0, 0);

display接收消息并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void sensor_display_task(void)
{
const struct zbus_channel *chan;
const struct device *const dev = DEVICE_DT_GET_ONE(espressif_esp32_temp);
struct sensor_value val;

// display thread等待订阅者temp_sub的消息
while (!zbus_sub_wait(&temp_sub, &chan, K_FOREVER)) {
display_msg_t msg;
//只处理temp_data_chan上的消息
if (&temp_data_chan == chan) {
//读取温度消息
zbus_chan_read(&temp_data_chan, &msg, K_MSEC(500));
//送到串口显示
LOG_INF("From temp_sub[%u] -> temperature %d", msg.time, msg.temp);
}
}
}

//静态创建display线程
K_THREAD_DEFINE(display_task_id, CONFIG_MAIN_STACK_SIZE,
sensor_display_task, NULL, NULL, NULL, 3, 0, 0);

执行结果

1
2
3
4
5
6
7
uart:~$ zbus trigger
[00:01:11.567,000] <inf> zbus_sample: Validator pass
[00:01:11.567,000] <inf> zbus_sensor: From trigger_sub -> Trigger Msg type=0
[00:01:11.567,000] <inf> zbus_sensor: Current temperature: 34 °C

[00:01:11.567,000] <inf> zbus_sample: From listener -> Temp 34
[00:01:11.567,000] <inf> zbus_display: From temp_sub[1145146557] -> temperature 34

运行时观察者

运行时观察者仍然需要静态定义,只是在运行时加入到通道。

配置

要使用运行时观察者需要首先配置支持运行时观察者pool大小,默认该配置为0,配置为2就表示最大支持2个运行时观察者

1
CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE=2

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void trigger_listener_callback(const struct zbus_channel *chan)
{
const trigger_msg_t *trigger = zbus_chan_const_msg(chan);

LOG_INF("From listener -> trigger %d", trigger->type);
}

//runtime时的监听者
ZBUS_LISTENER_DEFINE(runtime_trigger_lis, trigger_listener_callback);

//runtime时的订阅者
ZBUS_SUBSCRIBER_DEFINE(runtime_trigger_sub, 4);

//等待订阅者runtime_trigger_sub的消息
static void check_subscriber_task(void)
{
const struct zbus_channel *chan;

while (!zbus_sub_wait(&runtime_trigger_sub, &chan, K_FOREVER)) {
trigger_msg_t msg;

if (&trigger_chan == chan) {
zbus_chan_read(&trigger_chan, &msg, K_MSEC(500));
LOG_INF("From runtime_trigger_sub -> Trigger Msg type=%d", msg.type);
}
}
}

K_THREAD_DEFINE(check_task_id, CONFIG_MAIN_STACK_SIZE,
check_subscriber_task, NULL, NULL, NULL, 3, 0, 0);

static int zbus_cmd_add_runtimeobs(const struct shell *shell, size_t argc, char *argv[])
{
//将观察者加入到trigger_chan
zbus_chan_add_obs(&trigger_chan, &runtime_trigger_lis, K_MSEC(200));
zbus_chan_add_obs(&trigger_chan, &runtime_trigger_sub, K_MSEC(200));
return 0;
}

static int zbus_cmd_rm_runtimeobs(const struct shell *shell, size_t argc, char *argv[])
{
//将观察者从trigger_chan移除
zbus_chan_rm_obs(&trigger_chan, &runtime_trigger_lis, K_MSEC(200));
zbus_chan_rm_obs(&trigger_chan, &runtime_trigger_sub, K_MSEC(200));
return 0;
}

运行结果

在执行zbus_cmd_add_runtimeobs后,在执行shell命令zbus trigger可以看到多了动态加入的观察者收到消息

1
2
3
4
5
6
7
8
9
uart:~$ zbus trigger
[00:55:33.718,000] <inf> zbus_sample: Validator pass
[00:55:33.718,000] <inf> zbus_sample: From listener -> trigger 0
[00:55:33.718,000] <inf> zbus_sensor: From trigger_sub -> Trigger Msg type=0
[00:55:33.718,000] <inf> zbus_sensor: Current temperature: 35 °C

[00:55:33.718,000] <inf> zbus_sample: From listener -> Temp 35
[00:55:33.718,000] <inf> zbus_sample: From runtime_trigger_sub -> Trigger Msg type=0
[00:55:33.718,000] <inf> zbus_display: From temp_sub[1801485261] -> temperature 35

观察者被动态加入到trigger_chan后,有消息出现在chan后观察者会收到消息,一旦调用zbus_cmd_rm_runtimeobs将观察者从trigger_chan移除后就不会收到消息。对于动态的观察者可以通过添加和删除进行消息接收管控,对于静态的可以使用下面方式启用或禁用观察者收到消息

1
2
3
4
5
//禁用观察者temp_lis
zbus_obs_set_enable(temp_lis, 0);

//启用观察者temp_lis
zbus_obs_set_enable(temp_lis, 1);

遍历

通过下面方法可以遍历zbus中所有的观察者和通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//观察者遍历执行函数
bool iterator_obs_show(const struct zbus_observer *obs)
{
//显示观察者名
LOG_INF("obs:%s", zbus_obs_name(obs));
return true;
}

//通道遍历执行函数
bool iterator_chan_show(const struct zbus_channel *chan)
{
//显示通道名
LOG_INF("chan:%s", zbus_chan_name(chan));
return true;
}


static int zbus_cmd_list(const struct shell *shell, size_t argc, char *argv[])
{
//遍历所有观察者,遍历时执行iterator_obs_show
zbus_iterate_over_observers(iterator_obs_show);

//遍历所有通道,遍历时执行iterator_chan_show
zbus_iterate_over_channels(iterator_chan_show);
return 0;
}

需要注意的是遍历执行函数如果返回false将会导致退出遍历,让遍历提前结束。
zbus_iterate_over_observerszbus_iterate_over_channels有效,需要开启CONFIG_ZBUS_STRUCTS_ITERABLE_ACCESS=y
zbus_obs_name有效,需要开启CONFIG_ZBUS_OBSERVER_NAME=y
zbus_chan_name有效,需要开启CONFIG_ZBUS_CHANNEL_NAME=y

参考

https://docs.zephyrproject.org/3.3.0/services/zbus/index.html