fix(net/tcp): 修复TCP套接字关闭和接收逻辑 (#1690)

* fix(net/tcp): 修复TCP套接字关闭和接收逻辑

- 修复TCP套接字关闭时未处理未读数据的问题
- 修复接收逻辑中状态检查错误返回ECONNRESET的问题
- 添加cork缓冲区刷新互斥锁防止重复刷新

Signed-off-by: longjin <longjin@DragonOS.org>

* feat(net/tcp): 实现TCP keepalive和linger选项支持

- 新增TCP_KEEPIDLE、TCP_KEEPINTVL、TCP_KEEPCNT选项的读写支持
- 实现SO_LINGER选项设置,支持立即中止连接
- 修改连接关闭逻辑,当linger_abort为true时立即中止连接
- 添加相关选项的默认值和验证逻辑

Signed-off-by: longjin <longjin@DragonOS.org>

---------

Signed-off-by: longjin <longjin@DragonOS.org>
This commit is contained in:
LoGin 2026-01-21 13:20:30 +08:00 committed by GitHub
parent 1ee52548ed
commit ff5082c1ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 158 additions and 36 deletions

View File

@ -7,11 +7,14 @@ impl TcpSocket {
pub(crate) fn update_events(&self) -> bool {
// If cork was disabled or SHUT_WR was requested while there are still cork-buffered bytes,
// opportunistically flush them so the data does not become unreachable and FIN can be sent.
if (!self
.options
.tcp_cork
if !self
.cork_flush_in_progress
.load(core::sync::atomic::Ordering::Relaxed)
|| self.is_send_shutdown())
&& (!self
.options
.tcp_cork
.load(core::sync::atomic::Ordering::Relaxed)
|| self.is_send_shutdown())
&& !self.cork_buf.lock().is_empty()
{
let _ = self.flush_cork_buffer();

View File

@ -64,22 +64,14 @@ impl TcpSocket {
}
if !socket.can_recv() {
// Linux 语义:对端已关闭写端(收到 FIN)且本端已读完数据时recv 返回 0。
// 如果状态表明已收到 FIN即使 buffer 为空也应返回 0 (EOF)。
let state = socket.state();
if matches!(
state,
smoltcp::socket::tcp::State::CloseWait
| smoltcp::socket::tcp::State::LastAck
| smoltcp::socket::tcp::State::Closing
| smoltcp::socket::tcp::State::TimeWait
| smoltcp::socket::tcp::State::Closed
) {
return Ok(0);
}
if !socket.may_recv() {
return Ok(0);
return match socket.recv(|_data| (0usize, ())) {
Ok(()) => Ok(0),
Err(smoltcp::socket::tcp::RecvError::Finished) => Ok(0),
Err(smoltcp::socket::tcp::RecvError::InvalidState) => {
Err(SystemError::ECONNRESET)
}
};
}
return Err(SystemError::EAGAIN_OR_EWOULDBLOCK);
}
@ -133,7 +125,7 @@ impl TcpSocket {
if flags.contains(PMSG::PEEK) {
return match socket.peek_slice(current_buf) {
Ok(size) => Ok(size),
Err(smoltcp::socket::tcp::RecvError::InvalidState) => Err(SystemError::ENOTCONN),
Err(smoltcp::socket::tcp::RecvError::InvalidState) => Err(SystemError::ECONNRESET),
Err(smoltcp::socket::tcp::RecvError::Finished) => Ok(0),
};
}
@ -381,6 +373,20 @@ impl TcpSocket {
}
pub(crate) fn flush_cork_buffer(&self) -> Result<(), SystemError> {
if self
.cork_flush_in_progress
.compare_exchange(
false,
true,
core::sync::atomic::Ordering::Acquire,
core::sync::atomic::Ordering::Relaxed,
)
.is_err()
{
return Ok(());
}
let _guard = CorkFlushGuard(self);
loop {
let cork_buf = self.cork_buf.lock();
if cork_buf.is_empty() {
@ -504,3 +510,12 @@ impl TcpSocket {
Err(SystemError::ENOTCONN)
}
}
struct CorkFlushGuard<'a>(&'a TcpSocket);
impl Drop for CorkFlushGuard<'_> {
fn drop(&mut self) {
self.0
.cork_flush_in_progress
.store(false, core::sync::atomic::Ordering::Release);
}
}

View File

@ -229,10 +229,7 @@ impl TcpSocket {
let inner = write_state.take().expect("Tcp inner::Inner is None");
let (replace, result) = match inner {
inner::Inner::Connecting(conn) => conn.into_result(),
inner::Inner::Established(es) => {
log::warn!("TODO: check new established");
(inner::Inner::Established(es), Ok(()))
} // TODO check established
inner::Inner::Established(es) => (inner::Inner::Established(es), Ok(())), // TODO check established
inner::Inner::SelfConnected(sc) => (inner::Inner::SelfConnected(sc), Ok(())),
_ => {
log::warn!("TODO: connecting socket error options");
@ -459,7 +456,21 @@ impl TcpSocket {
let local_port = es.get_name().port;
let iface = es.iface().clone();
let me: alloc::sync::Weak<dyn InetSocket> = self.self_ref.clone();
es.close();
let linger_abort = self
.linger_onoff()
.load(core::sync::atomic::Ordering::Relaxed)
!= 0
&& self
.linger_linger()
.load(core::sync::atomic::Ordering::Relaxed)
== 0;
let unread = es.with(|socket| socket.recv_queue());
if linger_abort || unread > 0 {
es.with_mut(|socket| socket.abort());
es.iface().poll();
} else {
es.close();
}
iface.common().defer_tcp_close(handle, local_port, me);
writer.replace(inner::Inner::Established(es));
}

View File

@ -105,6 +105,10 @@ impl From<Options> for i32 {
/// TCP socket option setters.
impl super::TcpSocket {
const MAX_TCP_KEEPIDLE: i32 = 32767;
const MAX_TCP_KEEPINTVL: i32 = 32767;
const MAX_TCP_KEEPCNT: i32 = 127;
#[inline]
fn set_bool_option(
atomic: &core::sync::atomic::AtomicBool,
@ -295,13 +299,33 @@ impl super::TcpSocket {
}
PSO::KEEPALIVE => Self::set_bool_option(self.so_keepalive_enabled(), val, |on| {
let interval = if on {
Some(smoltcp::time::Duration::from_secs(7200))
let idle = self
.tcp_keepidle_secs()
.load(core::sync::atomic::Ordering::Relaxed);
Some(smoltcp::time::Duration::from_secs(idle.max(1) as u64))
} else {
None
};
self.apply_keepalive(interval);
Ok(())
}),
PSO::LINGER => {
if val.len() < 8 {
return Err(SystemError::EINVAL);
}
let l_onoff = i32::from_ne_bytes([val[0], val[1], val[2], val[3]]);
let l_linger = i32::from_ne_bytes([val[4], val[5], val[6], val[7]]);
if l_linger < 0 {
return Err(SystemError::EINVAL);
}
self.linger_onoff().store(
if l_onoff != 0 { 1 } else { 0 },
core::sync::atomic::Ordering::Relaxed,
);
self.linger_linger()
.store(l_linger, core::sync::atomic::Ordering::Relaxed);
Ok(())
}
_ => Ok(()), // Accept and ignore other SOL_SOCKET options
}
}
@ -329,18 +353,39 @@ impl super::TcpSocket {
})?;
Ok(())
}
Options::KeepIntvl => {
let interval = byte_parser::read_u32(val)?;
self.with_inner_established(|est| {
est.with_mut(|socket| {
socket.set_keep_alive(Some(smoltcp::time::Duration::from_secs(
interval as u64,
)));
});
})?;
Options::KeepIdle => {
let v = byte_parser::read_i32(val)?;
if !(1..=Self::MAX_TCP_KEEPIDLE).contains(&v) {
return Err(SystemError::EINVAL);
}
self.tcp_keepidle_secs()
.store(v, core::sync::atomic::Ordering::Relaxed);
if self
.so_keepalive_enabled()
.load(core::sync::atomic::Ordering::Relaxed)
{
self.apply_keepalive(Some(smoltcp::time::Duration::from_secs(v as u64)));
}
Ok(())
}
Options::KeepIntvl => {
let v = byte_parser::read_i32(val)?;
if !(1..=Self::MAX_TCP_KEEPINTVL).contains(&v) {
return Err(SystemError::EINVAL);
}
self.tcp_keepintvl_secs()
.store(v, core::sync::atomic::Ordering::Relaxed);
Ok(())
}
Options::KeepCnt => {
let v = byte_parser::read_i32(val)?;
if !(1..=Self::MAX_TCP_KEEPCNT).contains(&v) {
return Err(SystemError::EINVAL);
}
self.tcp_keepcnt()
.store(v, core::sync::atomic::Ordering::Relaxed);
Ok(())
}
Options::KeepCnt | Options::KeepIdle => Ok(()), // Stub: silently ignore
Options::INQ => Self::set_bool_option(self.tcp_inq_enabled(), val, |_| Ok(())),
Options::QuickAck => {
Self::set_bool_option(self.tcp_quickack_enabled(), val, |_| Ok(()))
@ -516,6 +561,9 @@ impl super::TcpSocket {
Options::MaxSegment => Self::write_atomic_usize_as_u32(value, self.tcp_max_seg()),
Options::DeferAccept => Self::write_atomic_i32(value, self.tcp_defer_accept()),
Options::Syncnt => Self::write_atomic_i32(value, self.tcp_syncnt()),
Options::KeepIdle => Self::write_atomic_i32(value, self.tcp_keepidle_secs()),
Options::KeepIntvl => Self::write_atomic_i32(value, self.tcp_keepintvl_secs()),
Options::KeepCnt => Self::write_atomic_i32(value, self.tcp_keepcnt()),
Options::WindowClamp => Self::write_atomic_usize_as_u32(value, self.tcp_window_clamp()),
Options::UserTimeout => Self::write_atomic_i32(value, self.tcp_user_timeout()),
Options::Info => self.get_tcp_info(value),

View File

@ -55,6 +55,17 @@ pub struct TcpSocketOptions {
pub(crate) tcp_quickack: AtomicBool,
/// SO_KEEPALIVE
pub(crate) so_keepalive: AtomicBool,
/// TCP_KEEPIDLE (seconds)
pub(crate) tcp_keepidle_secs: AtomicI32,
/// TCP_KEEPINTVL (seconds)
pub(crate) tcp_keepintvl_secs: AtomicI32,
/// TCP_KEEPCNT
pub(crate) tcp_keepcnt: AtomicI32,
/// SO_LINGER
pub(crate) linger_onoff: AtomicI32,
pub(crate) linger_linger: AtomicI32,
}
impl TcpSocketOptions {
@ -79,6 +90,13 @@ impl TcpSocketOptions {
tcp_cork: AtomicBool::new(false),
tcp_quickack: AtomicBool::new(true),
so_keepalive: AtomicBool::new(false),
tcp_keepidle_secs: AtomicI32::new(2 * 60 * 60),
tcp_keepintvl_secs: AtomicI32::new(75),
tcp_keepcnt: AtomicI32::new(9),
linger_onoff: AtomicI32::new(0),
linger_linger: AtomicI32::new(0),
}
}
}
@ -102,6 +120,7 @@ pub struct TcpSocket {
pub(crate) fasync_items: FAsyncItems,
pub(crate) options: TcpSocketOptions,
pub(crate) cork_buf: Mutex<Vec<u8>>,
pub(crate) cork_flush_in_progress: AtomicBool,
pub(crate) recv_shutdown: ShutdownRecvTracker,
}
@ -128,6 +147,7 @@ impl TcpSocket {
fasync_items: FAsyncItems::default(),
options: TcpSocketOptions::new(),
cork_buf: Mutex::new(Vec::new()),
cork_flush_in_progress: AtomicBool::new(false),
recv_shutdown: ShutdownRecvTracker::new(),
}
}
@ -272,6 +292,31 @@ impl TcpSocket {
&self.options.so_keepalive
}
#[inline]
pub(crate) fn tcp_keepidle_secs(&self) -> &AtomicI32 {
&self.options.tcp_keepidle_secs
}
#[inline]
pub(crate) fn tcp_keepintvl_secs(&self) -> &AtomicI32 {
&self.options.tcp_keepintvl_secs
}
#[inline]
pub(crate) fn tcp_keepcnt(&self) -> &AtomicI32 {
&self.options.tcp_keepcnt
}
#[inline]
pub(crate) fn linger_onoff(&self) -> &AtomicI32 {
&self.options.linger_onoff
}
#[inline]
pub(crate) fn linger_linger(&self) -> &AtomicI32 {
&self.options.linger_linger
}
#[inline]
pub(crate) fn send_buf_size(&self) -> &AtomicUsize {
&self.options.send_buf_size