Linux futex快速用户态互斥futex_wait与futex_wake
Linux futex快速用户态互斥futex_wait与futex_wake
futex(Fast Userspace muTEX)是Linux中用户态同步的核心机制,其设计思想是将同步操作尽量保持在用户态完成,仅在需要时才通过系统调用进入内核.这种设计避免了绝大多数场景下的系统调用开销.
一、futex核心设计
futex基于一个用户态的32位整数值(uaddr)和内核中的等待队列:
```
用户态: atomic操作在uaddr上忙等
内核态: futex_wait(如果uaddr==val)阻塞进程
futex_wake(uaddr) 唤醒等待者
```
基本使用模式:
```c
/* 用户态互斥锁实现 */
struct futex_lock {
atomic_t val; /* 0: unlocked, 1: locked, 2: contended */
};
void futex_lock(struct futex_lock *lock)
{
/* 快路径: 尝试从0到1的原子CAS */
if (atomic_cmpxchg(&lock->val, 0, 1) == 0)
return;
/* 慢路径: 需要内核介入 */
while (1) {
/* 标记锁有竞争者 */
if (atomic_xchg(&lock->val, 2) != 0) {
/* 值不是0, 调用futex_wait睡眠 */
futex_wait(&lock->val, 2);
}
/* 被唤醒后尝试获取锁 */
if (atomic_cmpxchg(&lock->val, 0, 2) == 0)
return;
}
}
void futex_unlock(struct futex_lock *lock)
{
/* 将值从1减到0, 检查是否有等待者 */
if (atomic_dec_and_test(&lock->val))
return; /* 无等待者 */
/* 有等待者, 设置0并唤醒 */
atomic_set(&lock->val, 0);
futex_wake(&lock->val, 1);
}
```
二、futex_wait系统调用的内核实现
```c
/* kernel/futex/core.c */
SYSCALL_DEFINE3(futex_wait, u32 __user *, uaddr, u32, val,
struct __kernel_timespec __user *, timeout)
{
return futex_wait_setup(uaddr, val, timeout, NULL, 0);
}
static int futex_wait_setup(u32 __user *uaddr, u32 val,
struct hrtimer_sleeper *timeout,
struct futex_q *q, int flags)
{
u32 uval;
int ret;
retry:
/* 从用户态读取uaddr的值 */
if (get_user(uval, uaddr))
return -EFAULT;
/*
* 核心检查: 当前用户态值是否等于预期值val?
* 如果不是, 说明锁已经被释放, 无需等待, 直接返回
*/
if (uval != val)
return -EWOULDBLOCK;
/* 将当前任务加入futex的等待队列 */
queue_me(q, uaddr);
/* 再次检查用户态值, 防止在queue_me期间值被修改 */
if (get_user(uval, uaddr))
ret = -EFAULT;
if (ret || uval != val) {
/* 值已经改变, 取消入队并返回 */
unqueue_me(q);
return ret ? ret : -EWOULDBLOCK;
}
/* 如果设置了超时, 启动定时器 */
if (timeout) {
hrtimer_start(&timeout->timer, *timeout, HRTIMER_MODE_ABS);
if (!hrtimer_is_queued(&timeout->timer))
return -ETIMEDOUT;
}
/* 将任务置为TASK_INTERRUPTIBLE并调度 */
freezable_schedule();
return 0;
}
```
queue_me是futex的哈希队列操作:
```c
static void queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
{
/* 将q加入哈希桶的链表 */
plist_node_init(&q->list, q->prio);
plist_add(&q->list, &hb->chain);
q->hb = hb;
}
/* futex哈希桶 */
struct futex_hash_bucket {
atomic_t waiters;
spinlock_t lock;
struct plist_head chain; /* 按优先级排序的等待者列表 */
} ____cacheline_aligned_in_smp;
/* 全局futex哈希表: 4096个桶 */
static struct futex_hash_bucket futex_queues[1 << FUTEX_HASHBITS];
```
三、futex_wake系统调用
```c
SYSCALL_DEFINE2(futex_wake, u32 __user *, uaddr, int, nr_wake)
{
struct futex_hash_bucket *hb;
struct futex_q *this, *next;
struct plist_head *head;
union futex_key key;
int ret;
/* 根据用户地址计算futex key */
ret = get_futex_key(uaddr, &key);
/* 定位哈希桶 */
hb = hash_futex(&key);
spin_lock(&hb->lock);
/* 遍历该桶的等待队列 */
head = &hb->chain;
plist_for_each_entry_safe(this, next, head, list) {
if (match_futex(&this->key, &key)) {
/* 找到匹配的futex等待者 */
if (this->pi_state || this->rt_waiter) {
/* PI futex的处理 */
ret = -EINVAL;
break;
}
/* 唤醒该任务 */
wake_futex(this);
if (++ret >= nr_wake)
break;
}
}
spin_unlock(&hb->lock);
return ret;
}
static void wake_futex(struct futex_q *q)
{
struct task_struct *p = q->task;
/* 将任务从等待队列中移除 */
get_task_struct(p);
plist_del(&q->list, &q->hb->chain);
WRITE_ONCE(q->lock_ptr, NULL);
/*
* 将任务状态设置为TASK_RUNNING并放入运行队列
* 这是真正唤醒线程的操作
*/
wake_up_state(p, TASK_NORMAL);
put_task_struct(p);
}
```
四、futex key的生成
futex通过key来标识futex对象,key基于用户态地址或内核的offsetof映射:
```c
union futex_key {
struct {
unsigned long pgoff; /* 页内偏移 */
struct inode *inode; /* 文件的inode(基于文件的futex) */
int offset; /* 页内偏移 */
} shared;
struct {
unsigned long address; /* 进程虚拟地址 */
struct mm_struct *mm; /* 进程内存描述符(私有futex) */
int offset;
} private;
};
```
futex key支持两种类型:
- 私有futex(FUTEX_PRIVATE): 仅在同一个进程的线程间使用,key包含mm+地址
- 共享futex(默认): 跨进程使用,key包含inode+offset
私有futex因为省略了inode查找,性能更优.
五、futex底层状态转换
futex值的三状态模型:
```c
/*
* val == 0: UNLOCKED - 锁未被持有
* val == 1: LOCKED - 锁被持有,无竞争
* val == 2: CONTENDED - 锁被持有,有等待者在内核中睡眠
*/
/* glibc低级别锁(low-level lock)实现 */
#define FUTEX_WAITERS 2
#define FUTEX_OWNER_DIED 1
int __lll_lock(int *futex, int private)
{
/* 尝试获取锁: cmpxchg 0->1 */
if (atomic_compare_exchange_acquire(futex, 0, 1))
return 0; /* 成功 */
/* 慢路径 */
return __lll_lock_wait(futex, private);
}
int __lll_lock_wait(int *futex, int private)
{
/* 原子交换: 将值设为FUTEX_WAITERS(2) */
int val = atomic_exchange_acquire(futex, FUTEX_WAITERS);
while (val != 0) {
/* 使用futex_wait系统调用等待 */
futex_wait(futex, FUTEX_WAITERS, private);
val = atomic_exchange_acquire(futex, FUTEX_WAITERS);
}
return 0;
}
```
六、futex内部状态转换图
在futex的哈希桶中,每个等待者(futex_q)包含一个task_struct指针和一个plist节点.当futex_wake被调用时,从哈希桶中匹配对应的futex_q并唤醒其task.
关键点:
- futex_wait是"检查-睡眠"的原子操作:检查用户态值,若未变则加入队列睡眠;若值已变更则直接返回
- futex_wake仅唤醒,不做值检查,值的修改由用户态完成
- 所有futex_q通过plist串联,按优先级排序,确保高优先级等待者先被唤醒
七、futex性能优化: hash bucket锁竞争
当大量线程操作不同futex但哈希到同一个桶时,spin_lock(&hb->lock)会成为瓶颈.内核引入的优化措施包括:
```c
/* 增大哈希表(FUTEX_HASHBITS从6变成12) */
#define FUTEX_HASHBITS 12 /* 4096个桶, 降低冲突率 */
/* 使用rwlock替代spinlock (某些场景) */
/* 锁拆分: 每个futex操作拆分为lookup和wait两个阶段 */
```
八、REQUEUE和CMP_REQUEUE
futex提供高级操作来减少系统调用次数:
```c
/*
* futex_cmp_requeue: 将等待在uaddr1上的部分等待者
* 转移到uaddr2的等待队列
*
* 用于PI futex和NPTL屏障的实现
*/
long futex_cmp_requeue(u32 __user *uaddr1, u32 val1,
u32 __user *uaddr2, int nr_wake, int nr_requeue);
```
典型应用场景:glibc的pthread_cond_broadcast将等待在条件变量上的线程直接转移到互斥锁上,避免线程被唤醒后又立即睡眠在互斥锁上(即"惊群"优化).
futex的核心理念是用户态原子操作与内核等待队列的结合.绝大多数同步操作在用户态通过原子指令完成,仅有真正的锁争用才通过futex_wait/futex_wake系统调用进入内核,从而实现了接近纯用户态同步的性能.
