Support open and fstatfs for anonymous pipes

This commit is contained in:
vvsv 2025-12-05 16:22:12 +00:00 committed by Tate, Hongliang Tian
parent 3b44a638f4
commit f4a51e1ce9
7 changed files with 221 additions and 197 deletions

View File

@ -3,17 +3,24 @@
use core::{
fmt::Display,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use inherit_methods_macro::inherit_methods;
use crate::{
events::IoEvents,
fs::{
file_handle::FileLike,
file_table::FdFlags,
notify::FsEventPublisher,
path::RESERVED_MOUNT_ID,
pipe::common::{PipeReader, PipeWriter},
pseudofs::{PseudoInode, pipefs_singleton},
utils::{AccessMode, CreationFlags, Inode, InodeType, StatusFlags, mkmod},
pipe::{named_pipe::NamedPipeHandle, NamedPipe},
pseudofs::{pipefs_singleton, PseudoInode},
utils::{
mkmod, AccessMode, CreationFlags, FileSystem, Inode, InodeIo, InodeMode, InodeType,
Metadata, StatusFlags,
},
},
prelude::*,
process::{
@ -22,62 +29,121 @@ use crate::{
},
};
const DEFAULT_PIPE_BUF_SIZE: usize = 65536;
/// Creates a pair of connected pipe file handles with the default capacity.
pub fn new_file_pair() -> Result<(Arc<PipeReaderFile>, Arc<PipeWriterFile>)> {
new_file_pair_with_capacity(DEFAULT_PIPE_BUF_SIZE)
pub fn new_file_pair() -> Result<(Arc<AnonPipeFile>, Arc<AnonPipeFile>)> {
let pipe_inode = Arc::new(AnonPipeInode::new());
let reader = AnonPipeFile::open(
pipe_inode.clone(),
AccessMode::O_RDONLY,
StatusFlags::empty(),
)?;
let writer = AnonPipeFile::open(pipe_inode, AccessMode::O_WRONLY, StatusFlags::empty())?;
Ok((Arc::new(reader), Arc::new(writer)))
}
fn new_file_pair_with_capacity(
capacity: usize,
) -> Result<(Arc<PipeReaderFile>, Arc<PipeWriterFile>)> {
let (reader, writer) = super::common::new_pair_with_capacity(capacity);
let pseudo_inode = {
Arc::new(PseudoInode::new(
0,
InodeType::NamedPipe,
mkmod!(u+rw),
Uid::new_root(),
Gid::new_root(),
aster_block::BLOCK_SIZE,
Arc::downgrade(pipefs_singleton()),
))
};
Ok((
PipeReaderFile::new(reader, StatusFlags::empty(), pseudo_inode.clone())?,
PipeWriterFile::new(writer, StatusFlags::empty(), pseudo_inode)?,
))
}
/// A file handle for reading from a pipe.
pub struct PipeReaderFile {
reader: PipeReader,
/// An anonymous pipe file.
pub struct AnonPipeFile {
/// The opened pipe handle. `None` if the file is opened as a path.
handle: Option<Box<NamedPipeHandle>>,
pipe_inode: Arc<dyn Inode>,
status_flags: AtomicU32,
pseudo_inode: Arc<dyn Inode>,
}
impl PipeReaderFile {
fn new(
reader: PipeReader,
impl AnonPipeFile {
pub fn open(
pipe_inode: Arc<AnonPipeInode>,
access_mode: AccessMode,
status_flags: StatusFlags,
pseudo_inode: Arc<PseudoInode>,
) -> Result<Arc<Self>> {
) -> Result<Self> {
check_status_flags(status_flags)?;
Ok(Arc::new(Self {
reader,
let handle = if !status_flags.contains(StatusFlags::O_PATH) {
let handle = pipe_inode
.pipe
.open_handle(access_mode, status_flags, false)?;
Some(handle)
} else {
None
};
Ok(Self {
handle,
pipe_inode,
status_flags: AtomicU32::new(status_flags.bits()),
pseudo_inode,
}))
})
}
}
impl Pollable for PipeReaderFile {
impl Pollable for AnonPipeFile {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.reader.poll(mask, poller)
if let Some(handle) = &self.handle {
handle.poll(mask, poller)
} else {
IoEvents::NVAL
}
}
}
impl FileLike for AnonPipeFile {
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
let Some(handle) = &self.handle else {
return_errno_with_message!(Errno::EBADF, "the file is opened as a path");
};
if !handle.access_mode().is_readable() {
return_errno_with_message!(Errno::EBADF, "the file is not opened readable");
}
handle.read_at(0, writer, self.status_flags())
}
fn write(&self, reader: &mut VmReader) -> Result<usize> {
let Some(handle) = &self.handle else {
return_errno_with_message!(Errno::EBADF, "the file is opened as a path");
};
if !handle.access_mode().is_writable() {
return_errno_with_message!(Errno::EBADF, "the file is not opened writable");
}
handle.write_at(0, reader, self.status_flags())
}
fn status_flags(&self) -> StatusFlags {
StatusFlags::from_bits_truncate(self.status_flags.load(Ordering::Relaxed))
}
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
check_status_flags(new_flags)?;
self.status_flags.store(new_flags.bits(), Ordering::Relaxed);
Ok(())
}
fn access_mode(&self) -> AccessMode {
if let Some(handle) = &self.handle {
handle.access_mode()
} else {
// The file is opened with `O_PATH`. We follow Linux to report `O_RDONLY` here.
AccessMode::O_RDONLY
}
}
fn inode(&self) -> &Arc<dyn Inode> {
&self.pipe_inode
}
fn dump_proc_fdinfo(self: Arc<Self>, fd_flags: FdFlags) -> Box<dyn Display> {
let mut flags = self.status_flags().bits() | self.access_mode() as u32;
if fd_flags.contains(FdFlags::CLOEXEC) {
flags |= CreationFlags::O_CLOEXEC.bits();
}
Box::new(FdInfo {
flags,
ino: self.inode().ino(),
})
}
}
@ -96,135 +162,6 @@ impl Display for FdInfo {
}
}
impl FileLike for PipeReaderFile {
fn read(&self, writer: &mut VmWriter) -> Result<usize> {
if !writer.has_avail() {
// Even the peer endpoint (`PipeWriter`) has been closed, reading an empty buffer is
// still fine.
return Ok(0);
}
if self.status_flags().contains(StatusFlags::O_NONBLOCK) {
self.reader.try_read(writer)
} else {
self.wait_events(IoEvents::IN, None, || self.reader.try_read(writer))
}
}
fn status_flags(&self) -> StatusFlags {
StatusFlags::from_bits_truncate(self.status_flags.load(Ordering::Relaxed))
}
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
check_status_flags(new_flags)?;
self.status_flags.store(new_flags.bits(), Ordering::Relaxed);
Ok(())
}
fn access_mode(&self) -> AccessMode {
AccessMode::O_RDONLY
}
fn inode(&self) -> &Arc<dyn Inode> {
&self.pseudo_inode
}
fn dump_proc_fdinfo(self: Arc<Self>, fd_flags: FdFlags) -> Box<dyn Display> {
let mut flags = self.status_flags().bits() | self.access_mode() as u32;
if fd_flags.contains(FdFlags::CLOEXEC) {
flags |= CreationFlags::O_CLOEXEC.bits();
}
Box::new(FdInfo {
flags,
ino: self.inode().ino(),
})
}
}
impl Drop for PipeReaderFile {
fn drop(&mut self) {
self.reader.peer_shutdown();
}
}
/// A file handle for writing to a pipe.
pub struct PipeWriterFile {
writer: PipeWriter,
status_flags: AtomicU32,
pseudo_inode: Arc<dyn Inode>,
}
impl PipeWriterFile {
fn new(
writer: PipeWriter,
status_flags: StatusFlags,
pseudo_inode: Arc<PseudoInode>,
) -> Result<Arc<Self>> {
check_status_flags(status_flags)?;
Ok(Arc::new(Self {
writer,
status_flags: AtomicU32::new(status_flags.bits()),
pseudo_inode,
}))
}
}
impl Pollable for PipeWriterFile {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.writer.poll(mask, poller)
}
}
impl FileLike for PipeWriterFile {
fn write(&self, reader: &mut VmReader) -> Result<usize> {
if !reader.has_remain() {
// Even the peer endpoint (`PipeReader`) has been closed, writing an empty buffer is
// still fine.
return Ok(0);
}
if self.status_flags().contains(StatusFlags::O_NONBLOCK) {
self.writer.try_write(reader)
} else {
self.wait_events(IoEvents::OUT, None, || self.writer.try_write(reader))
}
}
fn status_flags(&self) -> StatusFlags {
StatusFlags::from_bits_truncate(self.status_flags.load(Ordering::Relaxed))
}
fn set_status_flags(&self, new_flags: StatusFlags) -> Result<()> {
check_status_flags(new_flags)?;
self.status_flags.store(new_flags.bits(), Ordering::Relaxed);
Ok(())
}
fn access_mode(&self) -> AccessMode {
AccessMode::O_WRONLY
}
fn inode(&self) -> &Arc<dyn Inode> {
&self.pseudo_inode
}
fn dump_proc_fdinfo(self: Arc<Self>, fd_flags: FdFlags) -> Box<dyn Display> {
let mut flags = self.status_flags().bits() | self.access_mode() as u32;
if fd_flags.contains(FdFlags::CLOEXEC) {
flags |= CreationFlags::O_CLOEXEC.bits();
}
Box::new(FdInfo {
flags,
ino: self.inode().ino(),
})
}
}
fn check_status_flags(status_flags: StatusFlags) -> Result<()> {
if status_flags.contains(StatusFlags::O_DIRECT) {
// "O_DIRECT .. Older kernels that do not support this flag will indicate this via an
@ -240,12 +177,70 @@ fn check_status_flags(status_flags: StatusFlags) -> Result<()> {
Ok(())
}
impl Drop for PipeWriterFile {
fn drop(&mut self) {
self.writer.shutdown();
/// An anonymous pipe inode.
pub struct AnonPipeInode {
/// The underlying named pipe backend.
pipe: NamedPipe,
pseudo_inode: PseudoInode,
}
impl AnonPipeInode {
fn new() -> Self {
let pipe = NamedPipe::new();
let pseudo_inode = PseudoInode::new(
0,
InodeType::NamedPipe,
mkmod!(u+rw),
Uid::new_root(),
Gid::new_root(),
aster_block::BLOCK_SIZE,
Arc::downgrade(pipefs_singleton()),
);
Self { pipe, pseudo_inode }
}
}
#[inherit_methods(from = "self.pseudo_inode")]
impl InodeIo for AnonPipeInode {
fn read_at(
&self,
_offset: usize,
_writer: &mut VmWriter,
_status: StatusFlags,
) -> Result<usize>;
fn write_at(
&self,
_offset: usize,
_reader: &mut VmReader,
_status: StatusFlags,
) -> Result<usize>;
}
#[inherit_methods(from = "self.pseudo_inode")]
impl Inode for AnonPipeInode {
fn size(&self) -> usize;
fn resize(&self, _new_size: usize) -> Result<()>;
fn metadata(&self) -> Metadata;
fn fs_event_publisher(&self) -> &FsEventPublisher;
fn ino(&self) -> u64;
fn type_(&self) -> InodeType;
fn mode(&self) -> Result<InodeMode>;
fn set_mode(&self, mode: InodeMode) -> Result<()>;
fn owner(&self) -> Result<Uid>;
fn set_owner(&self, uid: Uid) -> Result<()>;
fn group(&self) -> Result<Gid>;
fn set_group(&self, gid: Gid) -> Result<()>;
fn atime(&self) -> Duration;
fn set_atime(&self, time: Duration);
fn mtime(&self) -> Duration;
fn set_mtime(&self, time: Duration);
fn ctime(&self) -> Duration;
fn set_ctime(&self, time: Duration);
fn fs(&self) -> Arc<dyn FileSystem>;
}
#[cfg(ktest)]
mod test {
use alloc::sync::Arc;
@ -264,10 +259,10 @@ mod test {
fn test_blocking<W, R>(write: W, read: R, ordering: Ordering)
where
W: FnOnce(Arc<PipeWriterFile>) + Send + 'static,
R: FnOnce(Arc<PipeReaderFile>) + Send + 'static,
W: FnOnce(Arc<AnonPipeFile>) + Send + 'static,
R: FnOnce(Arc<AnonPipeFile>) + Send + 'static,
{
let (reader, writer) = new_file_pair_with_capacity(2).unwrap();
let (reader, writer) = new_file_pair().unwrap();
let signal_writer = Arc::new(AtomicBool::new(false));
let signal_reader = signal_writer.clone();

View File

@ -15,7 +15,10 @@ use crate::{
util::ring_buffer::{RbConsumer, RbProducer, RingBuffer},
};
#[cfg(not(ktest))]
const DEFAULT_PIPE_BUF_SIZE: usize = 65536;
#[cfg(ktest)]
const DEFAULT_PIPE_BUF_SIZE: usize = 2;
/// Maximum number of bytes guaranteed to be written to a pipe atomically.
///
@ -34,7 +37,7 @@ pub(super) fn new_pair() -> (PipeReader, PipeWriter) {
new_pair_with_capacity(DEFAULT_PIPE_BUF_SIZE)
}
pub(super) fn new_pair_with_capacity(capacity: usize) -> (PipeReader, PipeWriter) {
fn new_pair_with_capacity(capacity: usize) -> (PipeReader, PipeWriter) {
let (producer, consumer) = RingBuffer::new(capacity).split();
let (producer_state, consumer_state) =
Endpoint::new_pair(EndpointState::default(), EndpointState::default());

View File

@ -4,7 +4,7 @@
//!
//! This module provides both anonymous and named pipes for inter-process communication.
pub use anony_pipe::new_file_pair;
pub use anony_pipe::{new_file_pair, AnonPipeFile, AnonPipeInode};
pub use named_pipe::NamedPipe;
mod anony_pipe;

View File

@ -22,12 +22,16 @@ use crate::{
///
/// Once a handle for a `NamedPipe` exists, the corresponding pipe object will
/// not be dropped.
struct NamedPipeHandle {
pub(super) struct NamedPipeHandle {
inner: Arc<PipeObj>,
access_mode: AccessMode,
}
impl NamedPipeHandle {
pub(super) fn access_mode(&self) -> AccessMode {
self.access_mode
}
fn new(inner: Arc<PipeObj>, access_mode: AccessMode) -> Box<Self> {
Box::new(Self { inner, access_mode })
}
@ -169,10 +173,20 @@ impl NamedPipe {
access_mode: AccessMode,
status_flags: StatusFlags,
) -> Result<Box<dyn FileIo>> {
Ok(self.open_handle(access_mode, status_flags, true)?)
}
/// Opens the pipe and returns a `NamedPipeHandle`.
pub(super) fn open_handle(
&self,
access_mode: AccessMode,
status_flags: StatusFlags,
is_named_pipe: bool,
) -> Result<Box<NamedPipeHandle>> {
let mut pipe = self.pipe.lock();
let pipe_obj = pipe.get_or_create_pipe_obj();
let handle: Box<dyn FileIo> = match access_mode {
let handle = match access_mode {
AccessMode::O_RDONLY => {
pipe.read_count += 1;
@ -185,7 +199,8 @@ impl NamedPipe {
let has_writer = pipe_obj.num_writer.load(Ordering::Relaxed) > 0;
let handle = NamedPipeHandle::new(pipe_obj, access_mode);
if !status_flags.contains(StatusFlags::O_NONBLOCK) && !has_writer {
// Reference: <https://elixir.bootlin.com/linux/v6.16.5/source/fs/pipe.c#L1166-L1175>
if is_named_pipe && !status_flags.contains(StatusFlags::O_NONBLOCK) && !has_writer {
let old_write_count = pipe.write_count;
drop(pipe);
self.wait_queue.pause_until(|| {
@ -207,7 +222,8 @@ impl NamedPipe {
let has_reader = pipe_obj.num_reader.load(Ordering::Relaxed) > 0;
let handle = NamedPipeHandle::new(pipe_obj, access_mode);
if !has_reader {
// Reference: <https://elixir.bootlin.com/linux/v6.16.5/source/fs/pipe.c#L1184-L1195>
if is_named_pipe && !has_reader {
if status_flags.contains(StatusFlags::O_NONBLOCK) {
return_errno_with_message!(Errno::ENXIO, "no reader is present");
}

View File

@ -297,7 +297,7 @@ impl MemfdFile {
})
}
pub fn open_from_inode(inode: Arc<MemfdInode>, open_args: OpenArgs) -> Result<Self> {
pub fn open(inode: Arc<MemfdInode>, open_args: OpenArgs) -> Result<Self> {
let inode: Arc<dyn Inode> = inode;
let status_flags = open_args.status_flags;
let access_mode = open_args.access_mode;

View File

@ -8,6 +8,7 @@ use crate::{
file_table::{FdFlags, FileDesc},
fs_resolver::{AT_FDCWD, FsPath, FsResolver, LookupResult, PathOrInode},
inode_handle::InodeHandle,
pipe::{AnonPipeFile, AnonPipeInode},
ramfs::memfd::{MemfdFile, MemfdInode},
utils::{AccessMode, CreationFlags, InodeMode, InodeType, OpenArgs, StatusFlags},
},
@ -92,11 +93,17 @@ fn do_open(
LookupResult::Resolved(target) => match target {
PathOrInode::Path(path) => Arc::new(path.open(open_args)?),
PathOrInode::Inode(inode) => {
// TODO: Support re-opening anonymous pipes.
let memfd_inode = Arc::downcast::<MemfdInode>(inode).map_err(|_| {
Error::with_message(Errno::ENXIO, "the inode is not re-openable")
})?;
Arc::new(MemfdFile::open_from_inode(memfd_inode.clone(), open_args)?)
if let Ok(memfd_inode) = Arc::downcast::<MemfdInode>(inode.clone()) {
Arc::new(MemfdFile::open(memfd_inode, open_args)?)
} else if let Ok(pipe_inode) = Arc::downcast::<AnonPipeInode>(inode) {
Arc::new(AnonPipeFile::open(
pipe_inode,
open_args.access_mode,
open_args.status_flags,
)?)
} else {
return_errno_with_message!(Errno::ENXIO, "the inode is not re-openable")
}
}
},
LookupResult::AtParent(result) => {

View File

@ -20,16 +20,19 @@ pub fn sys_statfs(path_ptr: Vaddr, statfs_buf_ptr: Vaddr, ctx: &Context) -> Resu
path_name, statfs_buf_ptr,
);
let path = {
let fs = {
let path_name = path_name.to_string_lossy();
let fs_path = FsPath::try_from(path_name.as_ref())?;
ctx.thread_local
let path_or_inode = ctx
.thread_local
.borrow_fs()
.resolver()
.read()
.lookup(&fs_path)?
.lookup_inode(&fs_path)?;
path_or_inode.inode().fs()
};
let statfs = Statfs::from(path.fs().sb());
let statfs = Statfs::from(fs.sb());
user_space.write_val(statfs_buf_ptr, &statfs)?;
Ok(SyscallReturn::Return(0))
}
@ -40,7 +43,7 @@ pub fn sys_fstatfs(fd: FileDesc, statfs_buf_ptr: Vaddr, ctx: &Context) -> Result
let fs = {
let mut file_table = ctx.thread_local.borrow_file_table_mut();
let file = get_file_fast!(&mut file_table, fd);
file.as_inode_handle_or_err()?.path().fs()
file.inode().fs()
};
let statfs = Statfs::from(fs.sb());