feat(pipe): 修复&完善pipe的行为,并且在procfs新增fd相关支持。 (#1426)

* feat(pipe): 改进管道实现,支持命名管道和符合POSIX的行为

- 新增管道文件系统(PipeFS)并注册PIPEFS_MAGIC
- 扩展管道缓冲区至65536字节,支持原子写入
- 实现命名管道(FIFO)支持,允许O_RDWR模式打开
- 改进fcntl的F_SETFL实现,仅允许修改特定标志位
- 修复写入只读文件描述符的错误码为EBADF
- 为命名管道自动添加O_LARGEFILE标志
- 优化管道读写逻辑,支持循环写入和部分写入

Signed-off-by: longjin <longjin@DragonOS.org>

* fix(vfs): 在preadv/pwrite64/pwritev系统调用中增加对管道、Socket和字符设备的ESPIPE错误检查

Signed-off-by: longjin <longjin@DragonOS.org>

* feat(vfs): 为fcntl系统调用添加管道缓冲区大小查询功能

- 实现F_GETPIPE_SZ和F_SETPIPE_SZ命令,支持获取和设置管道缓冲区大小
- 优化管道关闭逻辑,避免潜在死锁问题
- 公开PIPE_BUFF_SIZE常量供系统调用使用

Signed-off-by: longjin <longjin@DragonOS.org>

* feat(procfs): 支持/proc/self/fd/N魔法链接并实现管道FIONREAD

- 为procfs的InodeInfo添加target_inode字段,用于存储魔法链接的原始文件inode
- 实现IndexNode::special_node方法,使/proc/self/fd/N能返回原始文件的引用
- 在VFS中处理SpecialNodeData::Reference,支持魔法链接的路径解析
- 为管道文件实现ioctl的FIONREAD命令,获取可读字节数

Signed-off-by: longjin <longjin@DragonOS.org>

* feat(procfs): 添加/proc/<pid>/fdinfo目录支持并实现管道缓冲区动态调整

- 新增ProcFdInfoDir和ProcFdInfoFile枚举类型,支持/proc/<pid>/fdinfo目录和文件
- 实现fdinfo目录的动态查找和列表功能,与fd目录共享文件描述符列表逻辑
- 重构fcntl系统调用,支持F_GETPIPE_SZ和F_SETPIPE_SZ命令的动态管道缓冲区管理
- 修改管道实现,使用动态分配的Vec缓冲区替代固定大小数组,支持运行时调整大小
- 添加管道缓冲区大小验证和迁移逻辑,确保数据完整性

Signed-off-by: longjin <longjin@DragonOS.org>

* feat(filesystem): 提升文件描述符表最大容量并优化分配策略

- 将文件描述符表最大容量从65536提升至1048576
- 优化文件描述符分配算法,支持动态扩容
- 更新进程资源限制以匹配新的最大容量

Signed-off-by: longjin <longjin@DragonOS.org>

* perf(pipe): 实现管道缓冲区延迟分配以优化内存使用

- 修改管道缓冲区初始化逻辑,从立即分配改为首次写入时分配
- 优化缓冲区大小调整逻辑,避免不必要的内存分配
- 更新管道元数据返回逻辑,移除冗余计算
- 添加管道测试黑名单文件以排除已知卡死问题

Signed-off-by: longjin <longjin@DragonOS.org>

* perf(filesystem): 优化文件描述符分配性能

Signed-off-by: longjin <longjin@DragonOS.org>

---------

Signed-off-by: longjin <longjin@DragonOS.org>
This commit is contained in:
LoGin 2025-12-02 23:11:50 +08:00 committed by GitHub
parent c0122b5e6a
commit 2d48f12b2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 622 additions and 105 deletions

View File

@ -71,6 +71,10 @@ pub enum ProcFileType {
ProcSelf,
ProcFdDir,
ProcFdFile,
/// /proc/<pid>/fdinfo 目录
ProcFdInfoDir,
/// /proc/<pid>/fdinfo/<fd> 文件
ProcFdInfoFile,
ProcMounts,
/// /proc/version
ProcVersion,
@ -169,7 +173,6 @@ impl<'a> ProcFileCreationParamsBuilder<'a> {
/// @brief 节点私有信息结构体
/// @usage 用于传入各类文件所需的信息
#[derive(Debug)]
pub struct InodeInfo {
///进程的pid
pid: Option<RawPid>,
@ -177,9 +180,22 @@ pub struct InodeInfo {
ftype: ProcFileType,
/// 文件描述符
fd: i32,
/// 对于 /proc/self/fd/N 这种魔法链接,存储原始文件的 inode
target_inode: Option<Arc<dyn IndexNode>>,
// 其他需要传入的信息在此定义
}
impl core::fmt::Debug for InodeInfo {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("InodeInfo")
.field("pid", &self.pid)
.field("ftype", &self.ftype)
.field("fd", &self.fd)
.field("target_inode", &self.target_inode.is_some())
.finish()
}
}
/// @brief procfs的inode名称的最大长度
const PROCFS_MAX_NAMELEN: usize = 64;
const PROCFS_BLOCK_SIZE: u64 = 512;
@ -573,6 +589,7 @@ impl ProcFS {
pid: None,
ftype: ProcFileType::Default,
fd: -1,
target_inode: None,
},
dname: DName::default(),
})));
@ -634,6 +651,18 @@ impl ProcFS {
let fd = pid_dir.create("fd", FileType::Dir, InodeMode::from_bits_truncate(0o555))?;
let fd = fd.as_any_ref().downcast_ref::<LockedProcFSInode>().unwrap();
fd.0.lock().fdata.ftype = ProcFileType::ProcFdDir;
// fdinfo dir
let fdinfo = pid_dir.create(
"fdinfo",
FileType::Dir,
InodeMode::from_bits_truncate(0o555),
)?;
let fdinfo = fdinfo
.as_any_ref()
.downcast_ref::<LockedProcFSInode>()
.unwrap();
fdinfo.0.lock().fdata.ftype = ProcFileType::ProcFdInfoDir;
//todo: 创建其他文件
return Ok(());
@ -650,6 +679,7 @@ impl ProcFS {
pid_dir.unlink("status")?;
pid_dir.unlink("exe")?;
pid_dir.rmdir("fd")?;
pid_dir.rmdir("fdinfo")?;
// 查看进程文件是否还存在
// let pf= pid_dir.find("status").expect("Cannot find status");
@ -714,15 +744,23 @@ impl LockedProcFSInode {
let fd_table = pcb.fd_table();
let fd_table = fd_table.read();
let file = fd_table.get_file_by_fd(fd);
if file.is_some() {
if let Some(file) = file {
// 获取原始文件的 inode
let target_inode = file.inode();
drop(fd_table);
let _ = self.unlink(&fd.to_string());
let fd_file = self.create(&fd.to_string(), FileType::SymLink, InodeMode::S_IRUGO)?;
let fd_file_proc = fd_file
.as_any_ref()
.downcast_ref::<LockedProcFSInode>()
.unwrap();
fd_file_proc.0.lock().fdata.fd = fd;
fd_file_proc.0.lock().fdata.ftype = ProcFileType::ProcFdFile;
let mut guard = fd_file_proc.0.lock();
guard.fdata.fd = fd;
guard.fdata.ftype = ProcFileType::ProcFdFile;
// 存储原始文件的 inode用于魔法链接
guard.fdata.target_inode = Some(target_inode);
drop(guard);
return Ok(fd_file);
} else {
return Err(SystemError::ENOENT);
@ -737,6 +775,42 @@ impl LockedProcFSInode {
let res = fd_table.iter().map(|(fd, _)| fd.to_string()).collect();
return Ok(res);
}
fn dynamical_find_fdinfo(&self, fd: &str) -> Result<Arc<dyn IndexNode>, SystemError> {
let fd_num = fd.parse::<i32>().map_err(|_| SystemError::EINVAL)?;
let pcb = ProcessManager::current_pcb();
let fd_table = pcb.fd_table();
let fd_table = fd_table.read();
let file = fd_table.get_file_by_fd(fd_num);
if file.is_some() {
drop(fd_table);
let _ = self.unlink(&fd_num.to_string());
// fdinfo 文件是普通文件,不是符号链接
let fdinfo_file =
self.create(&fd_num.to_string(), FileType::File, InodeMode::S_IRUGO)?;
let fdinfo_file_proc = fdinfo_file
.as_any_ref()
.downcast_ref::<LockedProcFSInode>()
.unwrap();
let mut guard = fdinfo_file_proc.0.lock();
guard.fdata.fd = fd_num;
guard.fdata.ftype = ProcFileType::ProcFdInfoFile;
drop(guard);
return Ok(fdinfo_file);
} else {
return Err(SystemError::ENOENT);
}
}
fn dynamical_list_fdinfo(&self) -> Result<Vec<String>, SystemError> {
// 与 fd 目录共享相同的列表逻辑
let pcb = ProcessManager::current_pcb();
let fd_table = pcb.fd_table();
let fd_table = fd_table.read();
let res = fd_table.iter().map(|(fd, _)| fd.to_string()).collect();
return Ok(res);
}
}
/// 为 `/proc/thread-self/ns/*` 节点构造 namespace fd 绑定的私有数据。
@ -797,6 +871,8 @@ impl IndexNode for LockedProcFSInode {
ProcFileType::ProcKmsg
| ProcFileType::ProcFdDir
| ProcFileType::ProcFdFile
| ProcFileType::ProcFdInfoDir
| ProcFileType::ProcFdInfoFile
| ProcFileType::ProcThreadSelfNsRoot
| ProcFileType::ProcSysKernelPrintk => 0,
};
@ -1031,6 +1107,7 @@ impl IndexNode for LockedProcFSInode {
pid: None,
ftype: ProcFileType::Default,
fd: -1,
target_inode: None,
},
dname: dname.clone(),
})));
@ -1148,6 +1225,9 @@ impl IndexNode for LockedProcFSInode {
ProcFileType::ProcFdDir => {
return self.dynamical_find_fd(name);
}
ProcFileType::ProcFdInfoDir => {
return self.dynamical_find_fdinfo(name);
}
ProcFileType::ProcThreadSelfNsRoot => {
return self.dynamical_find_thread_self_ns(name);
}
@ -1229,6 +1309,11 @@ impl IndexNode for LockedProcFSInode {
keys.append(&mut fd_list);
return Ok(keys);
}
ProcFileType::ProcFdInfoDir => {
let mut fdinfo_list = self.dynamical_list_fdinfo()?;
keys.append(&mut fdinfo_list);
return Ok(keys);
}
ProcFileType::ProcThreadSelfNsRoot => {
keys.extend(ThreadSelfNsFileType::ALL_NAME.iter().map(|s| s.to_string()));
@ -1253,6 +1338,17 @@ impl IndexNode for LockedProcFSInode {
fn dname(&self) -> Result<DName, SystemError> {
Ok(self.0.lock().dname.clone())
}
fn special_node(&self) -> Option<super::vfs::SpecialNodeData> {
let guard = self.0.lock();
// 对于 /proc/self/fd/N 这种魔法链接,返回原始文件的 inode
if guard.fdata.ftype == ProcFileType::ProcFdFile {
if let Some(target_inode) = &guard.fdata.target_inode {
return Some(super::vfs::SpecialNodeData::Reference(target_inode.clone()));
}
}
None
}
}
/// @brief 向procfs注册进程

View File

@ -182,26 +182,26 @@ impl FileFlags {
///
/// 这是正确提取访问模式的方法,因为O_RDONLY=0不能用contains()检查
#[inline]
pub fn access_flags(&self) -> u32 {
self.bits() & Self::O_ACCMODE.bits()
pub fn access_flags(&self) -> FileFlags {
*self & Self::O_ACCMODE
}
/// @brief 检查是否是只读模式
#[inline]
pub fn is_read_only(&self) -> bool {
self.access_flags() == Self::O_RDONLY.bits()
self.access_flags() == Self::O_RDONLY
}
/// @brief 检查是否是只写模式
#[inline]
pub fn is_write_only(&self) -> bool {
self.access_flags() == Self::O_WRONLY.bits()
self.access_flags() == Self::O_WRONLY
}
/// @brief 检查是否是读写模式
#[inline]
pub fn is_rdwr(&self) -> bool {
self.access_flags() == Self::O_RDWR.bits()
self.access_flags() == Self::O_RDWR
}
/// 检查是否设置了 FASYNC 标志
@ -311,7 +311,7 @@ impl FileMode {
/// - 以及对于抑制fsnotify/fanotify机制触发通知的标志FMODE_NONOTIFY
pub fn open_fmode(flags: FileFlags) -> Self {
let fmode = flags.bits() & FileMode::FMODE_NONOTIFY.bits()
| (flags.access_flags() + 1) & FileFlags::O_ACCMODE.bits();
| (flags.access_flags().bits + 1) & FileFlags::O_ACCMODE.bits();
// 初始只设置访问模式,其他能力在后续设置
FileMode::from_bits_truncate(fmode)
@ -375,10 +375,21 @@ impl File {
pub fn new(inode: Arc<dyn IndexNode>, mut flags: FileFlags) -> Result<Self, SystemError> {
let mut inode = inode;
let file_type = inode.metadata()?.file_type;
if file_type == FileType::Pipe {
// 检查是否为命名管道FIFO
let is_named_pipe = if file_type == FileType::Pipe {
if let Some(SpecialNodeData::Pipe(pipe_inode)) = inode.special_node() {
inode = pipe_inode;
true
} else {
false
}
} else {
false
};
// 对于命名管道,自动添加 O_LARGEFILE 标志(符合 Linux 行为)
if is_named_pipe {
flags.insert(FileFlags::O_LARGEFILE);
}
let metadata = inode.metadata()?;
@ -1047,6 +1058,9 @@ impl Drop for File {
pub struct FileDescriptorVec {
/// 当前进程打开的文件描述符
fds: Vec<Option<Arc<File>>>,
/// 下一个可能空闲的文件描述符号用于优化分配避免O(n²)扫描)
/// 类似于 Linux 的 fd_next_fd
next_fd: usize,
}
impl Default for FileDescriptorVec {
fn default() -> Self {
@ -1057,7 +1071,7 @@ impl FileDescriptorVec {
/// 文件描述符表的初始容量
pub const INITIAL_CAPACITY: usize = 1024;
/// 文件描述符表的最大容量限制(防止无限扩容)
pub const MAX_CAPACITY: usize = 65536;
pub const MAX_CAPACITY: usize = 1048576;
#[inline(never)]
pub fn new() -> FileDescriptorVec {
@ -1065,7 +1079,10 @@ impl FileDescriptorVec {
data.resize(FileDescriptorVec::INITIAL_CAPACITY, None);
// 初始化文件描述符数组结构体
return FileDescriptorVec { fds: data };
return FileDescriptorVec {
fds: data,
next_fd: 0,
};
}
/// @brief 克隆一个文件描述符数组
@ -1081,6 +1098,8 @@ impl FileDescriptorVec {
res.fds[i] = Some(file.clone());
}
}
// 复制 next_fd 以保持相同的分配状态
res.next_fd = self.next_fd;
return res;
}
@ -1119,6 +1138,10 @@ impl FileDescriptorVec {
let target = core::cmp::max(new_capacity, floor);
if target < current_len {
self.fds.truncate(target);
// 确保 next_fd 不超过新的容量
if self.next_fd > target {
self.next_fd = target;
}
}
}
Ok(())
@ -1176,19 +1199,48 @@ impl FileDescriptorVec {
let x = &mut self.fds[new_fd as usize];
if x.is_none() {
*x = Some(Arc::new(file));
// 更新 next_fd如果分配的是 next_fd 位置,则推进到下一个
if new_fd as usize == self.next_fd {
self.next_fd = new_fd as usize + 1;
}
return Ok(new_fd);
} else {
return Err(SystemError::EBADF);
}
} else {
// 没有指定要申请的文件描述符编号,在有效范围内查找空位
// 没有指定要申请的文件描述符编号
// 使用 next_fd 作为起始搜索位置避免每次都从0开始扫描 (O(n²) -> O(n))
let max_search = core::cmp::min(self.fds.len(), nofile_limit);
for i in 0..max_search {
// 从 next_fd 开始查找空位
for i in self.next_fd..max_search {
if self.fds[i].is_none() {
self.fds[i] = Some(Arc::new(file));
// 更新 next_fd 为下一个位置
self.next_fd = i + 1;
return Ok(i as i32);
}
}
// 当前容量内没有空位,尝试扩容
// 计算新的容量:当前容量翻倍,但不超过 nofile_limit
let current_len = self.fds.len();
if current_len < nofile_limit {
// 扩容策略:翻倍或增加到 nofile_limit取较小值
let new_capacity = core::cmp::min(
core::cmp::max(current_len * 2, current_len + 1),
nofile_limit,
);
self.resize_to_capacity(new_capacity)?;
// 扩容后,第一个新位置就是空的
let new_fd = current_len;
self.fds[new_fd] = Some(Arc::new(file));
// 更新 next_fd
self.next_fd = new_fd + 1;
return Ok(new_fd as i32);
}
return Err(SystemError::EMFILE);
}
}
@ -1240,6 +1292,14 @@ impl FileDescriptorVec {
// 把文件描述符数组对应位置设置为空
let file = self.fds[fd as usize].take().unwrap();
// 更新 next_fd如果释放的fd比当前next_fd小则更新next_fd
// 这确保下次分配时可以复用较小的fd号符合POSIX语义
// POSIX要求分配最小可用的fd号
if (fd as usize) < self.next_fd {
self.next_fd = fd as usize;
}
return Ok(file);
}

View File

@ -201,6 +201,8 @@ pub enum SpecialNodeData {
CharDevice(Arc<dyn CharDevice>),
/// 块设备
BlockDevice(Arc<dyn BlockDevice>),
/// 指向其他 inode 的引用(用于 /proc/self/fd/N 这种魔法链接)
Reference(Arc<dyn IndexNode>),
}
/* these are defined by POSIX and also present in glibc's dirent.h */
@ -965,6 +967,22 @@ impl dyn IndexNode {
// 跟随符号链接跳转
if file_type == FileType::SymLink && max_follow_times > 0 {
// 首先检查是否是"魔法链接"(如 /proc/self/fd/N
// 这些链接的 readlink 返回的路径可能不可解析(如 pipe:[xxx]
// 但它们有一个 special_node 指向真实的 inode
if let Some(SpecialNodeData::Reference(target_inode)) = inode.special_node() {
// 如果还有剩余路径,继续在目标 inode 上查找
if rest_path.is_empty() {
return Ok(target_inode);
} else {
return target_inode.lookup_follow_symlink2(
&rest_path,
max_follow_times - 1,
follow_final_symlink,
);
}
}
let mut content = [0u8; 256];
// 读取符号链接
// TODO:We need to clarify which interfaces require private data and which do not
@ -1135,6 +1153,7 @@ bitflags! {
const PROC_MAGIC = 0x9fa0;
const RAMFS_MAGIC = 0x858458f6;
const MOUNT_MAGIC = 61267;
const PIPEFS_MAGIC = 0x50495045;
}
}

View File

@ -1,5 +1,7 @@
use crate::arch::syscall::nr::SYS_FCNTL;
use crate::filesystem::vfs::FileType;
use crate::filesystem::vfs::InodeFlags;
use crate::ipc::pipe::LockedPipeInode;
use crate::process::RawPid;
use crate::{
arch::interrupt::TrapFrame,
@ -16,7 +18,15 @@ use log::warn;
use num_traits::FromPrimitive;
use system_error::SystemError;
pub struct SysFcntlHandle;
// Only allow changing these flags
const SETFL_MASK: u32 = FileFlags::O_APPEND.bits()
| FileFlags::O_NONBLOCK.bits()
| FileFlags::O_DSYNC.bits()
| FileFlags::FASYNC.bits()
| FileFlags::O_DIRECT.bits()
| FileFlags::O_NOATIME.bits();
struct SysFcntlHandle;
impl Syscall for SysFcntlHandle {
fn num_args(&self) -> usize {
@ -57,8 +67,8 @@ impl SysFcntlHandle {
args[1] as u32
}
fn arg(args: &[usize]) -> i32 {
args[2] as i32
fn arg(args: &[usize]) -> usize {
args[2]
}
/// # fcntl
@ -67,8 +77,8 @@ impl SysFcntlHandle {
///
/// - `fd`:文件描述符
/// - `cmd`:命令
/// - `arg`:参数
pub fn do_fcntl(fd: i32, cmd: FcntlCommand, arg: i32) -> Result<usize, SystemError> {
/// - `arg`:参数(对于某些命令,这是一个 64 位值)
pub fn do_fcntl(fd: i32, cmd: FcntlCommand, arg: usize) -> Result<usize, SystemError> {
// debug!("fcntl ({cmd:?}) fd: {fd}, arg={arg}");
match cmd {
FcntlCommand::DupFd | FcntlCommand::DupFdCloexec => {
@ -76,10 +86,10 @@ impl SysFcntlHandle {
let nofile = ProcessManager::current_pcb()
.get_rlimit(crate::process::resource::RLimitID::Nofile)
.rlim_cur as usize;
if arg < 0 || arg as usize >= nofile {
let arg_i32 = arg as i32;
if arg_i32 < 0 || arg >= nofile {
return Err(SystemError::EBADF);
}
let arg = arg as usize;
let binding = ProcessManager::current_pcb().fd_table();
let mut fd_table_guard = binding.write();
@ -151,12 +161,23 @@ impl SysFcntlHandle {
}
FcntlCommand::SetFlags => {
// Set file status flags.
// According to Linux man page, F_SETFL can only change:
// O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME, and O_NONBLOCK
// File access mode (O_RDONLY, O_WRONLY, O_RDWR) and file creation flags
// (O_CREAT, O_EXCL, O_NOCTTY, O_TRUNC) in arg are ignored.
let binding = ProcessManager::current_pcb().fd_table();
let fd_table_guard = binding.write();
if let Some(file) = fd_table_guard.get_file_by_fd(fd) {
let arg = arg as u32;
let new_flags = FileFlags::from_bits(arg).ok_or(SystemError::EINVAL)?;
// Get current mode
let current_flags = file.flags();
// Preserve access mode and other non-changeable flags
let preserved = current_flags.bits() & !SETFL_MASK;
// Apply new flags (only the ones allowed to change)
let new_bits = preserved | (arg & SETFL_MASK);
let new_flags = FileFlags::from_bits_truncate(new_bits);
// drop guard 以避免无法调度的问题
drop(fd_table_guard);
let inode_flags = file.get_inode_flags()?;
@ -172,7 +193,9 @@ impl SysFcntlHandle {
return Err(SystemError::EBADF);
}
FcntlCommand::SetOwn => {
let pid = arg.unsigned_abs();
// arg 作为 pid_t有符号整数处理
let arg_i32 = arg as i32;
let pid = arg_i32.unsigned_abs();
if pid > i32::MAX as u32 {
return Err(SystemError::EINVAL);
}
@ -202,6 +225,53 @@ impl SysFcntlHandle {
return Ok(owner.data());
}
FcntlCommand::GetPipeSize => {
// F_GETPIPE_SZ: 获取管道缓冲区大小
let binding = ProcessManager::current_pcb().fd_table();
let file = binding
.read()
.get_file_by_fd(fd)
.ok_or(SystemError::EBADF)?;
// 检查是否是管道
let metadata = file.metadata()?;
if metadata.file_type != FileType::Pipe {
return Err(SystemError::EBADF);
}
// 获取 pipe inode 并返回实际大小
let inode = file.inode();
let pipe_inode = inode
.as_any_ref()
.downcast_ref::<LockedPipeInode>()
.ok_or(SystemError::EBADF)?;
return Ok(pipe_inode.get_pipe_size());
}
FcntlCommand::SetPipeSize => {
// F_SETPIPE_SZ: 设置管道缓冲区大小
let binding = ProcessManager::current_pcb().fd_table();
let file = binding
.read()
.get_file_by_fd(fd)
.ok_or(SystemError::EBADF)?;
// 检查是否是管道
let metadata = file.metadata()?;
if metadata.file_type != FileType::Pipe {
return Err(SystemError::EBADF);
}
// 获取 pipe inode 并设置大小
let inode = file.inode();
let pipe_inode = inode
.as_any_ref()
.downcast_ref::<LockedPipeInode>()
.ok_or(SystemError::EBADF)?;
// set_pipe_size 内部会验证大小是否合法
return pipe_inode.set_pipe_size(arg);
}
_ => {
// TODO: unimplemented
// 未实现的命令返回0不报错。

View File

@ -5,6 +5,7 @@ use system_error::SystemError;
use crate::arch::syscall::nr::SYS_PREADV;
use crate::filesystem::vfs::iov::{IoVec, IoVecs};
use crate::filesystem::vfs::FileType;
use crate::process::ProcessManager;
use crate::syscall::table::{FormattedSyscallParam, Syscall};
@ -71,6 +72,15 @@ pub fn do_preadv(fd: i32, iovecs: &IoVecs, offset: usize) -> Result<usize, Syste
drop(fd_table_guard);
// 检查是否是管道/Socket (ESPIPE)
let md = file.metadata()?;
if md.file_type == FileType::Pipe
|| md.file_type == FileType::Socket
|| md.file_type == FileType::CharDevice
{
return Err(SystemError::ESPIPE);
}
// Create a kernel buffer to read data into.
// TODO: Support scatter-gather I/O directly in FS to avoid this copy.
let mut data = vec![0; iovecs.total_len()];

View File

@ -4,6 +4,7 @@ use system_error::SystemError;
use crate::arch::interrupt::TrapFrame;
use crate::arch::syscall::nr::SYS_PWRITE64;
use crate::filesystem::vfs::FileType;
use crate::process::ProcessManager;
use crate::syscall::table::FormattedSyscallParam;
use crate::syscall::table::Syscall;
@ -49,6 +50,15 @@ impl Syscall for SysPwrite64Handle {
drop(fd_table_guard);
let file = file.unwrap();
// 检查是否是管道/Socket (ESPIPE)
let md = file.metadata()?;
if md.file_type == FileType::Pipe
|| md.file_type == FileType::Socket
|| md.file_type == FileType::CharDevice
{
return Err(SystemError::ESPIPE);
}
return file.pwrite(offset, len, user_buf);
}

View File

@ -5,6 +5,7 @@ use system_error::SystemError;
use crate::arch::syscall::nr::SYS_PWRITEV;
use crate::filesystem::vfs::iov::{IoVec, IoVecs};
use crate::filesystem::vfs::FileType;
use crate::process::ProcessManager;
use crate::syscall::table::{FormattedSyscallParam, Syscall};
@ -92,6 +93,16 @@ pub fn do_pwritev(fd: i32, buf: &[u8], offset: usize) -> Result<usize, SystemErr
// 释放 fd_table_guard 的读锁
drop(fd_table_guard);
// 检查是否是管道/Socket (ESPIPE)
let md = file.metadata()?;
if md.file_type == FileType::Pipe
|| md.file_type == FileType::Socket
|| md.file_type == FileType::CharDevice
{
return Err(SystemError::ESPIPE);
}
file.pwrite(offset, buf.len(), buf)
}

View File

@ -1,5 +1,5 @@
use crate::{
arch::ipc::signal::Signal,
arch::{ipc::signal::Signal, MMArch},
filesystem::{
epoll::{
event_poll::{EventPoll, LockedEPItemLinkedList},
@ -7,7 +7,7 @@ use crate::{
},
vfs::{
file::FileFlags, vcore::generate_inode_id, FilePrivateData, FileSystem, FileType,
IndexNode, InodeFlags, InodeMode, Metadata, PollableInode,
FsInfo, IndexNode, InodeFlags, InodeMode, Magic, Metadata, PollableInode, SuperBlock,
},
},
ipc::signal_types::SigCode,
@ -15,11 +15,16 @@ use crate::{
spinlock::{SpinLock, SpinLockGuard},
wait_queue::WaitQueue,
},
mm::MemoryManagementArch,
process::{ProcessFlags, ProcessManager, ProcessState},
sched::SchedMode,
syscall::user_access::UserBufferWriter,
time::PosixTimeSpec,
};
use alloc::string::String;
use alloc::vec;
use alloc::vec::Vec;
use core::any::Any;
use core::sync::atomic::compiler_fence;
use alloc::sync::{Arc, Weak};
@ -27,8 +32,60 @@ use system_error::SystemError;
use super::signal_types::{SigInfo, SigType};
/// 我们设定pipe_buff的总大小为1024字节
const PIPE_BUFF_SIZE: usize = 1024;
/// 管道缓冲区默认大小Linux 默认 65536 字节)
pub const PIPE_BUFF_SIZE: usize = 65536;
/// 管道缓冲区最小大小一页大小Linux 保证原子写入的最小单位)
pub const PIPE_MIN_SIZE: usize = 4096;
/// 管道缓冲区最大大小Linux 默认为 1MB
pub const PIPE_MAX_SIZE: usize = 1024 * 1024;
// FIONREAD: 获取管道中可读的字节数
const FIONREAD: u32 = 0x541B;
// 管道文件系统 - 全局单例
lazy_static! {
static ref PIPEFS: Arc<PipeFS> = Arc::new(PipeFS);
}
/// 管道文件系统
#[derive(Debug)]
pub struct PipeFS;
impl FileSystem for PipeFS {
fn root_inode(&self) -> Arc<dyn IndexNode> {
// PipeFS 没有真正的根 inode但我们需要实现这个方法
// 返回一个空的 pipe inode 作为占位符
LockedPipeInode::new()
}
fn info(&self) -> FsInfo {
FsInfo {
blk_dev_id: 0,
max_name_len: 255,
}
}
fn as_any_ref(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"pipefs"
}
fn super_block(&self) -> SuperBlock {
SuperBlock::new(Magic::PIPEFS_MAGIC, MMArch::PAGE_SIZE as u64, 255)
}
}
impl PipeFS {
/// 获取全局 PipeFS 实例
pub fn instance() -> Arc<PipeFS> {
PIPEFS.clone()
}
}
#[derive(Debug, Clone)]
pub struct PipeFsPrivateData {
@ -62,7 +119,10 @@ pub struct InnerPipeInode {
valid_cnt: i32,
read_pos: i32,
write_pos: i32,
data: [u8; PIPE_BUFF_SIZE],
/// 管道缓冲区数据(使用 Vec 支持动态大小)
data: Vec<u8>,
/// 当前缓冲区大小
buf_size: usize,
/// INode 元数据
metadata: Metadata,
reader: u32,
@ -94,7 +154,7 @@ impl InnerPipeInode {
if !flags.is_read_only() {
// 管道内数据未满
if self.valid_cnt as usize != PIPE_BUFF_SIZE {
if self.valid_cnt as usize != self.buf_size {
events.insert(EPollEventType::EPOLLOUT | EPollEventType::EPOLLWRNORM);
}
@ -108,7 +168,7 @@ impl InnerPipeInode {
}
fn buf_full(&self) -> bool {
return self.valid_cnt as usize == PIPE_BUFF_SIZE;
return self.valid_cnt as usize == self.buf_size;
}
}
@ -120,7 +180,8 @@ impl LockedPipeInode {
read_pos: 0,
write_pos: 0,
had_reader: false,
data: [0; PIPE_BUFF_SIZE],
data: Vec::new(), // 延迟分配:初始为空,第一次写入时分配
buf_size: PIPE_BUFF_SIZE,
metadata: Metadata {
dev_id: 0,
@ -169,6 +230,88 @@ impl LockedPipeInode {
let inode = self.inner.lock();
return !inode.buf_full() || inode.reader == 0;
}
/// 设置管道缓冲区大小
/// 成功返回新的大小,失败返回错误
pub fn set_pipe_size(&self, size: usize) -> Result<usize, SystemError> {
// 验证请求的大小
// Linux 限制:不能超过 /proc/sys/fs/pipe-max-size默认 1MB
// 大于 i32::MAX 的值是无效的(因为在 64 位系统上 long long 可能传入超大值)
if size > PIPE_MAX_SIZE || size > i32::MAX as usize {
return Err(SystemError::EINVAL);
}
// 将请求的大小向上对齐到页面大小的倍数
let page_size = MMArch::PAGE_SIZE;
let new_size = if size == 0 {
PIPE_MIN_SIZE
} else {
// 向上对齐到页面大小
size.div_ceil(page_size) * page_size
};
// 确保不小于最小值
let new_size = new_size.max(PIPE_MIN_SIZE);
// 确保不大于最大值
let new_size = new_size.min(PIPE_MAX_SIZE);
let mut inner = self.inner.lock();
// 如果新大小小于当前数据量,返回 EBUSY
if new_size < inner.valid_cnt as usize {
return Err(SystemError::EBUSY);
}
let old_size = inner.buf_size;
if new_size == old_size {
return Ok(new_size);
}
// 如果有数据,需要重新分配缓冲区并迁移数据
if inner.valid_cnt > 0 {
// 需要重新分配缓冲区
let mut new_data = vec![0u8; new_size];
let data_len = inner.valid_cnt as usize;
let read_pos = inner.read_pos as usize;
// 从旧缓冲区复制数据到新缓冲区(线性化)
if read_pos + data_len <= old_size {
// 数据没有跨越缓冲区边界
new_data[..data_len].copy_from_slice(&inner.data[read_pos..read_pos + data_len]);
} else {
// 数据跨越了缓冲区边界
let first_part = old_size - read_pos;
new_data[..first_part].copy_from_slice(&inner.data[read_pos..old_size]);
let second_part = data_len - first_part;
new_data[first_part..data_len].copy_from_slice(&inner.data[..second_part]);
}
// 重置读写位置
inner.read_pos = 0;
inner.write_pos = data_len as i32;
inner.data = new_data;
} else {
// 没有数据,只需更新大小
// 如果缓冲区已分配,需要重新分配(大小改变)
if !inner.data.is_empty() {
inner.data = vec![0u8; new_size];
}
// 如果缓冲区未分配,保持 data 为空(延迟分配)
// 重置读写位置应该已经为0
inner.read_pos = 0;
inner.write_pos = 0;
}
inner.buf_size = new_size;
inner.metadata.size = new_size as i64;
Ok(new_size)
}
/// 获取管道缓冲区大小
pub fn get_pipe_size(&self) -> usize {
self.inner.lock().buf_size
}
}
impl PollableInode for LockedPipeInode {
@ -261,8 +404,9 @@ impl IndexNode for LockedPipeInode {
num = len;
}
let buf_size = inner_guard.buf_size;
// 采用两段复制,统一处理不跨尾、跨尾、以及 end==start 的写满/读空边界
let first = core::cmp::min(num, PIPE_BUFF_SIZE - start);
let first = core::cmp::min(num, buf_size - start);
let second = num as isize - first as isize;
// 第1段从 start 开始直到缓冲尾部或读取完
buf[0..first].copy_from_slice(&inner_guard.data[start..start + first]);
@ -272,7 +416,7 @@ impl IndexNode for LockedPipeInode {
}
//更新读位置以及valid_cnt
inner_guard.read_pos = (inner_guard.read_pos + num as i32) % PIPE_BUFF_SIZE as i32;
inner_guard.read_pos = (inner_guard.read_pos + num as i32) % buf_size as i32;
inner_guard.valid_cnt -= num as i32;
// 读完以后如果未读完,则唤醒下一个读者
@ -300,26 +444,23 @@ impl IndexNode for LockedPipeInode {
) -> Result<(), SystemError> {
let accflags = flags.access_flags();
let mut guard = self.inner.lock();
// 不能以读写方式打开管道
if accflags == FileFlags::O_RDWR.bits() {
return Err(SystemError::EACCES);
} else if accflags == FileFlags::O_RDONLY.bits() {
// 根据访问模式增加读/写计数
// 注意命名管道FIFO允许以 O_RDWR 模式打开
if accflags == FileFlags::O_RDONLY {
guard.reader += 1;
guard.had_reader = true;
// println!(
// "FIFO: pipe try open in read flags with reader pid:{:?}",
// ProcessManager::current_pid()
// );
} else if accflags == FileFlags::O_WRONLY.bits() {
// println!(
// "FIFO: pipe try open in write flags with {} reader, writer pid:{:?}",
// guard.reader,
// ProcessManager::current_pid()
// );
} else if accflags == FileFlags::O_WRONLY {
// 非阻塞模式下,如果没有读者,返回 ENXIO
if guard.reader == 0 && flags.contains(FileFlags::O_NONBLOCK) {
return Err(SystemError::ENXIO);
}
guard.writer += 1;
} else if accflags == FileFlags::O_RDWR {
// O_RDWR 模式:同时作为读端和写端
// 这对于命名管道FIFO是有效的
guard.reader += 1;
guard.writer += 1;
guard.had_reader = true;
}
// 设置flags
@ -330,10 +471,7 @@ impl IndexNode for LockedPipeInode {
fn metadata(&self) -> Result<crate::filesystem::vfs::Metadata, SystemError> {
let inode = self.inner.lock();
let mut metadata = inode.metadata.clone();
metadata.size = inode.data.len() as i64;
return Ok(metadata);
return Ok(inode.metadata.clone());
}
fn close(&self, data: SpinLockGuard<FilePrivateData>) -> Result<(), SystemError> {
@ -347,22 +485,48 @@ impl IndexNode for LockedPipeInode {
let mut guard = self.inner.lock();
// 写端关闭
if accflags == FileFlags::O_WRONLY.bits() {
if accflags == FileFlags::O_WRONLY {
assert!(guard.writer > 0);
guard.writer -= 1;
// 如果已经没有写端了,则唤醒读端
if guard.writer == 0 {
drop(guard); // 先释放 inner 锁,避免潜在的死锁
self.read_wait_queue
.wakeup_all(Some(ProcessState::Blocked(true)));
return Ok(());
}
}
// 读端关闭
if accflags == FileFlags::O_RDONLY.bits() {
if accflags == FileFlags::O_RDONLY {
assert!(guard.reader > 0);
guard.reader -= 1;
// 如果已经没有写端了,则唤醒读
// 如果已经没有读端了,则唤醒写
if guard.reader == 0 {
drop(guard); // 先释放 inner 锁,避免死锁
self.write_wait_queue
.wakeup_all(Some(ProcessState::Blocked(true)));
return Ok(());
}
}
// O_RDWR 模式关闭:同时减少读写计数
if accflags == FileFlags::O_RDWR {
assert!(guard.reader > 0);
assert!(guard.writer > 0);
guard.reader -= 1;
guard.writer -= 1;
let wake_reader = guard.writer == 0;
let wake_writer = guard.reader == 0;
drop(guard); // 先释放 inner 锁
// 如果已经没有写端了,则唤醒读端
if wake_reader {
self.read_wait_queue
.wakeup_all(Some(ProcessState::Blocked(true)));
}
// 如果已经没有读端了,则唤醒写端
if wake_writer {
self.write_wait_queue
.wakeup_all(Some(ProcessState::Blocked(true)));
}
@ -386,9 +550,14 @@ impl IndexNode for LockedPipeInode {
return Err(SystemError::EBADF);
}
if buf.len() < len || len > PIPE_BUFF_SIZE {
if buf.len() < len {
return Err(SystemError::EINVAL);
}
// 提前释放 data 锁,因为后续可能需要睡眠
// 我们已经提取了需要的 mode 信息
drop(data);
// 加锁
let mut inner_guard = self.inner.lock();
@ -427,43 +596,82 @@ impl IndexNode for LockedPipeInode {
}
}
// 如果管道空间不够
while len + inner_guard.valid_cnt as usize > PIPE_BUFF_SIZE {
// 唤醒读端
self.read_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
// 延迟分配:如果缓冲区未分配,在第一次写入时分配
if inner_guard.data.is_empty() {
// 分配缓冲区大小为 buf_size
let buf_size = inner_guard.buf_size;
inner_guard.data = vec![0u8; buf_size];
}
// 如果为非阻塞管道,直接返回错误
if flags.contains(FileFlags::O_NONBLOCK) {
let mut total_written: usize = 0;
// 循环写入,直到写完所有数据
while total_written < len {
// 计算本次要写入的字节数
let remaining = len - total_written;
let buf_size = inner_guard.buf_size;
let available_space = buf_size - inner_guard.valid_cnt as usize;
// 如果没有可用空间,需要等待
if available_space == 0 {
// 唤醒读端
self.read_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
// 如果为非阻塞管道,返回已写入的字节数或 EAGAIN
if flags.contains(FileFlags::O_NONBLOCK) {
drop(inner_guard);
if total_written > 0 {
return Ok(total_written);
}
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
// 解锁并睡眠
drop(inner_guard);
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {});
if r.is_err() {
if total_written > 0 {
return Ok(total_written);
}
return Err(SystemError::ERESTARTSYS);
}
inner_guard = self.inner.lock();
// 检查读端是否已关闭
if inner_guard.reader == 0 && inner_guard.had_reader {
if total_written > 0 {
return Ok(total_written);
}
return Err(SystemError::EPIPE);
}
continue;
}
// 解锁并睡眠
drop(inner_guard);
let r = wq_wait_event_interruptible!(self.write_wait_queue, self.writeable(), {});
if r.is_err() {
return Err(SystemError::ERESTARTSYS);
}
inner_guard = self.inner.lock();
}
// 计算本次写入的字节数
let to_write = core::cmp::min(remaining, available_space);
// 决定要输入的字节(两段复制处理 wrap 与 end==start 情况)
let start = inner_guard.write_pos as usize;
let first = core::cmp::min(len, PIPE_BUFF_SIZE - start);
let second = len as isize - first as isize;
// 第1段写到缓冲尾部或写完
inner_guard.data[start..start + first].copy_from_slice(&buf[0..first]);
// 第2段如需要从缓冲头部继续
if second > 0 {
inner_guard.data[0..second as usize].copy_from_slice(&buf[first..len]);
// 决定要输入的字节(两段复制处理 wrap 与 end==start 情况)
let start = inner_guard.write_pos as usize;
let first = core::cmp::min(to_write, buf_size - start);
let second = to_write as isize - first as isize;
// 第1段写到缓冲尾部或写完
inner_guard.data[start..start + first]
.copy_from_slice(&buf[total_written..total_written + first]);
// 第2段如需要从缓冲头部继续
if second > 0 {
inner_guard.data[0..second as usize]
.copy_from_slice(&buf[total_written + first..total_written + to_write]);
}
// 更新写位置以及valid_cnt
inner_guard.write_pos = (inner_guard.write_pos + to_write as i32) % buf_size as i32;
inner_guard.valid_cnt += to_write as i32;
total_written += to_write;
}
// 更新写位置以及valid_cnt
inner_guard.write_pos = (inner_guard.write_pos + len as i32) % PIPE_BUFF_SIZE as i32;
inner_guard.valid_cnt += len as i32;
// 写完后还有位置,则唤醒下一个写者
if (inner_guard.valid_cnt as usize) < PIPE_BUFF_SIZE {
if (inner_guard.valid_cnt as usize) < inner_guard.buf_size {
self.write_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
}
@ -472,14 +680,16 @@ impl IndexNode for LockedPipeInode {
self.read_wait_queue
.wakeup(Some(ProcessState::Blocked(true)));
let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&data)? as u32);
// 构造用于 poll 的 FilePrivateData
let poll_data = FilePrivateData::Pipefs(PipeFsPrivateData::new(flags));
let pollflag = EPollEventType::from_bits_truncate(inner_guard.poll(&poll_data)? as u32);
drop(inner_guard);
// 唤醒epoll中等待的进程
EventPoll::wakeup_epoll(&self.epitems, pollflag)?;
// 返回写入的字节数
return Ok(len);
return Ok(total_written);
}
fn as_any_ref(&self) -> &dyn core::any::Any {
@ -497,7 +707,7 @@ impl IndexNode for LockedPipeInode {
}
fn fs(&self) -> Arc<dyn FileSystem> {
todo!()
PipeFS::instance()
}
fn list(&self) -> Result<alloc::vec::Vec<alloc::string::String>, SystemError> {
@ -511,4 +721,27 @@ impl IndexNode for LockedPipeInode {
fn absolute_path(&self) -> Result<String, SystemError> {
Ok(String::from("pipe"))
}
fn ioctl(
&self,
cmd: u32,
data: usize,
_private_data: &FilePrivateData,
) -> Result<usize, SystemError> {
match cmd {
FIONREAD => {
let inner = self.inner.lock();
let available = inner.valid_cnt;
drop(inner);
let mut writer =
UserBufferWriter::new(data as *mut u8, core::mem::size_of::<i32>(), true)?;
writer
.buffer_protected(0)?
.write_one::<i32>(0, &available)?;
Ok(0)
}
_ => Err(SystemError::ENOSYS),
}
}
}

View File

@ -1,12 +1,8 @@
use crate::arch::interrupt::TrapFrame;
use crate::{
arch::syscall::nr::SYS_PIPE2,
filesystem::vfs::{
file::{File, FileFlags},
FilePrivateData,
},
ipc::pipe::{LockedPipeInode, PipeFsPrivateData},
libs::spinlock::SpinLock,
filesystem::vfs::file::{File, FileFlags},
ipc::pipe::LockedPipeInode,
process::ProcessManager,
syscall::{
table::{FormattedSyscallParam, Syscall},
@ -32,21 +28,15 @@ pub(super) fn do_kernel_pipe2(fd: *mut i32, flags: FileFlags) -> Result<usize, S
let fd = user_buffer.buffer::<i32>(0)?;
let pipe_ptr = LockedPipeInode::new();
let mut read_file = File::new(
let read_file = File::new(
pipe_ptr.clone(),
FileFlags::O_RDONLY | (flags & FileFlags::O_NONBLOCK),
)?;
read_file.private_data = SpinLock::new(FilePrivateData::Pipefs(PipeFsPrivateData::new(
FileFlags::O_RDONLY,
)));
let mut write_file = File::new(
let write_file = File::new(
pipe_ptr.clone(),
FileFlags::O_WRONLY | (flags & (FileFlags::O_NONBLOCK | FileFlags::O_DIRECT)),
)?;
write_file.private_data = SpinLock::new(FilePrivateData::Pipefs(PipeFsPrivateData::new(
FileFlags::O_WRONLY | (flags & (FileFlags::O_NONBLOCK | FileFlags::O_DIRECT)),
)));
if flags.contains(FileFlags::O_CLOEXEC) {
read_file.set_close_on_exec(true);

View File

@ -1071,11 +1071,11 @@ impl ProcessControlBlock {
rlim_max: 0,
}; RLimitID::Nlimits as usize];
// Linux 典型默认值软限制1024硬限制65536
// Linux 典型默认值软限制1024硬限制可通过setrlimit调整
// 文件描述符表会根据RLIMIT_NOFILE自动扩容
arr[RLimitID::Nofile as usize] = RLimit64 {
rlim_cur: 1024,
rlim_max: 65536,
rlim_cur: FileDescriptorVec::MAX_CAPACITY as u64,
rlim_max: FileDescriptorVec::MAX_CAPACITY as u64,
};
arr[RLimitID::Stack as usize] = RLimit64 {

View File

@ -0,0 +1,17 @@
# 卡死问题
Pipes/PipeTest.BlockPartialWriteClosed/pipe
Pipes/PipeTest.BlockPartialWriteClosed/pipe2blocking
Pipes/PipeTest.BlockPartialWriteClosed/pipe2nonblocking
Pipes/PipeTest.BlockPartialWriteClosed/smallbuffer
Pipes/PipeTest.BlockPartialWriteClosed/namednonblocking
Pipes/PipeTest.BlockPartialWriteClosed/namedblocking
# 卡死问题
Pipes/PipeTest.BlockWriteClosed/pipe
Pipes/PipeTest.BlockWriteClosed/pipe2blocking
Pipes/PipeTest.BlockWriteClosed/pipe2nonblocking
Pipes/PipeTest.BlockWriteClosed/smallbuffer
Pipes/PipeTest.BlockWriteClosed/namednonblocking
Pipes/PipeTest.BlockWriteClosed/namedblocking

View File

@ -66,3 +66,4 @@ sigtimedwait_test
# 其他测试
itimer_test
pipe_test