Refactor the pipe modules to fit semantics
This commit is contained in:
parent
6e081b1043
commit
2dbcd1194b
|
|
@ -23,7 +23,7 @@ use crate::{
|
|||
inode_handle::FileIo,
|
||||
notify::FsEventPublisher,
|
||||
path::{is_dot, is_dot_or_dotdot, is_dotdot},
|
||||
pipe::NamedPipe,
|
||||
pipe::Pipe,
|
||||
utils::{
|
||||
AccessMode, Extension, FallocMode, Inode as _, InodeMode, Metadata, Permission,
|
||||
StatusFlags, XattrName, XattrNamespace, XattrSetFlags,
|
||||
|
|
@ -697,7 +697,7 @@ impl Inode {
|
|||
) -> Result<Box<dyn FileIo>> {
|
||||
let inner = self.inner.read();
|
||||
let named_pipe = inner.named_pipe.as_ref().unwrap();
|
||||
named_pipe.open(access_mode, status_flags)
|
||||
named_pipe.open_named(access_mode, status_flags)
|
||||
}
|
||||
|
||||
pub fn read_at(&self, offset: usize, writer: &mut VmWriter) -> Result<usize> {
|
||||
|
|
@ -974,14 +974,14 @@ struct InodeInner {
|
|||
page_cache: PageCache,
|
||||
// This corresponds to the `i_pipe` field in `struct inode` in Linux.
|
||||
// Reference: <https://elixir.bootlin.com/linux/v6.17/source/include/linux/fs.h#L771>.
|
||||
named_pipe: Option<NamedPipe>,
|
||||
named_pipe: Option<Pipe>,
|
||||
}
|
||||
|
||||
impl InodeInner {
|
||||
pub fn new(desc: Dirty<InodeDesc>, weak_self: Weak<Inode>, fs: Weak<Ext2>) -> Self {
|
||||
let num_page_bytes = desc.num_page_bytes();
|
||||
let named_pipe = if desc.type_ == InodeType::NamedPipe {
|
||||
Some(NamedPipe::new())
|
||||
Some(Pipe::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
|
|
|||
|
|
@ -15,11 +15,11 @@ use crate::{
|
|||
file_table::FdFlags,
|
||||
notify::FsEventPublisher,
|
||||
path::RESERVED_MOUNT_ID,
|
||||
pipe::{named_pipe::NamedPipeHandle, NamedPipe},
|
||||
pseudofs::{pipefs_singleton, PseudoInode},
|
||||
pipe::{Pipe, common::PipeHandle},
|
||||
pseudofs::{PseudoInode, pipefs_singleton},
|
||||
utils::{
|
||||
mkmod, AccessMode, CreationFlags, FileSystem, Inode, InodeIo, InodeMode, InodeType,
|
||||
Metadata, StatusFlags,
|
||||
AccessMode, CreationFlags, FileSystem, Inode, InodeIo, InodeMode, InodeType, Metadata,
|
||||
StatusFlags, mkmod,
|
||||
},
|
||||
},
|
||||
prelude::*,
|
||||
|
|
@ -45,7 +45,7 @@ pub fn new_file_pair() -> Result<(Arc<AnonPipeFile>, Arc<AnonPipeFile>)> {
|
|||
/// An anonymous pipe file.
|
||||
pub struct AnonPipeFile {
|
||||
/// The opened pipe handle. `None` if the file is opened as a path.
|
||||
handle: Option<Box<NamedPipeHandle>>,
|
||||
handle: Option<Box<PipeHandle>>,
|
||||
pipe_inode: Arc<dyn Inode>,
|
||||
status_flags: AtomicU32,
|
||||
}
|
||||
|
|
@ -59,9 +59,7 @@ impl AnonPipeFile {
|
|||
check_status_flags(status_flags)?;
|
||||
|
||||
let handle = if !status_flags.contains(StatusFlags::O_PATH) {
|
||||
let handle = pipe_inode
|
||||
.pipe
|
||||
.open_handle(access_mode, status_flags, false)?;
|
||||
let handle = pipe_inode.pipe.open_anon(access_mode, status_flags)?;
|
||||
Some(handle)
|
||||
} else {
|
||||
None
|
||||
|
|
@ -179,14 +177,14 @@ fn check_status_flags(status_flags: StatusFlags) -> Result<()> {
|
|||
|
||||
/// An anonymous pipe inode.
|
||||
pub struct AnonPipeInode {
|
||||
/// The underlying named pipe backend.
|
||||
pipe: NamedPipe,
|
||||
/// The underlying pipe backend.
|
||||
pipe: Pipe,
|
||||
pseudo_inode: PseudoInode,
|
||||
}
|
||||
|
||||
impl AnonPipeInode {
|
||||
fn new() -> Self {
|
||||
let pipe = NamedPipe::new();
|
||||
let pipe = Pipe::new();
|
||||
|
||||
let pseudo_inode = PseudoInode::new(
|
||||
0,
|
||||
|
|
@ -1,8 +1,18 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use core::{
|
||||
num::Wrapping,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use ostd::sync::WaitQueue;
|
||||
|
||||
use crate::{
|
||||
events::IoEvents,
|
||||
fs::utils::{Endpoint, EndpointState},
|
||||
fs::{
|
||||
inode_handle::FileIo,
|
||||
utils::{AccessMode, Endpoint, EndpointState, InodeIo, StatusFlags},
|
||||
},
|
||||
prelude::*,
|
||||
process::{
|
||||
posix_thread::AsPosixThread,
|
||||
|
|
@ -15,6 +25,295 @@ use crate::{
|
|||
util::ring_buffer::{RbConsumer, RbProducer, RingBuffer},
|
||||
};
|
||||
|
||||
/// A handle for a pipe that implements `FileIo`.
|
||||
///
|
||||
/// Once a handle for a `Pipe` exists, the corresponding pipe object will
|
||||
/// not be dropped.
|
||||
pub(super) struct PipeHandle {
|
||||
inner: Arc<PipeObj>,
|
||||
access_mode: AccessMode,
|
||||
}
|
||||
|
||||
impl PipeHandle {
|
||||
pub(super) fn access_mode(&self) -> AccessMode {
|
||||
self.access_mode
|
||||
}
|
||||
|
||||
fn new(inner: Arc<PipeObj>, access_mode: AccessMode) -> Box<Self> {
|
||||
Box::new(Self { inner, access_mode })
|
||||
}
|
||||
|
||||
fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
// `InodeHandle` checks the access mode before calling methods in `FileIo`.
|
||||
debug_assert!(self.access_mode.is_readable());
|
||||
|
||||
self.inner.reader.try_read(writer)
|
||||
}
|
||||
|
||||
fn try_write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
// `InodeHandle` checks the access mode before calling methods in `FileIo`.
|
||||
debug_assert!(self.access_mode.is_writable());
|
||||
|
||||
self.inner.writer.try_write(reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl Pollable for PipeHandle {
|
||||
fn poll(&self, mask: IoEvents, mut poller: Option<&mut PollHandle>) -> IoEvents {
|
||||
let mut events = IoEvents::empty();
|
||||
|
||||
if self.access_mode.is_readable() {
|
||||
events |= self.inner.reader.poll(mask, poller.as_deref_mut());
|
||||
}
|
||||
|
||||
if self.access_mode.is_writable() {
|
||||
events |= self.inner.writer.poll(mask, poller);
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PipeHandle {
|
||||
fn drop(&mut self) {
|
||||
if self.access_mode.is_readable() {
|
||||
let old_value = self.inner.num_reader.fetch_sub(1, Ordering::Relaxed);
|
||||
if old_value == 1 {
|
||||
self.inner.reader.peer_shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
if self.access_mode.is_writable() {
|
||||
let old_value = self.inner.num_writer.fetch_sub(1, Ordering::Relaxed);
|
||||
if old_value == 1 {
|
||||
self.inner.writer.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InodeIo for PipeHandle {
|
||||
fn read_at(
|
||||
&self,
|
||||
_offset: usize,
|
||||
writer: &mut VmWriter,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<usize> {
|
||||
if !writer.has_avail() {
|
||||
// Even the peer endpoint has been closed, reading an empty buffer is
|
||||
// still fine.
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
self.try_read(writer)
|
||||
} else {
|
||||
self.wait_events(IoEvents::IN, None, || self.try_read(writer))
|
||||
}
|
||||
}
|
||||
|
||||
fn write_at(
|
||||
&self,
|
||||
_offset: usize,
|
||||
reader: &mut VmReader,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<usize> {
|
||||
if !reader.has_remain() {
|
||||
// Even the peer endpoint has been closed, writing an empty buffer is
|
||||
// still fine.
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
self.try_write(reader)
|
||||
} else {
|
||||
self.wait_events(IoEvents::OUT, None, || self.try_write(reader))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileIo for PipeHandle {
|
||||
fn check_seekable(&self) -> Result<()> {
|
||||
return_errno_with_message!(Errno::ESPIPE, "the inode is a FIFO file")
|
||||
}
|
||||
|
||||
fn is_offset_aware(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// A pipe (FIFO) that provides inter-process communication.
|
||||
///
|
||||
/// Pipes are special files that appear in the filesystem and provide a
|
||||
/// communication channel between processes. It can be opened multiple times
|
||||
/// for reading, writing, or both.
|
||||
///
|
||||
/// A `Pipe` will maintain exactly one **pipe object** that provides actual pipe
|
||||
/// functionalities when there is at least one handle opened on it.
|
||||
pub struct Pipe {
|
||||
pipe: Mutex<PipeInner>,
|
||||
wait_queue: WaitQueue,
|
||||
}
|
||||
|
||||
impl Pipe {
|
||||
/// Creates a new pipe.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
pipe: Mutex::new(PipeInner::default()),
|
||||
wait_queue: WaitQueue::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens the named pipe with the specified access mode and status flags.
|
||||
///
|
||||
/// Returns a handle that implements `FileIo` for performing I/O operations.
|
||||
///
|
||||
/// The open behavior follows POSIX semantics:
|
||||
/// - Opening for read-only blocks until a writer opens the pipe.
|
||||
/// - Opening for write-only blocks until a reader opens the pipe.
|
||||
/// - Opening for read-write never blocks.
|
||||
///
|
||||
/// If no handle of this named pipe has existed, the method will create a new pipe object.
|
||||
/// Otherwise, it will return a handle that works on the existing pipe object.
|
||||
pub fn open_named(
|
||||
&self,
|
||||
access_mode: AccessMode,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<Box<dyn FileIo>> {
|
||||
Ok(self.open_handle(access_mode, status_flags, true)?)
|
||||
}
|
||||
|
||||
/// Opens the anonymous pipe with the specified access mode and status flags.
|
||||
pub(super) fn open_anon(
|
||||
&self,
|
||||
access_mode: AccessMode,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<Box<PipeHandle>> {
|
||||
self.open_handle(access_mode, status_flags, false)
|
||||
}
|
||||
|
||||
/// Opens the pipe and returns a `PipeHandle`.
|
||||
fn open_handle(
|
||||
&self,
|
||||
access_mode: AccessMode,
|
||||
status_flags: StatusFlags,
|
||||
is_named_pipe: bool,
|
||||
) -> Result<Box<PipeHandle>> {
|
||||
let mut pipe = self.pipe.lock();
|
||||
let pipe_obj = pipe.get_or_create_pipe_obj();
|
||||
|
||||
let handle = match access_mode {
|
||||
AccessMode::O_RDONLY => {
|
||||
pipe.read_count += 1;
|
||||
|
||||
let old_value = pipe_obj.num_reader.fetch_add(1, Ordering::Relaxed);
|
||||
if old_value == 0 {
|
||||
pipe_obj.reader.peer_activate();
|
||||
self.wait_queue.wake_all();
|
||||
}
|
||||
|
||||
let has_writer = pipe_obj.num_writer.load(Ordering::Relaxed) > 0;
|
||||
let handle = PipeHandle::new(pipe_obj, access_mode);
|
||||
|
||||
// Reference: <https://elixir.bootlin.com/linux/v6.16.5/source/fs/pipe.c#L1166-L1175>
|
||||
if is_named_pipe && !status_flags.contains(StatusFlags::O_NONBLOCK) && !has_writer {
|
||||
let old_write_count = pipe.write_count;
|
||||
drop(pipe);
|
||||
self.wait_queue.pause_until(|| {
|
||||
(old_write_count != self.pipe.lock().write_count).then_some(())
|
||||
})?;
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
AccessMode::O_WRONLY => {
|
||||
pipe.write_count += 1;
|
||||
|
||||
let old_num_writer = pipe_obj.num_writer.fetch_add(1, Ordering::Relaxed);
|
||||
if old_num_writer == 0 {
|
||||
pipe_obj.writer.activate();
|
||||
self.wait_queue.wake_all();
|
||||
}
|
||||
|
||||
let has_reader = pipe_obj.num_reader.load(Ordering::Relaxed) > 0;
|
||||
let handle = PipeHandle::new(pipe_obj, access_mode);
|
||||
|
||||
// Reference: <https://elixir.bootlin.com/linux/v6.16.5/source/fs/pipe.c#L1184-L1195>
|
||||
if is_named_pipe && !has_reader {
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
return_errno_with_message!(Errno::ENXIO, "no reader is present");
|
||||
}
|
||||
|
||||
let old_read_count = pipe.read_count;
|
||||
drop(pipe);
|
||||
self.wait_queue.pause_until(|| {
|
||||
(old_read_count != self.pipe.lock().read_count).then_some(())
|
||||
})?;
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
AccessMode::O_RDWR => {
|
||||
pipe.read_count += 1;
|
||||
pipe.write_count += 1;
|
||||
|
||||
let old_num_reader = pipe_obj.num_reader.fetch_add(1, Ordering::Relaxed);
|
||||
let old_num_writer = pipe_obj.num_writer.fetch_add(1, Ordering::Relaxed);
|
||||
if old_num_reader == 0 || old_num_writer == 0 {
|
||||
self.wait_queue.wake_all();
|
||||
pipe_obj.writer.activate();
|
||||
}
|
||||
|
||||
PipeHandle::new(pipe_obj, access_mode)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
}
|
||||
|
||||
struct PipeObj {
|
||||
reader: PipeReader,
|
||||
writer: PipeWriter,
|
||||
num_reader: AtomicUsize,
|
||||
num_writer: AtomicUsize,
|
||||
}
|
||||
|
||||
impl PipeObj {
|
||||
fn new() -> Arc<Self> {
|
||||
let (reader, writer) = super::common::new_pair();
|
||||
Arc::new(Self {
|
||||
reader,
|
||||
writer,
|
||||
num_reader: AtomicUsize::new(0),
|
||||
num_writer: AtomicUsize::new(0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct PipeInner {
|
||||
// Holding a weak reference here ensures that the pipe object (including
|
||||
// the buffer data) will be dropped when no handle is open on the pipe.
|
||||
// This is consistent with Linux behavior.
|
||||
pipe_obj: Weak<PipeObj>,
|
||||
read_count: Wrapping<usize>,
|
||||
write_count: Wrapping<usize>,
|
||||
}
|
||||
|
||||
impl PipeInner {
|
||||
fn get_or_create_pipe_obj(&mut self) -> Arc<PipeObj> {
|
||||
if let Some(pipe_obj) = self.pipe_obj.upgrade() {
|
||||
return pipe_obj;
|
||||
}
|
||||
|
||||
let pipe_obj = PipeObj::new();
|
||||
self.pipe_obj = Arc::downgrade(&pipe_obj);
|
||||
|
||||
pipe_obj
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(ktest))]
|
||||
const DEFAULT_PIPE_BUF_SIZE: usize = 65536;
|
||||
#[cfg(ktest)]
|
||||
|
|
@ -33,7 +332,7 @@ const PIPE_BUF: usize = 4096;
|
|||
#[cfg(ktest)]
|
||||
const PIPE_BUF: usize = 2;
|
||||
|
||||
pub(super) fn new_pair() -> (PipeReader, PipeWriter) {
|
||||
fn new_pair() -> (PipeReader, PipeWriter) {
|
||||
new_pair_with_capacity(DEFAULT_PIPE_BUF_SIZE)
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +347,7 @@ fn new_pair_with_capacity(capacity: usize) -> (PipeReader, PipeWriter) {
|
|||
)
|
||||
}
|
||||
|
||||
pub(super) struct PipeReader {
|
||||
struct PipeReader {
|
||||
consumer: Mutex<RbConsumer<u8>>,
|
||||
state: Endpoint<EndpointState>,
|
||||
}
|
||||
|
|
@ -61,7 +360,7 @@ impl PipeReader {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
let read = || {
|
||||
let mut consumer = self.consumer.lock();
|
||||
consumer.read_fallible(writer)
|
||||
|
|
@ -70,11 +369,11 @@ impl PipeReader {
|
|||
self.state.read_with(read)
|
||||
}
|
||||
|
||||
pub(super) fn peer_shutdown(&self) {
|
||||
fn peer_shutdown(&self) {
|
||||
self.state.peer_shutdown();
|
||||
}
|
||||
|
||||
pub(super) fn peer_activate(&self) {
|
||||
fn peer_activate(&self) {
|
||||
self.state.peer_activate();
|
||||
}
|
||||
|
||||
|
|
@ -97,7 +396,7 @@ impl Pollable for PipeReader {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) struct PipeWriter {
|
||||
struct PipeWriter {
|
||||
producer: Mutex<RbProducer<u8>>,
|
||||
state: Endpoint<EndpointState>,
|
||||
}
|
||||
|
|
@ -110,7 +409,7 @@ impl PipeWriter {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
fn try_write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
let write = || {
|
||||
let mut producer = self.producer.lock();
|
||||
if reader.remain() <= PIPE_BUF && producer.free_len() < reader.remain() {
|
||||
|
|
@ -135,11 +434,11 @@ impl PipeWriter {
|
|||
res
|
||||
}
|
||||
|
||||
pub(super) fn shutdown(&self) {
|
||||
fn shutdown(&self) {
|
||||
self.state.shutdown();
|
||||
}
|
||||
|
||||
pub(super) fn activate(&self) {
|
||||
fn activate(&self) {
|
||||
self.state.activate();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,9 +4,8 @@
|
|||
//!
|
||||
//! This module provides both anonymous and named pipes for inter-process communication.
|
||||
|
||||
pub use anony_pipe::{new_file_pair, AnonPipeFile, AnonPipeInode};
|
||||
pub use named_pipe::NamedPipe;
|
||||
pub use anon_pipe::{AnonPipeFile, AnonPipeInode, new_file_pair};
|
||||
pub use common::Pipe;
|
||||
|
||||
mod anony_pipe;
|
||||
mod anon_pipe;
|
||||
mod common;
|
||||
mod named_pipe;
|
||||
|
|
|
|||
|
|
@ -1,299 +0,0 @@
|
|||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use core::{
|
||||
num::Wrapping,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use ostd::sync::WaitQueue;
|
||||
|
||||
use super::common::{PipeReader, PipeWriter};
|
||||
use crate::{
|
||||
events::IoEvents,
|
||||
fs::{
|
||||
inode_handle::FileIo,
|
||||
utils::{AccessMode, InodeIo, StatusFlags},
|
||||
},
|
||||
prelude::*,
|
||||
process::signal::{PollHandle, Pollable},
|
||||
};
|
||||
|
||||
/// A handle for a named pipe that implements `FileIo`.
|
||||
///
|
||||
/// Once a handle for a `NamedPipe` exists, the corresponding pipe object will
|
||||
/// not be dropped.
|
||||
pub(super) struct NamedPipeHandle {
|
||||
inner: Arc<PipeObj>,
|
||||
access_mode: AccessMode,
|
||||
}
|
||||
|
||||
impl NamedPipeHandle {
|
||||
pub(super) fn access_mode(&self) -> AccessMode {
|
||||
self.access_mode
|
||||
}
|
||||
|
||||
fn new(inner: Arc<PipeObj>, access_mode: AccessMode) -> Box<Self> {
|
||||
Box::new(Self { inner, access_mode })
|
||||
}
|
||||
|
||||
fn try_read(&self, writer: &mut VmWriter) -> Result<usize> {
|
||||
// `InodeHandle` checks the access mode before calling methods in `FileIo`.
|
||||
debug_assert!(self.access_mode.is_readable());
|
||||
|
||||
self.inner.reader.try_read(writer)
|
||||
}
|
||||
|
||||
fn try_write(&self, reader: &mut VmReader) -> Result<usize> {
|
||||
// `InodeHandle` checks the access mode before calling methods in `FileIo`.
|
||||
debug_assert!(self.access_mode.is_writable());
|
||||
|
||||
self.inner.writer.try_write(reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl Pollable for NamedPipeHandle {
|
||||
fn poll(&self, mask: IoEvents, mut poller: Option<&mut PollHandle>) -> IoEvents {
|
||||
let mut events = IoEvents::empty();
|
||||
|
||||
if self.access_mode.is_readable() {
|
||||
events |= self.inner.reader.poll(mask, poller.as_deref_mut());
|
||||
}
|
||||
|
||||
if self.access_mode.is_writable() {
|
||||
events |= self.inner.writer.poll(mask, poller);
|
||||
}
|
||||
|
||||
events
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for NamedPipeHandle {
|
||||
fn drop(&mut self) {
|
||||
if self.access_mode.is_readable() {
|
||||
let old_value = self.inner.num_reader.fetch_sub(1, Ordering::Relaxed);
|
||||
if old_value == 1 {
|
||||
self.inner.reader.peer_shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
if self.access_mode.is_writable() {
|
||||
let old_value = self.inner.num_writer.fetch_sub(1, Ordering::Relaxed);
|
||||
if old_value == 1 {
|
||||
self.inner.writer.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InodeIo for NamedPipeHandle {
|
||||
fn read_at(
|
||||
&self,
|
||||
_offset: usize,
|
||||
writer: &mut VmWriter,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<usize> {
|
||||
if !writer.has_avail() {
|
||||
// Even the peer endpoint has been closed, reading an empty buffer is
|
||||
// still fine.
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
self.try_read(writer)
|
||||
} else {
|
||||
self.wait_events(IoEvents::IN, None, || self.try_read(writer))
|
||||
}
|
||||
}
|
||||
|
||||
fn write_at(
|
||||
&self,
|
||||
_offset: usize,
|
||||
reader: &mut VmReader,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<usize> {
|
||||
if !reader.has_remain() {
|
||||
// Even the peer endpoint has been closed, writing an empty buffer is
|
||||
// still fine.
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
self.try_write(reader)
|
||||
} else {
|
||||
self.wait_events(IoEvents::OUT, None, || self.try_write(reader))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileIo for NamedPipeHandle {
|
||||
fn check_seekable(&self) -> Result<()> {
|
||||
return_errno_with_message!(Errno::ESPIPE, "the inode is a FIFO file")
|
||||
}
|
||||
|
||||
fn is_offset_aware(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// A named pipe (FIFO) that provides inter-process communication.
|
||||
///
|
||||
/// Named pipes are special files that appear in the filesystem and provide
|
||||
/// a communication channel between processes. It can be opened multiple times
|
||||
/// for reading, writing, or both.
|
||||
///
|
||||
/// A `NamedPipe` will maintain exactly one **pipe object** that provides actual pipe
|
||||
/// functionalities when there is at least one handle opened on it.
|
||||
pub struct NamedPipe {
|
||||
pipe: Mutex<NamedPipeInner>,
|
||||
wait_queue: WaitQueue,
|
||||
}
|
||||
|
||||
impl NamedPipe {
|
||||
/// Creates a new named pipe.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
pipe: Mutex::new(NamedPipeInner::default()),
|
||||
wait_queue: WaitQueue::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens the named pipe with the specified access mode and status flags.
|
||||
///
|
||||
/// Returns a handle that implements `FileIo` for performing I/O operations.
|
||||
///
|
||||
/// The open behavior follows POSIX semantics:
|
||||
/// - Opening for read-only blocks until a writer opens the pipe.
|
||||
/// - Opening for write-only blocks until a reader opens the pipe.
|
||||
/// - Opening for read-write never blocks.
|
||||
///
|
||||
/// If no handle of this named pipe has existed, the method will create a new pipe object.
|
||||
/// Otherwise, it will return a handle that works on the existing pipe object.
|
||||
pub fn open(
|
||||
&self,
|
||||
access_mode: AccessMode,
|
||||
status_flags: StatusFlags,
|
||||
) -> Result<Box<dyn FileIo>> {
|
||||
Ok(self.open_handle(access_mode, status_flags, true)?)
|
||||
}
|
||||
|
||||
/// Opens the pipe and returns a `NamedPipeHandle`.
|
||||
pub(super) fn open_handle(
|
||||
&self,
|
||||
access_mode: AccessMode,
|
||||
status_flags: StatusFlags,
|
||||
is_named_pipe: bool,
|
||||
) -> Result<Box<NamedPipeHandle>> {
|
||||
let mut pipe = self.pipe.lock();
|
||||
let pipe_obj = pipe.get_or_create_pipe_obj();
|
||||
|
||||
let handle = match access_mode {
|
||||
AccessMode::O_RDONLY => {
|
||||
pipe.read_count += 1;
|
||||
|
||||
let old_value = pipe_obj.num_reader.fetch_add(1, Ordering::Relaxed);
|
||||
if old_value == 0 {
|
||||
pipe_obj.reader.peer_activate();
|
||||
self.wait_queue.wake_all();
|
||||
}
|
||||
|
||||
let has_writer = pipe_obj.num_writer.load(Ordering::Relaxed) > 0;
|
||||
let handle = NamedPipeHandle::new(pipe_obj, access_mode);
|
||||
|
||||
// Reference: <https://elixir.bootlin.com/linux/v6.16.5/source/fs/pipe.c#L1166-L1175>
|
||||
if is_named_pipe && !status_flags.contains(StatusFlags::O_NONBLOCK) && !has_writer {
|
||||
let old_write_count = pipe.write_count;
|
||||
drop(pipe);
|
||||
self.wait_queue.pause_until(|| {
|
||||
(old_write_count != self.pipe.lock().write_count).then_some(())
|
||||
})?;
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
AccessMode::O_WRONLY => {
|
||||
pipe.write_count += 1;
|
||||
|
||||
let old_num_writer = pipe_obj.num_writer.fetch_add(1, Ordering::Relaxed);
|
||||
if old_num_writer == 0 {
|
||||
pipe_obj.writer.activate();
|
||||
self.wait_queue.wake_all();
|
||||
}
|
||||
|
||||
let has_reader = pipe_obj.num_reader.load(Ordering::Relaxed) > 0;
|
||||
let handle = NamedPipeHandle::new(pipe_obj, access_mode);
|
||||
|
||||
// Reference: <https://elixir.bootlin.com/linux/v6.16.5/source/fs/pipe.c#L1184-L1195>
|
||||
if is_named_pipe && !has_reader {
|
||||
if status_flags.contains(StatusFlags::O_NONBLOCK) {
|
||||
return_errno_with_message!(Errno::ENXIO, "no reader is present");
|
||||
}
|
||||
|
||||
let old_read_count = pipe.read_count;
|
||||
drop(pipe);
|
||||
self.wait_queue.pause_until(|| {
|
||||
(old_read_count != self.pipe.lock().read_count).then_some(())
|
||||
})?;
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
AccessMode::O_RDWR => {
|
||||
pipe.read_count += 1;
|
||||
pipe.write_count += 1;
|
||||
|
||||
let old_num_reader = pipe_obj.num_reader.fetch_add(1, Ordering::Relaxed);
|
||||
let old_num_writer = pipe_obj.num_writer.fetch_add(1, Ordering::Relaxed);
|
||||
if old_num_reader == 0 || old_num_writer == 0 {
|
||||
self.wait_queue.wake_all();
|
||||
pipe_obj.writer.activate();
|
||||
}
|
||||
|
||||
NamedPipeHandle::new(pipe_obj, access_mode)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
}
|
||||
|
||||
struct PipeObj {
|
||||
reader: PipeReader,
|
||||
writer: PipeWriter,
|
||||
num_reader: AtomicUsize,
|
||||
num_writer: AtomicUsize,
|
||||
}
|
||||
|
||||
impl PipeObj {
|
||||
fn new() -> Arc<Self> {
|
||||
let (reader, writer) = super::common::new_pair();
|
||||
Arc::new(Self {
|
||||
reader,
|
||||
writer,
|
||||
num_reader: AtomicUsize::new(0),
|
||||
num_writer: AtomicUsize::new(0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct NamedPipeInner {
|
||||
// Holding a weak reference here ensures that the pipe object (including
|
||||
// the buffer data) will be dropped when no handle is open on the named
|
||||
// pipe. This is consistent with Linux behavior.
|
||||
pipe_obj: Weak<PipeObj>,
|
||||
read_count: Wrapping<usize>,
|
||||
write_count: Wrapping<usize>,
|
||||
}
|
||||
|
||||
impl NamedPipeInner {
|
||||
fn get_or_create_pipe_obj(&mut self) -> Arc<PipeObj> {
|
||||
if let Some(pipe_obj) = self.pipe_obj.upgrade() {
|
||||
return pipe_obj;
|
||||
}
|
||||
|
||||
let pipe_obj = PipeObj::new();
|
||||
self.pipe_obj = Arc::downgrade(&pipe_obj);
|
||||
|
||||
pipe_obj
|
||||
}
|
||||
}
|
||||
|
|
@ -23,7 +23,7 @@ use crate::{
|
|||
inode_handle::FileIo,
|
||||
notify::FsEventPublisher,
|
||||
path::{is_dot, is_dot_or_dotdot, is_dotdot},
|
||||
pipe::NamedPipe,
|
||||
pipe::Pipe,
|
||||
registry::{FsProperties, FsType},
|
||||
utils::{
|
||||
AccessMode, CStr256, CachePage, DirentVisitor, Extension, FallocMode, FileSystem,
|
||||
|
|
@ -132,7 +132,7 @@ enum Inner {
|
|||
BlockDevice(u64),
|
||||
CharDevice(u64),
|
||||
Socket,
|
||||
NamedPipe(NamedPipe),
|
||||
NamedPipe(Pipe),
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
|
|
@ -161,7 +161,7 @@ impl Inner {
|
|||
}
|
||||
|
||||
pub(self) fn new_named_pipe() -> Self {
|
||||
Self::NamedPipe(NamedPipe::new())
|
||||
Self::NamedPipe(Pipe::new())
|
||||
}
|
||||
|
||||
pub(self) fn new_file_in_memfd(this: Weak<MemfdInode>) -> Self {
|
||||
|
|
@ -228,7 +228,7 @@ impl Inner {
|
|||
|
||||
Some(device.open())
|
||||
}
|
||||
Self::NamedPipe(pipe) => Some(pipe.open(access_mode, status_flags)),
|
||||
Self::NamedPipe(pipe) => Some(pipe.open_named(access_mode, status_flags)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue