From ff5082c1ea0482f2caaf03e8afd87edd22a3d47a Mon Sep 17 00:00:00 2001 From: LoGin Date: Wed, 21 Jan 2026 13:20:30 +0800 Subject: [PATCH] =?UTF-8?q?fix(net/tcp):=20=E4=BF=AE=E5=A4=8DTCP=E5=A5=97?= =?UTF-8?q?=E6=8E=A5=E5=AD=97=E5=85=B3=E9=97=AD=E5=92=8C=E6=8E=A5=E6=94=B6?= =?UTF-8?q?=E9=80=BB=E8=BE=91=20(#1690)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(net/tcp): 修复TCP套接字关闭和接收逻辑 - 修复TCP套接字关闭时未处理未读数据的问题 - 修复接收逻辑中状态检查错误返回ECONNRESET的问题 - 添加cork缓冲区刷新互斥锁防止重复刷新 Signed-off-by: longjin * feat(net/tcp): 实现TCP keepalive和linger选项支持 - 新增TCP_KEEPIDLE、TCP_KEEPINTVL、TCP_KEEPCNT选项的读写支持 - 实现SO_LINGER选项设置,支持立即中止连接 - 修改连接关闭逻辑,当linger_abort为true时立即中止连接 - 添加相关选项的默认值和验证逻辑 Signed-off-by: longjin --------- Signed-off-by: longjin --- kernel/src/net/socket/inet/stream/events.rs | 11 +-- kernel/src/net/socket/inet/stream/io.rs | 47 ++++++++----- .../src/net/socket/inet/stream/lifecycle.rs | 21 ++++-- kernel/src/net/socket/inet/stream/option.rs | 70 ++++++++++++++++--- .../src/net/socket/inet/stream/stream_core.rs | 45 ++++++++++++ 5 files changed, 158 insertions(+), 36 deletions(-) diff --git a/kernel/src/net/socket/inet/stream/events.rs b/kernel/src/net/socket/inet/stream/events.rs index cbde42a8f..e52bd39e3 100644 --- a/kernel/src/net/socket/inet/stream/events.rs +++ b/kernel/src/net/socket/inet/stream/events.rs @@ -7,11 +7,14 @@ impl TcpSocket { pub(crate) fn update_events(&self) -> bool { // If cork was disabled or SHUT_WR was requested while there are still cork-buffered bytes, // opportunistically flush them so the data does not become unreachable and FIN can be sent. - if (!self - .options - .tcp_cork + if !self + .cork_flush_in_progress .load(core::sync::atomic::Ordering::Relaxed) - || self.is_send_shutdown()) + && (!self + .options + .tcp_cork + .load(core::sync::atomic::Ordering::Relaxed) + || self.is_send_shutdown()) && !self.cork_buf.lock().is_empty() { let _ = self.flush_cork_buffer(); diff --git a/kernel/src/net/socket/inet/stream/io.rs b/kernel/src/net/socket/inet/stream/io.rs index 695750601..06b37f65d 100644 --- a/kernel/src/net/socket/inet/stream/io.rs +++ b/kernel/src/net/socket/inet/stream/io.rs @@ -64,22 +64,14 @@ impl TcpSocket { } if !socket.can_recv() { - // Linux 语义:对端已关闭写端(收到 FIN)且本端已读完数据时,recv 返回 0。 - // 如果状态表明已收到 FIN,即使 buffer 为空也应返回 0 (EOF)。 - let state = socket.state(); - if matches!( - state, - smoltcp::socket::tcp::State::CloseWait - | smoltcp::socket::tcp::State::LastAck - | smoltcp::socket::tcp::State::Closing - | smoltcp::socket::tcp::State::TimeWait - | smoltcp::socket::tcp::State::Closed - ) { - return Ok(0); - } - if !socket.may_recv() { - return Ok(0); + return match socket.recv(|_data| (0usize, ())) { + Ok(()) => Ok(0), + Err(smoltcp::socket::tcp::RecvError::Finished) => Ok(0), + Err(smoltcp::socket::tcp::RecvError::InvalidState) => { + Err(SystemError::ECONNRESET) + } + }; } return Err(SystemError::EAGAIN_OR_EWOULDBLOCK); } @@ -133,7 +125,7 @@ impl TcpSocket { if flags.contains(PMSG::PEEK) { return match socket.peek_slice(current_buf) { Ok(size) => Ok(size), - Err(smoltcp::socket::tcp::RecvError::InvalidState) => Err(SystemError::ENOTCONN), + Err(smoltcp::socket::tcp::RecvError::InvalidState) => Err(SystemError::ECONNRESET), Err(smoltcp::socket::tcp::RecvError::Finished) => Ok(0), }; } @@ -381,6 +373,20 @@ impl TcpSocket { } pub(crate) fn flush_cork_buffer(&self) -> Result<(), SystemError> { + if self + .cork_flush_in_progress + .compare_exchange( + false, + true, + core::sync::atomic::Ordering::Acquire, + core::sync::atomic::Ordering::Relaxed, + ) + .is_err() + { + return Ok(()); + } + let _guard = CorkFlushGuard(self); + loop { let cork_buf = self.cork_buf.lock(); if cork_buf.is_empty() { @@ -504,3 +510,12 @@ impl TcpSocket { Err(SystemError::ENOTCONN) } } + +struct CorkFlushGuard<'a>(&'a TcpSocket); +impl Drop for CorkFlushGuard<'_> { + fn drop(&mut self) { + self.0 + .cork_flush_in_progress + .store(false, core::sync::atomic::Ordering::Release); + } +} diff --git a/kernel/src/net/socket/inet/stream/lifecycle.rs b/kernel/src/net/socket/inet/stream/lifecycle.rs index 5cac8e6d5..60eab4b97 100644 --- a/kernel/src/net/socket/inet/stream/lifecycle.rs +++ b/kernel/src/net/socket/inet/stream/lifecycle.rs @@ -229,10 +229,7 @@ impl TcpSocket { let inner = write_state.take().expect("Tcp inner::Inner is None"); let (replace, result) = match inner { inner::Inner::Connecting(conn) => conn.into_result(), - inner::Inner::Established(es) => { - log::warn!("TODO: check new established"); - (inner::Inner::Established(es), Ok(())) - } // TODO check established + inner::Inner::Established(es) => (inner::Inner::Established(es), Ok(())), // TODO check established inner::Inner::SelfConnected(sc) => (inner::Inner::SelfConnected(sc), Ok(())), _ => { log::warn!("TODO: connecting socket error options"); @@ -459,7 +456,21 @@ impl TcpSocket { let local_port = es.get_name().port; let iface = es.iface().clone(); let me: alloc::sync::Weak = self.self_ref.clone(); - es.close(); + let linger_abort = self + .linger_onoff() + .load(core::sync::atomic::Ordering::Relaxed) + != 0 + && self + .linger_linger() + .load(core::sync::atomic::Ordering::Relaxed) + == 0; + let unread = es.with(|socket| socket.recv_queue()); + if linger_abort || unread > 0 { + es.with_mut(|socket| socket.abort()); + es.iface().poll(); + } else { + es.close(); + } iface.common().defer_tcp_close(handle, local_port, me); writer.replace(inner::Inner::Established(es)); } diff --git a/kernel/src/net/socket/inet/stream/option.rs b/kernel/src/net/socket/inet/stream/option.rs index b086740b4..08fbdbad0 100644 --- a/kernel/src/net/socket/inet/stream/option.rs +++ b/kernel/src/net/socket/inet/stream/option.rs @@ -105,6 +105,10 @@ impl From for i32 { /// TCP socket option setters. impl super::TcpSocket { + const MAX_TCP_KEEPIDLE: i32 = 32767; + const MAX_TCP_KEEPINTVL: i32 = 32767; + const MAX_TCP_KEEPCNT: i32 = 127; + #[inline] fn set_bool_option( atomic: &core::sync::atomic::AtomicBool, @@ -295,13 +299,33 @@ impl super::TcpSocket { } PSO::KEEPALIVE => Self::set_bool_option(self.so_keepalive_enabled(), val, |on| { let interval = if on { - Some(smoltcp::time::Duration::from_secs(7200)) + let idle = self + .tcp_keepidle_secs() + .load(core::sync::atomic::Ordering::Relaxed); + Some(smoltcp::time::Duration::from_secs(idle.max(1) as u64)) } else { None }; self.apply_keepalive(interval); Ok(()) }), + PSO::LINGER => { + if val.len() < 8 { + return Err(SystemError::EINVAL); + } + let l_onoff = i32::from_ne_bytes([val[0], val[1], val[2], val[3]]); + let l_linger = i32::from_ne_bytes([val[4], val[5], val[6], val[7]]); + if l_linger < 0 { + return Err(SystemError::EINVAL); + } + self.linger_onoff().store( + if l_onoff != 0 { 1 } else { 0 }, + core::sync::atomic::Ordering::Relaxed, + ); + self.linger_linger() + .store(l_linger, core::sync::atomic::Ordering::Relaxed); + Ok(()) + } _ => Ok(()), // Accept and ignore other SOL_SOCKET options } } @@ -329,18 +353,39 @@ impl super::TcpSocket { })?; Ok(()) } - Options::KeepIntvl => { - let interval = byte_parser::read_u32(val)?; - self.with_inner_established(|est| { - est.with_mut(|socket| { - socket.set_keep_alive(Some(smoltcp::time::Duration::from_secs( - interval as u64, - ))); - }); - })?; + Options::KeepIdle => { + let v = byte_parser::read_i32(val)?; + if !(1..=Self::MAX_TCP_KEEPIDLE).contains(&v) { + return Err(SystemError::EINVAL); + } + self.tcp_keepidle_secs() + .store(v, core::sync::atomic::Ordering::Relaxed); + if self + .so_keepalive_enabled() + .load(core::sync::atomic::Ordering::Relaxed) + { + self.apply_keepalive(Some(smoltcp::time::Duration::from_secs(v as u64))); + } + Ok(()) + } + Options::KeepIntvl => { + let v = byte_parser::read_i32(val)?; + if !(1..=Self::MAX_TCP_KEEPINTVL).contains(&v) { + return Err(SystemError::EINVAL); + } + self.tcp_keepintvl_secs() + .store(v, core::sync::atomic::Ordering::Relaxed); + Ok(()) + } + Options::KeepCnt => { + let v = byte_parser::read_i32(val)?; + if !(1..=Self::MAX_TCP_KEEPCNT).contains(&v) { + return Err(SystemError::EINVAL); + } + self.tcp_keepcnt() + .store(v, core::sync::atomic::Ordering::Relaxed); Ok(()) } - Options::KeepCnt | Options::KeepIdle => Ok(()), // Stub: silently ignore Options::INQ => Self::set_bool_option(self.tcp_inq_enabled(), val, |_| Ok(())), Options::QuickAck => { Self::set_bool_option(self.tcp_quickack_enabled(), val, |_| Ok(())) @@ -516,6 +561,9 @@ impl super::TcpSocket { Options::MaxSegment => Self::write_atomic_usize_as_u32(value, self.tcp_max_seg()), Options::DeferAccept => Self::write_atomic_i32(value, self.tcp_defer_accept()), Options::Syncnt => Self::write_atomic_i32(value, self.tcp_syncnt()), + Options::KeepIdle => Self::write_atomic_i32(value, self.tcp_keepidle_secs()), + Options::KeepIntvl => Self::write_atomic_i32(value, self.tcp_keepintvl_secs()), + Options::KeepCnt => Self::write_atomic_i32(value, self.tcp_keepcnt()), Options::WindowClamp => Self::write_atomic_usize_as_u32(value, self.tcp_window_clamp()), Options::UserTimeout => Self::write_atomic_i32(value, self.tcp_user_timeout()), Options::Info => self.get_tcp_info(value), diff --git a/kernel/src/net/socket/inet/stream/stream_core.rs b/kernel/src/net/socket/inet/stream/stream_core.rs index 391c2a57b..1dc51a556 100644 --- a/kernel/src/net/socket/inet/stream/stream_core.rs +++ b/kernel/src/net/socket/inet/stream/stream_core.rs @@ -55,6 +55,17 @@ pub struct TcpSocketOptions { pub(crate) tcp_quickack: AtomicBool, /// SO_KEEPALIVE pub(crate) so_keepalive: AtomicBool, + + /// TCP_KEEPIDLE (seconds) + pub(crate) tcp_keepidle_secs: AtomicI32, + /// TCP_KEEPINTVL (seconds) + pub(crate) tcp_keepintvl_secs: AtomicI32, + /// TCP_KEEPCNT + pub(crate) tcp_keepcnt: AtomicI32, + + /// SO_LINGER + pub(crate) linger_onoff: AtomicI32, + pub(crate) linger_linger: AtomicI32, } impl TcpSocketOptions { @@ -79,6 +90,13 @@ impl TcpSocketOptions { tcp_cork: AtomicBool::new(false), tcp_quickack: AtomicBool::new(true), so_keepalive: AtomicBool::new(false), + + tcp_keepidle_secs: AtomicI32::new(2 * 60 * 60), + tcp_keepintvl_secs: AtomicI32::new(75), + tcp_keepcnt: AtomicI32::new(9), + + linger_onoff: AtomicI32::new(0), + linger_linger: AtomicI32::new(0), } } } @@ -102,6 +120,7 @@ pub struct TcpSocket { pub(crate) fasync_items: FAsyncItems, pub(crate) options: TcpSocketOptions, pub(crate) cork_buf: Mutex>, + pub(crate) cork_flush_in_progress: AtomicBool, pub(crate) recv_shutdown: ShutdownRecvTracker, } @@ -128,6 +147,7 @@ impl TcpSocket { fasync_items: FAsyncItems::default(), options: TcpSocketOptions::new(), cork_buf: Mutex::new(Vec::new()), + cork_flush_in_progress: AtomicBool::new(false), recv_shutdown: ShutdownRecvTracker::new(), } } @@ -272,6 +292,31 @@ impl TcpSocket { &self.options.so_keepalive } + #[inline] + pub(crate) fn tcp_keepidle_secs(&self) -> &AtomicI32 { + &self.options.tcp_keepidle_secs + } + + #[inline] + pub(crate) fn tcp_keepintvl_secs(&self) -> &AtomicI32 { + &self.options.tcp_keepintvl_secs + } + + #[inline] + pub(crate) fn tcp_keepcnt(&self) -> &AtomicI32 { + &self.options.tcp_keepcnt + } + + #[inline] + pub(crate) fn linger_onoff(&self) -> &AtomicI32 { + &self.options.linger_onoff + } + + #[inline] + pub(crate) fn linger_linger(&self) -> &AtomicI32 { + &self.options.linger_linger + } + #[inline] pub(crate) fn send_buf_size(&self) -> &AtomicUsize { &self.options.send_buf_size