Memory leak fixing (#77)

* incoming packet with FIN or RST

* read_server_n_write_proxy_handler

* testing script

* Interest::WRITABLE and continue_read

* read_data_from_tcp_stream

* logging hide

* test

* script iperf
This commit is contained in:
ssrlive 2023-11-13 12:02:19 +08:00 committed by GitHub
parent 4016e401b2
commit e5041e6d9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 41 deletions

View file

@ -53,3 +53,21 @@ jobs:
args: -- -D warnings args: -- -D warnings
- name: Build - name: Build
run: cargo build --verbose run: cargo build --verbose
iperf:
name: Iperf
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: build
args: --release
- run: sudo apt-get install -y iperf3 dante-server
- run: sudo systemctl stop danted
- run: sudo tests/iperf/test.sh

View file

@ -97,6 +97,7 @@ fn get_transport_info(
protocol: IpProtocol, protocol: IpProtocol,
transport_offset: usize, transport_offset: usize,
packet: &[u8], packet: &[u8],
is_closed: &mut bool,
) -> Result<((u16, u16), bool, usize, usize)> { ) -> Result<((u16, u16), bool, usize, usize)> {
match protocol { match protocol {
IpProtocol::Udp => UdpPacket::new_checked(packet) IpProtocol::Udp => UdpPacket::new_checked(packet)
@ -111,6 +112,7 @@ fn get_transport_info(
.map_err(|e| e.into()), .map_err(|e| e.into()),
IpProtocol::Tcp => TcpPacket::new_checked(packet) IpProtocol::Tcp => TcpPacket::new_checked(packet)
.map(|result| { .map(|result| {
*is_closed = result.fin() || result.rst();
let header_len = result.header_len() as usize; let header_len = result.header_len() as usize;
( (
(result.src_port(), result.dst_port()), (result.src_port(), result.dst_port()),
@ -124,7 +126,7 @@ fn get_transport_info(
} }
} }
fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize)> { fn connection_tuple(frame: &[u8], is_closed: &mut bool) -> Result<(ConnectionInfo, bool, usize, usize)> {
if let Ok(packet) = Ipv4Packet::new_checked(frame) { if let Ok(packet) = Ipv4Packet::new_checked(frame) {
let protocol = packet.next_header(); let protocol = packet.next_header();
@ -136,7 +138,7 @@ fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize)
let header_len = packet.header_len().into(); let header_len = packet.header_len().into();
let (ports, first_packet, payload_offset, payload_size) = let (ports, first_packet, payload_offset, payload_size) =
get_transport_info(protocol, header_len, &frame[header_len..])?; get_transport_info(protocol, header_len, &frame[header_len..], is_closed)?;
let info = ConnectionInfo::new( let info = ConnectionInfo::new(
SocketAddr::new(src_addr, ports.0), SocketAddr::new(src_addr, ports.0),
SocketAddr::new(dst_addr, ports.1).into(), SocketAddr::new(dst_addr, ports.1).into(),
@ -157,7 +159,7 @@ fn connection_tuple(frame: &[u8]) -> Result<(ConnectionInfo, bool, usize, usize)
let header_len = packet.header_len(); let header_len = packet.header_len();
let (ports, first_packet, payload_offset, payload_size) = let (ports, first_packet, payload_offset, payload_size) =
get_transport_info(protocol, header_len, &frame[header_len..])?; get_transport_info(protocol, header_len, &frame[header_len..], is_closed)?;
let info = ConnectionInfo::new( let info = ConnectionInfo::new(
SocketAddr::new(src_addr, ports.0), SocketAddr::new(src_addr, ports.0),
SocketAddr::new(dst_addr, ports.1).into(), SocketAddr::new(dst_addr, ports.1).into(),
@ -190,6 +192,7 @@ struct ConnectionState {
udp_data_cache: LinkedList<Vec<u8>>, udp_data_cache: LinkedList<Vec<u8>>,
dns_over_tcp_expiry: Option<::std::time::Instant>, dns_over_tcp_expiry: Option<::std::time::Instant>,
is_tcp_closed: bool, is_tcp_closed: bool,
continue_read: bool,
} }
pub(crate) trait ProxyHandler { pub(crate) trait ProxyHandler {
@ -265,16 +268,17 @@ impl<'a> TunToProxy<'a> {
let poll = Poll::new()?; let poll = Poll::new()?;
let interests = Interest::READABLE | Interest::WRITABLE;
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
poll.registry() poll.registry()
.register(&mut SourceFd(&tun.as_raw_fd()), TUN_TOKEN, Interest::READABLE)?; .register(&mut SourceFd(&tun.as_raw_fd()), TUN_TOKEN, interests)?;
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
{ {
let interest = Interest::READABLE | Interest::WRITABLE; poll.registry().register(&mut tun, TUN_TOKEN, interests)?;
poll.registry().register(&mut tun, TUN_TOKEN, interest)?;
let mut pipe = NamedPipeSource(tun.pipe_client()); let mut pipe = NamedPipeSource(tun.pipe_client());
poll.registry().register(&mut pipe, PIPE_TOKEN, interest)?; poll.registry().register(&mut pipe, PIPE_TOKEN, interests)?;
} }
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
@ -581,10 +585,15 @@ impl<'a> TunToProxy<'a> {
state.dns_over_tcp_expiry = Some(Self::common_udp_life_timeout()); state.dns_over_tcp_expiry = Some(Self::common_udp_life_timeout());
let mut vecbuf = vec![]; let mut vecbuf = vec![];
Self::read_data_from_tcp_stream(&mut state.mio_stream, &mut state.is_tcp_closed, |data| { Self::read_data_from_tcp_stream(
&mut state.mio_stream,
IP_PACKAGE_MAX_SIZE,
&mut state.is_tcp_closed,
|data| {
vecbuf.extend_from_slice(data); vecbuf.extend_from_slice(data);
Ok(()) Ok(())
})?; },
)?;
let data_event = IncomingDataEvent { let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer, direction: IncomingDirection::FromServer,
@ -708,6 +717,7 @@ impl<'a> TunToProxy<'a> {
info: &ConnectionInfo, info: &ConnectionInfo,
origin_dst: SocketAddr, origin_dst: SocketAddr,
frame: &[u8], frame: &[u8],
is_closed: bool,
) -> Result<()> { ) -> Result<()> {
if first_packet { if first_packet {
let proxy_handler = manager.new_proxy_handler(info, false)?; let proxy_handler = manager.new_proxy_handler(info, false)?;
@ -723,6 +733,10 @@ impl<'a> TunToProxy<'a> {
log::trace!("Subsequent packet {} ({})", info, origin_dst); log::trace!("Subsequent packet {} ({})", info, origin_dst);
} }
if let Some(state) = self.connection_map.get_mut(info) {
state.is_tcp_closed = is_closed;
}
// Inject the packet to advance the remote proxy server smoltcp socket state // Inject the packet to advance the remote proxy server smoltcp socket state
self.device.inject_packet(frame); self.device.inject_packet(frame);
@ -743,7 +757,8 @@ impl<'a> TunToProxy<'a> {
// A raw packet was received on the tunnel interface. // A raw packet was received on the tunnel interface.
fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> { fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> {
let mut handler = || -> Result<(), Error> { let mut handler = || -> Result<(), Error> {
let result = connection_tuple(frame); let mut is_closed = false;
let result = connection_tuple(frame, &mut is_closed);
if let Err(error) = result { if let Err(error) = result {
log::debug!("{}, ignored", error); log::debug!("{}, ignored", error);
return Ok(()); return Ok(());
@ -755,7 +770,7 @@ impl<'a> TunToProxy<'a> {
let manager = self.get_connection_manager().ok_or("get connection manager")?; let manager = self.get_connection_manager().ok_or("get connection manager")?;
if info.protocol == IpProtocol::Tcp { if info.protocol == IpProtocol::Tcp {
self.process_incoming_tcp_packets(first_packet, &manager, &info, origin_dst, frame)?; self.process_incoming_tcp_packets(first_packet, &manager, &info, origin_dst, frame, is_closed)?;
} else if info.protocol == IpProtocol::Udp { } else if info.protocol == IpProtocol::Udp {
let port = info.dst.port(); let port = info.dst.port();
let payload = &frame[payload_offset..payload_offset + payload_size]; let payload = &frame[payload_offset..payload_offset + payload_size];
@ -837,6 +852,7 @@ impl<'a> TunToProxy<'a> {
udp_data_cache: LinkedList::new(), udp_data_cache: LinkedList::new(),
dns_over_tcp_expiry: None, dns_over_tcp_expiry: None,
is_tcp_closed: false, is_tcp_closed: false,
continue_read: false,
}; };
Ok(state) Ok(state)
} }
@ -919,7 +935,7 @@ impl<'a> TunToProxy<'a> {
Ok(()) Ok(())
} }
fn write_to_client(&mut self, token: Token, info: &ConnectionInfo) -> Result<(), Error> { fn write_to_client(&mut self, info: &ConnectionInfo) -> Result<(), Error> {
while let Some(state) = self.connection_map.get_mut(info) { while let Some(state) = self.connection_map.get_mut(info) {
let event = state.proxy_handler.peek_data(OutgoingDirection::ToClient); let event = state.proxy_handler.peek_data(OutgoingDirection::ToClient);
let buflen = event.buffer.len(); let buflen = event.buffer.len();
@ -933,6 +949,7 @@ impl<'a> TunToProxy<'a> {
} }
consumed = socket.send_slice(event.buffer)?; consumed = socket.send_slice(event.buffer)?;
state.proxy_handler.consume_data(OutgoingDirection::ToClient, consumed); state.proxy_handler.consume_data(OutgoingDirection::ToClient, consumed);
let token = state.token;
self.expect_smoltcp_send()?; self.expect_smoltcp_send()?;
if consumed < buflen { if consumed < buflen {
self.write_sockets.insert(token); self.write_sockets.insert(token);
@ -959,6 +976,33 @@ impl<'a> TunToProxy<'a> {
rx_token.consume(|frame| self.receive_tun(frame))?; rx_token.consume(|frame| self.receive_tun(frame))?;
} }
} }
if event.is_writable() {
let items = self
.connection_map
.iter()
.filter(|(_, state)| state.continue_read)
.map(|(info, _)| info.clone())
.collect::<Vec<_>>();
for conn_info in items {
let (success, len) = self.read_server_n_write_proxy_handler(&conn_info)?;
if !success {
return Ok(());
}
let e = "connection state not found";
let state = self.connection_map.get_mut(&conn_info).ok_or(e)?;
if len == 0 || event.is_read_closed() {
state.wait_read = false;
state.close_state |= SERVER_WRITE_CLOSED;
Self::update_mio_socket_interest(&mut self.poll, state)?;
self.check_change_close_state(&conn_info)?;
self.expect_smoltcp_send()?;
}
self.write_to_client(&conn_info)?;
}
}
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
if event.is_writable() { if event.is_writable() {
// log::trace!("Tun writable"); // log::trace!("Tun writable");
@ -979,7 +1023,7 @@ impl<'a> TunToProxy<'a> {
for token in self.write_sockets.clone().into_iter() { for token in self.write_sockets.clone().into_iter() {
if let Some(connection) = self.find_info_by_token(token) { if let Some(connection) = self.find_info_by_token(token) {
let connection = connection.clone(); let connection = connection.clone();
if let Err(error) = self.write_to_client(token, &connection) { if let Err(error) = self.write_to_client(&connection) {
log::error!("Write to client {}", error); log::error!("Write to client {}", error);
self.remove_connection(&connection)?; self.remove_connection(&connection)?;
} }
@ -1039,6 +1083,48 @@ impl<'a> TunToProxy<'a> {
Ok(()) Ok(())
} }
fn read_server_n_write_proxy_handler(&mut self, conn_info: &ConnectionInfo) -> Result<(bool, usize), Error> {
let e = "connection state not found";
let state = self.connection_map.get_mut(conn_info).ok_or(e)?;
state.continue_read = false;
let mut vecbuf = vec![];
use std::io::{Error, ErrorKind};
let r = Self::read_data_from_tcp_stream(
&mut state.mio_stream,
IP_PACKAGE_MAX_SIZE,
&mut state.is_tcp_closed,
|data| {
vecbuf.extend_from_slice(data);
if vecbuf.len() >= IP_PACKAGE_MAX_SIZE {
return Err(Error::new(ErrorKind::OutOfMemory, "IP_PACKAGE_MAX_SIZE exceeded"));
}
Ok(())
},
);
let len = vecbuf.len();
if let Err(error) = r {
if error.kind() == ErrorKind::OutOfMemory {
state.continue_read = true;
} else {
log::error!("{}", error);
self.remove_connection(conn_info)?;
return Ok((false, len));
}
}
let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &vecbuf,
};
if let Err(error) = state.proxy_handler.push_data(data_event) {
log::error!("{}", error);
self.remove_connection(conn_info)?;
return Ok((false, len));
}
Ok((true, len))
}
fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> {
if let Some(info) = self.find_info_by_udp_token(event.token()) { if let Some(info) = self.find_info_by_udp_token(event.token()) {
return self.receive_udp_packet_and_write_to_client(&info.clone()); return self.receive_udp_packet_and_write_to_client(&info.clone());
@ -1070,26 +1156,14 @@ impl<'a> TunToProxy<'a> {
self.receive_dns_over_tcp_packet_and_write_to_client(&conn_info)?; self.receive_dns_over_tcp_packet_and_write_to_client(&conn_info)?;
return Ok(()); return Ok(());
} else { } else {
let e = "connection state not found"; let (success, len) = self.read_server_n_write_proxy_handler(&conn_info)?;
let state = self.connection_map.get_mut(&conn_info).ok_or(e)?; if !success {
// TODO: Move this reading process to its own function.
let mut vecbuf = vec![];
Self::read_data_from_tcp_stream(&mut state.mio_stream, &mut state.is_tcp_closed, |data| {
vecbuf.extend_from_slice(data);
Ok(())
})?;
let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &vecbuf,
};
if let Err(error) = state.proxy_handler.push_data(data_event) {
log::error!("{}", error);
self.remove_connection(&conn_info.clone())?;
return Ok(()); return Ok(());
} }
let e = "connection state not found";
let state = self.connection_map.get_mut(&conn_info).ok_or(e)?;
// The handler request for reset the server connection // The handler request for reset the server connection
if state.proxy_handler.reset_connection() { if state.proxy_handler.reset_connection() {
if let Err(err) = self.poll.registry().deregister(&mut state.mio_stream) { if let Err(err) = self.poll.registry().deregister(&mut state.mio_stream) {
@ -1112,7 +1186,7 @@ impl<'a> TunToProxy<'a> {
return Ok(()); return Ok(());
} }
if vecbuf.is_empty() || event.is_read_closed() { if len == 0 || event.is_read_closed() {
state.wait_read = false; state.wait_read = false;
state.close_state |= SERVER_WRITE_CLOSED; state.close_state |= SERVER_WRITE_CLOSED;
Self::update_mio_socket_interest(&mut self.poll, state)?; Self::update_mio_socket_interest(&mut self.poll, state)?;
@ -1123,7 +1197,7 @@ impl<'a> TunToProxy<'a> {
// We have read from the proxy server and pushed the data to the connection handler. // We have read from the proxy server and pushed the data to the connection handler.
// Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client. // Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client.
self.write_to_client(event.token(), &conn_info)?; self.write_to_client(&conn_info)?;
// The connection handler could have produced data that is to be written to the // The connection handler could have produced data that is to be written to the
// server. // server.
@ -1144,11 +1218,17 @@ impl<'a> TunToProxy<'a> {
Ok(()) Ok(())
} }
fn read_data_from_tcp_stream<F>(stream: &mut TcpStream, is_closed: &mut bool, mut callback: F) -> Result<()> fn read_data_from_tcp_stream<F>(
stream: &mut dyn std::io::Read,
buffer_size: usize,
is_closed: &mut bool,
mut callback: F,
) -> std::io::Result<()>
where where
F: FnMut(&mut [u8]) -> Result<()>, F: FnMut(&mut [u8]) -> std::io::Result<()>,
{ {
let mut tmp: [u8; 4096] = [0_u8; 4096]; assert!(buffer_size > 0);
let mut tmp = vec![0_u8; buffer_size];
loop { loop {
match stream.read(&mut tmp) { match stream.read(&mut tmp) {
Ok(0) => { Ok(0) => {
@ -1168,7 +1248,7 @@ impl<'a> TunToProxy<'a> {
continue; continue;
} else { } else {
*is_closed = true; *is_closed = true;
return Err(error.into()); return Err(error);
} }
} }
}; };
@ -1216,6 +1296,9 @@ impl<'a> TunToProxy<'a> {
} }
break 'exit_point Err(Error::from(err)); break 'exit_point Err(Error::from(err));
} }
log::trace!("Polling events count {}", events.iter().count());
for event in events.iter() { for event in events.iter() {
match event.token() { match event.token() {
EXIT_TOKEN => { EXIT_TOKEN => {
@ -1234,6 +1317,8 @@ impl<'a> TunToProxy<'a> {
self.send_to_smoltcp()?; self.send_to_smoltcp()?;
self.clearup_expired_connection()?; self.clearup_expired_connection()?;
self.clearup_expired_dns_over_tcp()?; self.clearup_expired_dns_over_tcp()?;
log::trace!("connection count: {}", self.connection_map.len());
}; };
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))] #[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
handle.join().unwrap(); handle.join().unwrap();

View file

@ -1,5 +1,5 @@
# logoutput: /var/log/socks.log # logoutput: /var/log/socks.log
internal: 10.0.0.3 internal: 10.0.0.3 port = 10800
external: 10.0.0.3 external: 10.0.0.3
clientmethod: none clientmethod: none
socksmethod: none socksmethod: none

View file

@ -1,6 +1,7 @@
#!/bin/bash #!/bin/bash
# sudo apt install iperf3 dante-server # sudo apt install iperf3 dante-server
# sudo systemctl stop danted
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo $SCRIPT_DIR echo $SCRIPT_DIR
@ -38,9 +39,12 @@ sleep 1
ip tuntap add name tun0 mode tun ip tuntap add name tun0 mode tun
ip link set tun0 up ip link set tun0 up
ip route add 10.0.0.4 dev tun0 ip route add 10.0.0.4 dev tun0
"$tun2proxy" --proxy socks5://10.0.0.3:1080 & "$tun2proxy" --proxy socks5://10.0.0.3:10800 &
# Run iperf client through tun2proxy # Run iperf client through tun2proxy
iperf3 -c 10.0.0.4 iperf3 -c 10.0.0.4
iperf3 -c 10.0.0.4 -R iperf3 -c 10.0.0.4 -R -P 10
# Clean up
# sudo sh -c "pkill tun2proxy; pkill iperf3; pkill danted; ip link del tun0; ip netns del test"