From e5041e6d9ec5e19329215beb54a5f1ff8f1b6999 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Mon, 13 Nov 2023 12:02:19 +0800 Subject: [PATCH] Memory leak fixing (#77) * incoming packet with FIN or RST * read_server_n_write_proxy_handler * testing script * Interest::WRITABLE and continue_read * read_data_from_tcp_stream * logging hide * test * script iperf --- .github/workflows/format-build.yml | 18 ++++ src/tun2proxy.rs | 161 ++++++++++++++++++++++------- tests/iperf/dante.conf | 2 +- tests/iperf/test.sh | 8 +- 4 files changed, 148 insertions(+), 41 deletions(-) diff --git a/.github/workflows/format-build.yml b/.github/workflows/format-build.yml index 6fefc0f..9469bf9 100644 --- a/.github/workflows/format-build.yml +++ b/.github/workflows/format-build.yml @@ -53,3 +53,21 @@ jobs: args: -- -D warnings - name: Build run: cargo build --verbose + + iperf: + name: Iperf + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - uses: actions-rs/cargo@v1 + with: + command: build + args: --release + - run: sudo apt-get install -y iperf3 dante-server + - run: sudo systemctl stop danted + - run: sudo tests/iperf/test.sh diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index b0613f1..121f298 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -97,6 +97,7 @@ fn get_transport_info( protocol: IpProtocol, transport_offset: usize, packet: &[u8], + is_closed: &mut bool, ) -> Result<((u16, u16), bool, usize, usize)> { match protocol { IpProtocol::Udp => UdpPacket::new_checked(packet) @@ -111,6 +112,7 @@ fn get_transport_info( .map_err(|e| e.into()), IpProtocol::Tcp => TcpPacket::new_checked(packet) .map(|result| { + *is_closed = result.fin() || result.rst(); let header_len = result.header_len() as usize; ( (result.src_port(), result.dst_port()), @@ -124,7 +126,7 @@ fn get_transport_info( } } -fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize)> { +fn connection_tuple(frame: &[u8], is_closed: &mut bool) -> Result<(ConnectionInfo, bool, usize, usize)> { if let Ok(packet) = Ipv4Packet::new_checked(frame) { let protocol = packet.next_header(); @@ -136,7 +138,7 @@ fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize) let header_len = packet.header_len().into(); let (ports, first_packet, payload_offset, payload_size) = - get_transport_info(protocol, header_len, &frame[header_len..])?; + get_transport_info(protocol, header_len, &frame[header_len..], is_closed)?; let info = ConnectionInfo::new( SocketAddr::new(src_addr, ports.0), SocketAddr::new(dst_addr, ports.1).into(), @@ -157,7 +159,7 @@ fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize) let header_len = packet.header_len(); let (ports, first_packet, payload_offset, payload_size) = - get_transport_info(protocol, header_len, &frame[header_len..])?; + get_transport_info(protocol, header_len, &frame[header_len..], is_closed)?; let info = ConnectionInfo::new( SocketAddr::new(src_addr, ports.0), SocketAddr::new(dst_addr, ports.1).into(), @@ -190,6 +192,7 @@ struct ConnectionState { udp_data_cache: LinkedList>, dns_over_tcp_expiry: Option<::std::time::Instant>, is_tcp_closed: bool, + continue_read: bool, } pub(crate) trait ProxyHandler { @@ -265,16 +268,17 @@ impl<'a> TunToProxy<'a> { let poll = Poll::new()?; + let interests = Interest::READABLE | Interest::WRITABLE; + #[cfg(target_family = "unix")] poll.registry() - .register(&mut SourceFd(&tun.as_raw_fd()), TUN_TOKEN, Interest::READABLE)?; + .register(&mut SourceFd(&tun.as_raw_fd()), TUN_TOKEN, interests)?; #[cfg(target_os = "windows")] { - let interest = Interest::READABLE | Interest::WRITABLE; - poll.registry().register(&mut tun, TUN_TOKEN, interest)?; + poll.registry().register(&mut tun, TUN_TOKEN, interests)?; let mut pipe = NamedPipeSource(tun.pipe_client()); - poll.registry().register(&mut pipe, PIPE_TOKEN, interest)?; + poll.registry().register(&mut pipe, PIPE_TOKEN, interests)?; } #[cfg(target_family = "unix")] @@ -581,10 +585,15 @@ impl<'a> TunToProxy<'a> { state.dns_over_tcp_expiry = Some(Self::common_udp_life_timeout()); let mut vecbuf = vec![]; - Self::read_data_from_tcp_stream(&mut state.mio_stream, &mut state.is_tcp_closed, |data| { - vecbuf.extend_from_slice(data); - Ok(()) - })?; + Self::read_data_from_tcp_stream( + &mut state.mio_stream, + IP_PACKAGE_MAX_SIZE, + &mut state.is_tcp_closed, + |data| { + vecbuf.extend_from_slice(data); + Ok(()) + }, + )?; let data_event = IncomingDataEvent { direction: IncomingDirection::FromServer, @@ -708,6 +717,7 @@ impl<'a> TunToProxy<'a> { info: &ConnectionInfo, origin_dst: SocketAddr, frame: &[u8], + is_closed: bool, ) -> Result<()> { if first_packet { let proxy_handler = manager.new_proxy_handler(info, false)?; @@ -723,6 +733,10 @@ impl<'a> TunToProxy<'a> { log::trace!("Subsequent packet {} ({})", info, origin_dst); } + if let Some(state) = self.connection_map.get_mut(info) { + state.is_tcp_closed = is_closed; + } + // Inject the packet to advance the remote proxy server smoltcp socket state self.device.inject_packet(frame); @@ -743,7 +757,8 @@ impl<'a> TunToProxy<'a> { // A raw packet was received on the tunnel interface. fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> { let mut handler = || -> Result<(), Error> { - let result = connection_tuple(frame); + let mut is_closed = false; + let result = connection_tuple(frame, &mut is_closed); if let Err(error) = result { log::debug!("{}, ignored", error); return Ok(()); @@ -755,7 +770,7 @@ impl<'a> TunToProxy<'a> { let manager = self.get_connection_manager().ok_or("get connection manager")?; if info.protocol == IpProtocol::Tcp { - self.process_incoming_tcp_packets(first_packet, &manager, &info, origin_dst, frame)?; + self.process_incoming_tcp_packets(first_packet, &manager, &info, origin_dst, frame, is_closed)?; } else if info.protocol == IpProtocol::Udp { let port = info.dst.port(); let payload = &frame[payload_offset..payload_offset + payload_size]; @@ -837,6 +852,7 @@ impl<'a> TunToProxy<'a> { udp_data_cache: LinkedList::new(), dns_over_tcp_expiry: None, is_tcp_closed: false, + continue_read: false, }; Ok(state) } @@ -919,7 +935,7 @@ impl<'a> TunToProxy<'a> { Ok(()) } - fn write_to_client(&mut self, token: Token, info: &ConnectionInfo) -> Result<(), Error> { + fn write_to_client(&mut self, info: &ConnectionInfo) -> Result<(), Error> { while let Some(state) = self.connection_map.get_mut(info) { let event = state.proxy_handler.peek_data(OutgoingDirection::ToClient); let buflen = event.buffer.len(); @@ -933,6 +949,7 @@ impl<'a> TunToProxy<'a> { } consumed = socket.send_slice(event.buffer)?; state.proxy_handler.consume_data(OutgoingDirection::ToClient, consumed); + let token = state.token; self.expect_smoltcp_send()?; if consumed < buflen { self.write_sockets.insert(token); @@ -959,6 +976,33 @@ impl<'a> TunToProxy<'a> { rx_token.consume(|frame| self.receive_tun(frame))?; } } + + if event.is_writable() { + let items = self + .connection_map + .iter() + .filter(|(_, state)| state.continue_read) + .map(|(info, _)| info.clone()) + .collect::>(); + for conn_info in items { + let (success, len) = self.read_server_n_write_proxy_handler(&conn_info)?; + if !success { + return Ok(()); + } + let e = "connection state not found"; + let state = self.connection_map.get_mut(&conn_info).ok_or(e)?; + + if len == 0 || event.is_read_closed() { + state.wait_read = false; + state.close_state |= SERVER_WRITE_CLOSED; + Self::update_mio_socket_interest(&mut self.poll, state)?; + self.check_change_close_state(&conn_info)?; + self.expect_smoltcp_send()?; + } + self.write_to_client(&conn_info)?; + } + } + #[cfg(target_os = "windows")] if event.is_writable() { // log::trace!("Tun writable"); @@ -979,7 +1023,7 @@ impl<'a> TunToProxy<'a> { for token in self.write_sockets.clone().into_iter() { if let Some(connection) = self.find_info_by_token(token) { let connection = connection.clone(); - if let Err(error) = self.write_to_client(token, &connection) { + if let Err(error) = self.write_to_client(&connection) { log::error!("Write to client {}", error); self.remove_connection(&connection)?; } @@ -1039,6 +1083,48 @@ impl<'a> TunToProxy<'a> { Ok(()) } + fn read_server_n_write_proxy_handler(&mut self, conn_info: &ConnectionInfo) -> Result<(bool, usize), Error> { + let e = "connection state not found"; + let state = self.connection_map.get_mut(conn_info).ok_or(e)?; + state.continue_read = false; + + let mut vecbuf = vec![]; + use std::io::{Error, ErrorKind}; + let r = Self::read_data_from_tcp_stream( + &mut state.mio_stream, + IP_PACKAGE_MAX_SIZE, + &mut state.is_tcp_closed, + |data| { + vecbuf.extend_from_slice(data); + if vecbuf.len() >= IP_PACKAGE_MAX_SIZE { + return Err(Error::new(ErrorKind::OutOfMemory, "IP_PACKAGE_MAX_SIZE exceeded")); + } + Ok(()) + }, + ); + let len = vecbuf.len(); + if let Err(error) = r { + if error.kind() == ErrorKind::OutOfMemory { + state.continue_read = true; + } else { + log::error!("{}", error); + self.remove_connection(conn_info)?; + return Ok((false, len)); + } + } + + let data_event = IncomingDataEvent { + direction: IncomingDirection::FromServer, + buffer: &vecbuf, + }; + if let Err(error) = state.proxy_handler.push_data(data_event) { + log::error!("{}", error); + self.remove_connection(conn_info)?; + return Ok((false, len)); + } + Ok((true, len)) + } + fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { if let Some(info) = self.find_info_by_udp_token(event.token()) { return self.receive_udp_packet_and_write_to_client(&info.clone()); @@ -1070,26 +1156,14 @@ impl<'a> TunToProxy<'a> { self.receive_dns_over_tcp_packet_and_write_to_client(&conn_info)?; return Ok(()); } else { - let e = "connection state not found"; - let state = self.connection_map.get_mut(&conn_info).ok_or(e)?; - - // TODO: Move this reading process to its own function. - let mut vecbuf = vec![]; - Self::read_data_from_tcp_stream(&mut state.mio_stream, &mut state.is_tcp_closed, |data| { - vecbuf.extend_from_slice(data); - Ok(()) - })?; - - let data_event = IncomingDataEvent { - direction: IncomingDirection::FromServer, - buffer: &vecbuf, - }; - if let Err(error) = state.proxy_handler.push_data(data_event) { - log::error!("{}", error); - self.remove_connection(&conn_info.clone())?; + let (success, len) = self.read_server_n_write_proxy_handler(&conn_info)?; + if !success { return Ok(()); } + let e = "connection state not found"; + let state = self.connection_map.get_mut(&conn_info).ok_or(e)?; + // The handler request for reset the server connection if state.proxy_handler.reset_connection() { if let Err(err) = self.poll.registry().deregister(&mut state.mio_stream) { @@ -1112,7 +1186,7 @@ impl<'a> TunToProxy<'a> { return Ok(()); } - if vecbuf.is_empty() || event.is_read_closed() { + if len == 0 || event.is_read_closed() { state.wait_read = false; state.close_state |= SERVER_WRITE_CLOSED; Self::update_mio_socket_interest(&mut self.poll, state)?; @@ -1123,7 +1197,7 @@ impl<'a> TunToProxy<'a> { // We have read from the proxy server and pushed the data to the connection handler. // Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client. - self.write_to_client(event.token(), &conn_info)?; + self.write_to_client(&conn_info)?; // The connection handler could have produced data that is to be written to the // server. @@ -1144,11 +1218,17 @@ impl<'a> TunToProxy<'a> { Ok(()) } - fn read_data_from_tcp_stream(stream: &mut TcpStream, is_closed: &mut bool, mut callback: F) -> Result<()> + fn read_data_from_tcp_stream( + stream: &mut dyn std::io::Read, + buffer_size: usize, + is_closed: &mut bool, + mut callback: F, + ) -> std::io::Result<()> where - F: FnMut(&mut [u8]) -> Result<()>, + F: FnMut(&mut [u8]) -> std::io::Result<()>, { - let mut tmp: [u8; 4096] = [0_u8; 4096]; + assert!(buffer_size > 0); + let mut tmp = vec![0_u8; buffer_size]; loop { match stream.read(&mut tmp) { Ok(0) => { @@ -1168,7 +1248,7 @@ impl<'a> TunToProxy<'a> { continue; } else { *is_closed = true; - return Err(error.into()); + return Err(error); } } }; @@ -1216,6 +1296,9 @@ impl<'a> TunToProxy<'a> { } break 'exit_point Err(Error::from(err)); } + + log::trace!("Polling events count {}", events.iter().count()); + for event in events.iter() { match event.token() { EXIT_TOKEN => { @@ -1234,6 +1317,8 @@ impl<'a> TunToProxy<'a> { self.send_to_smoltcp()?; self.clearup_expired_connection()?; self.clearup_expired_dns_over_tcp()?; + + log::trace!("connection count: {}", self.connection_map.len()); }; #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] handle.join().unwrap(); diff --git a/tests/iperf/dante.conf b/tests/iperf/dante.conf index b723f5b..1970568 100644 --- a/tests/iperf/dante.conf +++ b/tests/iperf/dante.conf @@ -1,5 +1,5 @@ # logoutput: /var/log/socks.log -internal: 10.0.0.3 +internal: 10.0.0.3 port = 10800 external: 10.0.0.3 clientmethod: none socksmethod: none diff --git a/tests/iperf/test.sh b/tests/iperf/test.sh index 09fd0b4..6332152 100755 --- a/tests/iperf/test.sh +++ b/tests/iperf/test.sh @@ -1,6 +1,7 @@ #!/bin/bash # sudo apt install iperf3 dante-server +# sudo systemctl stop danted SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" echo $SCRIPT_DIR @@ -38,9 +39,12 @@ sleep 1 ip tuntap add name tun0 mode tun ip link set tun0 up ip route add 10.0.0.4 dev tun0 -"$tun2proxy" --proxy socks5://10.0.0.3:1080 & +"$tun2proxy" --proxy socks5://10.0.0.3:10800 & # Run iperf client through tun2proxy iperf3 -c 10.0.0.4 -iperf3 -c 10.0.0.4 -R +iperf3 -c 10.0.0.4 -R -P 10 + +# Clean up +# sudo sh -c "pkill tun2proxy; pkill iperf3; pkill danted; ip link del tun0; ip netns del test"