Zephyr内核对象-数据传递之FIFO/LIFO

Creative Commons
本作品采用知识共享署名

本文简要说明Zephyr FIFO/LIFO的使用和实现。

Zephyr内核对象–数据传递对象简介一文中已经大概介绍了FIFO/LIFO的特性,从这篇文章也知道FIFO和LIFO都是通过queue实现,这也就是为什么要把FIFO/LIFO放在同一篇文章中分析的原因。本文将继续说明FIFO/LILO的使用和实现。

使用

API

相似API

FIFO和LIFO API都是用宏包装queue的API实现,它们有一组类似的发送接收API,这里放在一起介绍
#define k_fifo_init(fifo)
#define k_lifo_init(lifo)
作用:初始化一个struct k_fifo/strut k_lifo
fifo/lifo: struct k_fifo/strut k_lifo

#define k_fifo_put(fifo, data)
#define k_lifo_put(lifo, data)
作用:将数据放入fifo/lifo
fifo/lifo:数据要放入的fifo/lifo
data: 要放入fifo/lifo的数据

#define k_fifo_alloc_put(fifo, data)
#define k_lifo_alloc_put(lifo, data)
作用:fifo/lifo分配一个空间,将数据放入
fifo/lifo:数据要放入的fifo/lifo
data: 要放入fifo/lifo的数据

#define k_fifo_get(fifo, timeout)
#define k_lifo_get(lifo, timeout)
作用:从fifo/lifo读出数据
fifo/lifo:要读的fifo/lifo
data: 读出的数据

FIFO特殊API

FIFO比LIFO多一些API
#define k_fifo_cancel_wait(fifo)
作用:放弃等待,让第一个等待的fifo的thread退出pending,k_fifo_get的数据将是NULL
fifo: 操作的fifo

#define k_fifo_put_list(fifo, head, tail)
作用:将一个单链表加入到fifo中
fifo: 操作的fifo
head: 单链表的第一个节点
tail: 单链表的最后一个节点

#define k_fifo_put_slist(fifo, list)
作用:将一个单链表加入到fifo中,这里单链表对象为sys_slist_t
fifo: 操作的fifo
list: sys_slist_t 单链表

#define k_fifo_is_empty(fifo)
作用:检查fifo是否为空
fifo: 要检查的fifo

#define k_fifo_peek_head(fifo)
作用:peek fifo 头部上的节点(下个将会被read的数据),但不会将数据从fifo删除
fifo: 被peek的fifo
返回:返回peek的数据

#define k_fifo_peek_tail(fifo)
作用:peek fifo 尾部上的节点(FIFO中最后进入的数据),但不会将数据从fifo删除
fifo: 被peek的fifo
返回:返回peek的数据

使用说明

可以在ISR中put fifo/lifo.也可在ISR内get fifo/fifo,但不能等待。
fifo/lifo不限制数据项的多少。

初始化

经过初始化的fifo/lifo才能被使用,下面两种方法是一样的,显示定义

1
2
3
4
5
struct k_fifo my_fifo;
struct k_lifo my_lifo;

k_fifo_init(&my_fifo);
k_lifo_init(&my_lifo);

隐式定义:

1
2
K_FIFO_DEFINE(my_fifo);
K_LIFO_DEFINE(my_lifo);

写数据

这里用fifo演示put,用Lifo演示alloc_put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//使用put发送数据,最开始的word必须保留,用于存放queue,这也就是为什么要求4对其的原因
//使用alloc_put没有这个限制
struct data_item_t {
void *fifo_reserved; /* 1st word reserved for use by fifo */
...
};

struct data_item_t fifo_tx_data;
char lifo_tx_data[32];

void producer_thread(int unused1, int unused2, int unused3)
{
while (1) {
/* create data item to send */
//fifo_tx_data使用的是put,因此最开始的4个字节不能使用
fifo_tx_data = ...
lifo_tx_data = ...

/* send data to consumers */
k_fifo_put(&my_fifo, &fifo_tx_data);
//alloc_put时,queue会从线程池中分配出内存存放lifo,所以不需要保留开始4个字节
//在get时,queue会自动将其释放掉
k_lifo_alloc_put(&my_lifo, &lifo_tx_data);

...
}
}

