diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index c405486..10d17a5 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -757,40 +757,57 @@ impl<'a> TunToProxy<'a> { Ok(()) } - fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { - if let Some(info) = self.find_info_by_udp_token(event.token()) { - let info = info.clone(); - let err = "udp connection state not found"; - let state = self.connection_map.get_mut(&info).ok_or(err)?; - state.expiry = Some(Self::udp_associate_timeout()); - let mut to_send: LinkedList> = LinkedList::new(); + fn receive_udp_packet_and_write_to_client(&mut self, info: &ConnectionInfo) -> Result<()> { + let err = "udp connection state not found"; + let state = self.connection_map.get_mut(info).ok_or(err)?; + state.expiry = Some(Self::udp_associate_timeout()); + let mut to_send: LinkedList> = LinkedList::new(); + if let Some(udp_socket) = state.udp_socket.as_ref() { + let mut buf = [0; 1 << 16]; + // Receive UDP packet from remote SOCKS5 server + while let Ok((packet_size, _svr_addr)) = udp_socket.recv_from(&mut buf) { + let buf = buf[..packet_size].to_vec(); + let header = UdpHeader::retrieve_from_stream(&mut &buf[..])?; + + let buf = if info.dst.port() == DNS_PORT { + let mut message = dns::parse_data_to_dns_message(&buf[header.len()..], false)?; + dns::remove_ipv6_entries(&mut message); // TODO: Configurable + message.to_vec()? + } else { + buf[header.len()..].to_vec() + }; + + // Escape the borrow checker madness + to_send.push_back(buf); + } + } + + // Write to client + let src = state.udp_origin_dst.ok_or("udp address")?; + while let Some(packet) = to_send.pop_front() { + self.send_udp_packet_to_client(src, info.src, &packet)?; + } + Ok(()) + } + + fn comsume_cached_udp_packets(&mut self, info: &ConnectionInfo) -> Result<()> { + // Try to send the first UDP packets to remote SOCKS5 server for UDP associate session + if let Some(state) = self.connection_map.get_mut(info) { if let Some(udp_socket) = state.udp_socket.as_ref() { - let mut buf = [0; 1 << 16]; - // Receive UDP packet from remote SOCKS5 server - while let Ok((packet_size, _svr_addr)) = udp_socket.recv_from(&mut buf) { - let buf = buf[..packet_size].to_vec(); - let header = UdpHeader::retrieve_from_stream(&mut &buf[..])?; - - let buf = if info.dst.port() == DNS_PORT { - let mut message = dns::parse_data_to_dns_message(&buf[header.len()..], false)?; - dns::remove_ipv6_entries(&mut message); // TODO: Configurable - message.to_vec()? - } else { - buf[header.len()..].to_vec() - }; - - // Escape the borrow checker madness - to_send.push_back(buf); + if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() { + // Consume udp_data_cache data + while let Some(buf) = state.udp_data_cache.pop_front() { + udp_socket.send_to(&buf, addr)?; + } } } + } + Ok(()) + } - // Write to client - let src = state.udp_origin_dst.ok_or("udp address")?; - while let Some(packet) = to_send.pop_front() { - self.send_udp_packet_to_client(src, info.src, &packet)?; - } - - return Ok(()); + fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { + if let Some(info) = self.find_info_by_udp_token(event.token()) { + return self.receive_udp_packet_and_write_to_client(&info.clone()); } let conn_info = match self.find_info_by_token(event.token()) { @@ -872,17 +889,7 @@ impl<'a> TunToProxy<'a> { // server. self.write_to_server(&conn_info)?; - // Try to send the first UDP packet to remote SOCKS5 server for UDP associate session - if let Some(state) = self.connection_map.get_mut(&conn_info) { - if let Some(udp_socket) = state.udp_socket.as_ref() { - if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() { - // Consume udp_data_cache data - while let Some(buf) = state.udp_data_cache.pop_front() { - udp_socket.send_to(&buf, addr)?; - } - } - } - } + self.comsume_cached_udp_packets(&conn_info)?; } if event.is_writable() {