fix(pipe): 改进 FIFO 打开逻辑,引入读写端计数器 (#1546)

- 新增`r_counter`和`w_counter`字段,用于 FIFO 打开时的阻塞等待逻辑
- 重构`has_writer`和`has_reader`方法为`w_counter_changed`和`r_counter_changed`
,采用 Linux 内核设计,通过计数器变化判断对端是否打开
- 在`open`
方法中,读写端打开时递增对应计数器,并在阻塞等待时检查计数器变化,避免竞态条件
- 优化中断处理逻辑,被信号中断时仅回滚`reader`或`writer`计数,不重置`had_reader`
且不减少计数器(计数器只增不减)

Signed-off-by: longjin <longjin@DragonOS.org>
This commit is contained in:
LoGin 2025-12-24 11:04:52 +08:00 committed by GitHub
parent f914ae00f4
commit 79d54c9a8b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 53 additions and 20 deletions

View File

@ -132,6 +132,11 @@ pub struct InnerPipeInode {
/// 是否为命名管道FIFO
/// 只有 FIFO 才需要在 open 时阻塞等待另一端
is_fifo: bool,
/// 读端打开计数器(只增不减,用于 FIFO 等待逻辑)
/// 采用 Linux 内核的设计:等待计数器变化而非检查 reader > 0
r_counter: u32,
/// 写端打开计数器(只增不减,用于 FIFO 等待逻辑)
w_counter: u32,
}
impl InnerPipeInode {
@ -208,6 +213,8 @@ impl LockedPipeInode {
reader: 0,
writer: 0,
is_fifo: false, // 默认为匿名管道
r_counter: 0, // 初始化读端计数器
w_counter: 0, // 初始化写端计数器
};
let result = Arc::new(Self {
inner: SpinLock::new(inner),
@ -248,14 +255,29 @@ impl LockedPipeInode {
return !inode.buf_full() || inode.reader == 0;
}
/// 检查是否有写端(用于 FIFO O_RDONLY 阻塞等待)
fn has_writer(&self) -> bool {
self.inner.lock().writer > 0
/// 检查写端计数器是否已变化(用于 FIFO O_RDONLY 阻塞等待)
/// 采用 Linux 内核的设计:等待计数器变化而非检查 writer > 0
///
/// 为了处理计数器溢出回绕的极端情况,采用双重检查:
/// 1. 计数器是否变化(主要条件)
/// 2. 当前是否有写端存在(兜底条件,处理回绕)
fn w_counter_changed(&self, old: u32) -> bool {
let guard = self.inner.lock();
// 条件 1计数器变化正常情况
// 条件 2当前有写端处理极端的计数器回绕情况
guard.w_counter != old || guard.writer > 0
}
/// 检查是否有读端(用于 FIFO O_WRONLY 阻塞等待)
fn has_reader(&self) -> bool {
self.inner.lock().reader > 0
/// 检查读端计数器是否已变化(用于 FIFO O_WRONLY 阻塞等待)
///
/// 为了处理计数器溢出回绕的极端情况,采用双重检查:
/// 1. 计数器是否变化(主要条件)
/// 2. 当前是否有读端存在(兜底条件,处理回绕)
fn r_counter_changed(&self, old: u32) -> bool {
let guard = self.inner.lock();
// 条件 1计数器变化正常情况
// 条件 2当前有读端处理极端的计数器回绕情况
guard.r_counter != old || guard.reader > 0
}
/// 设置管道缓冲区大小
@ -484,9 +506,11 @@ impl IndexNode for LockedPipeInode {
if accflags == FileFlags::O_RDONLY {
// 读端打开
let mut guard = self.inner.lock();
guard.r_counter += 1; // 增加读端计数器(永不减少)
guard.reader += 1;
guard.had_reader = true;
let has_writer = guard.writer > 0;
let writers = guard.writer;
let cur_w_counter = guard.w_counter; // 记录当前写端计数器
drop(guard);
// 只有 FIFO 才需要处理阻塞等待
@ -499,19 +523,21 @@ impl IndexNode for LockedPipeInode {
return Ok(());
}
// 阻塞模式:等待写端打开
if !has_writer {
// 阻塞模式:等待写端计数器变化(采用 Linux 内核的设计)
if writers == 0 {
// 在睡眠前必须释放 data 锁
drop(data);
let r =
wq_wait_event_interruptible!(self.open_wait_queue, self.has_writer(), {});
let r = wq_wait_event_interruptible!(
self.open_wait_queue,
self.w_counter_changed(cur_w_counter),
{}
);
if r.is_err() {
// 被信号中断,需要回滚 reader 计数
// 注意:不要回滚 r_counter它只增不减
// 注意:不要重置 had_reader即使 reader 变为 0
let mut guard = self.inner.lock();
guard.reader -= 1;
if guard.reader == 0 {
guard.had_reader = false;
}
drop(guard);
return Err(SystemError::EINTR);
}
@ -527,30 +553,34 @@ impl IndexNode for LockedPipeInode {
if guard.reader == 0 {
return Err(SystemError::ENXIO);
}
guard.w_counter += 1; // 增加写端计数器(永不减少)
guard.writer += 1;
drop(guard);
} else {
// 阻塞模式:先增加 writer 计数,再等待读端
// 这样可以避免竞态条件:读端在唤醒写端时能看到 writer > 0
// 采用 Linux 内核的设计:等待计数器变化
let mut guard = self.inner.lock();
guard.w_counter += 1; // 增加写端计数器(永不减少)
guard.writer += 1;
let has_reader = guard.reader > 0;
let readers = guard.reader;
let cur_r_counter = guard.r_counter; // 记录当前读端计数器
drop(guard);
// 唤醒可能在等待写端的读者(在增加 writer 之后立即唤醒)
// 唤醒可能在等待写端的读者(在增加 w_counter 之后立即唤醒)
self.open_wait_queue.wakeup_all(None);
if !has_reader {
if readers == 0 {
// 在睡眠前必须释放 data 锁
drop(data);
// 等待读端打开
// 等待读端计数器变化
let r = wq_wait_event_interruptible!(
self.open_wait_queue,
self.has_reader(),
self.r_counter_changed(cur_r_counter),
{}
);
if r.is_err() {
// 被信号中断,需要回滚 writer 计数
// 注意:不要回滚 w_counter它只增不减
let mut guard = self.inner.lock();
guard.writer -= 1;
drop(guard);
@ -566,12 +596,15 @@ impl IndexNode for LockedPipeInode {
} else {
// 匿名管道:直接增加写端计数
let mut guard = self.inner.lock();
guard.w_counter += 1;
guard.writer += 1;
drop(guard);
}
} else if accflags == FileFlags::O_RDWR {
// O_RDWR 模式:同时作为读端和写端,不阻塞
let mut guard = self.inner.lock();
guard.r_counter += 1; // 增加读端计数器
guard.w_counter += 1; // 增加写端计数器
guard.reader += 1;
guard.writer += 1;
guard.had_reader = true;