Zephyr线程阻塞和超时机制分析

Creative Commons
本作品采用知识共享署名

本文分析Zephyr是如何实现对线程阻塞和超时机制。

概述

从Zephyr的代码结构上来看阻塞和超时的实现都放在sched.c中,但为了更容易的理解Zephyr内核的调度,本文将阻塞和超时抽取出来,分析Zephyr的同步/数据传递对线程的阻塞和超时实现,同时还分析k_sleep是如何让thread进入sleep并从sleep中恢复。

基础概念

在分析具体的阻塞实现和超时前先介绍以下基本概念:
zephyr kernel有一个ready_q,所有就绪可以被调度的thread都放在这个ready_q中,其数据结构在include/kernel_structs.h中
ready_q的根据配置的调度方式可以是链表/红黑树/queue,本文我们只需要记住任务就绪时被放入到ready_q中即可。

阻塞等待

Zephyr内核对象–同步之信号量等文章(sem,msgq,poll,queue,stack,msgq)中我们可以看到,当这些内核对象都是依赖wait_q来实现对thread的等待.
可以在对应的数据结构struct k_mutex, struct k_sem, struct k_queue, struct k_stack,struct k_msgq,内都可以找到_wait_q_t wait_q, k_poll比较特殊,在Zephyr内核对象–同步之轮询一文中已经介绍poll的wait_q用的是局部变量,但等待的方式还是wait_q。
无论上面哪种内核对象最后都通过下面的代码等待内核对象就绪,大家可以自行查看相关内核对象的代码。

等待

这里以sem为例进行说明,阻塞等待timeout=K_FOREVER

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int z_impl_k_sem_take(struct k_sem *sem, s32_t timeout)
{
int ret = 0;

k_spinlock_key_t key = k_spin_lock(&lock);

if (likely(sem->count > 0U)) {
sem->count--;
k_spin_unlock(&lock, key);
ret = 0;
goto out;
}

if (timeout == K_NO_WAIT) {
k_spin_unlock(&lock, key);
ret = -EBUSY;
goto out;
}

//等待sem的wait_q
ret = z_pend_curr(&lock, key, &sem->wait_q, timeout);


return ret;
}

等待的机制如下:

  1. 将thread从ready_q中移除,之后调度不会再调度到该thread
  2. 将thread加入到wait_q
  3. 重新调度,重调度将从该thread切到其它thread运行,该thread就保持在z_swap的上下文而被阻塞
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    int z_pend_curr(struct k_spinlock *lock, k_spinlock_key_t key,
    _wait_q_t *wait_q, s32_t timeout)
    {
    //将当前的thread加入到wait_q中
    pend(_current, wait_q, timeout);

    //进行调度
    return z_swap(lock, key);
    }

    static void pend(struct k_thread *thread, _wait_q_t *wait_q, s32_t timeout)
    {
    LOCKED(&sched_spinlock) {
    add_to_waitq_locked(thread, wait_q);
    }

    //阻塞等待时timeout=K_FOREVER,add_thread_timeout_ms退化为不做任何动作
    add_thread_timeout_ms(thread, timeout);
    }

    static void add_to_waitq_locked(struct k_thread *thread, _wait_q_t *wait_q)
    {
    //将thread从ready_q中移除
    unready_thread(thread);
    z_mark_thread_as_pending(thread);

    if (wait_q != NULL) {
    //更新thread等待的wait_q
    thread->base.pended_on = wait_q;
    //将thread放入wait_q
    z_priq_wait_add(&wait_q->waitq, thread);
    }
    }

z_swap进行重调度z_swap->z_swap_irqlock->arch_swap对于arm 32位arch来说如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int arch_swap(unsigned int key)
{
_current->arch.basepri = key;
_current->arch.swap_return_value = _k_neg_eagain;

//触发pendsv,将在这里进入pendsv exception
//在exception中保存该thread上下文,然后切到其它thread运行
SCB->ICSR |= SCB_ICSR_PENDSVSET_Msk;

//当该thread重新被调度时,会从这里继续执行,然后pend返回

/* clear mask or enable all irqs to take a pendsv */
irq_unlock(0);

//恢复调度后的返回值,将会在解除等待时设置
return _current->arch.swap_return_value;
}

