Compare commits

..

No commits in common. "master" and "v0.7.7" have entirely different histories.

5 changed files with 34 additions and 61 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "tun2proxy" name = "tun2proxy"
version = "0.7.8" version = "0.7.7"
edition = "2024" edition = "2024"
license = "MIT" license = "MIT"
repository = "https://github.com/tun2proxy/tun2proxy" repository = "https://github.com/tun2proxy/tun2proxy"
@ -38,7 +38,7 @@ env_logger = "0.11"
hashlink = "0.10" hashlink = "0.10"
hickory-proto = "0.25" hickory-proto = "0.25"
httparse = "1" httparse = "1"
ipstack = { version = "0.3", git = "https://github.com/ssrlive/ipstack.git", rev = "53c648e" } ipstack = { version = "0.2" }
log = { version = "0.4", features = ["std"] } log = { version = "0.4", features = ["std"] }
mimalloc = { version = "0.1", default-features = false, optional = true } mimalloc = { version = "0.1", default-features = false, optional = true }
percent-encoding = "2" percent-encoding = "2"

View file

@ -37,7 +37,6 @@ async fn main_async(args: Args) -> Result<(), BoxError> {
let shutdown_token = tokio_util::sync::CancellationToken::new(); let shutdown_token = tokio_util::sync::CancellationToken::new();
let main_loop_handle = tokio::spawn({ let main_loop_handle = tokio::spawn({
let args = args.clone();
let shutdown_token = shutdown_token.clone(); let shutdown_token = shutdown_token.clone();
async move { async move {
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
@ -45,7 +44,7 @@ async fn main_async(args: Args) -> Result<(), BoxError> {
if let Err(err) = namespace_proxy_main(args, shutdown_token).await { if let Err(err) = namespace_proxy_main(args, shutdown_token).await {
log::error!("namespace proxy error: {}", err); log::error!("namespace proxy error: {}", err);
} }
return Ok(0); return;
} }
unsafe extern "C" fn traffic_cb(status: *const tun2proxy::TrafficStatus, _: *mut std::ffi::c_void) { unsafe extern "C" fn traffic_cb(status: *const tun2proxy::TrafficStatus, _: *mut std::ffi::c_void) {
@ -54,11 +53,9 @@ async fn main_async(args: Args) -> Result<(), BoxError> {
} }
unsafe { tun2proxy::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) }; unsafe { tun2proxy::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) };
let ret = tun2proxy::general_run_async(args, tun::DEFAULT_MTU, cfg!(target_os = "macos"), shutdown_token).await; if let Err(err) = tun2proxy::general_run_async(args, tun::DEFAULT_MTU, cfg!(target_os = "macos"), shutdown_token).await {
if let Err(err) = &ret { log::error!("main loop error: {}", err);
log::error!("main loop error: {err}");
} }
ret
} }
}); });
@ -71,19 +68,13 @@ async fn main_async(args: Args) -> Result<(), BoxError> {
}) })
.await; .await;
let tasks = main_loop_handle.await??; main_loop_handle.await?;
if ctrlc_fired.load(std::sync::atomic::Ordering::SeqCst) { if ctrlc_fired.load(std::sync::atomic::Ordering::SeqCst) {
log::info!("Ctrl-C fired, waiting the handler to finish..."); log::info!("Ctrl-C fired, waiting the handler to finish...");
ctrlc_handel.await.map_err(|err| err.to_string())?; ctrlc_handel.await.map_err(|err| err.to_string())?;
} }
if args.exit_on_fatal_error && tasks >= args.max_sessions {
// Because `main_async` function perhaps stuck in `await` state, so we need to exit the process forcefully
log::info!("Internal fatal error, max sessions reached ({tasks}/{})", args.max_sessions);
std::process::exit(-1);
}
Ok(()) Ok(())
} }

View file