读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void consumer_fifo_thread(int unused1, int unused2, int unused3)
{
struct data_item_t *rx_data;

while (1) {
rx_data = k_fifo_get(&my_fifo, K_FOREVER);

/* process fifo data item */
...
}
}

void consumer_lifo_thread(int unused1, int unused2, int unused3)
{
struct data_item_t *rx_data;

while (1) {
rx_data = k_lifo_get(&my_lifo, K_FOREVER);

/* process lifo data item */
...
}
}

实现

前面已经提到过fifo/lifo都是使用queue实现,fifo/lifo是直接包queue的API ,每一个fifo/lifo API都对应一个queue的API,可以说fifo/lifo就是queue的使用方法不一样,先看结构体就可以看出使用的就是queue

FIFO/LIFO

1
2
3
4
5
6
7
struct k_fifo {
struct k_queue _queue;
};

struct k_lifo {
struct k_queue _queue;
};

每个fifo/lifo都有一个对应的queue API,这里列出来,更详细的可以看include/kernel.h,下面斜体加粗的就是fifo/lifo区别所在
k_fifo_init->k_queue_init
k_fifo_cancel_wait->k_queue_cancel_wait
k_fifo_put->k_queue_append
k_fifo_alloc_put->k_queue_alloc_append
k_fifo_put_list->k_queue_append_list
k_fifo_put_slist->k_queue_merge_slist
k_fifo_get->k_queue_get
k_fifo_is_empty->k_queue_is_empty
k_fifo_peek_head->k_queue_peek_head
k_fifo_peek_tail->k_queue_peek_tail
k_lifo_init->k_queue_init
k_lifo_put->k_queue_prepend
k_lifo_alloc_put->k_queue_alloc_prepend
k_lifo_get->k_queue_get

Queue

从上一节可以看出FIFO就是用queue实现的,因此我们继续分析queue

Queue初始化

初始化:建立一个wait_q,建立一个单项标志链表

添加数据到queue

添加数据到queue基本都是通过queue_insert完成
k_queue_append->queue_insert: 在链表尾部插入
k_queue_insert->queue_insert: 在指定节点后插入
k_queue_prepend->queue_insert:在链表头部插入
z_impl_k_queue_alloc_append->queue_insert:从thread pool中分配节点,并插入到尾部
z_impl_k_queue_alloc_prepend->queue_insert:从thread pool中分配节点,并插入到头部
z_impl_k_queue_peek_head -> z_queue_node_peek: 从链表中读取头,但不删除该节点
z_impl_k_queue_peek_tail -> z_queue_node_peek: 从链表中读取尾,但不删除该节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static s32_t queue_insert(struct k_queue *queue, void *prev, void *data,
bool alloc)
{
k_spinlock_key_t key = k_spin_lock(&queue->lock);
#if !defined(CONFIG_POLL)
struct k_thread *first_pending_thread;

//获取等待queue的thread,如果有的话,将数据提供给该thread,引发调度,直接返回
first_pending_thread = z_unpend_first_thread(&queue->wait_q);

if (first_pending_thread != NULL) {
prepare_thread_to_run(first_pending_thread, data);
z_reschedule(&queue->lock, key);
return 0;
}
#endif /* !CONFIG_POLL */

/* Only need to actually allocate if no threads are pending */
if (alloc) {
//要alloc节点空间
struct alloc_node *anode;
//从线程池中alloc节点空间
//并初始化数据
anode = z_thread_malloc(sizeof(*anode));
if (anode == NULL) {
k_spin_unlock(&queue->lock, key);
return -ENOMEM;
}
anode->data = data;
//初始化节点的时候,设置标志为1,表示是alloc的,在get的时候就需要释放
sys_sfnode_init(&anode->node, 0x1);
data = anode;
} else {
//如果data中自带节点,则无需alloc
sys_sfnode_init(data, 0x0);
}
//将节点插入到指定节点之后
//prev如果为NULL,data节点会被插入到单链表的最开始
sys_sflist_insert(&queue->data_q, prev, data);

//如果配置了poll,会通知queue条件已经满足
#if defined(CONFIG_POLL)
handle_poll_events(queue, K_POLL_STATE_DATA_AVAILABLE);
#endif /* CONFIG_POLL */

//引发调度
z_reschedule(&queue->lock, key);
return 0;
}

