mirror of
https://github.com/tun2proxy/tun2proxy.git
synced 2025-04-21 22:39:08 +00:00
Implementation of traffic status callback
This commit is contained in:
parent
4554d3bc55
commit
1789259f6f
5 changed files with 136 additions and 2 deletions
|
@ -25,6 +25,7 @@ env_logger = "0.11"
|
||||||
hashlink = "0.9"
|
hashlink = "0.9"
|
||||||
httparse = "1.8"
|
httparse = "1.8"
|
||||||
ipstack = { version = "0.0" }
|
ipstack = { version = "0.0" }
|
||||||
|
lazy_static = "1"
|
||||||
log = { version = "0.4", features = ["std"] }
|
log = { version = "0.4", features = ["std"] }
|
||||||
percent-encoding = "2"
|
percent-encoding = "2"
|
||||||
socks5-impl = { version = "0.5" }
|
socks5-impl = { version = "0.5" }
|
||||||
|
|
|
@ -5,6 +5,7 @@ include = [
|
||||||
"tun2proxy_with_name_stop",
|
"tun2proxy_with_name_stop",
|
||||||
"tun2proxy_with_fd_stop",
|
"tun2proxy_with_fd_stop",
|
||||||
"tun2proxy_set_log_callback",
|
"tun2proxy_set_log_callback",
|
||||||
|
"tun2proxy_set_traffic_status_callback",
|
||||||
]
|
]
|
||||||
exclude = [
|
exclude = [
|
||||||
"Java_com_github_shadowsocks_bg_Tun2proxy_run",
|
"Java_com_github_shadowsocks_bg_Tun2proxy_run",
|
||||||
|
@ -14,6 +15,7 @@ exclude = [
|
||||||
[export.rename]
|
[export.rename]
|
||||||
"ArgVerbosity" = "Tun2proxyVerbosity"
|
"ArgVerbosity" = "Tun2proxyVerbosity"
|
||||||
"ArgDns" = "Tun2proxyDns"
|
"ArgDns" = "Tun2proxyDns"
|
||||||
|
"TrafficStatus" = "Tun2proxyTrafficStatus"
|
||||||
|
|
||||||
[enum]
|
[enum]
|
||||||
prefix_with_name = true
|
prefix_with_name = true
|
||||||
|
|
|
@ -21,6 +21,12 @@ async fn main() -> Result<(), BoxError> {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
unsafe extern "C" fn traffic_cb(status: *const tun2proxy::TrafficStatus, _: *mut std::ffi::c_void) {
|
||||||
|
let status = &*status;
|
||||||
|
log::debug!("Traffic: ▲ {} : ▼ {}", status.tx, status.rx);
|
||||||
|
}
|
||||||
|
unsafe { tun2proxy::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) };
|
||||||
|
|
||||||
if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await {
|
if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await {
|
||||||
log::error!("main loop error: {}", err);
|
log::error!("main loop error: {}", err);
|
||||||
}
|
}
|
||||||
|
|
42
src/lib.rs
42
src/lib.rs
|
@ -27,6 +27,7 @@ use udp_stream::UdpStream;
|
||||||
pub use {
|
pub use {
|
||||||
args::{ArgDns, ArgProxy, ArgVerbosity, Args, ProxyType},
|
args::{ArgDns, ArgProxy, ArgVerbosity, Args, ProxyType},
|
||||||
error::{BoxError, Error, Result},
|
error::{BoxError, Error, Result},
|
||||||
|
traffic_status::{tun2proxy_set_traffic_status_callback, TrafficStatus},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
|
#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))]
|
||||||
|
@ -53,6 +54,7 @@ mod proxy_handler;
|
||||||
mod session_info;
|
mod session_info;
|
||||||
pub mod socket_transfer;
|
pub mod socket_transfer;
|
||||||
mod socks;
|
mod socks;
|
||||||
|
mod traffic_status;
|
||||||
mod virtual_dns;
|
mod virtual_dns;
|
||||||
|
|
||||||
const DNS_PORT: u16 = 53;
|
const DNS_PORT: u16 = 53;
|
||||||
|
@ -354,6 +356,29 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc<Mutex<Vi
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn copy_and_record_traffic<R, W>(reader: &mut R, writer: &mut W, is_tx: bool) -> tokio::io::Result<u64>
|
||||||
|
where
|
||||||
|
R: tokio::io::AsyncRead + Unpin + ?Sized,
|
||||||
|
W: tokio::io::AsyncWrite + Unpin + ?Sized,
|
||||||
|
{
|
||||||
|
let mut buf = vec![0; 8192];
|
||||||
|
let mut total = 0;
|
||||||
|
loop {
|
||||||
|
match reader.read(&mut buf).await? {
|
||||||
|
0 => break, // EOF
|
||||||
|
n => {
|
||||||
|
total += n as u64;
|
||||||
|
let (tx, rx) = if is_tx { (n, 0) } else { (0, n) };
|
||||||
|
if let Err(e) = crate::traffic_status::traffic_status_update(tx, rx) {
|
||||||
|
log::debug!("Record traffic status error: {}", e);
|
||||||
|
}
|
||||||
|
writer.write_all(&buf[..n]).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(total)
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_tcp_session(
|
async fn handle_tcp_session(
|
||||||
mut tcp_stack: IpStackTcpStream,
|
mut tcp_stack: IpStackTcpStream,
|
||||||
proxy_handler: Arc<Mutex<dyn ProxyHandler>>,
|
proxy_handler: Arc<Mutex<dyn ProxyHandler>>,
|
||||||
|
@ -379,14 +404,14 @@ async fn handle_tcp_session(
|
||||||
|
|
||||||
let res = tokio::join!(
|
let res = tokio::join!(
|
||||||
async move {
|
async move {
|
||||||
let r = tokio::io::copy(&mut t_rx, &mut s_tx).await;
|
let r = copy_and_record_traffic(&mut t_rx, &mut s_tx, true).await;
|
||||||
if let Err(err) = s_tx.shutdown().await {
|
if let Err(err) = s_tx.shutdown().await {
|
||||||
log::trace!("{} s_tx shutdown error {}", session_info, err);
|
log::trace!("{} s_tx shutdown error {}", session_info, err);
|
||||||
}
|
}
|
||||||
r
|
r
|
||||||
},
|
},
|
||||||
async move {
|
async move {
|
||||||
let r = tokio::io::copy(&mut s_rx, &mut t_tx).await;
|
let r = copy_and_record_traffic(&mut s_rx, &mut t_tx, false).await;
|
||||||
if let Err(err) = t_tx.shutdown().await {
|
if let Err(err) = t_tx.shutdown().await {
|
||||||
log::trace!("{} t_tx shutdown error {}", session_info, err);
|
log::trace!("{} t_tx shutdown error {}", session_info, err);
|
||||||
}
|
}
|
||||||
|
@ -443,6 +468,8 @@ async fn handle_udp_associate_session(
|
||||||
}
|
}
|
||||||
let buf1 = &buf1[..len];
|
let buf1 = &buf1[..len];
|
||||||
|
|
||||||
|
crate::traffic_status::traffic_status_update(len, 0)?;
|
||||||
|
|
||||||
if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type {
|
if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type {
|
||||||
let s5addr = if let Some(domain_name) = &domain_name {
|
let s5addr = if let Some(domain_name) = &domain_name {
|
||||||
Address::DomainAddress(domain_name.clone(), session_info.dst.port())
|
Address::DomainAddress(domain_name.clone(), session_info.dst.port())
|
||||||
|
@ -467,6 +494,8 @@ async fn handle_udp_associate_session(
|
||||||
}
|
}
|
||||||
let buf2 = &buf2[..len];
|
let buf2 = &buf2[..len];
|
||||||
|
|
||||||
|
crate::traffic_status::traffic_status_update(0, len)?;
|
||||||
|
|
||||||
if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type {
|
if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type {
|
||||||
// Remove SOCKS5 UDP header from the server data
|
// Remove SOCKS5 UDP header from the server data
|
||||||
let header = UdpHeader::retrieve_from_stream(&mut &buf2[..])?;
|
let header = UdpHeader::retrieve_from_stream(&mut &buf2[..])?;
|
||||||
|
@ -533,6 +562,8 @@ async fn handle_dns_over_tcp_session(
|
||||||
buf.extend_from_slice(buf1);
|
buf.extend_from_slice(buf1);
|
||||||
|
|
||||||
server.write_all(&buf).await?;
|
server.write_all(&buf).await?;
|
||||||
|
|
||||||
|
crate::traffic_status::traffic_status_update(buf.len(), 0)?;
|
||||||
}
|
}
|
||||||
len = server.read(&mut buf2) => {
|
len = server.read(&mut buf2) => {
|
||||||
let len = len?;
|
let len = len?;
|
||||||
|
@ -541,6 +572,8 @@ async fn handle_dns_over_tcp_session(
|
||||||
}
|
}
|
||||||
let mut buf = buf2[..len].to_vec();
|
let mut buf = buf2[..len].to_vec();
|
||||||
|
|
||||||
|
crate::traffic_status::traffic_status_update(0, len)?;
|
||||||
|
|
||||||
let mut to_send: VecDeque<Vec<u8>> = VecDeque::new();
|
let mut to_send: VecDeque<Vec<u8>> = VecDeque::new();
|
||||||
loop {
|
loop {
|
||||||
if buf.len() < 2 {
|
if buf.len() < 2 {
|
||||||
|
@ -590,6 +623,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
|
||||||
let mut launched = false;
|
let mut launched = false;
|
||||||
let mut proxy_handler = proxy_handler.lock().await;
|
let mut proxy_handler = proxy_handler.lock().await;
|
||||||
let dir = OutgoingDirection::ToServer;
|
let dir = OutgoingDirection::ToServer;
|
||||||
|
let (mut tx, mut rx) = (0, 0);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if proxy_handler.connection_established() {
|
if proxy_handler.connection_established() {
|
||||||
|
@ -604,6 +638,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
|
||||||
}
|
}
|
||||||
server.write_all(data).await?;
|
server.write_all(data).await?;
|
||||||
proxy_handler.consume_data(dir, len);
|
proxy_handler.consume_data(dir, len);
|
||||||
|
tx += len;
|
||||||
|
|
||||||
launched = true;
|
launched = true;
|
||||||
}
|
}
|
||||||
|
@ -613,6 +648,7 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
|
||||||
if len == 0 {
|
if len == 0 {
|
||||||
return Err("server closed accidentially".into());
|
return Err("server closed accidentially".into());
|
||||||
}
|
}
|
||||||
|
rx += len;
|
||||||
let event = IncomingDataEvent {
|
let event = IncomingDataEvent {
|
||||||
direction: IncomingDirection::FromServer,
|
direction: IncomingDirection::FromServer,
|
||||||
buffer: &buf[..len],
|
buffer: &buf[..len],
|
||||||
|
@ -624,7 +660,9 @@ async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc<Mutex<d
|
||||||
if len > 0 {
|
if len > 0 {
|
||||||
server.write_all(data).await?;
|
server.write_all(data).await?;
|
||||||
proxy_handler.consume_data(dir, len);
|
proxy_handler.consume_data(dir, len);
|
||||||
|
tx += len;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
crate::traffic_status::traffic_status_update(tx, rx)?;
|
||||||
Ok(proxy_handler.get_udp_associate())
|
Ok(proxy_handler.get_udp_associate())
|
||||||
}
|
}
|
||||||
|
|
87
src/traffic_status.rs
Normal file
87
src/traffic_status.rs
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
use crate::error::{Error, Result};
|
||||||
|
use std::os::raw::c_void;
|
||||||
|
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// set traffic status callback.
|
||||||
|
#[no_mangle]
|
||||||
|
pub unsafe extern "C" fn tun2proxy_set_traffic_status_callback(
|
||||||
|
send_interval_secs: u32,
|
||||||
|
callback: Option<unsafe extern "C" fn(*const TrafficStatus, *mut c_void)>,
|
||||||
|
ctx: *mut c_void,
|
||||||
|
) {
|
||||||
|
if let Ok(mut cb) = TRAFFIC_STATUS_CALLBACK.lock() {
|
||||||
|
*cb = Some(TrafficStatusCallback(callback, ctx));
|
||||||
|
} else {
|
||||||
|
log::error!("set traffic status callback failed");
|
||||||
|
}
|
||||||
|
if send_interval_secs > 0 {
|
||||||
|
SEND_INTERVAL_SECS.store(send_interval_secs as u64, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Default, Copy, Clone)]
|
||||||
|
pub struct TrafficStatus {
|
||||||
|
pub tx: u64,
|
||||||
|
pub rx: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct TrafficStatusCallback(Option<unsafe extern "C" fn(*const TrafficStatus, *mut c_void)>, *mut c_void);
|
||||||
|
|
||||||
|
impl TrafficStatusCallback {
|
||||||
|
unsafe fn call(self, info: &TrafficStatus) {
|
||||||
|
if let Some(cb) = self.0 {
|
||||||
|
cb(info, self.1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for TrafficStatusCallback {}
|
||||||
|
unsafe impl Sync for TrafficStatusCallback {}
|
||||||
|
|
||||||
|
static TRAFFIC_STATUS_CALLBACK: std::sync::Mutex<Option<TrafficStatusCallback>> = std::sync::Mutex::new(None);
|
||||||
|
static SEND_INTERVAL_SECS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
static ref TRAFFIC_STATUS: std::sync::Mutex<TrafficStatus> = std::sync::Mutex::new(TrafficStatus::default());
|
||||||
|
static ref TIME_STAMP: std::sync::Mutex<std::time::Instant> = std::sync::Mutex::new(std::time::Instant::now());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> {
|
||||||
|
{
|
||||||
|
let is_none_or_error = TRAFFIC_STATUS_CALLBACK.lock().map(|guard| guard.is_none()).unwrap_or_else(|e| {
|
||||||
|
log::error!("Failed to acquire lock: {}", e);
|
||||||
|
true
|
||||||
|
});
|
||||||
|
if is_none_or_error {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let traffic_status = {
|
||||||
|
let mut traffic_status = TRAFFIC_STATUS.lock().map_err(|e| Error::from(e.to_string()))?;
|
||||||
|
traffic_status.tx += delta_tx as u64;
|
||||||
|
traffic_status.rx += delta_rx as u64;
|
||||||
|
*traffic_status
|
||||||
|
};
|
||||||
|
let old_time = { *TIME_STAMP.lock().map_err(|e| Error::from(e.to_string()))? };
|
||||||
|
let interval_secs = SEND_INTERVAL_SECS.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
if std::time::Instant::now().duration_since(old_time).as_secs() >= interval_secs {
|
||||||
|
send_traffic_stat(&traffic_status)?;
|
||||||
|
{
|
||||||
|
let mut time_stamp = TIME_STAMP.lock().map_err(|e| Error::from(e.to_string()))?;
|
||||||
|
*time_stamp = std::time::Instant::now();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_traffic_stat(traffic_status: &TrafficStatus) -> Result<()> {
|
||||||
|
if let Ok(cb) = TRAFFIC_STATUS_CALLBACK.lock() {
|
||||||
|
if let Some(cb) = cb.clone() {
|
||||||
|
unsafe { cb.call(traffic_status) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue