diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index c5c18d9..9218cb9 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -567,23 +567,11 @@ impl<'a> TunToProxy<'a> { assert!(state.dns_over_tcp_expiry.is_some()); state.dns_over_tcp_expiry = Some(Self::common_udp_life_timeout()); - // Code similar to the code in parent function. TODO: Cleanup. - let mut vecbuf = Vec::::new(); - let read_result = state.mio_stream.read_to_end(&mut vecbuf); - let read = match read_result { - Ok(read_result) => read_result, - Err(error) => { - if error.kind() != std::io::ErrorKind::WouldBlock { - log::error!("{} Read from proxy: {}", info.dst, error); - } - vecbuf.len() - } - }; + let vecbuf = Self::read_data_from_tcp_stream(&mut state.mio_stream, info)?; - let data = vecbuf.as_slice(); let data_event = IncomingDataEvent { direction: IncomingDirection::FromServer, - buffer: &data[0..read], + buffer: &vecbuf, }; if let Err(error) = state.proxy_handler.push_data(data_event) { log::error!("{}", error); @@ -881,14 +869,13 @@ impl<'a> TunToProxy<'a> { state.wait_write = written < buffer_size; Self::update_mio_socket_interest(&mut self.poll, state)?; } - Err(error) if error.kind() != std::io::ErrorKind::WouldBlock => { - return Err(error.into()); - } - _ => { - // WOULDBLOCK case + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { state.wait_write = true; Self::update_mio_socket_interest(&mut self.poll, state)?; } + Err(error) => { + return Err(error.into()); + } } } self.check_change_close_state(info)?; @@ -1042,22 +1029,11 @@ impl<'a> TunToProxy<'a> { let state = self.connection_map.get_mut(&conn_info).ok_or(e)?; // TODO: Move this reading process to its own function. - let mut vecbuf = Vec::::new(); - let read_result = state.mio_stream.read_to_end(&mut vecbuf); - let read = match read_result { - Ok(read_result) => read_result, - Err(error) => { - if error.kind() != std::io::ErrorKind::WouldBlock { - log::error!("{} Read from proxy: {}", conn_info.dst, error); - } - vecbuf.len() - } - }; + let vecbuf = Self::read_data_from_tcp_stream(&mut state.mio_stream, &conn_info)?; - let data = vecbuf.as_slice(); let data_event = IncomingDataEvent { direction: IncomingDirection::FromServer, - buffer: &data[0..read], + buffer: &vecbuf, }; if let Err(error) = state.proxy_handler.push_data(data_event) { log::error!("{}", error); @@ -1087,7 +1063,7 @@ impl<'a> TunToProxy<'a> { return Ok(()); } - if read == 0 || event.is_read_closed() { + if vecbuf.len() == 0 || event.is_read_closed() { state.wait_read = false; state.close_state |= SERVER_WRITE_CLOSED; Self::update_mio_socket_interest(&mut self.poll, state)?; @@ -1119,6 +1095,34 @@ impl<'a> TunToProxy<'a> { Ok(()) } + fn read_data_from_tcp_stream(stream: &mut TcpStream, conn_info: &ConnectionInfo) -> Result> { + let mut vecbuf = Vec::::new(); + loop { + let mut tmp: [u8; 4096] = [0_u8; 4096]; + match stream.read(&mut tmp) { + Ok(0) => { + log::info!("{} closed", conn_info); + break; + } + Ok(read_result) => { + vecbuf.extend_from_slice(&tmp[0..read_result]); + } + Err(error) => { + if error.kind() == std::io::ErrorKind::WouldBlock { + // We have read all available data. + break; + } else if error.kind() == std::io::ErrorKind::Interrupted { + // Hardware or software interrupt, continue polling. + continue; + } else { + return Err(error.into()); + } + } + }; + } + Ok(vecbuf) + } + #[cfg(any(target_os = "linux", target_os = "macos"))] fn prepare_exiting_signal_trigger(&mut self) -> Result<()> { let mut exit_trigger = self.exit_trigger.take().ok_or("Already running")?;