解除等待

解除等待的机制如下

  1. 从wait_q中选取一个thread,将thread从wait_q中移除
  2. 设置thread 恢复时swap的换回值
  3. 将thread放回ready_q, 之后将会被kernel调度恢复运行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    void z_impl_k_sem_give(struct k_sem *sem)
    {
    k_spinlock_key_t key = k_spin_lock(&lock);

    //从wait_q中取出一个thread
    struct k_thread *thread = z_unpend_first_thread(&sem->wait_q);

    if (thread != NULL) {
    //为恢复的thread设置swap返回值
    arch_thread_return_value_set(thread, 0);

    //将thread放入ready_q等待被调度
    z_ready_thread(thread);
    } else {
    sem->count += (sem->count != sem->limit) ? 1U : 0U;
    handle_poll_events(sem);
    }

    z_reschedule(&lock, key);
    }

超时等待

等待

和阻塞等待的流程一样,但是会建立一个timer

1
2
3
4
5
6
7
8
9
static void pend(struct k_thread *thread, _wait_q_t *wait_q, s32_t timeout)
{
LOCKED(&sched_spinlock) {
add_to_waitq_locked(thread, wait_q);
}

//建立超时timer
add_thread_timeout_ms(thread, timeout);
}

建立的timer的代码如下,最后使用的z_add_timeout,详情可参考Zephyr内核Timeout模块简介

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void add_thread_timeout_ms(struct k_thread *thread, s32_t timeout)
{
if (timeout != K_FOREVER) {
s32_t ticks;

if (timeout < 0) {
timeout = 0;
}
//计算timer要等待的tick数
ticks = _TICK_ALIGN + k_ms_to_ticks_ceil32(timeout);

//建立超时timer, tick后超时
z_add_thread_timeout(thread, ticks);
}
}

static inline void z_add_thread_timeout(struct k_thread *th, s32_t ticks)
{
//thread ticks超时后会调用z_thread_timeout
z_add_timeout(&th->base.timeout, z_thread_timeout, ticks);
}

被其它thread解除等待

被其它thread解除等待的流程和阻塞式的大致一样,因为在超时前已经解除等待,因此timer的timeout不再需要,在thread从wait_q中取出thread时,会删除timer

1
2
3
4
5
6
7
8
9
10
11
struct k_thread *z_unpend_first_thread(_wait_q_t *wait_q)
{
struct k_thread *thread = z_unpend1_no_timeout(wait_q);

if (thread != NULL) {
//将thread的timer移除
(void)z_abort_thread_timeout(thread);
}

return thread;
}

超时解除等待

在等待时注册了timer,该timer超时后会调用z_thread_timeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void z_thread_timeout(struct _timeout *timeout)
{
struct k_thread *thread = CONTAINER_OF(timeout,
struct k_thread, base.timeout);

//超时发生时尚未被其它thread解除等待pended_on将不为NULL
if (thread->base.pended_on != NULL) {
//将thread从wait_q中移除
z_unpend_thread_no_timeout(thread);
}
z_mark_thread_as_started(thread);
z_mark_thread_as_not_suspended(thread);
//将thread放入ready_q
z_ready_thread(thread);
}

此时thread因为超时发生被放入了ready_q,kernel将对其进行调度运行,当thread重新被调度时pend将会返回-EAGAIN表示timeout了
下面看下pend是如何返回-EAGAIN, 从前面的分析可以知道pend最后是调用arch_swap,进入pending状态,恢复调度后又从arch_swap继续运行并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int arch_swap(unsigned int key)
{
_current->arch.basepri = key;
//在进入pending前会个swap_return_value设置为_k_neg_eagain,该值就是-EAGAIN
_current->arch.swap_return_value = _k_neg_eagain;

//触发pendsv,将在这里进入pendsv exception
//在exception中保存该thread上下文,然后切到其它thread运行
SCB->ICSR |= SCB_ICSR_PENDSVSET_Msk;

//当该thread重新被调度时,会从这里继续执行,由于超时恢复调度,然后pend返回

/* clear mask or enable all irqs to take a pendsv */
irq_unlock(0);

//恢复调度后的返回值,如果是超时解除等待swap_return_value不会被其它thread用arch_thread_return_value_set设置
//因此这里讲返回_k_neg_eagain,也就是-EAGAIN
return _current->arch.swap_return_value;
}

