From 57728ff1053c415d45599b1386625255f5be914c Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Sun, 1 Oct 2023 11:57:50 +0800 Subject: [PATCH] pipe_server_cache --- src/tun2proxy.rs | 19 ++++++++++------- src/wintuninterface.rs | 46 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/src/tun2proxy.rs b/src/tun2proxy.rs index 6183123..d9b99ee 100644 --- a/src/tun2proxy.rs +++ b/src/tun2proxy.rs @@ -267,9 +267,10 @@ impl<'a> TunToProxy<'a> { #[cfg(target_os = "windows")] { - poll.registry().register(&mut tun, TUN_TOKEN, Interest::READABLE)?; + let interest = Interest::READABLE | Interest::WRITABLE; + poll.registry().register(&mut tun, TUN_TOKEN, interest)?; let mut pipe = NamedPipeSource(tun.pipe_client()); - poll.registry().register(&mut pipe, PIPE_TOKEN, Interest::READABLE)?; + poll.registry().register(&mut pipe, PIPE_TOKEN, interest)?; } #[cfg(target_family = "unix")] @@ -937,14 +938,18 @@ impl<'a> TunToProxy<'a> { rx_token.consume(|frame| self.receive_tun(frame))?; } } + if event.is_writable() { + log::trace!("tun send"); + let tx_token = self.tun.transmit(Instant::now()).ok_or("tx token not available")?; + // Just consume the cached packets, do nothing else. + tx_token.consume(0, |_buf| {}); + } Ok(()) } - fn pipe_event(&mut self, event: &Event) -> Result<(), Error> { - if event.is_readable() { - #[cfg(target_os = "windows")] - self.tun.pipe_client_event()?; - } + fn pipe_event(&mut self, _event: &Event) -> Result<(), Error> { + #[cfg(target_os = "windows")] + self.tun.pipe_client_event(_event)?; Ok(()) } diff --git a/src/wintuninterface.rs b/src/wintuninterface.rs index 512f167..fa67cb8 100644 --- a/src/wintuninterface.rs +++ b/src/wintuninterface.rs @@ -57,6 +57,7 @@ pub struct WinTunInterface { mtu: usize, medium: Medium, pipe_server: Rc>, + pipe_server_cache: Rc>>, pipe_client: Arc>, wintun_reader_thread: Option>, old_gateway: Option, @@ -80,12 +81,12 @@ impl event::Source for WinTunInterface { } impl WinTunInterface { - pub fn new(name: &str, medium: Medium) -> io::Result { + pub fn new(tun_name: &str, medium: Medium) -> io::Result { let wintun = unsafe { wintun::load() }.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let tun_name = name; + let guid = 324435345345345345_u128; let adapter = match wintun::Adapter::open(&wintun, tun_name) { Ok(a) => a, - Err(_) => wintun::Adapter::create(&wintun, tun_name, tun_name, None) + Err(_) => wintun::Adapter::create(&wintun, tun_name, tun_name, Some(guid)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?, }; @@ -130,6 +131,7 @@ impl WinTunInterface { mtu, medium, pipe_server: Rc::new(RefCell::new(pipe_server)), + pipe_server_cache: Rc::new(RefCell::new(Vec::new())), pipe_client, wintun_reader_thread: Some(reader_thread), old_gateway: None, @@ -140,7 +142,17 @@ impl WinTunInterface { self.pipe_client.clone() } - pub fn pipe_client_event(&self) -> Result<(), io::Error> { + pub fn pipe_client_event(&self, event: &event::Event) -> Result<(), io::Error> { + if event.is_readable() { + self.pipe_client_event_readable() + } else if event.is_writable() { + self.pipe_client_event_writable() + } else { + Ok(()) + } + } + + fn pipe_client_event_readable(&self) -> Result<(), io::Error> { let mut reader = self .pipe_client .lock() @@ -167,6 +179,10 @@ impl WinTunInterface { Ok(()) } + fn pipe_client_event_writable(&self) -> Result<(), io::Error> { + Ok(()) + } + pub fn setup_config(&mut self, bypass_ip: Option, dns_addr: Option) -> Result<(), io::Error> { let adapter = self.wintun_session.get_adapter(); @@ -268,6 +284,7 @@ impl Device for WinTunInterface { let rx = RxToken { buffer }; let tx = TxToken { pipe_server: self.pipe_server.clone(), + pipe_server_cache: self.pipe_server_cache.clone(), }; Some((rx, tx)) } @@ -279,6 +296,7 @@ impl Device for WinTunInterface { fn transmit(&mut self, _timestamp: Instant) -> Option> { Some(TxToken { pipe_server: self.pipe_server.clone(), + pipe_server_cache: self.pipe_server_cache.clone(), }) } } @@ -300,6 +318,7 @@ impl phy::RxToken for RxToken { #[doc(hidden)] pub struct TxToken { pipe_server: Rc>, + pipe_server_cache: Rc>>, } impl phy::TxToken for TxToken { @@ -310,9 +329,26 @@ impl phy::TxToken for TxToken { let mut buffer = vec![0; len]; let result = f(&mut buffer); + let buffer = self + .pipe_server_cache + .borrow_mut() + .drain(..) + .chain(buffer.into_iter()) + .collect::>(); + if buffer.is_empty() { + return result; + } + match self.pipe_server.borrow_mut().write(&buffer[..]) { - Ok(_) => {} + Ok(len) => { + let len0 = buffer.len(); + if len < len0 { + log::trace!("Wintun TxToken consumed data len {} less than buffer len {}", len, len0); + self.pipe_server_cache.borrow_mut().extend_from_slice(&buffer[len..]); + } + } Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + self.pipe_server_cache.borrow_mut().extend_from_slice(&buffer[..]); log::trace!("Wintun TxToken: WouldBlock data len: {}", len) } Err(err) => log::error!("Wintun TxToken data len {} error \"{}\"", len, err),