traffic status logic

This commit is contained in:
ssrlive 2024-06-11 19:03:07 +08:00
commit 3b5f526728
5 changed files with 136 additions and 2 deletions

View file

@ -25,6 +25,7 @@ env_logger = "0.11"
hashlink = "0.9"
httparse = "1.8"
ipstack = { version = "0.0" }
lazy_static = "1"
log = { version = "0.4", features = ["std"] }
percent-encoding = "2"
socks5-impl = { version = "0.5" }

View file

@ -5,6 +5,7 @@ include = [
"tun2proxy_with_name_stop",
"tun2proxy_with_fd_stop",
"tun2proxy_set_log_callback",
"tun2proxy_set_traffic_status_callback",
]
exclude = [
"Java_com_github_shadowsocks_bg_Tun2proxy_run",
@ -14,6 +15,7 @@ exclude = [
[export.rename]
"ArgVerbosity" = "Tun2proxyVerbosity"
"ArgDns" = "Tun2proxyDns"
"TrafficStatus" = "Tun2proxyTrafficStatus"
[enum]
prefix_with_name = true

View file

@ -21,6 +21,12 @@ async fn main() -> Result<(), BoxError> {
return;
}
unsafe extern "C" fn traffic_cb(status: *const tun2proxy::TrafficStatus, _: *mut std::ffi::c_void) {
let status = &*status;
log::debug!("Traffic: ▲ {} : ▼ {}", status.tx, status.rx);
}
unsafe { tun2proxy::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) };
if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await {
log::error!("main loop error: {}", err);
}

View file

@ -27,6 +27,7 @@ use udp_stream::UdpStream;
pub use {
args::{ArgDns, ArgProxy, ArgVerbosity, Args, ProxyType},
error::{BoxError, Error, Result},
traffic_status::{tun2proxy_set_traffic_status_callback, TrafficStatus},
};
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
@ -53,6 +54,7 @@ mod proxy_handler;
mod session_info;
pub mod socket_transfer;
mod socks;
mod traffic_status;
mod virtual_dns;
const DNS_PORT: u16 = 53;
@ -354,6 +356,29 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc<Mutex<Vi
Ok(())
}
async fn copy_and_record_traffic<R, W>(reader: &mut R, writer: &mut W, is_tx: bool) -> tokio::io::Result<u64>
where
R: tokio::io::AsyncRead + Unpin + ?Sized,
W: tokio::io::AsyncWrite + Unpin + ?Sized,
{
let mut buf = vec![0; 8192];
let mut total = 0;
loop {
match reader.read(&mut buf).await? {
0 => break, // EOF
n => {
total += n as u64;
let (tx, rx) = if is_tx { (n, 0) } else { (0, n) };
if let Err(e) = crate::traffic_status::traffic_status_update(tx, rx) {
log::debug!("Record traffic status error: {}", e);
}
writer.write_all(&buf[..n]).await?;
}
}
}
Ok(total)
}
async fn handle_tcp_session(
mut tcp_stack: IpStackTcpStream,
proxy_handler: Arc<Mutex<dyn ProxyHandler>>,
@ -379,14 +404,14 @@ async fn handle_tcp_session(
let res = tokio::join!(
async move {
let r = tokio::io::copy(&mut t_rx, &mut s_tx).await;
let r = copy_and_record_traffic(&mut t_rx, &mut s_tx, true).await;
if let Err(err) = s_tx.shutdown().await {
log::trace!("{} s_tx shutdown error {}", session_info, err);
}
r
},
async move {
let r = tokio::io::copy(&mut s_rx, &mut t_tx).await;
let r = copy_and_record_traffic(&mut s_rx, &mut t_tx, false).await;
if let Err(err) = t_tx.shutdown().await {
log::trace!("{} t_tx shutdown error {}", session_info, err);
}
@ -443,6 +468,8 @@ async fn handle_udp_associate_session(
}
let buf1 = &buf1[..len];
crate::traffic_status::traffic_status_update(len, 0)?;
if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type {
let s5addr = if let Some(domain_name) = &domain_name {
Address::DomainAddress(domain_name.clone(), session_info.dst.port())
@ -467,6 +494,8 @@ async fn handle_udp_associate_session(
}
let buf2 = &buf2[..len];
crate::traffic_status::traffic_status_update(0, len)?;
if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type {
// Remove SOCKS5 UDP header from the server data
let header = UdpHeader::retrieve_from_stream(&mut &buf2[..])?;
@ -533,6 +562,8 @@ async fn handle_dns_over_tcp_session(
buf.extend_from_slice(buf1);
server.write_all(&buf).await?;
crate::traffic_status::traffic_status_update(buf.len(), 0)?;
}
len = server.read(&mut buf2) => {
let len = len?;
@ -541,6 +572,8 @@ async fn handle_dns_over_tcp_session(
}
let mut buf = buf2[..len].to_vec();
crate::traffic_status::traffic_status_update(0, len)?;
let mut to_send: VecDeque<Vec<u8>> = VecDeque::new();
loop {
if buf.len() < 2 {
@ -590,6 +623,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
let mut launched = false;
let mut proxy_handler = proxy_handler.lock().await;
let dir = OutgoingDirection::ToServer;
let (mut tx, mut rx) = (0, 0);
loop {
if proxy_handler.connection_established() {
@ -604,6 +638,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
}
server.write_all(data).await?;
proxy_handler.consume_data(dir, len);
tx += len;
launched = true;
}
@ -613,6 +648,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
if len == 0 {
return Err("server closed accidentially".into());
}
rx += len;
let event = IncomingDataEvent {
direction: IncomingDirection::FromServer,
buffer: &buf[..len],
@ -624,7 +660,9 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
if len > 0 {
server.write_all(data).await?;
proxy_handler.consume_data(dir, len);
tx += len;
}
}
crate::traffic_status::traffic_status_update(tx, rx)?;
Ok(proxy_handler.get_udp_associate())
}

87
src/traffic_status.rs Normal file
View file

@ -0,0 +1,87 @@
use crate::error::{Error, Result};
use std::os::raw::c_void;
/// # Safety
///
/// set traffic status callback.
#[no_mangle]
pub unsafe extern "C" fn tun2proxy_set_traffic_status_callback(
send_interval_secs: u32,
callback: Option<unsafe extern "C" fn(*const TrafficStatus, *mut c_void)>,
ctx: *mut c_void,
) {
if let Ok(mut cb) = TRAFFIC_STATUS_CALLBACK.lock() {
*cb = Some(TrafficStatusCallback(callback, ctx));
} else {
log::error!("set traffic status callback failed");
}
if send_interval_secs > 0 {
SEND_INTERVAL_SECS.store(send_interval_secs as u64, std::sync::atomic::Ordering::Relaxed);
}
}
#[repr(C)]
#[derive(Debug, Default, Copy, Clone)]
pub struct TrafficStatus {
pub tx: u64,
pub rx: u64,
}
#[derive(Clone)]
struct TrafficStatusCallback(Option<unsafe extern "C" fn(*const TrafficStatus, *mut c_void)>, *mut c_void);
impl TrafficStatusCallback {
unsafe fn call(self, info: &TrafficStatus) {
if let Some(cb) = self.0 {
cb(info, self.1);
}
}
}
unsafe impl Send for TrafficStatusCallback {}
unsafe impl Sync for TrafficStatusCallback {}
static TRAFFIC_STATUS_CALLBACK: std::sync::Mutex<Option<TrafficStatusCallback>> = std::sync::Mutex::new(None);
static SEND_INTERVAL_SECS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
lazy_static::lazy_static! {
static ref TRAFFIC_STATUS: std::sync::Mutex<TrafficStatus> = std::sync::Mutex::new(TrafficStatus::default());
static ref TIME_STAMP: std::sync::Mutex<std::time::Instant> = std::sync::Mutex::new(std::time::Instant::now());
}
pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> {
{
let is_none_or_error = TRAFFIC_STATUS_CALLBACK.lock().map(|guard| guard.is_none()).unwrap_or_else(|e| {
log::error!("Failed to acquire lock: {}", e);
true
});
if is_none_or_error {
return Ok(());
}
}
let traffic_status = {
let mut traffic_status = TRAFFIC_STATUS.lock().map_err(|e| Error::from(e.to_string()))?;
traffic_status.tx += delta_tx as u64;
traffic_status.rx += delta_rx as u64;
*traffic_status
};
let old_time = { *TIME_STAMP.lock().map_err(|e| Error::from(e.to_string()))? };
let interval_secs = SEND_INTERVAL_SECS.load(std::sync::atomic::Ordering::Relaxed);
if std::time::Instant::now().duration_since(old_time).as_secs() >= interval_secs {
send_traffic_stat(&traffic_status)?;
{
let mut time_stamp = TIME_STAMP.lock().map_err(|e| Error::from(e.to_string()))?;
*time_stamp = std::time::Instant::now();
}
}
Ok(())
}
fn send_traffic_stat(traffic_status: &TrafficStatus) -> Result<()> {
if let Ok(cb) = TRAFFIC_STATUS_CALLBACK.lock() {
if let Some(cb) = cb.clone() {
unsafe { cb.call(traffic_status) };
}
}
Ok(())
}