Linux rcu_expedited快速GP与IPI加速同步
synchronize_rcu_expedited通过IPI强制所有CPU报告QS,将GP等待时间从毫秒级降至微秒级,代价是额外的CPU中断开销和能耗,适用于卸载设备、文件系统同步等对延迟敏感的路径。
synchronize_rcu_expedited顶层实现
```c
// kernel/rcu/tree_exp.h
void synchronize_rcu_expedited(void)
{
unsigned long flags;
struct rcu_state *rsp = &rcu_state;
unsigned long s = rcu_exp_gp_seq_snap(rsp);
// 如果当前有未完成的expedited GP,直接等待
if (rcu_exp_gp_seq_done(rsp, s))
return;
// 请求一个expedited GP
rcu_exp_gp_seq_start(rsp);
rcu_exp_need_qs = true;
// 向所有CPU发送IPI强制QS
synchronize_sched_expedited_wait(rsp);
// 完成GP
rcu_exp_gp_seq_end(rsp);
}
}
核心流程:记录当前expedited GP序列号 -> 启动新序列 -> 通过IPI强制QS -> 等待所有CPU响应 -> 完成序列。rcu_exp_gp_seq_snap返回一个快照,用于检测是否已经有完成序列覆盖了当前请求。
IPI分发:sync_rcu_exp_select_node_cpus
```c
static void sync_rcu_exp_select_node_cpus(struct rcu_node *rnp)
{
int cpu;
unsigned long mask = rnp->expmask;
struct rcu_data *rdp;
// 遍历节点下所有需要IPI的CPU
for_each_leaf_node_cpu_mask(rnp, cpu, mask) {
unsigned long flags;
rdp = per_cpu_ptr(&rcu_data, cpu);
raw_spin_lock_irqsave(&rdp->exp_lock, flags);
if (rdp->exp_deferred_qs) {
// 该CPU有延迟QS,跳过
raw_spin_unlock_irqrestore(&rdp->exp_lock, flags);
continue;
}
rdp->exp_deferred_qs = true;
raw_spin_unlock_irqrestore(&rdp->exp_lock, flags);
// 发送IPI
ret = smp_call_function_single(cpu, rcu_exp_handler, NULL, 0);
if (ret) {
// CPU可能已离线,直接清除其mask
rcu_report_exp_rdp(this_cpu_ptr(&rcu_data));
}
}
}
}
遍历叶子rcu_node下所有CPU,对每个需要QS的CPU设置exp_deferred_qs标记并发送IPI。smp_call_function_single异步发送,不等待执行完成。如果CPU离线,直接报告其QS。
IPI处理函数:rcu_exp_handler
```c
static void rcu_exp_handler(void *unused)
{
struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
unsigned long flags;
struct task_struct *t = current;
if (t->rcu_read_lock_nesting > 0) {
// 当前任务在RCU读侧临界区内
// 设置exp_need_qs标志,在unlock时处理
WRITE_ONCE(t->rcu_read_unlock_special.b.exp_need_qs, 1);
local_irq_save(flags);
// 确保unlock路径看到此标志
smp_mb();
local_irq_restore(flags);
return;
}
// 不在RCU临界区内,直接报告QS
raw_spin_lock(&rdp->exp_lock);
if (rdp->exp_deferred_qs) {
rdp->exp_deferred_qs = false;
raw_spin_unlock(&rdp->exp_lock);
rcu_report_exp_rdp(rdp);
} else {
raw_spin_unlock(&rdp->exp_lock);
}
}
```
IPI处理逻辑分两支:若接收CPU当前在RCU读侧临界区内(rcu_read_lock_nesting > 0),设置exp_need_qs标志等待unlock时处理;若不在临界区内,直接通过rcu_report_exp_rdp向rcu_node树报告QS。这种设计避免在临界区内强行抢占,同时保证QS的及时上报。
等待所有CPU响应
```c
static void synchronize_sched_expedited_wait(struct rcu_state *rsp)
{
int cpu;
unsigned long jiffies_stall;
unsigned long mask;
struct rcu_node *rnp = rcu_get_root(rsp);
jiffies_stall = rcu_jiffies_stall_exp = jiffies + 60 * HZ;
for (;;) {
mask = READ_ONCE(rnp->expmask);
if (mask == 0)
break; // 所有CPU已响应
// 超时检测,打印告警
if (time_after(jiffies, jiffies_stall)) {
rcu_dump_cpu_exp_stalls(rsp, rnp);
jiffies_stall = jiffies + 60 * HZ;
}
// 等待短时间再检查
schedule_timeout_uninterruptible(1);
}
}
}
循环检测根节点的expmask是否清零。expmask的每个比特位代表一个尚未响应IPI的CPU。60秒超时后会输出告警信息帮助诊断。由于IPI通常微秒级完成,schedule_timeout(1)的短暂睡眠保证了较低的CPU占用。
expedited与正常GP的协同
```c
// 正常GP也会检测exp_need_qs
void rcu_report_qs_rdp(struct rcu_data *rdp)
{
struct rcu_node *rnp = rdp->mynode;
raw_spin_lock_irqsave(&rnp->lock, flags);
// 如果expedited GP在等待此CPU
if (READ_ONCE(rdp->exp_deferred_qs)) {
rdp->exp_deferred_qs = false;
if (!rdp->cpu_no_qs.b.norm) {
// 同时清除正常GP的QS标记
rdp->cpu_no_qs.b.norm = false;
rcu_report_qs_rnp(rnp, rdp->grpmask, &flags);
}
// 报告expedited QS
rcu_report_exp_rdp(rdp);
}
raw_spin_unlock_irqrestore(&rnp->lock, flags);
}
}
正常QS报告路径中检查exp_deferred_qs标记,若存在则一并处理。这意味着如果CPU在正常GP过程中经过QS,自动也满足了expedited GP的要求,避免了重复的IPI开销。当实时任务无法容忍IPI延迟时,这种协同设计保证了GP推进的平滑性。
