From bdf38a8dc2cb8e92dfd1f69f55b4d75b7679fba2 Mon Sep 17 00:00:00 2001 From: LoGin Date: Thu, 5 Feb 2026 11:24:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(kernel):=20=E6=94=B9=E8=BF=9B=E5=86=85?= =?UTF-8?q?=E6=A0=B8=E7=BA=BF=E7=A8=8B=E7=AE=A1=E7=90=86=E5=92=8C=E7=BD=91?= =?UTF-8?q?=E7=BB=9C=E5=91=BD=E5=90=8D=E7=A9=BA=E9=97=B4=E8=BD=AE=E8=AF=A2?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=20(#1764)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(kernel): 改进内核线程管理和网络命名空间轮询线程 - 新增`KernelThreadMechanism::request_stop`方法,支持请求内核线程退出 - 新增`KernelThreadMechanism::reap_zombie_kthreads`方法,用于回收僵尸内核线程 - 重构网络命名空间轮询线程为`NetnsPoller`结构,支持线程生命周期管理 - 网络命名空间销毁时自动停止轮询线程,避免资源泄漏 - 修复`ProcessControlBlock::is_kthread`方法的实现 - 优化内核线程退出时的唤醒逻辑,确保父进程能及时回收 Signed-off-by: longjin --- kernel/src/process/kthread.rs | 51 ++++ kernel/src/process/mod.rs | 7 +- kernel/src/process/namespace/net_namespace.rs | 285 +++++++++++------- 3 files changed, 233 insertions(+), 110 deletions(-) diff --git a/kernel/src/process/kthread.rs b/kernel/src/process/kthread.rs index 5aa8b80c3..7a2ec20de 100644 --- a/kernel/src/process/kthread.rs +++ b/kernel/src/process/kthread.rs @@ -412,6 +412,32 @@ impl KernelThreadMechanism { } } + /// 请求一个内核线程退出(不等待其真正退出) + #[allow(dead_code)] + pub fn request_stop(pcb: &Arc) -> Result<(), SystemError> { + if !pcb.flags().contains(ProcessFlags::KTHREAD) { + panic!("Cannt stop a non-kthread process"); + } + + let mut worker_private = pcb.worker_private(); + assert!( + worker_private.is_some(), + "kthread request_stop: worker_private is none, pid: {:?}", + pcb.raw_pid() + ); + worker_private + .as_mut() + .unwrap() + .kernel_thread_mut() + .expect("Error type of worker private") + .flags + .insert(KernelThreadFlags::SHOULD_STOP); + + drop(worker_private); + ProcessManager::wakeup(pcb).ok(); + Ok(()) + } + /// 判断一个内核线程是否应当停止 /// /// ## 参数 @@ -458,6 +484,7 @@ impl KernelThreadMechanism { } // 设置为kthread current_pcb.flags().insert(ProcessFlags::KTHREAD); + let kthreadd_pcb = current_pcb.clone(); drop(current_pcb); loop { @@ -478,12 +505,36 @@ impl KernelThreadMechanism { } drop(list); + Self::reap_zombie_kthreads(&kthreadd_pcb); + let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() }; ProcessManager::mark_sleep(true).ok(); drop(irq_guard); schedule(SchedMode::SM_NONE); } } + + fn reap_zombie_kthreads(current_pcb: &Arc) { + let child_pids = { + let guard = current_pcb.children_read_irqsave(); + guard.clone() + }; + + for pid in child_pids { + let Some(task) = ProcessManager::find_task_by_vpid(pid) else { + continue; + }; + if !task.is_kthread() || !task.is_zombie() { + continue; + } + if task.try_mark_dead_from_zombie() { + unsafe { + ProcessManager::release(task.raw_pid()); + } + } + drop(task); + } + } } /// 内核线程启动的第二阶段 diff --git a/kernel/src/process/mod.rs b/kernel/src/process/mod.rs index de4b00ae6..c9153b845 100644 --- a/kernel/src/process/mod.rs +++ b/kernel/src/process/mod.rs @@ -589,6 +589,11 @@ impl ProcessManager { .wait_queue .wakeup_all(Some(ProcessState::Blocked(true))); + // kthread 退出时显式唤醒 kthreadd,使其回收 zombie + if current.is_kthread() { + let _ = ProcessManager::wakeup(&parent_pcb); + } + // 根据 Linux wait 语义,线程组中的任何线程都可以等待同一线程组中任何线程创建的子进程。 // 由于子进程被添加到线程组 leader 的 children 列表中, // 因此还需要唤醒线程组 leader 的 wait_queue(如果 leader 不是 parent_pcb 本身)。 @@ -1213,7 +1218,7 @@ impl ProcessControlBlock { /// /// 若进程是内核进程则返回true 否则返回false pub fn is_kthread(&self) -> bool { - return matches!(self.flags(), &mut ProcessFlags::KTHREAD); + self.flags().contains(ProcessFlags::KTHREAD) } #[inline(never)] diff --git a/kernel/src/process/namespace/net_namespace.rs b/kernel/src/process/namespace/net_namespace.rs index 6c792b321..feb555004 100644 --- a/kernel/src/process/namespace/net_namespace.rs +++ b/kernel/src/process/namespace/net_namespace.rs @@ -11,7 +11,8 @@ use crate::net::socket::netlink::table::{ use crate::net::socket::unix::ns::UnixAbstractTable; use crate::process::fork::CloneFlags; use crate::process::kthread::{KernelThreadClosure, KernelThreadMechanism}; -use crate::process::namespace::{NamespaceOps, NamespaceType}; +use crate::process::namespace::{nsproxy::NsProxy, NamespaceOps, NamespaceType}; +use crate::process::ProcessControlBlock; use crate::process::ProcessManager; use crate::time::{Duration, Instant}; use crate::{ @@ -64,13 +65,9 @@ pub struct NetNamespace { self_ref: Weak, _user_ns: Arc, inner: RwLock, - /// # 用于唤醒网络轮询线程的等待队列 - /// 使用 WaitQueue 的 Waiter/Waker 机制避免唤醒丢失 - poll_wait_queue: WaitQueue, - /// # 标记是否有待处理的网络事件 - /// 用于避免唤醒丢失:当 poll 线程正在 poll 时收到的唤醒请求会设置此标志, - /// poll 线程在进入等待前会检查此标志 - poll_pending: AtomicBool, + /// # 轮询线程控制器 + /// 使用弱引用避免 poll 线程持有 netns 强引用,阻止 Drop + poller: Arc, /// # 当前网络命名空间下所有网络接口的列表 /// 该列表仅应在 **进程上下文** 使用(可睡眠),避免在 hardirq 上下文遍历/加锁。 /// hardirq 应仅做 `napi_schedule()`(见 `driver/net/irq_handle.rs`)。 @@ -79,7 +76,7 @@ pub struct NetNamespace { /// 因此这里使用可睡眠的 `RwSem`,避免自旋锁 + schedule 的组合导致崩溃。 device_list: RwSem>>, ///当前网络命名空间下的桥接设备列表 - bridge_list: RwLock>>, + bridge_list: RwSem>>, // -- Netlink -- /// # 当前网络命名空间下的 Netlink 套接字表 @@ -87,7 +84,7 @@ pub struct NetNamespace { netlink_socket_table: NetlinkSocketTable, /// # 当前网络命名空间下的 Netlink 内核套接字 /// 负责接收并处理 Netlink 消息 - netlink_kernel_socket: RwLock>>, + netlink_kernel_socket: RwSem>>, /// AF_UNIX abstract namespace table (scoped to this netns). unix_abstract_table: Arc, @@ -105,6 +102,160 @@ pub struct InnerNetNamespace { default_iface: Option>, } +#[derive(Debug)] +struct NetnsPoller { + netns: Weak, + /// # 用于唤醒网络轮询线程的等待队列 + /// 使用 WaitQueue 的 Waiter/Waker 机制避免唤醒丢失 + wait_queue: WaitQueue, + /// # 标记是否有待处理的网络事件 + /// 用于避免唤醒丢失:当 poll 线程正在 poll 时收到的唤醒请求会设置此标志, + /// poll 线程在进入等待前会检查此标志 + poll_pending: AtomicBool, + /// # 轮询线程的 PCB(用于 stop) + thread: RwSem>>, +} + +impl NetnsPoller { + fn new(netns: Weak) -> Arc { + Arc::new(Self { + netns, + wait_queue: WaitQueue::default(), + poll_pending: AtomicBool::new(false), + thread: RwSem::new(None), + }) + } + + fn start(self: &Arc, name: String) { + let poller = self.clone(); + let closure: Box i32 + Send + Sync> = Box::new(move || { + poller.polling(); + 0 + }); + let pcb = KernelThreadMechanism::create_and_run( + KernelThreadClosure::EmptyClosure((closure, ())), + name, + ) + .expect("create net_poll thread for net namespace failed"); + // 避免轮询线程通过 nsproxy 持有 netns 强引用导致无法释放 + pcb.set_nsproxy(NsProxy::new_root()); + *self.thread.write() = Some(pcb); + } + + fn stop(&self) { + let pcb = self.thread.write().take(); + if let Some(pcb) = pcb { + // 唤醒等待中的 poll 线程,确保其能看到 should_stop 标志。 + // + // 重要:stop 可能由 poller 线程自身触发(例如 poller 线程释放最后一个 netns Arc, + // 进入 NetNamespace::drop)。此时也必须设置 pending 并唤醒/自唤醒,避免在 timeout=None + // 的 wait_event 上永久睡眠。 + self.poll_pending.store(true, Ordering::Release); + self.wait_queue.wake_all(); + let _ = KernelThreadMechanism::request_stop(&pcb); + } + } + + fn polling(&self) { + loop { + if KernelThreadMechanism::should_stop(&ProcessManager::current_pcb()) { + break; + } + + let netns = match self.netns.upgrade() { + Some(netns) => netns, + None => { + log::info!("netns poller exit: netns dropped"); + break; + } + }; + + let nsid = netns.ns_common.nsid.data(); + let now_us = Instant::now().total_micros() as u64; + + // 处理“已到期的定时事件”:到期则 schedule NAPI 推进一次。 + // 同时计算下一次最早到期时间点,用于设置 sleep 超时。 + let mut next_us: Option = None; + let mut had_due = false; + for (_, iface) in netns.device_list.read().iter() { + if let Some(us) = iface.common().poll_at_us() { + if us <= now_us { + had_due = true; + if let Some(napi) = iface.napi_struct() { + napi_schedule(napi); + } else { + // 兜底:若未配置 NAPI,则仍调用一次 poll 推进(可能无界)。 + let _ = iface.poll(); + } + continue; + } + + next_us = Some(match next_us { + Some(cur) => core::cmp::min(cur, us), + None => us, + }); + } + } + + // sleep 超时: + // - 若刚处理了 due timer:小睡一会儿,避免在 NAPI 尚未推进/更新时间戳前重复 schedule 形成忙等 + // - 否则按最早 deadline 精确睡眠 + let timeout = if had_due { + Some(Duration::from_micros(200)) + } else { + next_us.map(|us| { + let delta = us.saturating_sub(now_us); + Duration::from_micros(core::cmp::max(1, delta)) + }) + }; + + log::trace!( + "netns scheduler sleep: nsid={} timeout_us={:?}", + nsid, + timeout.map(|d| d.total_micros()) + ); + + // 释放 netns 引用再进入等待,避免 poll 线程长期持有 netns 阻止 Drop。 + drop(netns); + + // 等待事件唤醒(IRQ/lo Tx 等)或 timeout(smoltcp timer deadline)。 + // cond 使用 swap(false) 原子消费一次 pending,避免丢唤醒。 + let woke_by_event = match self.wait_queue.wait_event_uninterruptible_timeout( + || self.poll_pending.swap(false, Ordering::AcqRel), + timeout, + ) { + Ok(()) => true, + Err(SystemError::EAGAIN_OR_EWOULDBLOCK) => false, + Err(e) => { + log::warn!("netns scheduler sleep error: {:?}", e); + false + } + }; + + if woke_by_event { + if KernelThreadMechanism::should_stop(&ProcessManager::current_pcb()) { + break; + } + let netns = match self.netns.upgrade() { + Some(netns) => netns, + None => { + break; + } + }; + + // 事件驱动:尽量只 schedule 一次即可,由 NAPI 线程以 bounded poll 推进。 + for (_, iface) in netns.device_list.read().iter() { + if let Some(napi) = iface.napi_struct() { + napi_schedule(napi); + } else { + let _ = iface.poll(); + } + } + } + } + } +} + impl InnerNetNamespace { pub fn router(&self) -> &Arc { &self.router @@ -132,12 +283,11 @@ impl NetNamespace { self_ref: self_ref.clone(), _user_ns: crate::process::namespace::user_namespace::INIT_USER_NAMESPACE.clone(), inner: RwLock::new(inner), - poll_wait_queue: WaitQueue::default(), - poll_pending: AtomicBool::new(false), + poller: NetnsPoller::new(self_ref.clone()), device_list: RwSem::new(BTreeMap::new()), - bridge_list: RwLock::new(BTreeMap::new()), + bridge_list: RwSem::new(BTreeMap::new()), netlink_socket_table: NetlinkSocketTable::default(), - netlink_kernel_socket: RwLock::new(generate_supported_netlink_kernel_sockets()), + netlink_kernel_socket: RwSem::new(generate_supported_netlink_kernel_sockets()), unix_abstract_table: unix_abstract_table.clone(), local_port_range: AtomicU32::new( crate::net::socket::inet::common::port::DEFAULT_LOCAL_PORT_RANGE, @@ -167,12 +317,11 @@ impl NetNamespace { self_ref: self_ref.clone(), _user_ns: user_ns, inner: RwLock::new(inner), - poll_wait_queue: WaitQueue::default(), - poll_pending: AtomicBool::new(false), + poller: NetnsPoller::new(self_ref.clone()), device_list: RwSem::new(BTreeMap::new()), - bridge_list: RwLock::new(BTreeMap::new()), + bridge_list: RwSem::new(BTreeMap::new()), netlink_socket_table: NetlinkSocketTable::default(), - netlink_kernel_socket: RwLock::new(generate_supported_netlink_kernel_sockets()), + netlink_kernel_socket: RwSem::new(generate_supported_netlink_kernel_sockets()), unix_abstract_table: unix_abstract_table.clone(), local_port_range: AtomicU32::new( crate::net::socket::inet::common::port::DEFAULT_LOCAL_PORT_RANGE, @@ -301,8 +450,8 @@ impl NetNamespace { /// 使用原子标志确保即使 poll 线程正在执行也不会丢失唤醒请求 pub fn wakeup_poll_thread(&self) { // 先设置 pending 标志,再唤醒:避免“先唤后睡/睡前漏信号”。 - let was_pending = self.poll_pending.swap(true, Ordering::AcqRel); - let woken = self.poll_wait_queue.wake_all(); + let was_pending = self.poller.poll_pending.swap(true, Ordering::AcqRel); + let woken = self.poller.wait_queue.wake_all(); // 事件驱动:对齐 Linux,尽量在事件发生后立刻 schedule NAPI(由 NAPI 线程 bounded poll 推进)。 // 只在从“未 pending -> pending”这一跳触发一次,避免中断风暴下重复 schedule。 if !was_pending { @@ -315,96 +464,8 @@ impl NetNamespace { } } - /// # 网络命名空间的轮询线程 - /// 该线程负责“调度网络推进”,对齐 Linux 的“事件驱动 + 定时器驱动”模型: - /// - 事件(IRQ/lo Tx 等)到来:唤醒本线程 -> schedule NAPI - /// - smoltcp 定时器到期(poll_at):超时唤醒 -> schedule NAPI - /// - /// 注意:实际的协议栈推进工作由 NAPI 线程执行(bounded poll),避免本线程扫描+无界处理导致卡顿。 - /// 注意: 此方法仅可在初始化当前net namespace时创建进程使用 - fn polling(&self) { - loop { - let now_us = Instant::now().total_micros() as u64; - - // 处理“已到期的定时事件”:到期则 schedule NAPI 推进一次。 - // 同时计算下一次最早到期时间点,用于设置 sleep 超时。 - let mut next_us: Option = None; - let mut had_due = false; - for (_, iface) in self.device_list.read().iter() { - if let Some(us) = iface.common().poll_at_us() { - if us <= now_us { - had_due = true; - if let Some(napi) = iface.napi_struct() { - napi_schedule(napi); - } else { - // 兜底:若未配置 NAPI,则仍调用一次 poll 推进(可能无界)。 - let _ = iface.poll(); - } - continue; - } - - next_us = Some(match next_us { - Some(cur) => core::cmp::min(cur, us), - None => us, - }); - } - } - - // sleep 超时: - // - 若刚处理了 due timer:小睡一会儿,避免在 NAPI 尚未推进/更新时间戳前重复 schedule 形成忙等 - // - 否则按最早 deadline 精确睡眠 - let timeout = if had_due { - Some(Duration::from_micros(200)) - } else { - next_us.map(|us| { - let delta = us.saturating_sub(now_us); - Duration::from_micros(core::cmp::max(1, delta)) - }) - }; - - log::trace!( - "netns scheduler sleep: nsid={} timeout_us={:?}", - self.ns_common.nsid.data(), - timeout.map(|d| d.total_micros()) - ); - - // 等待事件唤醒(IRQ/lo Tx 等)或 timeout(smoltcp timer deadline)。 - // cond 使用 swap(false) 原子消费一次 pending,避免丢唤醒。 - let woke_by_event = match self.poll_wait_queue.wait_event_uninterruptible_timeout( - || self.poll_pending.swap(false, Ordering::AcqRel), - timeout, - ) { - Ok(()) => true, - Err(SystemError::EAGAIN_OR_EWOULDBLOCK) => false, - Err(e) => { - log::warn!("netns scheduler sleep error: {:?}", e); - false - } - }; - - if woke_by_event { - // 事件驱动:尽量只 schedule 一次即可,由 NAPI 线程以 bounded poll 推进。 - for (_, iface) in self.device_list.read().iter() { - if let Some(napi) = iface.napi_struct() { - napi_schedule(napi); - } else { - let _ = iface.poll(); - } - } - } - } - } - fn create_polling_thread(netns: Arc, name: String) { - let closure: Box i32 + Send + Sync> = Box::new(move || { - netns.polling(); - 0 - }); - let pcb = KernelThreadClosure::EmptyClosure((closure, ())); - - KernelThreadMechanism::create_and_run(pcb, name) - .expect("create net_poll thread for net namespace failed"); - log::info!("net_poll thread created for namespace"); + netns.poller.start(name); } } @@ -414,6 +475,12 @@ impl NamespaceOps for NetNamespace { } } +impl Drop for NetNamespace { + fn drop(&mut self) { + self.poller.stop(); + } +} + impl ProcessManager { pub fn current_netns() -> Arc { Self::current_pcb().nsproxy.read().net_ns.clone()