diff --git a/src/error.rs b/src/error.rs index bae52ba..d45f0ee 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,9 @@ pub enum Error { #[error("std::io::Error {0}")] Io(#[from] std::io::Error), + #[error("TryFromIntError {0:?}")] + TryFromInt(#[from] std::num::TryFromIntError), + #[error("std::net::AddrParseError {0}")] AddrParse(#[from] std::net::AddrParseError), diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index 67574c7..3cc194b 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -456,6 +456,103 @@ impl<'a> TunToProxy<'a> { Ok(()) } + fn preprocess_origin_connection_info(&mut self, info: ConnectionInfo) -> Result { + let origin_dst = SocketAddr::try_from(&info.dst)?; + let connection_info = match &mut self.options.virtual_dns { + None => { + let mut info = info; + let port = origin_dst.port(); + if port == DNS_PORT && info.protocol == IpProtocol::Udp && dns::addr_is_private(&origin_dst) { + let dns_addr: SocketAddr = "8.8.8.8:53".parse()?; // TODO: Configurable + info.dst = Address::from(dns_addr); + } + info + } + Some(virtual_dns) => { + let dst_ip = origin_dst.ip(); + virtual_dns.touch_ip(&dst_ip); + match virtual_dns.resolve_ip(&dst_ip) { + None => info, + Some(name) => info.to_named(name.clone()), + } + } + }; + Ok(connection_info) + } + + fn process_incoming_udp_packets( + &mut self, + manager: &Rc, + info: &ConnectionInfo, + origin_dst: SocketAddr, + payload: &[u8], + ) -> Result<()> { + if self.options.dns_over_tcp && origin_dst.port() == DNS_PORT { + dns::parse_data_to_dns_message(payload, false)?; + + if !self.connection_map.contains_key(info) { + log::info!("DNS over TCP {} ({})", info, origin_dst); + let tcp_proxy_handler = manager.new_tcp_proxy(info, false)?; + let server_addr = manager.get_server_addr(); + let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?; + self.connection_map.insert(info.clone(), state); + } else { + log::trace!("DNS over TCP subsequent packet {} ({})", info, origin_dst); + } + + // Insert the DNS message length in front of the payload + let len = u16::try_from(payload.len())?; + let mut buf = Vec::with_capacity(2 + usize::from(len)); + buf.extend_from_slice(&len.to_be_bytes()); + buf.extend_from_slice(payload); + + // FIXME: Build an IP packet with TCP and inject it into the device. + self.device.inject_packet(&buf); + + self.expect_smoltcp_send()?; + self.tunsocket_read_and_forward(info)?; + self.write_to_server(info)?; + return Ok(()); + } + + if !self.connection_map.contains_key(info) { + log::info!("UDP associate session {} ({})", info, origin_dst); + let tcp_proxy_handler = manager.new_tcp_proxy(info, true)?; + let server_addr = manager.get_server_addr(); + let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?; + state.udp_origin_dst = Some(origin_dst); + self.connection_map.insert(info.clone(), state); + + self.expect_smoltcp_send()?; + self.tunsocket_read_and_forward(info)?; + self.write_to_server(info)?; + } else { + log::trace!("Subsequent udp packet {} ({})", info, origin_dst); + } + + let err = "udp associate state not find"; + let state = self.connection_map.get_mut(info).ok_or(err)?; + assert!(state.expiry.is_some()); + state.expiry = Some(Self::udp_associate_timeout()); + + // Add SOCKS5 UDP header to the incoming data + let mut s5_udp_data = Vec::::new(); + UdpHeader::new(0, info.dst.clone()).write_to_stream(&mut s5_udp_data)?; + s5_udp_data.extend_from_slice(payload); + + if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() { + // UDP associate session has been established, we can send packets directly... + if let Some(socket) = state.udp_socket.as_ref() { + socket.send_to(&s5_udp_data, udp_associate)?; + } + } else { + // UDP associate tunnel not ready yet, we must cache the packets... + log::trace!("Cache udp packet {} ({})", info, origin_dst); + state.udp_data_cache.push_back(s5_udp_data); + } + Ok(()) + } + // A raw packet was received on the tunnel interface. fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> { let mut handler = || -> Result<(), Error> { @@ -466,34 +563,15 @@ impl<'a> TunToProxy<'a> { } let (info, _first_packet, payload_offset, payload_size) = result?; let origin_dst = SocketAddr::try_from(&info.dst)?; - let connection_info = match &mut self.options.virtual_dns { - None => { - let mut info = info; - let port = origin_dst.port(); - if port == DNS_PORT && info.protocol == IpProtocol::Udp && dns::addr_is_private(&origin_dst) { - let dns_addr: SocketAddr = "8.8.8.8:53".parse()?; // TODO: Configurable - info.dst = Address::from(dns_addr); - } - info - } - Some(virtual_dns) => { - let dst_ip = origin_dst.ip(); - virtual_dns.touch_ip(&dst_ip); - match virtual_dns.resolve_ip(&dst_ip) { - None => info, - Some(name) => info.to_named(name.clone()), - } - } - }; + let connection_info = self.preprocess_origin_connection_info(info)?; let manager = self.get_connection_manager().ok_or("get connection manager")?; - let server_addr = manager.get_server_addr(); if connection_info.protocol == IpProtocol::Tcp { if _first_packet { let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?; - #[rustfmt::skip] - let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?; + let server = manager.get_server_addr(); + let state = self.create_new_tcp_connection_state(server, origin_dst, tcp_proxy_handler, false)?; self.connection_map.insert(connection_info.clone(), state); log::info!("Connect done {} ({})", connection_info, origin_dst); @@ -527,65 +605,7 @@ impl<'a> TunToProxy<'a> { self.send_udp_packet_to_client(origin_dst, connection_info.src, response.as_slice())?; } else { // Another UDP packet - if self.options.dns_over_tcp && port == DNS_PORT { - if !self.connection_map.contains_key(&connection_info) { - log::info!("DNS over TCP {} ({})", connection_info, origin_dst); - let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?; - #[rustfmt::skip] - let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?; - self.connection_map.insert(connection_info.clone(), state); - } else { - log::trace!("Subsequent dns over tcp packet {} ({})", connection_info, origin_dst); - } - - let len = payload.len() as u16; - let mut buf = Vec::with_capacity(2 + len as usize); - buf.extend_from_slice(&len.to_be_bytes()); - buf.extend_from_slice(payload); - - // TODO: Build an IP packet and inject it into the device. - self.device.inject_packet(&buf); - - self.expect_smoltcp_send()?; - self.tunsocket_read_and_forward(&connection_info)?; - self.write_to_server(&connection_info)?; - return Ok(()); - } - if !self.connection_map.contains_key(&connection_info) { - log::info!("UDP associate session {} ({})", connection_info, origin_dst); - let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, true)?; - #[rustfmt::skip] - let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?; - state.udp_origin_dst = Some(origin_dst); - self.connection_map.insert(connection_info.clone(), state); - - self.expect_smoltcp_send()?; - self.tunsocket_read_and_forward(&connection_info)?; - self.write_to_server(&connection_info)?; - } else { - log::trace!("Subsequent udp packet {} ({})", connection_info, origin_dst); - } - - let err = "udp associate state not find"; - let state = self.connection_map.get_mut(&connection_info).ok_or(err)?; - assert!(state.expiry.is_some()); - state.expiry = Some(Self::udp_associate_timeout()); - - // Add SOCKS5 UDP header to the incoming data - let mut s5_udp_data = Vec::::new(); - UdpHeader::new(0, connection_info.dst.clone()).write_to_stream(&mut s5_udp_data)?; - s5_udp_data.extend_from_slice(payload); - - if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() { - // UDP associate session has been established, we can send packets directly... - if let Some(socket) = state.udp_socket.as_ref() { - socket.send_to(&s5_udp_data, udp_associate)?; - } - } else { - // UDP associate tunnel not ready yet, we must cache the packets... - log::trace!("Cache udp packet {} ({})", connection_info, origin_dst); - state.udp_data_cache.push_back(s5_udp_data); - } + self.process_incoming_udp_packets(&manager, &connection_info, origin_dst, payload)?; } } else { log::warn!("Unsupported protocol: {} ({})", connection_info, origin_dst); @@ -781,40 +801,57 @@ impl<'a> TunToProxy<'a> { Ok(()) } - fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { - if let Some(info) = self.find_info_by_udp_token(event.token()) { - let info = info.clone(); - let err = "udp connection state not found"; - let state = self.connection_map.get_mut(&info).ok_or(err)?; - state.expiry = Some(Self::udp_associate_timeout()); - let mut to_send: LinkedList> = LinkedList::new(); + fn receive_udp_packet_and_write_to_client(&mut self, info: &ConnectionInfo) -> Result<()> { + let err = "udp connection state not found"; + let state = self.connection_map.get_mut(info).ok_or(err)?; + state.expiry = Some(Self::udp_associate_timeout()); + let mut to_send: LinkedList> = LinkedList::new(); + if let Some(udp_socket) = state.udp_socket.as_ref() { + let mut buf = [0; 1 << 16]; + // Receive UDP packet from remote SOCKS5 server + while let Ok((packet_size, _svr_addr)) = udp_socket.recv_from(&mut buf) { + let buf = buf[..packet_size].to_vec(); + let header = UdpHeader::retrieve_from_stream(&mut &buf[..])?; + + let buf = if info.dst.port() == DNS_PORT { + let mut message = dns::parse_data_to_dns_message(&buf[header.len()..], false)?; + dns::remove_ipv6_entries(&mut message); // TODO: Configurable + message.to_vec()? + } else { + buf[header.len()..].to_vec() + }; + + // Escape the borrow checker madness + to_send.push_back(buf); + } + } + + // Write to client + let src = state.udp_origin_dst.ok_or("udp address")?; + while let Some(packet) = to_send.pop_front() { + self.send_udp_packet_to_client(src, info.src, &packet)?; + } + Ok(()) + } + + fn comsume_cached_udp_packets(&mut self, info: &ConnectionInfo) -> Result<()> { + // Try to send the first UDP packets to remote SOCKS5 server for UDP associate session + if let Some(state) = self.connection_map.get_mut(info) { if let Some(udp_socket) = state.udp_socket.as_ref() { - let mut buf = [0; 1 << 16]; - // Receive UDP packet from remote SOCKS5 server - while let Ok((packet_size, _svr_addr)) = udp_socket.recv_from(&mut buf) { - let buf = buf[..packet_size].to_vec(); - let header = UdpHeader::retrieve_from_stream(&mut &buf[..])?; - - let buf = if info.dst.port() == DNS_PORT { - let mut message = dns::parse_data_to_dns_message(&buf[header.len()..], false)?; - dns::remove_ipv6_entries(&mut message); // TODO: Configurable - message.to_vec()? - } else { - buf[header.len()..].to_vec() - }; - - // Escape the borrow checker madness - to_send.push_back(buf); + if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() { + // Consume udp_data_cache data + while let Some(buf) = state.udp_data_cache.pop_front() { + udp_socket.send_to(&buf, addr)?; + } } } + } + Ok(()) + } - // Write to client - let src = state.udp_origin_dst.ok_or("udp address")?; - while let Some(packet) = to_send.pop_front() { - self.send_udp_packet_to_client(src, info.src, &packet)?; - } - - return Ok(()); + fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { + if let Some(info) = self.find_info_by_udp_token(event.token()) { + return self.receive_udp_packet_and_write_to_client(&info.clone()); } let conn_info = match self.find_info_by_token(event.token()) { @@ -896,17 +933,7 @@ impl<'a> TunToProxy<'a> { // server. self.write_to_server(&conn_info)?; - // Try to send the first UDP packet to remote SOCKS5 server for UDP associate session - if let Some(state) = self.connection_map.get_mut(&conn_info) { - if let Some(udp_socket) = state.udp_socket.as_ref() { - if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() { - // Consume udp_data_cache data - while let Some(buf) = state.udp_data_cache.pop_front() { - udp_socket.send_to(&buf, addr)?; - } - } - } - } + self.comsume_cached_udp_packets(&conn_info)?; } if event.is_writable() {