Improve handling of half-open connections

This commit is contained in:
B. Blechschmidt 2023-04-03 20:31:31 +02:00
parent 6d9767db42
commit 0be39345a8

View file

@ -8,6 +8,7 @@ use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token}; use mio::{Events, Interest, Poll, Token};
use smoltcp::iface::{Config, Interface, SocketHandle, SocketSet}; use smoltcp::iface::{Config, Interface, SocketHandle, SocketSet};
use smoltcp::phy::{Device, Medium, RxToken, TunTapInterface, TxToken}; use smoltcp::phy::{Device, Medium, RxToken, TunTapInterface, TxToken};
use smoltcp::socket::tcp::State;
use smoltcp::socket::{tcp, udp}; use smoltcp::socket::{tcp, udp};
use smoltcp::time::Instant; use smoltcp::time::Instant;
use smoltcp::wire::{IpCidr, IpProtocol, Ipv4Packet, Ipv6Packet, TcpPacket, UdpPacket}; use smoltcp::wire::{IpCidr, IpProtocol, Ipv4Packet, Ipv6Packet, TcpPacket, UdpPacket};
@ -15,7 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::convert::{From, TryFrom}; use std::convert::{From, TryFrom};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::Shutdown::Both; use std::net::Shutdown::Both;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::rc::Rc; use std::rc::Rc;
use std::str::FromStr; use std::str::FromStr;
@ -204,14 +205,15 @@ fn connection_tuple(frame: &[u8]) -> Option<(Connection, bool, usize, usize)> {
} }
} }
const WRITE_CLOSED: u8 = 1; const SERVER_WRITE_CLOSED: u8 = 1;
const CLIENT_WRITE_CLOSED: u8 = 2;
struct ConnectionState { struct ConnectionState {
smoltcp_handle: SocketHandle, smoltcp_handle: SocketHandle,
mio_stream: TcpStream, mio_stream: TcpStream,
token: Token, token: Token,
handler: Box<dyn TcpProxy>, handler: Box<dyn TcpProxy>,
smoltcp_socket_state: u8, close_state: u8,
} }
pub(crate) trait TcpProxy { pub(crate) trait TcpProxy {
@ -330,9 +332,46 @@ impl<'a> TunToProxy<'a> {
None None
} }
fn check_change_close_state(&mut self, connection: &Connection) -> Result<(), Error> {
let state = self
.connections
.get_mut(connection)
.ok_or("connection does not exist")?;
let mut closed_ends = 0;
if (state.close_state & SERVER_WRITE_CLOSED) == SERVER_WRITE_CLOSED {
//info!("Server write closed");
let event = state.handler.peek_data(OutgoingDirection::ToClient);
if event.buffer.is_empty() {
//info!("Server write closed and consumed");
let socket = self.sockets.get_mut::<tcp::Socket>(state.smoltcp_handle);
socket.close();
closed_ends += 1;
}
}
if (state.close_state & CLIENT_WRITE_CLOSED) == CLIENT_WRITE_CLOSED {
//info!("Client write closed");
let event = state.handler.peek_data(OutgoingDirection::ToServer);
if event.buffer.is_empty() {
//info!("Client write closed and consumed");
_ = state.mio_stream.shutdown(Shutdown::Write);
closed_ends += 1;
}
}
if closed_ends == 2 {
self.remove_connection(connection)?;
}
Ok(())
}
fn tunsocket_read_and_forward(&mut self, connection: &Connection) -> Result<(), Error> { fn tunsocket_read_and_forward(&mut self, connection: &Connection) -> Result<(), Error> {
if let Some(state) = self.connections.get_mut(connection) { // Scope for mutable borrow of self.
let closed = { {
let state = self
.connections
.get_mut(connection)
.ok_or("connection does not exist")?;
let socket = self.sockets.get_mut::<tcp::Socket>(state.smoltcp_handle); let socket = self.sockets.get_mut::<tcp::Socket>(state.smoltcp_handle);
let mut error = Ok(()); let mut error = Ok(());
while socket.can_recv() && error.is_ok() { while socket.can_recv() && error.is_ok() {
@ -342,30 +381,26 @@ impl<'a> TunToProxy<'a> {
buffer: data, buffer: data,
}; };
error = state.handler.push_data(event); error = state.handler.push_data(event);
(data.len(), ()) (data.len(), ())
})?; })?;
} }
match error { if !socket.may_recv()
Ok(_) => socket.state() == tcp::State::CloseWait, && socket.state() != State::Listen
Err(e) => { && socket.state() != State::SynSent
log::error!("{e}"); && socket.state() != State::SynReceived
true {
// We cannot yet close the write end of the mio stream here because we may still
// need to send data.
state.close_state |= CLIENT_WRITE_CLOSED;
} }
}
};
// Expect ACKs etc. from smoltcp sockets. // Expect ACKs etc. from smoltcp sockets.
self.expect_smoltcp_send()?; self.expect_smoltcp_send()?;
}
self.check_change_close_state(connection)?;
if closed {
let e = "connection not exist";
let connection_state = self.connections.get_mut(connection).ok_or(e)?;
connection_state.mio_stream.shutdown(Both)?;
self.remove_connection(connection)?;
}
}
Ok(()) Ok(())
} }
@ -417,7 +452,7 @@ impl<'a> TunToProxy<'a> {
mio_stream: client, mio_stream: client,
token, token,
handler, handler,
smoltcp_socket_state: 0, close_state: 0,
}; };
self.token_to_connection self.token_to_connection
@ -491,6 +526,7 @@ impl<'a> TunToProxy<'a> {
if let Some(state) = self.connections.get_mut(connection) { if let Some(state) = self.connections.get_mut(connection) {
let event = state.handler.peek_data(OutgoingDirection::ToServer); let event = state.handler.peek_data(OutgoingDirection::ToServer);
if event.buffer.is_empty() { if event.buffer.is_empty() {
self.check_change_close_state(connection)?;
return Ok(()); return Ok(());
} }
let result = state.mio_stream.write(event.buffer); let result = state.mio_stream.write(event.buffer);
@ -510,9 +546,7 @@ impl<'a> TunToProxy<'a> {
} }
fn write_to_client(&mut self, token: Token, connection: &Connection) -> Result<(), Error> { fn write_to_client(&mut self, token: Token, connection: &Connection) -> Result<(), Error> {
loop { while let Some(state) = self.connections.get_mut(connection) {
if let Some(state) = self.connections.get_mut(connection) {
let socket_state = state.smoltcp_socket_state;
let socket_handle = state.smoltcp_handle; let socket_handle = state.smoltcp_handle;
let event = state.handler.peek_data(OutgoingDirection::ToClient); let event = state.handler.peek_data(OutgoingDirection::ToClient);
let buflen = event.buffer.len(); let buflen = event.buffer.len();
@ -542,19 +576,22 @@ impl<'a> TunToProxy<'a> {
break; break;
} }
} }
let socket = self.sockets.get_mut::<tcp::Socket>(socket_handle);
self.check_change_close_state(connection)?;
/*let socket = self.sockets.get_mut::<tcp::Socket>(socket_handle);
// Closing and removing the connection here may work in practice but is actually not // Closing and removing the connection here may work in practice but is actually not
// correct. Only the write end was closed but we could still read from it! // correct. Only the write end was closed but we could still read from it!
// TODO: Fix and test half-open connection scenarios as mentioned in the README. // TODO: Fix and test half-open connection scenarios as mentioned in the README.
// TODO: Investigate how half-closed connections from the other end are handled. // TODO: Investigate how half-closed connections from the other end are handled.
if socket_state & WRITE_CLOSED != 0 && consumed == buflen { if socket_state & SERVER_WRITE_CLOSED != 0 && consumed == buflen {
info!("WRCL");
socket.close(); socket.close();
self.expect_smoltcp_send()?; self.expect_smoltcp_send()?;
self.write_sockets.remove(&token); self.write_sockets.remove(&token);
self.remove_connection(connection)?; self.remove_connection(connection)?;
break; break;
} }*/
}
} }
Ok(()) Ok(())
} }
@ -612,18 +649,6 @@ impl<'a> TunToProxy<'a> {
} }
}; };
if read == 0 {
{
let socket = self.sockets.get_mut::<tcp::Socket>(
self.connections.get(&connection).ok_or(e)?.smoltcp_handle,
);
socket.close();
}
self.expect_smoltcp_send()?;
self.remove_connection(&connection.clone())?;
return Ok(());
}
let data = vecbuf.as_slice(); let data = vecbuf.as_slice();
let data_event = IncomingDataEvent { let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer, direction: IncomingDirection::FromServer,
@ -642,8 +667,11 @@ impl<'a> TunToProxy<'a> {
self.remove_connection(&connection.clone())?; self.remove_connection(&connection.clone())?;
return Ok(()); return Ok(());
} }
if event.is_read_closed() {
state.smoltcp_socket_state |= WRITE_CLOSED; if read == 0 || event.is_read_closed() {
state.close_state |= SERVER_WRITE_CLOSED;
self.check_change_close_state(&connection)?;
self.expect_smoltcp_send()?;
} }
} }