clearup_expired_udp_associate

This commit is contained in:
ssrlive 2023-08-18 09:40:16 +08:00
parent 1bea9ba9ea
commit 334514cfc1

View file

@ -166,6 +166,8 @@ fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize)
const SERVER_WRITE_CLOSED: u8 = 1; const SERVER_WRITE_CLOSED: u8 = 1;
const CLIENT_WRITE_CLOSED: u8 = 2; const CLIENT_WRITE_CLOSED: u8 = 2;
const UDP_ASSO_TIMEOUT: u64 = 5; // seconds
struct TcpConnectState { struct TcpConnectState {
smoltcp_handle: Option<SocketHandle>, smoltcp_handle: Option<SocketHandle>,
mio_stream: TcpStream, mio_stream: TcpStream,
@ -174,6 +176,7 @@ struct TcpConnectState {
close_state: u8, close_state: u8,
wait_read: bool, wait_read: bool,
wait_write: bool, wait_write: bool,
expiry: Option<::std::time::Instant>,
} }
pub(crate) trait TcpProxy { pub(crate) trait TcpProxy {
@ -458,7 +461,7 @@ impl<'a> TunToProxy<'a> {
if connection_info.protocol == IpProtocol::Tcp { if connection_info.protocol == IpProtocol::Tcp {
if _first_packet { if _first_packet {
let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?; let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?;
let state = self.create_new_tcp_connection_state(server_addr, dst, tcp_proxy_handler)?; let state = self.create_new_tcp_connection_state(server_addr, dst, tcp_proxy_handler, false)?;
self.connection_map.insert(connection_info.clone(), state); self.connection_map.insert(connection_info.clone(), state);
log::info!("Connect done {} ({})", connection_info, dst); log::info!("Connect done {} ({})", connection_info, dst);
@ -495,7 +498,7 @@ impl<'a> TunToProxy<'a> {
if !self.connection_map.contains_key(&connection_info) { if !self.connection_map.contains_key(&connection_info) {
log::trace!("New UDP connection {} ({})", connection_info, dst); log::trace!("New UDP connection {} ({})", connection_info, dst);
let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, true)?; let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, true)?;
let state = self.create_new_tcp_connection_state(server_addr, dst, tcp_proxy_handler)?; let state = self.create_new_tcp_connection_state(server_addr, dst, tcp_proxy_handler, true)?;
self.connection_map.insert(connection_info.clone(), state); self.connection_map.insert(connection_info.clone(), state);
} }
@ -507,7 +510,13 @@ impl<'a> TunToProxy<'a> {
UdpHeader::new(0, connection_info.dst.clone()).write_to_stream(&mut s5_udp_data)?; UdpHeader::new(0, connection_info.dst.clone()).write_to_stream(&mut s5_udp_data)?;
s5_udp_data.extend_from_slice(payload); s5_udp_data.extend_from_slice(payload);
let state = self.connection_map.get(&connection_info).ok_or("udp associate state")?; let state = self
.connection_map
.get_mut(&connection_info)
.ok_or("udp associate state")?;
assert!(state.expiry.is_some());
state.expiry = Some(Self::udp_associate_timeout());
if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() { if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() {
log::debug!("UDP associate address: {}", udp_associate); log::debug!("UDP associate address: {}", udp_associate);
// Send packets via UDP associate... // Send packets via UDP associate...
@ -532,6 +541,7 @@ impl<'a> TunToProxy<'a> {
server_addr: SocketAddr, server_addr: SocketAddr,
dst: SocketAddr, dst: SocketAddr,
tcp_proxy_handler: Box<dyn TcpProxy>, tcp_proxy_handler: Box<dyn TcpProxy>,
udp_associate: bool,
) -> Result<TcpConnectState> { ) -> Result<TcpConnectState> {
let mut socket = tcp::Socket::new( let mut socket = tcp::Socket::new(
tcp::SocketBuffer::new(vec![0; 1024 * 128]), tcp::SocketBuffer::new(vec![0; 1024 * 128]),
@ -546,6 +556,11 @@ impl<'a> TunToProxy<'a> {
let i = Interest::READABLE; let i = Interest::READABLE;
self.poll.registry().register(&mut client, token, i)?; self.poll.registry().register(&mut client, token, i)?;
let expiry = if udp_associate {
Some(Self::udp_associate_timeout())
} else {
None
};
let state = TcpConnectState { let state = TcpConnectState {
smoltcp_handle: Some(handle), smoltcp_handle: Some(handle),
mio_stream: client, mio_stream: client,
@ -554,10 +569,35 @@ impl<'a> TunToProxy<'a> {
close_state: 0, close_state: 0,
wait_read: true, wait_read: true,
wait_write: false, wait_write: false,
expiry,
}; };
Ok(state) Ok(state)
} }
fn udp_associate_timeout() -> ::std::time::Instant {
::std::time::Instant::now() + ::std::time::Duration::from_secs(UDP_ASSO_TIMEOUT)
}
fn udp_associate_timeout_expired(&self, info: &ConnectionInfo) -> bool {
if let Some(state) = self.connection_map.get(info) {
if let Some(expiry) = state.expiry {
return expiry < ::std::time::Instant::now();
}
}
false
}
fn clearup_expired_udp_associate(&mut self) -> Result<()> {
let keys = self.connection_map.keys().map(|key| key.clone()).collect::<Vec<_>>();
for key in keys {
if self.udp_associate_timeout_expired(&key) {
log::debug!("UDP associate timeout: {}", key);
self.remove_connection(&key)?;
}
}
Ok(())
}
fn send_udp_packet(&mut self, src: SocketAddr, dst: SocketAddr, data: &[u8]) -> Result<()> { fn send_udp_packet(&mut self, src: SocketAddr, dst: SocketAddr, data: &[u8]) -> Result<()> {
let rx_buffer = udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]); let rx_buffer = udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]);
let tx_buffer = udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]); let tx_buffer = udp::PacketBuffer::new(vec![udp::PacketMetadata::EMPTY], vec![0; 4096]);
@ -778,6 +818,7 @@ impl<'a> TunToProxy<'a> {
} }
} }
self.send_to_smoltcp()?; self.send_to_smoltcp()?;
self.clearup_expired_udp_associate()?;
} }
} }