mirror of
https://github.com/tun2proxy/tun2proxy.git
synced 2025-04-25 16:26:04 +00:00
read_data_from_tcp_stream
This commit is contained in:
parent
b5d8f0ee48
commit
d4568c4676
1 changed files with 37 additions and 33 deletions
|
@ -567,23 +567,11 @@ impl<'a> TunToProxy<'a> {
|
|||
assert!(state.dns_over_tcp_expiry.is_some());
|
||||
state.dns_over_tcp_expiry = Some(Self::common_udp_life_timeout());
|
||||
|
||||
// Code similar to the code in parent function. TODO: Cleanup.
|
||||
let mut vecbuf = Vec::<u8>::new();
|
||||
let read_result = state.mio_stream.read_to_end(&mut vecbuf);
|
||||
let read = match read_result {
|
||||
Ok(read_result) => read_result,
|
||||
Err(error) => {
|
||||
if error.kind() != std::io::ErrorKind::WouldBlock {
|
||||
log::error!("{} Read from proxy: {}", info.dst, error);
|
||||
}
|
||||
vecbuf.len()
|
||||
}
|
||||
};
|
||||
let vecbuf = Self::read_data_from_tcp_stream(&mut state.mio_stream, info)?;
|
||||
|
||||
let data = vecbuf.as_slice();
|
||||
let data_event = IncomingDataEvent {
|
||||
direction: IncomingDirection::FromServer,
|
||||
buffer: &data[0..read],
|
||||
buffer: &vecbuf,
|
||||
};
|
||||
if let Err(error) = state.proxy_handler.push_data(data_event) {
|
||||
log::error!("{}", error);
|
||||
|
@ -881,14 +869,13 @@ impl<'a> TunToProxy<'a> {
|
|||
state.wait_write = written < buffer_size;
|
||||
Self::update_mio_socket_interest(&mut self.poll, state)?;
|
||||
}
|
||||
Err(error) if error.kind() != std::io::ErrorKind::WouldBlock => {
|
||||
return Err(error.into());
|
||||
}
|
||||
_ => {
|
||||
// WOULDBLOCK case
|
||||
Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
state.wait_write = true;
|
||||
Self::update_mio_socket_interest(&mut self.poll, state)?;
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(error.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
self.check_change_close_state(info)?;
|
||||
|
@ -1042,22 +1029,11 @@ impl<'a> TunToProxy<'a> {
|
|||
let state = self.connection_map.get_mut(&conn_info).ok_or(e)?;
|
||||
|
||||
// TODO: Move this reading process to its own function.
|
||||
let mut vecbuf = Vec::<u8>::new();
|
||||
let read_result = state.mio_stream.read_to_end(&mut vecbuf);
|
||||
let read = match read_result {
|
||||
Ok(read_result) => read_result,
|
||||
Err(error) => {
|
||||
if error.kind() != std::io::ErrorKind::WouldBlock {
|
||||
log::error!("{} Read from proxy: {}", conn_info.dst, error);
|
||||
}
|
||||
vecbuf.len()
|
||||
}
|
||||
};
|
||||
let vecbuf = Self::read_data_from_tcp_stream(&mut state.mio_stream, &conn_info)?;
|
||||
|
||||
let data = vecbuf.as_slice();
|
||||
let data_event = IncomingDataEvent {
|
||||
direction: IncomingDirection::FromServer,
|
||||
buffer: &data[0..read],
|
||||
buffer: &vecbuf,
|
||||
};
|
||||
if let Err(error) = state.proxy_handler.push_data(data_event) {
|
||||
log::error!("{}", error);
|
||||
|
@ -1087,7 +1063,7 @@ impl<'a> TunToProxy<'a> {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
if read == 0 || event.is_read_closed() {
|
||||
if vecbuf.len() == 0 || event.is_read_closed() {
|
||||
state.wait_read = false;
|
||||
state.close_state |= SERVER_WRITE_CLOSED;
|
||||
Self::update_mio_socket_interest(&mut self.poll, state)?;
|
||||
|
@ -1119,6 +1095,34 @@ impl<'a> TunToProxy<'a> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn read_data_from_tcp_stream(stream: &mut TcpStream, conn_info: &ConnectionInfo) -> Result<Vec<u8>> {
|
||||
let mut vecbuf = Vec::<u8>::new();
|
||||
loop {
|
||||
let mut tmp: [u8; 4096] = [0_u8; 4096];
|
||||
match stream.read(&mut tmp) {
|
||||
Ok(0) => {
|
||||
log::info!("{} closed", conn_info);
|
||||
break;
|
||||
}
|
||||
Ok(read_result) => {
|
||||
vecbuf.extend_from_slice(&tmp[0..read_result]);
|
||||
}
|
||||
Err(error) => {
|
||||
if error.kind() == std::io::ErrorKind::WouldBlock {
|
||||
// We have read all available data.
|
||||
break;
|
||||
} else if error.kind() == std::io::ErrorKind::Interrupted {
|
||||
// Hardware or software interrupt, continue polling.
|
||||
continue;
|
||||
} else {
|
||||
return Err(error.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(vecbuf)
|
||||
}
|
||||
|
||||
#[cfg(any(target_os = "linux", target_os = "macos"))]
|
||||
fn prepare_exiting_signal_trigger(&mut self) -> Result<()> {
|
||||
let mut exit_trigger = self.exit_trigger.take().ok_or("Already running")?;
|
||||
|
|
Loading…
Add table
Reference in a new issue