当传入参数data是净数据时(没有在data的最开始预留node位置),那么在insert的使用需要用alloc指定要分配节点

从Queue中读取数据

使用k_queue_get->z_impl_k_queue_get从queue中读取数据,读取数据时数据从queue中删除,实际的删除动作就是从链表中将节点移除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void *z_impl_k_queue_get(struct k_queue *queue, s32_t timeout)
{
k_spinlock_key_t key = k_spin_lock(&queue->lock);
void *data;

if (likely(!sys_sflist_is_empty(&queue->data_q))) {
sys_sfnode_t *node;
//如果链表不为空,从链表中取出头节点
node = sys_sflist_get_not_empty(&queue->data_q);
//从节点中取出数据,如果node是从线程池中分配的,将其释放
data = z_queue_node_peek(node, true);
k_spin_unlock(&queue->lock, key);
//取到数据后立即返回
return data;
}

if (timeout == K_NO_WAIT) {
k_spin_unlock(&queue->lock, key);
return NULL;
}

#if defined(CONFIG_POLL)
k_spin_unlock(&queue->lock, key);

return k_queue_poll(queue, timeout);

#else
//没有取到数据,进入等待
//可以结合发送看,如果有线程在等待queue数据,那么新进入queue的数据是不会进入链表的
int ret = z_pend_curr(&queue->lock, key, &queue->wait_q, timeout);

return (ret != 0) ? NULL : _current->base.swap_data;
#endif /* CONFIG_POLL */
}

从node中提取数据

从链表中peek或者移出的node,需要使用下面API从node中提取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void *z_queue_node_peek(sys_sfnode_t *node, bool needs_free)
{
void *ret;

if ((node != NULL) && (sys_sfnode_flags_get(node) != (u8_t)0)) {
//检查到flag 不为0,说明加入链表的node是从线程池中分配
struct alloc_node *anode;

//这里取出数据后释放node
anode = CONTAINER_OF(node, struct alloc_node, node);
ret = anode->data;
if (needs_free) {
k_free(anode);
}
} else {
//不是从线程池分配的node,直接返回
ret = (void *)node;
}

return ret;
}

如果是移出的node在Insert的时候是从线程池alloc出来的,那么在这里需要needs_free=true指定要free空间。如果是Peek的node数据时,则只用提取数据,例如:

1
2
3
4
5
6
7
8
9
static inline void *z_impl_k_queue_peek_head(struct k_queue *queue)
{
return z_queue_node_peek(sys_sflist_peek_head(&queue->data_q), false);
}

static inline void *z_impl_k_queue_peek_tail(struct k_queue *queue)
{
return z_queue_node_peek(sys_sflist_peek_tail(&queue->data_q), false);
}

放弃等待数据

k_queue_get的时候如果queue内没有数据,允许进行等待。等待期间可以使用k_queue_cancel_wait放弃等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void z_impl_k_queue_cancel_wait(struct k_queue *queue)
{
k_spinlock_key_t key = k_spin_lock(&queue->lock);
#if !defined(CONFIG_POLL)
struct k_thread *first_pending_thread;
//获取正在等待的第一个线程
first_pending_thread = z_unpend_first_thread(&queue->wait_q);

//让该线程退出等待
if (first_pending_thread != NULL) {
prepare_thread_to_run(first_pending_thread, NULL);
}
#else
handle_poll_events(queue, K_POLL_STATE_CANCELLED);
#endif /* !CONFIG_POLL */
//重调度
z_reschedule(&queue->lock, key);
}

