read_server_n_write_proxy_handler

This commit is contained in:
ssrlive 2023-11-09 21:25:17 +08:00
parent e0de669598
commit 4ffa610b49

View file

@ -927,7 +927,7 @@ impl<'a> TunToProxy<'a> {
Ok(())
}
fn write_to_client(&mut self, token: Token, info: &ConnectionInfo) -> Result<(), Error> {
fn write_to_client(&mut self, info: &ConnectionInfo) -> Result<(), Error> {
while let Some(state) = self.connection_map.get_mut(info) {
let event = state.proxy_handler.peek_data(OutgoingDirection::ToClient);
let buflen = event.buffer.len();
@ -941,6 +941,7 @@ impl<'a> TunToProxy<'a> {
}
consumed = socket.send_slice(event.buffer)?;
state.proxy_handler.consume_data(OutgoingDirection::ToClient, consumed);
let token = state.token;
self.expect_smoltcp_send()?;
if consumed < buflen {
self.write_sockets.insert(token);
@ -987,7 +988,7 @@ impl<'a> TunToProxy<'a> {
for token in self.write_sockets.clone().into_iter() {
if let Some(connection) = self.find_info_by_token(token) {
let connection = connection.clone();
if let Err(error) = self.write_to_client(token, &connection) {
if let Err(error) = self.write_to_client(&connection) {
log::error!("Write to client {}", error);
self.remove_connection(&connection)?;
}
@ -1047,6 +1048,36 @@ impl<'a> TunToProxy<'a> {
Ok(())
}
fn read_server_n_write_proxy_handler(&mut self, conn_info: &ConnectionInfo) -> Result<(bool, usize), Error> {
let e = "connection state not found";
let state = self.connection_map.get_mut(conn_info).ok_or(e)?;
let mut vecbuf = vec![];
let r = Self::read_data_from_tcp_stream(&mut state.mio_stream, &mut state.is_tcp_closed, |data| {
vecbuf.extend_from_slice(data);
Ok(())
});
let len = vecbuf.len();
if let Err(error) = r {
{
log::error!("{}", error);
self.remove_connection(conn_info)?;
return Ok((false, len));
}
}
let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &vecbuf,
};
if let Err(error) = state.proxy_handler.push_data(data_event) {
log::error!("{}", error);
self.remove_connection(conn_info)?;
return Ok((false, len));
}
Ok((true, len))
}
fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> {
if let Some(info) = self.find_info_by_udp_token(event.token()) {
return self.receive_udp_packet_and_write_to_client(&info.clone());
@ -1078,26 +1109,14 @@ impl<'a> TunToProxy<'a> {
self.receive_dns_over_tcp_packet_and_write_to_client(&conn_info)?;
return Ok(());
} else {
let e = "connection state not found";
let state = self.connection_map.get_mut(&conn_info).ok_or(e)?;
// TODO: Move this reading process to its own function.
let mut vecbuf = vec![];
Self::read_data_from_tcp_stream(&mut state.mio_stream, &mut state.is_tcp_closed, |data| {
vecbuf.extend_from_slice(data);
Ok(())
})?;
let data_event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &vecbuf,
};
if let Err(error) = state.proxy_handler.push_data(data_event) {
log::error!("{}", error);
self.remove_connection(&conn_info.clone())?;
let (success, len) = self.read_server_n_write_proxy_handler(&conn_info)?;
if !success {
return Ok(());
}
let e = "connection state not found";
let state = self.connection_map.get_mut(&conn_info).ok_or(e)?;
// The handler request for reset the server connection
if state.proxy_handler.reset_connection() {
if let Err(err) = self.poll.registry().deregister(&mut state.mio_stream) {
@ -1120,7 +1139,7 @@ impl<'a> TunToProxy<'a> {
return Ok(());
}
if vecbuf.is_empty() || event.is_read_closed() {
if len == 0 || event.is_read_closed() {
state.wait_read = false;
state.close_state |= SERVER_WRITE_CLOSED;
Self::update_mio_socket_interest(&mut self.poll, state)?;
@ -1131,7 +1150,7 @@ impl<'a> TunToProxy<'a> {
// We have read from the proxy server and pushed the data to the connection handler.
// Thus, expect data to be processed (e.g. decapsulated) and forwarded to the client.
self.write_to_client(event.token(), &conn_info)?;
self.write_to_client(&conn_info)?;
// The connection handler could have produced data that is to be written to the
// server.
@ -1156,7 +1175,7 @@ impl<'a> TunToProxy<'a> {
where
F: FnMut(&mut [u8]) -> std::io::Result<()>,
{
let mut tmp: [u8; 4096] = [0_u8; 4096];
let mut tmp = [0_u8; IP_PACKAGE_MAX_SIZE];
loop {
match stream.read(&mut tmp) {
Ok(0) => {
@ -1224,6 +1243,9 @@ impl<'a> TunToProxy<'a> {
}
break 'exit_point Err(Error::from(err));
}
log::info!("Polling events count {}", events.iter().count());
for event in events.iter() {
match event.token() {
EXIT_TOKEN => {
@ -1242,6 +1264,8 @@ impl<'a> TunToProxy<'a> {
self.send_to_smoltcp()?;
self.clearup_expired_connection()?;
self.clearup_expired_dns_over_tcp()?;
log::info!("connection count: {}", self.connection_map.len());
};
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
handle.join().unwrap();