wintun_reader_thread

This commit is contained in:
ssrlive 2023-09-24 17:56:49 +08:00
parent 9cf3fd3f94
commit a686d630fb
2 changed files with 40 additions and 37 deletions

View file

@ -60,4 +60,4 @@ windows = { version = "0.51", features = [
"Win32_Networking_WinSock",
"Win32_Foundation",
] }
wintun = "0.3"
wintun = { git = "https://github.com/ssrlive/wintun.git", branch = "main" }

View file

@ -11,6 +11,7 @@ use std::{
os::windows::prelude::{FromRawHandle, IntoRawHandle, OpenOptionsExt},
rc::Rc,
sync::{Arc, Mutex},
thread::JoinHandle,
vec::Vec,
};
use windows::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
@ -37,11 +38,12 @@ pub(crate) fn pipe() -> io::Result<(NamedPipe, NamedPipe)> {
/// A virtual TUN (IP) interface.
pub struct WinTunInterface {
inner: Arc<wintun::Session>,
wintun_session: Arc<wintun::Session>,
mtu: usize,
medium: Medium,
pipe_server: Rc<RefCell<NamedPipe>>,
pipe_client: Arc<Mutex<NamedPipe>>,
wintun_reader_thread: Option<JoinHandle<()>>,
}
impl event::Source for WinTunInterface {
@ -81,21 +83,44 @@ impl WinTunInterface {
let session = adapter
.start_session(wintun::MAX_RING_CAPACITY)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let inner = Arc::new(session);
let wintun_session = Arc::new(session);
let (pipe_server, pipe_client) = pipe()?;
let pipe_client = Arc::new(Mutex::new(pipe_client));
// let inner = WinTunInterfaceDesc::new(name, medium)?;
// let mtu = inner.interface_mtu()?;
let mtu = 1500;
let mtu = adapter.get_mtu().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let reader_session = wintun_session.clone();
let pipe_client_clone = pipe_client.clone();
let reader_thread = std::thread::spawn(move || {
let block = || -> Result<(), Box<dyn std::error::Error>> {
loop {
let packet = reader_session.receive_blocking()?;
let bytes = packet.bytes();
let result = { pipe_client_clone.lock().unwrap().write(bytes) };
match result {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
log::trace!("reader_thread phy: tx failed due to WouldBlock")
}
Err(err) => log::error!("{}", err),
}
}
};
if let Err(err) = block() {
log::trace!("Reader {}", err);
}
});
Ok(WinTunInterface {
inner,
wintun_session,
mtu,
medium,
pipe_server: Rc::new(RefCell::new(pipe_server)),
pipe_client,
wintun_reader_thread: Some(reader_thread),
})
}
@ -112,10 +137,10 @@ impl WinTunInterface {
.read(&mut buffer)
{
Ok(len) => {
let write_pack = self.inner.allocate_send_packet(len as u16);
let write_pack = self.wintun_session.allocate_send_packet(len as u16);
if let Ok(mut write_pack) = write_pack {
write_pack.bytes_mut().copy_from_slice(&buffer[..len]);
self.inner.send_packet(write_pack);
self.wintun_session.send_packet(write_pack);
} else if let Err(err) = write_pack {
log::error!("phy: failed to allocate send packet: {}", err);
}
@ -129,9 +154,14 @@ impl WinTunInterface {
impl Drop for WinTunInterface {
fn drop(&mut self) {
if let Err(e) = self.inner.shutdown() {
if let Err(e) = self.wintun_session.shutdown() {
log::error!("phy: failed to shutdown interface: {}", e);
}
if let Some(thread) = self.wintun_reader_thread.take() {
if let Err(e) = thread.join() {
log::error!("phy: failed to join reader thread: {:?}", e);
}
}
}
}
@ -147,22 +177,6 @@ impl Device for WinTunInterface {
}
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
/*
let inner = self.inner.clone();
match inner.receive_blocking() {
Ok(read_pack) => Some((
RxToken {
buffer: read_pack.bytes().to_vec(),
},
TxToken { inner },
)),
Err(err) => {
log::error!("phy: failed to receive packet: {}", err);
None
}
}
*/
let mut buffer = vec![0; self.mtu];
match self.pipe_server.borrow_mut().read(&mut buffer[..]) {
Ok(size) => {
@ -212,17 +226,6 @@ impl phy::TxToken for TxToken {
let mut buffer = vec![0; len];
let result = f(&mut buffer);
/*
let inner = self.inner.clone();
let write_pack = inner.allocate_send_packet(len as u16);
if let Ok(mut write_pack) = write_pack {
write_pack.bytes_mut().copy_from_slice(&buffer[..]);
inner.send_packet(write_pack);
} else if let Err(err) = write_pack {
log::error!("phy: failed to allocate send packet: {}", err);
}
*/
match self.pipe_server.borrow_mut().write(&buffer[..]) {
Ok(_) => {}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {