From da24bffa7028b4a476180bd114b0f03f2fa74bd5 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Sun, 1 Oct 2023 13:21:26 +0800 Subject: [PATCH] pipe_client_cache --- src/wintuninterface.rs | 64 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 6 deletions(-) diff --git a/src/wintuninterface.rs b/src/wintuninterface.rs index fa67cb8..8a572a4 100644 --- a/src/wintuninterface.rs +++ b/src/wintuninterface.rs @@ -59,6 +59,7 @@ pub struct WinTunInterface { pipe_server: Rc>, pipe_server_cache: Rc>>, pipe_client: Arc>, + pipe_client_cache: Arc>>, wintun_reader_thread: Option>, old_gateway: Option, } @@ -98,26 +99,42 @@ impl WinTunInterface { let (pipe_server, pipe_client) = pipe()?; let pipe_client = Arc::new(Mutex::new(pipe_client)); + let pipe_client_cache = Arc::new(Mutex::new(Vec::new())); let mtu = adapter.get_mtu().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; let reader_session = wintun_session.clone(); let pipe_client_clone = pipe_client.clone(); + let pipe_client_cache_clone = pipe_client_cache.clone(); let reader_thread = std::thread::spawn(move || { let block = || -> Result<(), Box> { loop { // read data from tunnel interface let packet = reader_session.receive_blocking()?; - let bytes = packet.bytes(); + let bytes = packet.bytes().to_vec(); + + // Take the old data from pipe_client_cache and append the new data + let old_data = pipe_client_cache_clone.lock()?.drain(..).collect::>(); + let bytes = old_data.into_iter().chain(bytes.into_iter()).collect::>(); + if bytes.is_empty() { + continue; + } + let len = bytes.len(); // write data to named pipe_server - let result = { pipe_client_clone.lock()?.write(bytes) }; + let result = { pipe_client_clone.lock()?.write(&bytes) }; match result { - Ok(_) => {} - Err(err) if err.kind() == io::ErrorKind::WouldBlock => { - log::trace!("Wintun pipe_client write data len {} WouldBlock", bytes.len()) + Ok(n) => { + if n < len { + log::trace!("Wintun pipe_client write data {} less than buffer {}", n, len); + pipe_client_cache_clone.lock()?.extend_from_slice(&bytes[n..]); + } } - Err(err) => log::error!("Wintun pipe_client write data len {} error \"{}\"", bytes.len(), err), + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + log::trace!("Wintun pipe_client write data len {} WouldBlock", len); + pipe_client_cache_clone.lock()?.extend_from_slice(&bytes); + } + Err(err) => log::error!("Wintun pipe_client write data len {} error \"{}\"", len, err), } } }; @@ -133,6 +150,7 @@ impl WinTunInterface { pipe_server: Rc::new(RefCell::new(pipe_server)), pipe_server_cache: Rc::new(RefCell::new(Vec::new())), pipe_client, + pipe_client_cache, wintun_reader_thread: Some(reader_thread), old_gateway: None, }) @@ -180,6 +198,40 @@ impl WinTunInterface { } fn pipe_client_event_writable(&self) -> Result<(), io::Error> { + let cache = self + .pipe_client_cache + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .drain(..) + .collect::>(); + if cache.is_empty() { + return Ok(()); + } + let result = self + .pipe_client + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .write(&cache[..]); + match result { + Ok(len) => { + let len0 = cache.len(); + if len < len0 { + log::trace!("Wintun pipe_client write data {} less than buffer {}", len, len0); + self.pipe_client_cache + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .extend_from_slice(&cache[len..]); + } + } + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + log::trace!("Wintun pipe_client write data len {} WouldBlock", cache.len()); + self.pipe_client_cache + .lock() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))? + .extend_from_slice(&cache); + } + Err(err) => log::error!("Wintun pipe_client write data len {} error \"{}\"", cache.len(), err), + } Ok(()) }