@ -120,18 +120,11 @@ pub fn general_run_for_api(args: Args, tun_mtu: u16, packet_information: bool) -
return -3; return -3;
}; };
match rt.block_on(async move { match rt.block_on(async move {
let ret = general_run_async(args.clone(), tun_mtu, packet_information, shutdown_token).await; if let Err(err) = general_run_async(args, tun_mtu, packet_information, shutdown_token).await {
match &ret { log::error!("main loop error: {}", err);
Ok(sessions) => { return Err(err);
if args.exit_on_fatal_error && *sessions >= args.max_sessions {
log::error!("Forced exit due to max sessions reached ({sessions}/{})", args.max_sessions);
std::process::exit(-1);
}
log::debug!("tun2proxy exited normally, current sessions: {sessions}");
}
Err(err) => log::error!("main loop error: {err}"),
} }
ret Ok(())
}) { }) {
Ok(_) => 0, Ok(_) => 0,
Err(e) => { Err(e) => {
@ -147,7 +140,7 @@ pub async fn general_run_async(
tun_mtu: u16, tun_mtu: u16,
_packet_information: bool, _packet_information: bool,
shutdown_token: tokio_util::sync::CancellationToken, shutdown_token: tokio_util::sync::CancellationToken,
) -> std::io::Result<usize> { ) -> std::io::Result<()> {
let mut tun_config = tun::Configuration::default(); let mut tun_config = tun::Configuration::default();
#[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))]

View file

@ -64,6 +64,9 @@ pub mod win_svc;
const DNS_PORT: u16 = 53; const DNS_PORT: u16 = 53;
static TASK_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
use std::sync::atomic::Ordering::Relaxed;
#[allow(unused)] #[allow(unused)]
#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug)] #[derive(Hash, Copy, Clone, Eq, PartialEq, Debug)]
#[cfg_attr( #[cfg_attr(
@ -151,9 +154,7 @@ async fn create_udp_stream(socket_queue: &Option<Arc<SocketQueue>>, peer: Socket
/// * `mtu` - The MTU of the network device /// * `mtu` - The MTU of the network device
/// * `args` - The arguments to use /// * `args` - The arguments to use
/// * `shutdown_token` - The token to exit the server /// * `shutdown_token` - The token to exit the server
/// # Returns pub async fn run<D>(device: D, mtu: u16, args: Args, shutdown_token: CancellationToken) -> crate::Result<()>
/// * The number of sessions while exiting
pub async fn run<D>(device: D, mtu: u16, args: Args, shutdown_token: CancellationToken) -> crate::Result<usize>
where where
D: AsyncRead + AsyncWrite + Unpin + Send + 'static, D: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
@ -221,11 +222,11 @@ where
let socket_queue = None; let socket_queue = None;
use socks5_impl::protocol::Version::{V4, V5}; use socks5_impl::protocol::Version::{V4, V5};
let mgr: Arc<dyn ProxyHandlerManager> = match args.proxy.proxy_type { let mgr = match args.proxy.proxy_type {
ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)), ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)) as Arc<dyn ProxyHandlerManager>,
ProxyType::Socks4 => Arc::new(SocksProxyManager::new(server_addr, V4, key)), ProxyType::Socks4 => Arc::new(SocksProxyManager::new(server_addr, V4, key)) as Arc<dyn ProxyHandlerManager>,
ProxyType::Http => Arc::new(HttpManager::new(server_addr, key)), ProxyType::Http => Arc::new(HttpManager::new(server_addr, key)) as Arc<dyn ProxyHandlerManager>,
ProxyType::None => Arc::new(NoProxyManager::new()), ProxyType::None => Arc::new(NoProxyManager::new()) as Arc<dyn ProxyHandlerManager>,
}; };
let mut ipstack_config = ipstack::IpStackConfig::default(); let mut ipstack_config = ipstack::IpStackConfig::default();
@ -253,11 +254,7 @@ where
client client
}); });
let task_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
use std::sync::atomic::Ordering::Relaxed;
loop { loop {
let task_count = task_count.clone();
let virtual_dns = virtual_dns.clone(); let virtual_dns = virtual_dns.clone();
let ip_stack_stream = tokio::select! { let ip_stack_stream = tokio::select! {
_ = shutdown_token.cancelled() => { _ = shutdown_token.cancelled() => {
@ -268,10 +265,10 @@ where
ip_stack_stream? ip_stack_stream?
} }
}; };
let max_sessions = args.max_sessions; let max_sessions = args.max_sessions as u64;
match ip_stack_stream { match ip_stack_stream {
IpStackStream::Tcp(tcp) => { IpStackStream::Tcp(tcp) => {
if task_count.load(Relaxed) >= max_sessions { if TASK_COUNT.load(Relaxed) > max_sessions {
if args.exit_on_fatal_error { if args.exit_on_fatal_error {
log::info!("Too many sessions that over {max_sessions}, exiting..."); log::info!("Too many sessions that over {max_sessions}, exiting...");
break; break;
@ -279,7 +276,7 @@ where
log::warn!("Too many sessions that over {max_sessions}, dropping new session"); log::warn!("Too many sessions that over {max_sessions}, dropping new session");
continue; continue;
} }
log::trace!("Session count {}", task_count.fetch_add(1, Relaxed).saturating_add(1)); log::trace!("Session count {}", TASK_COUNT.fetch_add(1, Relaxed) + 1);
let info = SessionInfo::new(tcp.local_addr(), tcp.peer_addr(), IpProtocol::Tcp); let info = SessionInfo::new(tcp.local_addr(), tcp.peer_addr(), IpProtocol::Tcp);
let domain_name = if let Some(virtual_dns) = &virtual_dns { let domain_name = if let Some(virtual_dns) = &virtual_dns {
let mut virtual_dns = virtual_dns.lock().await; let mut virtual_dns = virtual_dns.lock().await;
@ -294,11 +291,11 @@ where
if let Err(err) = handle_tcp_session(tcp, proxy_handler, socket_queue).await { if let Err(err) = handle_tcp_session(tcp, proxy_handler, socket_queue).await {
log::error!("{} error \"{}\"", info, err); log::error!("{} error \"{}\"", info, err);
} }
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1);
}); });
} }
IpStackStream::Udp(udp) => { IpStackStream::Udp(udp) => {
if task_count.load(Relaxed) >= max_sessions { if TASK_COUNT.load(Relaxed) > max_sessions {
if args.exit_on_fatal_error { if args.exit_on_fatal_error {
log::info!("Too many sessions that over {max_sessions}, exiting..."); log::info!("Too many sessions that over {max_sessions}, exiting...");
break; break;
@ -306,11 +303,11 @@ where
log::warn!("Too many sessions that over {max_sessions}, dropping new session"); log::warn!("Too many sessions that over {max_sessions}, dropping new session");
continue; continue;
} }
log::trace!("Session count {}", task_count.fetch_add(1, Relaxed).saturating_add(1)); log::trace!("Session count {}", TASK_COUNT.fetch_add(1, Relaxed) + 1);
let mut info = SessionInfo::new(udp.local_addr(), udp.peer_addr(), IpProtocol::Udp); let mut info = SessionInfo::new(udp.local_addr(), udp.peer_addr(), IpProtocol::Udp);
if info.dst.port() == DNS_PORT { if info.dst.port() == DNS_PORT {
if is_private_ip(info.dst.ip()) { if is_private_ip(info.dst.ip()) {
info.dst.set_ip(dns_addr); // !!! Here we change the destination address to remote DNS server!!! info.dst.set_ip(dns_addr);
} }
if args.dns == ArgDns::OverTcp { if args.dns == ArgDns::OverTcp {
info.protocol = IpProtocol::Tcp; info.protocol = IpProtocol::Tcp;
@ -320,7 +317,7 @@ where
if let Err(err) = handle_dns_over_tcp_session(udp, proxy_handler, socket_queue, ipv6_enabled).await { if let Err(err) = handle_dns_over_tcp_session(udp, proxy_handler, socket_queue, ipv6_enabled).await {
log::error!("{} error \"{}\"", info, err); log::error!("{} error \"{}\"", info, err);
} }
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1);
}); });
continue; continue;
} }
@ -331,7 +328,7 @@ where
log::error!("{} error \"{}\"", info, err); log::error!("{} error \"{}\"", info, err);
} }
} }
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1);
}); });
continue; continue;
} }
@ -362,7 +359,7 @@ where
if let Err(e) = handle_udp_gateway_session(udp, udpgw, &dst_addr, proxy_handler, queue, ipv6_enabled).await { if let Err(e) = handle_udp_gateway_session(udp, udpgw, &dst_addr, proxy_handler, queue, ipv6_enabled).await {
log::info!("Ending {} with \"{}\"", info, e); log::info!("Ending {} with \"{}\"", info, e);
} }
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1);
}); });
continue; continue;
} }
@ -374,7 +371,7 @@ where
if let Err(err) = handle_udp_associate_session(udp, ty, proxy_handler, socket_queue, ipv6_enabled).await { if let Err(err) = handle_udp_associate_session(udp, ty, proxy_handler, socket_queue, ipv6_enabled).await {
log::info!("Ending {} with \"{}\"", info, err); log::info!("Ending {} with \"{}\"", info, err);
} }
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1);
}); });
} }
Err(e) => { Err(e) => {
@ -393,7 +390,7 @@ where
} }
} }
} }
Ok(task_count.load(Relaxed)) Ok(())
} }
async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc<Mutex<VirtualDns>>) -> crate::Result<()> { async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc<Mutex<VirtualDns>>) -> crate::Result<()> {

View file

@ -78,16 +78,8 @@ fn run_service(_arguments: Vec<std::ffi::OsString>) -> Result<(), crate::BoxErro
} }
unsafe { crate::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) }; unsafe { crate::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) };
let ret = crate::general_run_async(args.clone(), tun::DEFAULT_MTU, false, shutdown_token).await; if let Err(err) = crate::general_run_async(args, tun::DEFAULT_MTU, false, shutdown_token).await {
match &ret { log::error!("main loop error: {}", err);
Ok(sessions) => {
if args.exit_on_fatal_error && *sessions >= args.max_sessions {
log::error!("Forced exit due to max sessions reached ({sessions}/{})", args.max_sessions);
std::process::exit(-1);
}
log::debug!("tun2proxy exited normally, current sessions: {sessions}");
}
Err(err) => log::error!("main loop error: {err}"),
} }
Ok::<(), crate::Error>(()) Ok::<(), crate::Error>(())
})?; })?;