diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index 10d17a5..8888b2c 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -456,6 +456,51 @@ impl<'a> TunToProxy<'a> { Ok(()) } + fn deal_with_incoming_udp_packets( + &mut self, + manager: &Rc, + info: &ConnectionInfo, + origin_dst: SocketAddr, + payload: &[u8], + ) -> Result<()> { + if !self.connection_map.contains_key(info) { + log::info!("UDP associate session {} ({})", info, origin_dst); + let tcp_proxy_handler = manager.new_tcp_proxy(info, true)?; + let server_addr = manager.get_server_addr(); + let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?; + state.udp_origin_dst = Some(origin_dst); + self.connection_map.insert(info.clone(), state); + + self.expect_smoltcp_send()?; + self.tunsocket_read_and_forward(info)?; + self.write_to_server(info)?; + } else { + log::trace!("Subsequent udp packet {} ({})", info, origin_dst); + } + + let err = "udp associate state not find"; + let state = self.connection_map.get_mut(info).ok_or(err)?; + assert!(state.expiry.is_some()); + state.expiry = Some(Self::udp_associate_timeout()); + + // Add SOCKS5 UDP header to the incoming data + let mut s5_udp_data = Vec::::new(); + UdpHeader::new(0, info.dst.clone()).write_to_stream(&mut s5_udp_data)?; + s5_udp_data.extend_from_slice(payload); + + if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() { + // UDP associate session has been established, we can send packets directly... + if let Some(socket) = state.udp_socket.as_ref() { + socket.send_to(&s5_udp_data, udp_associate)?; + } + } else { + // UDP associate tunnel not ready yet, we must cache the packets... + log::trace!("Cache udp packet {} ({})", info, origin_dst); + state.udp_data_cache.push_back(s5_udp_data); + } + Ok(()) + } + // A raw packet was received on the tunnel interface. fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> { let mut handler = || -> Result<(), Error> { @@ -527,41 +572,7 @@ impl<'a> TunToProxy<'a> { self.send_udp_packet_to_client(origin_dst, connection_info.src, response.as_slice())?; } else { // Another UDP packet - if !self.connection_map.contains_key(&connection_info) { - log::info!("UDP associate session {} ({})", connection_info, origin_dst); - let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, true)?; - #[rustfmt::skip] - let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?; - state.udp_origin_dst = Some(origin_dst); - self.connection_map.insert(connection_info.clone(), state); - - self.expect_smoltcp_send()?; - self.tunsocket_read_and_forward(&connection_info)?; - self.write_to_server(&connection_info)?; - } else { - log::trace!("Subsequent udp packet {} ({})", connection_info, origin_dst); - } - - let err = "udp associate state not find"; - let state = self.connection_map.get_mut(&connection_info).ok_or(err)?; - assert!(state.expiry.is_some()); - state.expiry = Some(Self::udp_associate_timeout()); - - // Add SOCKS5 UDP header to the incoming data - let mut s5_udp_data = Vec::::new(); - UdpHeader::new(0, connection_info.dst.clone()).write_to_stream(&mut s5_udp_data)?; - s5_udp_data.extend_from_slice(payload); - - if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() { - // UDP associate session has been established, we can send packets directly... - if let Some(socket) = state.udp_socket.as_ref() { - socket.send_to(&s5_udp_data, udp_associate)?; - } - } else { - // UDP associate tunnel not ready yet, we must cache the packets... - log::trace!("Cache udp packet {} ({})", connection_info, origin_dst); - state.udp_data_cache.push_back(s5_udp_data); - } + self.deal_with_incoming_udp_packets(&manager, &connection_info, origin_dst, payload)?; } } else { log::warn!("Unsupported protocol: {} ({})", connection_info, origin_dst);