关于poll

一般情况下queue等待数据是通过让其thread 等待queue的wait_q实现,一旦配置poll后将不再使用该机制,而是通过poll,queue get使用k_queue_poll等待queue有数据,因此可能和其它thread等待poll条件发生冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static void *k_queue_poll(struct k_queue *queue, s32_t timeout)
{
struct k_poll_event event;
int err, elapsed = 0, done = 0;
k_spinlock_key_t key;
void *val;
u32_t start;
//初始化要等待的条件
k_poll_event_init(&event, K_POLL_TYPE_FIFO_DATA_AVAILABLE,
K_POLL_MODE_NOTIFY_ONLY, queue);

if (timeout != K_FOREVER) {
start = k_uptime_get_32();
}

do {
event.state = K_POLL_STATE_NOT_READY;
//开始poll
err = k_poll(&event, 1, timeout - elapsed);

if (err && err != -EAGAIN) {
return NULL;
}
//通知后去链表拿数据
key = k_spin_lock(&queue->lock);
val = z_queue_node_peek(sys_sflist_get(&queue->data_q), true);
k_spin_unlock(&queue->lock, key);

//如果没拿到数据,说明可能被其它thread将数据拿走,需要重新等待
if ((val == NULL) && (timeout != K_FOREVER)) {
elapsed = k_uptime_get_32() - start;
done = elapsed > timeout;
}
} while (!val && !done);

return val;
}

插入链表数据

queue也提供2个API将数据以链表的形式插入到queue中,区别是k_queue_append_list插入后不会影响被插入链表,而k_queue_merge_slist插入链表后会把链表清空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int k_queue_append_list(struct k_queue *queue, void *head, void *tail)
{
/* invalid head or tail of list */
CHECKIF(head == NULL || tail == NULL) {
return -EINVAL;
}

k_spinlock_key_t key = k_spin_lock(&queue->lock);
#if !defined(CONFIG_POLL)
struct k_thread *thread = NULL;
//有等待的thread全部取出来消化这个链表
//注意,消化完后原来的链表还在
if (head != NULL) {
thread = z_unpend_first_thread(&queue->wait_q);
}

while ((head != NULL) && (thread != NULL)) {
prepare_thread_to_run(thread, head);
head = *(void **)head;
thread = z_unpend_first_thread(&queue->wait_q);
}

//没有消化完的加入到queue中
if (head != NULL) {
sys_sflist_append_list(&queue->data_q, head, tail);
}

#else
sys_sflist_append_list(&queue->data_q, head, tail);
handle_poll_events(queue, K_POLL_STATE_DATA_AVAILABLE);
#endif /* !CONFIG_POLL */

z_reschedule(&queue->lock, key);

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int k_queue_merge_slist(struct k_queue *queue, sys_slist_t *list)
{
int ret;

/* list must not be empty */
CHECKIF(sys_slist_is_empty(list)) {
return -EINVAL;
}

//和k_queue_append_list功能一样
ret = k_queue_append_list(queue, list->head, list->tail);
CHECKIF(ret != 0) {
return ret;
}
//但消化完链表后,原有链表被清空
sys_slist_init(list);

return 0;
}

图例

下面用一张图简单说明FIFO/LIFO和queue操作之间的关系
QUEUE
可以看到FIFO是通过k_queue_appen将数据加入到queue的链表尾,LIFO通过k_queue_prepend将数据加入到queue的链表头,而FIFO/LIFO读数据都是用k_queue_get从链表头读数据,所以达到了FIFO先进先出,LIFO后进先出的目的。

参考

https://docs.zephyrproject.org/latest/reference/kernel/data_passing/fifos.html
https://docs.zephyrproject.org/latest/reference/kernel/data_passing/lifos.html