From 1789259f6fdb305801260ea87e47512ab6a37e47 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Wed, 5 Jun 2024 19:51:46 +0800 Subject: [PATCH] Implementation of traffic status callback --- Cargo.toml | 1 + cbindgen.toml | 2 + src/bin/main.rs | 6 +++ src/lib.rs | 42 ++++++++++++++++++++- src/traffic_status.rs | 87 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 src/traffic_status.rs diff --git a/Cargo.toml b/Cargo.toml index 2f18fe1..7e3ad69 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ env_logger = "0.11" hashlink = "0.9" httparse = "1.8" ipstack = { version = "0.0" } +lazy_static = "1" log = { version = "0.4", features = ["std"] } percent-encoding = "2" socks5-impl = { version = "0.5" } diff --git a/cbindgen.toml b/cbindgen.toml index 015b262..bed1b79 100644 --- a/cbindgen.toml +++ b/cbindgen.toml @@ -5,6 +5,7 @@ include = [ "tun2proxy_with_name_stop", "tun2proxy_with_fd_stop", "tun2proxy_set_log_callback", + "tun2proxy_set_traffic_status_callback", ] exclude = [ "Java_com_github_shadowsocks_bg_Tun2proxy_run", @@ -14,6 +15,7 @@ exclude = [ [export.rename] "ArgVerbosity" = "Tun2proxyVerbosity" "ArgDns" = "Tun2proxyDns" +"TrafficStatus" = "Tun2proxyTrafficStatus" [enum] prefix_with_name = true diff --git a/src/bin/main.rs b/src/bin/main.rs index 9360b54..8ae508f 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -21,6 +21,12 @@ async fn main() -> Result<(), BoxError> { return; } + unsafe extern "C" fn traffic_cb(status: *const tun2proxy::TrafficStatus, _: *mut std::ffi::c_void) { + let status = &*status; + log::debug!("Traffic: ▲ {} : ▼ {}", status.tx, status.rx); + } + unsafe { tun2proxy::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) }; + if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { log::error!("main loop error: {}", err); } diff --git a/src/lib.rs b/src/lib.rs index accc745..406ad0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ use udp_stream::UdpStream; pub use { args::{ArgDns, ArgProxy, ArgVerbosity, Args, ProxyType}, error::{BoxError, Error, Result}, + traffic_status::{tun2proxy_set_traffic_status_callback, TrafficStatus}, }; #[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))] @@ -53,6 +54,7 @@ mod proxy_handler; mod session_info; pub mod socket_transfer; mod socks; +mod traffic_status; mod virtual_dns; const DNS_PORT: u16 = 53; @@ -354,6 +356,29 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc(reader: &mut R, writer: &mut W, is_tx: bool) -> tokio::io::Result +where + R: tokio::io::AsyncRead + Unpin + ?Sized, + W: tokio::io::AsyncWrite + Unpin + ?Sized, +{ + let mut buf = vec![0; 8192]; + let mut total = 0; + loop { + match reader.read(&mut buf).await? { + 0 => break, // EOF + n => { + total += n as u64; + let (tx, rx) = if is_tx { (n, 0) } else { (0, n) }; + if let Err(e) = crate::traffic_status::traffic_status_update(tx, rx) { + log::debug!("Record traffic status error: {}", e); + } + writer.write_all(&buf[..n]).await?; + } + } + } + Ok(total) +} + async fn handle_tcp_session( mut tcp_stack: IpStackTcpStream, proxy_handler: Arc>, @@ -379,14 +404,14 @@ async fn handle_tcp_session( let res = tokio::join!( async move { - let r = tokio::io::copy(&mut t_rx, &mut s_tx).await; + let r = copy_and_record_traffic(&mut t_rx, &mut s_tx, true).await; if let Err(err) = s_tx.shutdown().await { log::trace!("{} s_tx shutdown error {}", session_info, err); } r }, async move { - let r = tokio::io::copy(&mut s_rx, &mut t_tx).await; + let r = copy_and_record_traffic(&mut s_rx, &mut t_tx, false).await; if let Err(err) = t_tx.shutdown().await { log::trace!("{} t_tx shutdown error {}", session_info, err); } @@ -443,6 +468,8 @@ async fn handle_udp_associate_session( } let buf1 = &buf1[..len]; + crate::traffic_status::traffic_status_update(len, 0)?; + if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type { let s5addr = if let Some(domain_name) = &domain_name { Address::DomainAddress(domain_name.clone(), session_info.dst.port()) @@ -467,6 +494,8 @@ async fn handle_udp_associate_session( } let buf2 = &buf2[..len]; + crate::traffic_status::traffic_status_update(0, len)?; + if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type { // Remove SOCKS5 UDP header from the server data let header = UdpHeader::retrieve_from_stream(&mut &buf2[..])?; @@ -533,6 +562,8 @@ async fn handle_dns_over_tcp_session( buf.extend_from_slice(buf1); server.write_all(&buf).await?; + + crate::traffic_status::traffic_status_update(buf.len(), 0)?; } len = server.read(&mut buf2) => { let len = len?; @@ -541,6 +572,8 @@ async fn handle_dns_over_tcp_session( } let mut buf = buf2[..len].to_vec(); + crate::traffic_status::traffic_status_update(0, len)?; + let mut to_send: VecDeque> = VecDeque::new(); loop { if buf.len() < 2 { @@ -590,6 +623,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc 0 { server.write_all(data).await?; proxy_handler.consume_data(dir, len); + tx += len; } } + crate::traffic_status::traffic_status_update(tx, rx)?; Ok(proxy_handler.get_udp_associate()) } diff --git a/src/traffic_status.rs b/src/traffic_status.rs new file mode 100644 index 0000000..ef38d91 --- /dev/null +++ b/src/traffic_status.rs @@ -0,0 +1,87 @@ +use crate::error::{Error, Result}; +use std::os::raw::c_void; + +/// # Safety +/// +/// set traffic status callback. +#[no_mangle] +pub unsafe extern "C" fn tun2proxy_set_traffic_status_callback( + send_interval_secs: u32, + callback: Option, + ctx: *mut c_void, +) { + if let Ok(mut cb) = TRAFFIC_STATUS_CALLBACK.lock() { + *cb = Some(TrafficStatusCallback(callback, ctx)); + } else { + log::error!("set traffic status callback failed"); + } + if send_interval_secs > 0 { + SEND_INTERVAL_SECS.store(send_interval_secs as u64, std::sync::atomic::Ordering::Relaxed); + } +} + +#[repr(C)] +#[derive(Debug, Default, Copy, Clone)] +pub struct TrafficStatus { + pub tx: u64, + pub rx: u64, +} + +#[derive(Clone)] +struct TrafficStatusCallback(Option, *mut c_void); + +impl TrafficStatusCallback { + unsafe fn call(self, info: &TrafficStatus) { + if let Some(cb) = self.0 { + cb(info, self.1); + } + } +} + +unsafe impl Send for TrafficStatusCallback {} +unsafe impl Sync for TrafficStatusCallback {} + +static TRAFFIC_STATUS_CALLBACK: std::sync::Mutex> = std::sync::Mutex::new(None); +static SEND_INTERVAL_SECS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1); + +lazy_static::lazy_static! { + static ref TRAFFIC_STATUS: std::sync::Mutex = std::sync::Mutex::new(TrafficStatus::default()); + static ref TIME_STAMP: std::sync::Mutex = std::sync::Mutex::new(std::time::Instant::now()); +} + +pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> { + { + let is_none_or_error = TRAFFIC_STATUS_CALLBACK.lock().map(|guard| guard.is_none()).unwrap_or_else(|e| { + log::error!("Failed to acquire lock: {}", e); + true + }); + if is_none_or_error { + return Ok(()); + } + } + let traffic_status = { + let mut traffic_status = TRAFFIC_STATUS.lock().map_err(|e| Error::from(e.to_string()))?; + traffic_status.tx += delta_tx as u64; + traffic_status.rx += delta_rx as u64; + *traffic_status + }; + let old_time = { *TIME_STAMP.lock().map_err(|e| Error::from(e.to_string()))? }; + let interval_secs = SEND_INTERVAL_SECS.load(std::sync::atomic::Ordering::Relaxed); + if std::time::Instant::now().duration_since(old_time).as_secs() >= interval_secs { + send_traffic_stat(&traffic_status)?; + { + let mut time_stamp = TIME_STAMP.lock().map_err(|e| Error::from(e.to_string()))?; + *time_stamp = std::time::Instant::now(); + } + } + Ok(()) +} + +fn send_traffic_stat(traffic_status: &TrafficStatus) -> Result<()> { + if let Ok(cb) = TRAFFIC_STATUS_CALLBACK.lock() { + if let Some(cb) = cb.clone() { + unsafe { cb.call(traffic_status) }; + } + } + Ok(()) +}