k_sleep

Thread可以通过k_sleep进入睡眠,通过k_wakeup唤醒睡眠的thread. k_sleep的本质和pending一样,就是将thread从ready_q中移除,让kernel不对齐进行调度,当timeout后或者其它thread对其进行wakeup时,再将该thread放回ready_q.

超时睡眠

流程代码分析如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
s32_t z_impl_k_sleep(int ms)
{
s32_t ticks;
//ISR中不能做sleep
__ASSERT(!arch_is_in_isr(), "");

//如果是永远sleep,不用创建timer,直接挂起当前thread
if (ms == K_FOREVER) {
k_thread_suspend(_current);
return K_FOREVER;
}

//超时睡眠
ticks = k_ms_to_ticks_ceil32(ms);
ticks = z_tick_sleep(ticks);
return k_ticks_to_ms_floor64(ticks);
}

static s32_t z_tick_sleep(s32_t ticks)
{

u32_t expected_wakeup_time;

__ASSERT(!arch_is_in_isr(), "");


//如果是sleep 0,就是k_yield
if (ticks == 0) {
k_yield();
return 0;
}

ticks += _TICK_ALIGN;
expected_wakeup_time = ticks + z_tick_get_32();


struct k_spinlock local_lock = {};
k_spinlock_key_t key = k_spin_lock(&local_lock);

//将thread从ready_q中移除
z_remove_thread_from_ready_q(_current);

//为thread添加timer,ticks后超时,会将thread恢复到ready_q中
//这个机制和超时等待的一样,不再展开分析
z_add_thread_timeout(_current, ticks);
z_mark_thread_as_suspended(_current);

(void)z_swap(&local_lock, key);

__ASSERT(!z_is_thread_state_set(_current, _THREAD_SUSPENDED), "");

ticks = expected_wakeup_time - z_tick_get_32();
if (ticks > 0) {
return ticks;
}

return 0;
}

可以看到k_sleep和超时等待的机制一样,但k_sleep不需要wait_q,thread sleep时将thread从ready_q中移除,将自己加入到timer中,timer到了后又将thread放入ready_q.

永久睡眠

从前面分析看永久睡眠调用的是k_thread_suspend->z_impl_k_thread_suspend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void z_impl_k_thread_suspend(struct k_thread *thread)
{
//如果thread正在等待超时,放弃等待
(void)z_abort_thread_timeout(thread);

LOCKED(&sched_spinlock) {
if (z_is_thread_queued(thread)) {
//将thread从ready_q中移除
_priq_run_remove(&_kernel.ready_q.runq, thread);
z_mark_thread_as_not_queued(thread);
}
z_mark_thread_as_suspended(thread);
update_cache(thread == _current);
}

if (thread == _current) {
z_reschedule_unlocked();
}
}

唤醒

在thread被睡眠时可以通过wakeup唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void z_impl_k_wakeup(k_tid_t thread)
{
if (z_is_thread_pending(thread)) {
return;
}

//如果是timeout的,先删除timer
if (z_abort_thread_timeout(thread) < 0) {
/* Might have just been sleeping forever */
if (thread->base.thread_state != _THREAD_SUSPENDED) {
return;
}
}

//将thread放入ready_q
z_mark_thread_as_not_suspended(thread);
z_ready_thread(thread);


if (!arch_is_in_isr()) {
z_reschedule_unlocked();
}
}

总结

前面说了这么多,可能有点混乱,下面一张图总结一下
waitq
kernel只有一个ready_q,可被调度的thread都被放到ready_q中
每个需要等待的内核对象都有一个wait_q, 当thread等待该内核对象时,该thread会从ready_q中移除,放入到等待内核对象的wait_q中
当内核对象有效时,会将thread从wait_q中移除又放回ready_q
进行k_sleep的thread,没有wait_q,会将thread从read_q中移除,并加入到timeout_list中,当timeout到了后,timeout callback又会将thread加入到ready_q中

关于wait_q/ready_q

wait_q和ready_q的管理和调度有关,不同的调度配置会有不同的实现方式,主要是涉及thread加入q和移除q的方法,本文就不再展开。