pipe_client_cache

This commit is contained in:
ssrlive 2023-10-01 13:21:26 +08:00
parent d6e15e61d6
commit da24bffa70

View file

@ -59,6 +59,7 @@ pub struct WinTunInterface {
pipe_server: Rc<RefCell<NamedPipe>>, pipe_server: Rc<RefCell<NamedPipe>>,
pipe_server_cache: Rc<RefCell<Vec<u8>>>, pipe_server_cache: Rc<RefCell<Vec<u8>>>,
pipe_client: Arc<Mutex<NamedPipe>>, pipe_client: Arc<Mutex<NamedPipe>>,
pipe_client_cache: Arc<Mutex<Vec<u8>>>,
wintun_reader_thread: Option<JoinHandle<()>>, wintun_reader_thread: Option<JoinHandle<()>>,
old_gateway: Option<IpAddr>, old_gateway: Option<IpAddr>,
} }
@ -98,26 +99,42 @@ impl WinTunInterface {
let (pipe_server, pipe_client) = pipe()?; let (pipe_server, pipe_client) = pipe()?;
let pipe_client = Arc::new(Mutex::new(pipe_client)); let pipe_client = Arc::new(Mutex::new(pipe_client));
let pipe_client_cache = Arc::new(Mutex::new(Vec::new()));
let mtu = adapter.get_mtu().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; let mtu = adapter.get_mtu().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let reader_session = wintun_session.clone(); let reader_session = wintun_session.clone();
let pipe_client_clone = pipe_client.clone(); let pipe_client_clone = pipe_client.clone();
let pipe_client_cache_clone = pipe_client_cache.clone();
let reader_thread = std::thread::spawn(move || { let reader_thread = std::thread::spawn(move || {
let block = || -> Result<(), Box<dyn std::error::Error>> { let block = || -> Result<(), Box<dyn std::error::Error>> {
loop { loop {
// read data from tunnel interface // read data from tunnel interface
let packet = reader_session.receive_blocking()?; let packet = reader_session.receive_blocking()?;
let bytes = packet.bytes(); let bytes = packet.bytes().to_vec();
// Take the old data from pipe_client_cache and append the new data
let old_data = pipe_client_cache_clone.lock()?.drain(..).collect::<Vec<u8>>();
let bytes = old_data.into_iter().chain(bytes.into_iter()).collect::<Vec<u8>>();
if bytes.is_empty() {
continue;
}
let len = bytes.len();
// write data to named pipe_server // write data to named pipe_server
let result = { pipe_client_clone.lock()?.write(bytes) }; let result = { pipe_client_clone.lock()?.write(&bytes) };
match result { match result {
Ok(_) => {} Ok(n) => {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => { if n < len {
log::trace!("Wintun pipe_client write data len {} WouldBlock", bytes.len()) log::trace!("Wintun pipe_client write data {} less than buffer {}", n, len);
pipe_client_cache_clone.lock()?.extend_from_slice(&bytes[n..]);
} }
Err(err) => log::error!("Wintun pipe_client write data len {} error \"{}\"", bytes.len(), err), }
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
log::trace!("Wintun pipe_client write data len {} WouldBlock", len);
pipe_client_cache_clone.lock()?.extend_from_slice(&bytes);
}
Err(err) => log::error!("Wintun pipe_client write data len {} error \"{}\"", len, err),
} }
} }
}; };
@ -133,6 +150,7 @@ impl WinTunInterface {
pipe_server: Rc::new(RefCell::new(pipe_server)), pipe_server: Rc::new(RefCell::new(pipe_server)),
pipe_server_cache: Rc::new(RefCell::new(Vec::new())), pipe_server_cache: Rc::new(RefCell::new(Vec::new())),
pipe_client, pipe_client,
pipe_client_cache,
wintun_reader_thread: Some(reader_thread), wintun_reader_thread: Some(reader_thread),
old_gateway: None, old_gateway: None,
}) })
@ -180,6 +198,40 @@ impl WinTunInterface {
} }
fn pipe_client_event_writable(&self) -> Result<(), io::Error> { fn pipe_client_event_writable(&self) -> Result<(), io::Error> {
let cache = self
.pipe_client_cache
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.drain(..)
.collect::<Vec<u8>>();
if cache.is_empty() {
return Ok(());
}
let result = self
.pipe_client
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.write(&cache[..]);
match result {
Ok(len) => {
let len0 = cache.len();
if len < len0 {
log::trace!("Wintun pipe_client write data {} less than buffer {}", len, len0);
self.pipe_client_cache
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.extend_from_slice(&cache[len..]);
}
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
log::trace!("Wintun pipe_client write data len {} WouldBlock", cache.len());
self.pipe_client_cache
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?
.extend_from_slice(&cache);
}
Err(err) => log::error!("Wintun pipe_client write data len {} error \"{}\"", cache.len(), err),
}
Ok(()) Ok(())
} }