Refactor `try_read`

This commit is contained in:
Ruihan Li 2026-01-03 11:44:27 +08:00 committed by Tate, Hongliang Tian
parent 4a93b34e3d
commit 2155869920
1 changed files with 36 additions and 24 deletions

View File

@ -41,15 +41,17 @@ struct SubscriberEntry {
/// `InotifyFile` accepts events from multiple inotify subscribers (watches) on different inodes.
/// Users should read events from this file to receive notifications about filesystem changes.
pub struct InotifyFile {
// Next watch descriptor to allocate.
// The next watch descriptor to allocate.
next_wd: AtomicU32,
// A map from watch descriptors to subscriber entries.
watch_map: SpinLock<HashMap<u32, SubscriberEntry>>,
// A mutex to synchronize `read()` operations.
read_mutex: Mutex<()>,
// Whether the file is opened in non-blocking mode.
is_nonblocking: AtomicBool,
// Bounded queue of inotify events.
// A bounded queue of inotify events.
event_queue: SpinLock<VecDeque<InotifyEvent>>,
// Maximum capacity of the event queue.
// The maximum capacity of the event queue.
queue_capacity: usize,
// A pollable object for this inotify file.
pollee: Pollee,
@ -95,6 +97,7 @@ impl InotifyFile {
// Reference: <https://elixir.bootlin.com/linux/v6.17/source/fs/notify/inotify/inotify_user.c#L402>
next_wd: AtomicU32::new(1),
watch_map: SpinLock::new(HashMap::new()),
read_mutex: Mutex::new(()),
is_nonblocking: AtomicBool::new(is_nonblocking),
event_queue: SpinLock::new(VecDeque::new()),
queue_capacity: DEFAULT_MAX_QUEUED_EVENTS,
@ -212,16 +215,14 @@ impl InotifyFile {
let wd = subscriber.wd();
let new_event = InotifyEvent::new(wd, event, 0, name);
{
'notify: {
let mut event_queue = self.event_queue.lock();
if let Some(last_event) = event_queue.back()
&& can_merge_events(last_event, &new_event)
{
event_queue.pop_back();
event_queue.push_back(new_event);
// New or merged event makes the file readable
self.pollee.notify(IoEvents::IN);
return;
break 'notify;
}
// If the queue is full, drop the event.
@ -232,13 +233,21 @@ impl InotifyFile {
event_queue.push_back(new_event);
}
// The new event or the merged event makes the file readable.
self.pollee.notify(IoEvents::IN);
}
/// Pops an event from the notification queue.
fn pop_event(&self) -> Option<InotifyEvent> {
let mut event_queue = self.event_queue.lock();
event_queue.pop_front()
let event = event_queue.pop_front();
// Invalidate when the queue is empty.
if event_queue.is_empty() {
self.pollee.invalidate();
}
event
}
/// Gets the total size of all events in the notification queue.
@ -255,6 +264,10 @@ impl InotifyFile {
return_errno_with_message!(Errno::EINVAL, "buffer is too small");
}
// This ensures that we report continuous events even when the user program attempts to
// call `read()` concurrently.
let _guard = self.read_mutex.lock();
let mut size = 0;
let mut consumed_events = 0;
@ -264,28 +277,32 @@ impl InotifyFile {
size += event_size;
consumed_events += 1;
}
Err(e) => {
Err(err) => {
// This won't reorder events due to `_guard`.
self.event_queue.lock().push_front(event);
if consumed_events == 0 {
return Err(e);
return Err(err);
}
break;
return Ok(size);
}
}
}
if consumed_events == 0 {
return_errno_with_message!(Errno::EAGAIN, "no inotify events available");
return_errno_with_message!(Errno::EAGAIN, "no inotify events are available");
}
// Only invalidate if the queue is empty after reading
let queue_empty = self.event_queue.lock().is_empty();
if queue_empty {
self.pollee.invalidate();
}
Ok(size)
}
fn check_io_events(&self) -> IoEvents {
if self.event_queue.lock().is_empty() {
IoEvents::empty()
} else {
IoEvents::IN
}
}
fn this(&self) -> Arc<InotifyFile> {
self.this.upgrade().unwrap()
}
@ -293,13 +310,8 @@ impl InotifyFile {
impl Pollable for InotifyFile {
fn poll(&self, mask: IoEvents, poller: Option<&mut PollHandle>) -> IoEvents {
self.pollee.poll_with(mask, poller, || {
if self.event_queue.lock().is_empty() {
IoEvents::empty()
} else {
IoEvents::IN
}
})
self.pollee
.poll_with(mask, poller, || self.check_io_events())
}
}