From 74e5220d088ca676c6fa985d42634ed59f312454 Mon Sep 17 00:00:00 2001 From: "Remy D. Farley" Date: Wed, 3 Apr 2024 20:40:51 +0000 Subject: [PATCH 01/12] ci: don't abort checks immediately if error is encountered --- .github/workflows/rust.yml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d66b58a..e7609d7 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -9,6 +9,7 @@ env: jobs: build_n_test: strategy: + fail-fast: false matrix: os: [ubuntu-latest, macos-latest, windows-latest] @@ -16,11 +17,23 @@ jobs: steps: - uses: actions/checkout@v3 + - name: rustfmt + if: ${{ !cancelled() }} run: cargo fmt --all -- --check + - name: check + if: ${{ !cancelled() }} run: cargo check --verbose + - name: clippy + if: ${{ !cancelled() }} run: cargo clippy --all-targets --all-features -- -D warnings + - name: Build + if: ${{ !cancelled() }} run: cargo build --verbose --tests --all-features + + - name: Abort on error + if: ${{ failure() }} + run: echo "Some of jobs failed" && false \ No newline at end of file From 361cf95f4eba3bf2e1f676cb92372dd61c046e10 Mon Sep 17 00:00:00 2001 From: "Remy D. Farley" Date: Wed, 3 Apr 2024 14:13:18 +0000 Subject: [PATCH 02/12] add udp timeout option --- src/args.rs | 5 +++++ src/lib.rs | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/args.rs b/src/args.rs index 1264fc0..309f7d0 100644 --- a/src/args.rs +++ b/src/args.rs @@ -45,6 +45,10 @@ pub struct Args { #[arg(long, value_name = "seconds", default_value = "600")] pub tcp_timeout: u64, + /// UDP timeout in seconds + #[arg(long, value_name = "seconds", default_value = "10")] + pub udp_timeout: u64, + /// Verbosity level #[arg(short, long, value_name = "level", value_enum, default_value = "info")] pub verbosity: ArgVerbosity, @@ -62,6 +66,7 @@ impl Default for Args { dns_addr: "8.8.8.8".parse().unwrap(), bypass: vec![], tcp_timeout: 600, + udp_timeout: 10, verbosity: ArgVerbosity::Info, } } diff --git a/src/lib.rs b/src/lib.rs index 5936440..6a8f699 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,6 @@ mod virtual_dns; const DNS_PORT: u16 = 53; const MAX_SESSIONS: u64 = 200; -const UDP_TIMEOUT_SEC: u64 = 10; // 10 seconds static TASK_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); use std::sync::atomic::Ordering::Relaxed; @@ -87,7 +86,7 @@ where let mut ipstack_config = ipstack::IpStackConfig::default(); ipstack_config.mtu(mtu); ipstack_config.tcp_timeout(std::time::Duration::from_secs(args.tcp_timeout)); - ipstack_config.udp_timeout(std::time::Duration::from_secs(UDP_TIMEOUT_SEC)); + ipstack_config.udp_timeout(std::time::Duration::from_secs(args.udp_timeout)); let mut ip_stack = ipstack::IpStack::new(ipstack_config, device); From 5e99c9f87405b99bfd558445dc6768d5edb6afd5 Mon Sep 17 00:00:00 2001 From: "Remy D. Farley" Date: Wed, 3 Apr 2024 14:20:05 +0000 Subject: [PATCH 03/12] add no-proxy mode --- src/args.rs | 10 +++++ src/lib.rs | 3 ++ src/no_proxy.rs | 109 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 src/no_proxy.rs diff --git a/src/args.rs b/src/args.rs index 309f7d0..e7df38c 100644 --- a/src/args.rs +++ b/src/args.rs @@ -249,6 +249,14 @@ impl std::fmt::Display for ArgProxy { impl ArgProxy { pub fn from_url(s: &str) -> Result { + if s == "none" { + return Ok(ArgProxy { + proxy_type: ProxyType::None, + addr: "0.0.0.0:0".parse().unwrap(), + credentials: None, + }); + } + let e = format!("`{s}` is not a valid proxy URL"); let url = url::Url::parse(s).map_err(|_| Error::from(&e))?; let e = format!("`{s}` does not contain a host"); @@ -299,6 +307,7 @@ pub enum ProxyType { Socks4, #[default] Socks5, + None, } impl std::fmt::Display for ProxyType { @@ -307,6 +316,7 @@ impl std::fmt::Display for ProxyType { ProxyType::Socks4 => write!(f, "socks4"), ProxyType::Socks5 => write!(f, "socks5"), ProxyType::Http => write!(f, "http"), + ProxyType::None => write!(f, "none"), } } } diff --git a/src/lib.rs b/src/lib.rs index 6a8f699..e2701fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ use crate::{ directions::{IncomingDataEvent, IncomingDirection, OutgoingDirection}, http::HttpManager, + no_proxy::NoProxyManager, session_info::{IpProtocol, SessionInfo}, virtual_dns::VirtualDns, }; @@ -42,6 +43,7 @@ mod dump_logger; mod error; mod http; mod mobile_api; +mod no_proxy; mod proxy_handler; mod session_info; mod socks; @@ -81,6 +83,7 @@ where ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)) as Arc, ProxyType::Socks4 => Arc::new(SocksProxyManager::new(server_addr, V4, key)) as Arc, ProxyType::Http => Arc::new(HttpManager::new(server_addr, key)) as Arc, + ProxyType::None => Arc::new(NoProxyManager::new(server_addr)) as Arc, }; let mut ipstack_config = ipstack::IpStackConfig::default(); diff --git a/src/no_proxy.rs b/src/no_proxy.rs new file mode 100644 index 0000000..99c4dbf --- /dev/null +++ b/src/no_proxy.rs @@ -0,0 +1,109 @@ +use crate::{ + directions::{IncomingDataEvent, IncomingDirection, OutgoingDataEvent, OutgoingDirection}, + proxy_handler::{ProxyHandler, ProxyHandlerManager}, + session_info::SessionInfo, +}; +use std::{collections::VecDeque, net::SocketAddr, sync::Arc}; +use tokio::sync::Mutex; + +struct NoProxyHandler { + info: SessionInfo, + domain_name: Option, + client_outbuf: VecDeque, + server_outbuf: VecDeque, + udp_associate: bool, +} + +#[async_trait::async_trait] +impl ProxyHandler for NoProxyHandler { + fn get_session_info(&self) -> SessionInfo { + self.info + } + + fn get_domain_name(&self) -> Option { + self.domain_name.clone() + } + + async fn push_data(&mut self, event: IncomingDataEvent<'_>) -> std::io::Result<()> { + let IncomingDataEvent { direction, buffer } = event; + match direction { + IncomingDirection::FromServer => { + self.client_outbuf.extend(buffer.iter()); + } + IncomingDirection::FromClient => { + self.server_outbuf.extend(buffer.iter()); + } + } + Ok(()) + } + + fn consume_data(&mut self, dir: OutgoingDirection, size: usize) { + let buffer = match dir { + OutgoingDirection::ToServer => &mut self.server_outbuf, + OutgoingDirection::ToClient => &mut self.client_outbuf, + }; + buffer.drain(0..size); + } + + fn peek_data(&mut self, dir: OutgoingDirection) -> OutgoingDataEvent { + let buffer = match dir { + OutgoingDirection::ToServer => &mut self.server_outbuf, + OutgoingDirection::ToClient => &mut self.client_outbuf, + }; + OutgoingDataEvent { + direction: dir, + buffer: buffer.make_contiguous(), + } + } + + fn connection_established(&self) -> bool { + true + } + + fn data_len(&self, dir: OutgoingDirection) -> usize { + match dir { + OutgoingDirection::ToServer => self.server_outbuf.len(), + OutgoingDirection::ToClient => self.client_outbuf.len(), + } + } + + fn reset_connection(&self) -> bool { + false + } + + fn get_udp_associate(&self) -> Option { + self.udp_associate.then_some(self.info.dst) + } +} + +pub(crate) struct NoProxyManager { + server: SocketAddr, +} + +#[async_trait::async_trait] +impl ProxyHandlerManager for NoProxyManager { + async fn new_proxy_handler( + &self, + info: SessionInfo, + domain_name: Option, + udp_associate: bool, + ) -> std::io::Result>> { + Ok(Arc::new(Mutex::new(NoProxyHandler { + info, + domain_name, + client_outbuf: VecDeque::default(), + server_outbuf: VecDeque::default(), + udp_associate, + }))) + } + + fn get_server_addr(&self) -> SocketAddr { + self.server + } +} + +impl NoProxyManager { + pub(crate) fn new(server: SocketAddr) -> Self { + Self { server } + } +} From d351b5031cef734d163e914079459c6112d4f400 Mon Sep 17 00:00:00 2001 From: "Remy D. Farley" Date: Wed, 3 Apr 2024 14:26:46 +0000 Subject: [PATCH 04/12] add support for unprivileged namespaces --- Cargo.toml | 5 + src/args.rs | 26 ++++- src/bin/main.rs | 58 +++++++++- src/desktop_api.rs | 58 ++++++++++ src/error.rs | 8 ++ src/http.rs | 9 +- src/lib.rs | 256 ++++++++++++++++++++++++++++++++++------- src/no_proxy.rs | 4 + src/proxy_handler.rs | 1 + src/socket_transfer.rs | 230 ++++++++++++++++++++++++++++++++++++ src/socks.rs | 8 ++ 11 files changed, 615 insertions(+), 48 deletions(-) create mode 100644 src/socket_transfer.rs diff --git a/Cargo.toml b/Cargo.toml index ca23b27..65e1f74 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,11 @@ udp-stream = { version = "0.0", default-features = false } unicase = "2.7" url = "2.5" +[target.'cfg(target_os="linux")'.dependencies] +serde = { version = "1", features = [ "derive" ] } +bincode = "1" +nix = { version = "0", default-features = false, features = ["fs", "socket", "uio"] } + [target.'cfg(target_os="android")'.dependencies] android_logger = "0.13" jni = { version = "0.21", default-features = false } diff --git a/src/args.rs b/src/args.rs index e7df38c..bd9d413 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,6 +1,9 @@ use crate::{Error, Result}; use socks5_impl::protocol::UserKey; -use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +use std::{ + ffi::OsString, + net::{IpAddr, SocketAddr, ToSocketAddrs}, +}; #[derive(Debug, Clone, clap::Parser)] #[command(author, version, about = "Tunnel interface to proxy.", long_about = None)] @@ -20,13 +23,29 @@ pub struct Args { #[arg(long, value_name = "fd", conflicts_with = "tun")] pub tun_fd: Option, + /// Create a tun interface in a newly created unprivileged namespace + /// while maintaining proxy connectivity via the global network namespace. + #[arg(long)] + pub unshare: bool, + + /// File descriptor for UNIX datagram socket meant to transfer + /// network sockets from global namespace to the new one. + /// See `unshare(1)`, `namespaces(7)`, `sendmsg(2)`, `unix(7)`. + #[arg(long)] + pub socket_transfer_fd: Option, + + /// Specify a command to run with root-like capabilities in the new namespace. + /// This could be useful to start additional daemons, e.g. `openvpn` instance. + #[arg(requires = "unshare")] + pub admin_command: Vec, + /// IPv6 enabled #[arg(short = '6', long)] pub ipv6_enabled: bool, #[arg(short, long)] /// Routing and system setup, which decides whether to setup the routing and system configuration. - /// This option is only available on Linux and requires root privileges. + /// This option is only available on Linux and requires root-like privileges. See `capabilities(7)`. pub setup: bool, /// DNS handling strategy @@ -60,6 +79,9 @@ impl Default for Args { proxy: ArgProxy::default(), tun: None, tun_fd: None, + unshare: false, + socket_transfer_fd: None, + admin_command: Vec::new(), ipv6_enabled: false, setup: false, dns: ArgDns::default(), diff --git a/src/bin/main.rs b/src/bin/main.rs index 7be56ab..2fdad5e 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -13,8 +13,17 @@ async fn main() -> Result<(), BoxError> { let join_handle = tokio::spawn({ let shutdown_token = shutdown_token.clone(); async move { - if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { - log::error!("main loop error: {}", err); + if args.unshare && args.socket_transfer_fd.is_none() { + #[cfg(target_os = "linux")] + if let Err(err) = namespace_proxy_main(args, shutdown_token).await { + log::error!("namespace proxy error: {}", err); + } + #[cfg(not(target_os = "linux"))] + log::error!("Your platform doesn't support unprivileged namespaces"); + } else { + if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { + log::error!("main loop error: {}", err); + } } } }); @@ -31,3 +40,48 @@ async fn main() -> Result<(), BoxError> { Ok(()) } + +#[cfg(target_os = "linux")] +async fn namespace_proxy_main( + _args: Args, + _shutdown_token: tokio_util::sync::CancellationToken, +) -> Result { + use std::os::fd::AsRawFd; + + let (socket, remote_fd) = tun2proxy::socket_transfer::create_transfer_socket_pair().await?; + + let child = tokio::process::Command::new("unshare") + .args("--user --map-current-user --net --mount --keep-caps --kill-child --fork".split(' ')) + .arg(std::env::current_exe()?) + .arg("--socket-transfer-fd") + .arg(remote_fd.as_raw_fd().to_string()) + .args(std::env::args().skip(1)) + .kill_on_drop(true) + .spawn(); + + let mut child = match child { + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + log::error!("`unshare(1)` executable wasn't located in PATH."); + log::error!("Consider installing linux utils package: `apt install util-linux`"); + log::error!("Or similar for your distribution."); + return Err(err.into()); + } + child => child?, + }; + + log::info!("The tun proxy is running in unprivileged mode. See `namespaces(7)`."); + log::info!(""); + log::info!("If you need to run a process that relies on root-like capabilities (e.g. `openvpn`)"); + log::info!("Use `tun2proxy --unshare --setup [...] -- openvpn --config [...]`"); + log::info!(""); + log::info!("To run a new process in the created namespace (e.g. a flatpak app)"); + log::info!( + "Use `nsenter --preserve-credentials --user --net --mount --target {} /bin/sh`", + child.id().unwrap_or(0) + ); + log::info!(""); + + tokio::spawn(async move { tun2proxy::socket_transfer::process_socket_requests(&socket).await }); + + Ok(child.wait().await?) +} diff --git a/src/desktop_api.rs b/src/desktop_api.rs index 3d5723e..71f67dc 100644 --- a/src/desktop_api.rs +++ b/src/desktop_api.rs @@ -131,11 +131,69 @@ pub async fn desktop_run_async(args: Args, shutdown_token: tokio_util::sync::Can restore = Some(tproxy_config::tproxy_setup(&tproxy_args)?); } + #[cfg(target_os = "linux")] + { + let run_ip_util = |args: String| { + tokio::process::Command::new("ip") + .args(args.split(' ')) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .ok(); + }; + + if setup && !args.ipv6_enabled { + // Remove ipv6 connectivity if not explicitly required + // TODO: remove this when upstream will get updated + run_ip_util(format!("-6 route delete ::/1 dev {}", tproxy_args.tun_name)); + run_ip_util(format!("-6 route delete 80::/1 dev {}", tproxy_args.tun_name)); + } + + if setup && args.unshare { + // New namespace doesn't have any other routing device by default + // So our `tun` device should act as such to make space for other proxies. + run_ip_util(format!("route delete 0.0.0.0/1 dev {}", tproxy_args.tun_name)); + run_ip_util(format!("route delete 128.0.0.0/1 dev {}", tproxy_args.tun_name)); + + run_ip_util(format!("route add 0.0.0.0/0 dev {}", tproxy_args.tun_name)); + + if args.ipv6_enabled { + run_ip_util(format!("-6 route delete ::/1 dev {}", tproxy_args.tun_name)); + run_ip_util(format!("-6 route delete 80::/1 dev {}", tproxy_args.tun_name)); + + run_ip_util(format!("-6 route add ::/0 dev {}", tproxy_args.tun_name)); + } + } + } + + let mut admin_command_args = args.admin_command.iter(); + if let Some(command) = admin_command_args.next() { + let child = tokio::process::Command::new(command) + .args(admin_command_args) + .kill_on_drop(true) + .spawn(); + + match child { + Err(err) => { + log::warn!("Failed to start admin process: {err}"); + } + Ok(mut child) => { + tokio::spawn(async move { + if let Err(err) = child.wait().await { + log::warn!("Admin process terminated: {err}"); + } + }); + } + }; + } + let join_handle = tokio::spawn(crate::run(device, MTU, args, shutdown_token)); join_handle.await.map_err(std::io::Error::from)??; #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] if setup { + // TODO: This probably should be handled by a destructor + // since otherwise removal is not guaranteed if anything above returns early. tproxy_config::tproxy_remove(restore)?; } diff --git a/src/error.rs b/src/error.rs index 96b9732..2afd19b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,10 @@ pub enum Error { #[error(transparent)] Io(#[from] std::io::Error), + #[cfg(target_os = "linux")] + #[error("nix::errno::Errno {0:?}")] + NixErrno(#[from] nix::errno::Errno), + #[error("TryFromIntError {0:?}")] TryFromInt(#[from] std::num::TryFromIntError), @@ -39,6 +43,10 @@ pub enum Error { #[error("std::num::ParseIntError {0:?}")] IntParseError(#[from] std::num::ParseIntError), + + #[cfg(target_os = "linux")] + #[error("bincode::Error {0:?}")] + BincodeError(#[from] bincode::Error), } impl From<&str> for Error { diff --git a/src/http.rs b/src/http.rs index 73cf9b6..7aa2569 100644 --- a/src/http.rs +++ b/src/http.rs @@ -38,6 +38,7 @@ enum HttpState { pub(crate) type DigestState = digest_auth::WwwAuthenticateHeader; pub struct HttpConnection { + server_addr: SocketAddr, state: HttpState, client_inbuf: VecDeque, server_inbuf: VecDeque, @@ -61,12 +62,14 @@ static CONTENT_LENGTH: &str = "Content-Length"; impl HttpConnection { async fn new( + server_addr: SocketAddr, info: SessionInfo, domain_name: Option, credentials: Option, digest_state: Arc>>, ) -> Result { let mut res = Self { + server_addr, state: HttpState::ExpectResponseHeaders, client_inbuf: VecDeque::default(), server_inbuf: VecDeque::default(), @@ -330,6 +333,10 @@ impl HttpConnection { #[async_trait::async_trait] impl ProxyHandler for HttpConnection { + fn get_server_addr(&self) -> SocketAddr { + self.server_addr + } + fn get_session_info(&self) -> SessionInfo { self.info } @@ -413,7 +420,7 @@ impl ProxyHandlerManager for HttpManager { return Err(Error::from("Protocol not supported by HTTP proxy").into()); } Ok(Arc::new(Mutex::new( - HttpConnection::new(info, domain_name, self.credentials.clone(), self.digest_state.clone()).await?, + HttpConnection::new(self.server, info, domain_name, self.credentials.clone(), self.digest_state.clone()).await?, ))) } diff --git a/src/lib.rs b/src/lib.rs index e2701fe..183f37d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,11 +9,16 @@ use ipstack::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream}; use proxy_handler::{ProxyHandler, ProxyHandlerManager}; use socks::SocksProxyManager; pub use socks5_impl::protocol::UserKey; -use std::{collections::VecDeque, net::SocketAddr, sync::Arc}; +use std::{ + collections::VecDeque, + io::ErrorKind, + net::{IpAddr, SocketAddr}, + sync::Arc, +}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, - net::TcpStream, - sync::Mutex, + net::{TcpSocket, TcpStream, UdpSocket}, + sync::{mpsc::Receiver, Mutex}, }; pub use tokio_util::sync::CancellationToken; use tproxy_config::is_private_ip; @@ -46,6 +51,7 @@ mod mobile_api; mod no_proxy; mod proxy_handler; mod session_info; +pub mod socket_transfer; mod socks; mod virtual_dns; @@ -56,6 +62,81 @@ const MAX_SESSIONS: u64 = 200; static TASK_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); use std::sync::atomic::Ordering::Relaxed; +#[allow(unused)] +#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug)] +#[cfg_attr(target_os = "linux", derive(serde::Serialize, serde::Deserialize))] +pub enum SocketProtocol { + Tcp, + Udp, +} + +#[allow(unused)] +#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug)] +#[cfg_attr(target_os = "linux", derive(serde::Serialize, serde::Deserialize))] +pub enum SocketDomain { + IpV4, + IpV6, +} + +impl From for SocketDomain { + fn from(value: IpAddr) -> Self { + match value { + IpAddr::V4(_) => Self::IpV4, + IpAddr::V6(_) => Self::IpV6, + } + } +} + +struct SocketQueue { + tcp_v4: Mutex>, + tcp_v6: Mutex>, + udp_v4: Mutex>, + udp_v6: Mutex>, +} + +impl SocketQueue { + async fn recv_tcp(&self, domain: SocketDomain) -> Result { + match domain { + SocketDomain::IpV4 => &self.tcp_v4, + SocketDomain::IpV6 => &self.tcp_v6, + } + .lock() + .await + .recv() + .await + .ok_or(ErrorKind::Other.into()) + } + async fn recv_udp(&self, domain: SocketDomain) -> Result { + match domain { + SocketDomain::IpV4 => &self.udp_v4, + SocketDomain::IpV6 => &self.udp_v6, + } + .lock() + .await + .recv() + .await + .ok_or(ErrorKind::Other.into()) + } +} + +async fn create_tcp_stream(socket_queue: &Option>, peer: SocketAddr) -> std::io::Result { + match &socket_queue { + None => TcpStream::connect(peer).await, + Some(queue) => queue.recv_tcp(peer.ip().into()).await?.connect(peer).await, + } +} + +async fn create_udp_stream(socket_queue: &Option>, peer: SocketAddr) -> std::io::Result { + match &socket_queue { + None => UdpStream::connect(peer).await, + Some(queue) => { + let socket = queue.recv_udp(peer.ip().into()).await?; + socket.connect(peer).await?; + UdpStream::from_tokio(socket).await + } + } +} + /// Run the proxy server /// # Arguments /// * `device` - The network device to use @@ -78,6 +159,56 @@ where None }; + #[cfg(target_os = "linux")] + let socket_queue = match args.socket_transfer_fd { + None => None, + Some(fd) => { + use crate::socket_transfer::{reconstruct_socket, reconstruct_transfer_socket, request_sockets}; + use tokio::sync::mpsc::channel; + + let fd = reconstruct_socket(fd)?; + let socket = reconstruct_transfer_socket(fd)?; + let socket = Arc::new(Mutex::new(socket)); + + macro_rules! create_socket_queue { + ($domain:ident) => {{ + const SOCKETS_PER_REQUEST: usize = 64; + + let socket = socket.clone(); + let (tx, rx) = channel(SOCKETS_PER_REQUEST); + tokio::spawn(async move { + loop { + let sockets = + match request_sockets(socket.lock().await, SocketDomain::$domain, SOCKETS_PER_REQUEST as u32).await { + Ok(sockets) => sockets, + Err(err) => { + log::warn!("Socket allocation request failed: {err}"); + continue; + } + }; + for s in sockets { + if let Err(_) = tx.send(s).await { + return; + } + } + } + }); + Mutex::new(rx) + }}; + } + + Some(Arc::new(SocketQueue { + tcp_v4: create_socket_queue!(IpV4), + tcp_v6: create_socket_queue!(IpV6), + udp_v4: create_socket_queue!(IpV4), + udp_v6: create_socket_queue!(IpV6), + })) + } + }; + + #[cfg(not(target_os = "linux"))] + let socket_queue = None; + use socks5_impl::protocol::Version::{V4, V5}; let mgr = match args.proxy.proxy_type { ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)) as Arc, @@ -120,8 +251,9 @@ where None }; let proxy_handler = mgr.new_proxy_handler(info, domain_name, false).await?; + let socket_queue = socket_queue.clone(); tokio::spawn(async move { - if let Err(err) = handle_tcp_session(tcp, server_addr, proxy_handler).await { + if let Err(err) = handle_tcp_session(tcp, proxy_handler, socket_queue).await { log::error!("{} error \"{}\"", info, err); } log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); @@ -140,8 +272,9 @@ where } if args.dns == ArgDns::OverTcp { let proxy_handler = mgr.new_proxy_handler(info, None, false).await?; + let socket_queue = socket_queue.clone(); tokio::spawn(async move { - if let Err(err) = handle_dns_over_tcp_session(udp, server_addr, proxy_handler, ipv6_enabled).await { + if let Err(err) = handle_dns_over_tcp_session(udp, proxy_handler, socket_queue, ipv6_enabled).await { log::error!("{} error \"{}\"", info, err); } log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); @@ -170,8 +303,11 @@ where }; match mgr.new_proxy_handler(info, domain_name, true).await { Ok(proxy_handler) => { + let socket_queue = socket_queue.clone(); tokio::spawn(async move { - if let Err(err) = handle_udp_associate_session(udp, server_addr, proxy_handler, ipv6_enabled).await { + if let Err(err) = + handle_udp_associate_session(udp, args.proxy.proxy_type, proxy_handler, socket_queue, ipv6_enabled).await + { log::info!("Ending {} with \"{}\"", info, err); } log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); @@ -207,12 +343,17 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc>, + socket_queue: Option>, ) -> crate::Result<()> { - let mut server = TcpStream::connect(server_addr).await?; + let (session_info, server_addr) = { + let handler = proxy_handler.lock().await; + + (handler.get_session_info(), handler.get_server_addr()) + }; + + let mut server = create_tcp_stream(&socket_queue, server_addr).await?; - let session_info = proxy_handler.lock().await.get_session_info(); log::info!("Beginning {}", session_info); if let Err(e) = handle_proxy_session(&mut server, proxy_handler).await { @@ -246,20 +387,36 @@ async fn handle_tcp_session( async fn handle_udp_associate_session( mut udp_stack: IpStackUdpStream, - server_addr: SocketAddr, + proxy_type: ProxyType, proxy_handler: Arc>, + socket_queue: Option>, ipv6_enabled: bool, ) -> crate::Result<()> { use socks5_impl::protocol::{Address, StreamOperation, UdpHeader}; - let mut server = TcpStream::connect(server_addr).await?; - let session_info = proxy_handler.lock().await.get_session_info(); - let domain_name = proxy_handler.lock().await.get_domain_name(); + + let (session_info, server_addr, domain_name, udp_addr) = { + let handler = proxy_handler.lock().await; + ( + handler.get_session_info(), + handler.get_server_addr(), + handler.get_domain_name(), + handler.get_udp_associate(), + ) + }; + log::info!("Beginning {}", session_info); - let udp_addr = handle_proxy_session(&mut server, proxy_handler).await?; - let udp_addr = udp_addr.ok_or("udp associate failed")?; + let udp_addr = match udp_addr { + Some(udp_addr) => udp_addr, + None => { + let mut server = create_tcp_stream(&socket_queue, server_addr).await?; - let mut udp_server = UdpStream::connect(udp_addr).await?; + let udp_addr = handle_proxy_session(&mut server, proxy_handler).await?; + udp_addr.ok_or("udp associate failed")? + } + }; + + let mut udp_server = create_udp_stream(&socket_queue, udp_addr).await?; let mut buf1 = [0_u8; 4096]; let mut buf2 = [0_u8; 4096]; @@ -272,18 +429,22 @@ async fn handle_udp_associate_session( } let buf1 = &buf1[..len]; - let s5addr = if let Some(domain_name) = &domain_name { - Address::DomainAddress(domain_name.clone(), session_info.dst.port()) + if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type { + let s5addr = if let Some(domain_name) = &domain_name { + Address::DomainAddress(domain_name.clone(), session_info.dst.port()) + } else { + session_info.dst.into() + }; + + // Add SOCKS5 UDP header to the incoming data + let mut s5_udp_data = Vec::::new(); + UdpHeader::new(0, s5addr).write_to_stream(&mut s5_udp_data)?; + s5_udp_data.extend_from_slice(buf1); + + udp_server.write_all(&s5_udp_data).await?; } else { - session_info.dst.into() - }; - - // Add SOCKS5 UDP header to the incoming data - let mut s5_udp_data = Vec::::new(); - UdpHeader::new(0, s5addr).write_to_stream(&mut s5_udp_data)?; - s5_udp_data.extend_from_slice(buf1); - - udp_server.write_all(&s5_udp_data).await?; + udp_server.write_all(buf1).await?; + } } len = udp_server.read(&mut buf2) => { let len = len?; @@ -292,21 +453,25 @@ async fn handle_udp_associate_session( } let buf2 = &buf2[..len]; - // Remove SOCKS5 UDP header from the server data - let header = UdpHeader::retrieve_from_stream(&mut &buf2[..])?; - let data = &buf2[header.len()..]; + if let ProxyType::Socks4 | ProxyType::Socks5 = proxy_type { + // Remove SOCKS5 UDP header from the server data + let header = UdpHeader::retrieve_from_stream(&mut &buf2[..])?; + let data = &buf2[header.len()..]; - let buf = if session_info.dst.port() == DNS_PORT { - let mut message = dns::parse_data_to_dns_message(data, false)?; - if !ipv6_enabled { - dns::remove_ipv6_entries(&mut message); - } - message.to_vec()? + let buf = if session_info.dst.port() == DNS_PORT { + let mut message = dns::parse_data_to_dns_message(data, false)?; + if !ipv6_enabled { + dns::remove_ipv6_entries(&mut message); + } + message.to_vec()? + } else { + data.to_vec() + }; + + udp_stack.write_all(&buf).await?; } else { - data.to_vec() - }; - - udp_stack.write_all(&buf).await?; + udp_stack.write_all(buf2).await?; + } } } } @@ -318,13 +483,18 @@ async fn handle_udp_associate_session( async fn handle_dns_over_tcp_session( mut udp_stack: IpStackUdpStream, - server_addr: SocketAddr, proxy_handler: Arc>, + socket_queue: Option>, ipv6_enabled: bool, ) -> crate::Result<()> { - let mut server = TcpStream::connect(server_addr).await?; + let (session_info, server_addr) = { + let handler = proxy_handler.lock().await; + + (handler.get_session_info(), handler.get_server_addr()) + }; + + let mut server = create_tcp_stream(&socket_queue, server_addr).await?; - let session_info = proxy_handler.lock().await.get_session_info(); log::info!("Beginning {}", session_info); let _ = handle_proxy_session(&mut server, proxy_handler).await?; diff --git a/src/no_proxy.rs b/src/no_proxy.rs index 99c4dbf..83edadf 100644 --- a/src/no_proxy.rs +++ b/src/no_proxy.rs @@ -16,6 +16,10 @@ struct NoProxyHandler { #[async_trait::async_trait] impl ProxyHandler for NoProxyHandler { + fn get_server_addr(&self) -> SocketAddr { + self.info.dst + } + fn get_session_info(&self) -> SessionInfo { self.info } diff --git a/src/proxy_handler.rs b/src/proxy_handler.rs index 5621347..94406a6 100644 --- a/src/proxy_handler.rs +++ b/src/proxy_handler.rs @@ -7,6 +7,7 @@ use tokio::sync::Mutex; #[async_trait::async_trait] pub(crate) trait ProxyHandler: Send + Sync { + fn get_server_addr(&self) -> SocketAddr; fn get_session_info(&self) -> SessionInfo; fn get_domain_name(&self) -> Option; async fn push_data(&mut self, event: IncomingDataEvent<'_>) -> std::io::Result<()>; diff --git a/src/socket_transfer.rs b/src/socket_transfer.rs new file mode 100644 index 0000000..194c6f7 --- /dev/null +++ b/src/socket_transfer.rs @@ -0,0 +1,230 @@ +#![cfg(target_os = "linux")] + +use crate::{error, SocketDomain, SocketProtocol}; +use nix::{ + errno::Errno, + fcntl::{self, FdFlag}, + sys::socket::{cmsg_space, getsockopt, recvmsg, sendmsg, sockopt, ControlMessage, ControlMessageOwned, MsgFlags, SockType}, +}; +use serde::{Deserialize, Serialize}; +use std::{ + io::{ErrorKind, IoSlice, IoSliceMut, Result}, + ops::DerefMut, + os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, +}; +use tokio::net::{TcpSocket, UdpSocket, UnixDatagram}; + +const REQUEST_BUFFER_SIZE: usize = 64; + +#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +struct Request { + protocol: SocketProtocol, + domain: SocketDomain, + number: u32, +} + +#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +enum Response { + Ok, +} + +/// Reconstruct socket from raw `fd` +pub fn reconstruct_socket(fd: RawFd) -> Result { + // Check if `fd` is valid + let fd_flags = fcntl::fcntl(fd, fcntl::F_GETFD)?; + + // `fd` is confirmed to be valid so it should be closed + let socket = unsafe { OwnedFd::from_raw_fd(fd) }; + + // Insert CLOEXEC flag to the `fd` to prevent further propagation across `execve(2)` calls + let mut fd_flags = FdFlag::from_bits(fd_flags).ok_or(ErrorKind::Unsupported)?; + if !fd_flags.contains(FdFlag::FD_CLOEXEC) { + fd_flags.insert(FdFlag::FD_CLOEXEC); + fcntl::fcntl(fd, fcntl::F_SETFD(fd_flags))?; + } + + Ok(socket) +} + +/// Reconstruct transfer socket from `fd` +/// +/// Panics if called outside of tokio runtime +pub fn reconstruct_transfer_socket(fd: OwnedFd) -> Result { + // Check if socket of type DATAGRAM + let sock_type = getsockopt(&fd, sockopt::SockType)?; + if !matches!(sock_type, SockType::Datagram) { + return Err(ErrorKind::InvalidInput.into()); + } + + let std_socket: std::os::unix::net::UnixDatagram = fd.into(); + std_socket.set_nonblocking(true)?; + + // Fails if tokio context is absent + Ok(UnixDatagram::from_std(std_socket).unwrap()) +} + +/// Create pair of interconnected sockets one of which is set to stay open across `execve(2)` calls. +pub async fn create_transfer_socket_pair() -> std::io::Result<(UnixDatagram, OwnedFd)> { + let (local, remote) = tokio::net::UnixDatagram::pair()?; + + let remote_fd: OwnedFd = remote.into_std().unwrap().into(); + + // Get `remote_fd` flags + let fd_flags = fcntl::fcntl(remote_fd.as_raw_fd(), fcntl::F_GETFD)?; + + // Remove CLOEXEC flag from the `remote_fd` to allow propagating across `execve(2)` + let mut fd_flags = FdFlag::from_bits(fd_flags).ok_or(ErrorKind::Unsupported)?; + fd_flags.remove(FdFlag::FD_CLOEXEC); + fcntl::fcntl(remote_fd.as_raw_fd(), fcntl::F_SETFD(fd_flags))?; + + Ok((local, remote_fd)) +} + +pub trait TransferableSocket: Sized { + fn from_fd(fd: OwnedFd) -> Result; + fn domain() -> SocketProtocol; +} + +impl TransferableSocket for TcpSocket { + fn from_fd(fd: OwnedFd) -> Result { + // Check if socket is of type STREAM + let sock_type = getsockopt(&fd, sockopt::SockType)?; + if !matches!(sock_type, SockType::Stream) { + return Err(ErrorKind::InvalidInput.into()); + } + + let std_stream: std::net::TcpStream = fd.into(); + std_stream.set_nonblocking(true)?; + + Ok(TcpSocket::from_std_stream(std_stream)) + } + + fn domain() -> SocketProtocol { + SocketProtocol::Tcp + } +} + +impl TransferableSocket for UdpSocket { + /// Panics if called outside of tokio runtime + fn from_fd(fd: OwnedFd) -> Result { + // Check if socket is of type DATAGRAM + let sock_type = getsockopt(&fd, sockopt::SockType)?; + if !matches!(sock_type, SockType::Datagram) { + return Err(ErrorKind::InvalidInput.into()); + } + + let std_socket: std::net::UdpSocket = fd.into(); + std_socket.set_nonblocking(true)?; + + Ok(UdpSocket::try_from(std_socket).unwrap()) + } + + fn domain() -> SocketProtocol { + SocketProtocol::Udp + } +} + +/// Send [`Request`] to `socket` and return received [`TransferableSocket`]s +/// +/// Panics if called outside of tokio runtime +pub async fn request_sockets(mut socket: S, domain: SocketDomain, number: u32) -> error::Result> +where + S: DerefMut, + T: TransferableSocket, +{ + // Borrow socket as mut to prevent multiple simultaneous requests + let socket = socket.deref_mut(); + + // Send request + let request = bincode::serialize(&Request { + protocol: T::domain(), + domain, + number, + })?; + + socket.send(&request[..]).await?; + + // Receive response + loop { + socket.readable().await?; + + let mut buf = [0_u8; REQUEST_BUFFER_SIZE]; + let mut iov = [IoSliceMut::new(&mut buf[..])]; + let mut cmsg = Vec::with_capacity(cmsg_space::() * number as usize); + + let msg = recvmsg::<()>(socket.as_fd().as_raw_fd(), &mut iov, Some(&mut cmsg), MsgFlags::empty()); + + let msg = match msg { + Err(Errno::EAGAIN) => continue, + msg => msg?, + }; + + // Parse response + let response = &msg.iovs().next().unwrap()[..msg.bytes]; + let response: Response = bincode::deserialize(response)?; + if !matches!(response, Response::Ok) { + return Err("Request for new sockets failed".into()); + } + + // Process received file descriptors + let mut sockets = Vec::::with_capacity(number as usize); + for cmsg in msg.cmsgs() { + if let ControlMessageOwned::ScmRights(fds) = cmsg { + for fd in fds { + if fd < 0 { + return Err("Received socket is invalid".into()); + } + + let owned_fd = reconstruct_socket(fd)?; + sockets.push(T::from_fd(owned_fd)?); + } + } + } + + return Ok(sockets); + } +} + +/// Process [`Request`]s received from `socket` +/// +/// Panics if called outside of tokio runtime +pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()> { + loop { + let mut buf = [0_u8; REQUEST_BUFFER_SIZE]; + + let len = socket.recv(&mut buf[..]).await?; + + let request: Request = bincode::deserialize(&buf[..len])?; + + let response = Response::Ok; + let buf = bincode::serialize(&response)?; + + let mut owned_fd_buf: Vec = Vec::with_capacity(request.number as usize); + for _ in 0..request.number { + let fd = match request.protocol { + SocketProtocol::Tcp => match request.domain { + SocketDomain::IpV4 => tokio::net::TcpSocket::new_v4(), + SocketDomain::IpV6 => tokio::net::TcpSocket::new_v6(), + } + .map(|s| unsafe { OwnedFd::from_raw_fd(s.into_raw_fd()) }), + SocketProtocol::Udp => match request.domain { + SocketDomain::IpV4 => tokio::net::UdpSocket::bind("0.0.0.0:0").await, + SocketDomain::IpV6 => tokio::net::UdpSocket::bind("[::]:0").await, + } + .map(|s| s.into_std().unwrap().into()), + }; + match fd { + Err(err) => log::warn!("Failed to allocate socket: {err}"), + Ok(fd) => owned_fd_buf.push(fd), + }; + } + + socket.writable().await?; + + let raw_fd_buf: Vec = owned_fd_buf.iter().map(|fd| fd.as_raw_fd()).collect(); + let cmsg = ControlMessage::ScmRights(&raw_fd_buf[..]); + let iov = [IoSlice::new(&buf[..])]; + + sendmsg::<()>(socket.as_raw_fd(), &iov, &[cmsg], MsgFlags::empty(), None)?; + } +} diff --git a/src/socks.rs b/src/socks.rs index 90a6bcd..a7c0b0b 100644 --- a/src/socks.rs +++ b/src/socks.rs @@ -20,6 +20,7 @@ enum SocksState { } struct SocksProxyImpl { + server_addr: SocketAddr, info: SessionInfo, domain_name: Option, state: SocksState, @@ -35,6 +36,7 @@ struct SocksProxyImpl { impl SocksProxyImpl { fn new( + server_addr: SocketAddr, info: SessionInfo, domain_name: Option, credentials: Option, @@ -42,6 +44,7 @@ impl SocksProxyImpl { command: protocol::Command, ) -> Result { let mut result = Self { + server_addr, info, domain_name, state: SocksState::ClientHello, @@ -260,6 +263,10 @@ impl SocksProxyImpl { #[async_trait::async_trait] impl ProxyHandler for SocksProxyImpl { + fn get_server_addr(&self) -> SocketAddr { + self.server_addr + } + fn get_session_info(&self) -> SessionInfo { self.info } @@ -339,6 +346,7 @@ impl ProxyHandlerManager for SocksProxyManager { let command = if udp_associate { UdpAssociate } else { Connect }; let credentials = self.credentials.clone(); Ok(Arc::new(Mutex::new(SocksProxyImpl::new( + self.server, info, domain_name, credentials, From a08b3338c3e037e0ffa4f8f3741b9a4ae7ce72a1 Mon Sep 17 00:00:00 2001 From: "B. Blechschmidt" Date: Wed, 3 Apr 2024 22:39:05 +0200 Subject: [PATCH 05/12] Apply clippy suggestion --- src/bin/main.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index 2fdad5e..1723895 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -20,10 +20,8 @@ async fn main() -> Result<(), BoxError> { } #[cfg(not(target_os = "linux"))] log::error!("Your platform doesn't support unprivileged namespaces"); - } else { - if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { - log::error!("main loop error: {}", err); - } + } else if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { + log::error!("main loop error: {}", err); } } }); From 181497e709b8ec49e90b82d316ef82ec55268fc1 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Sat, 6 Apr 2024 16:22:26 +0800 Subject: [PATCH 06/12] remove useless get_server_addr --- src/http.rs | 4 ---- src/lib.rs | 2 +- src/no_proxy.rs | 12 +++--------- src/proxy_handler.rs | 1 - src/socks.rs | 4 ---- 5 files changed, 4 insertions(+), 19 deletions(-) diff --git a/src/http.rs b/src/http.rs index 7aa2569..79c403a 100644 --- a/src/http.rs +++ b/src/http.rs @@ -423,10 +423,6 @@ impl ProxyHandlerManager for HttpManager { HttpConnection::new(self.server, info, domain_name, self.credentials.clone(), self.digest_state.clone()).await?, ))) } - - fn get_server_addr(&self) -> SocketAddr { - self.server - } } impl HttpManager { diff --git a/src/lib.rs b/src/lib.rs index 183f37d..b3bf11c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -214,7 +214,7 @@ where ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)) as Arc, ProxyType::Socks4 => Arc::new(SocksProxyManager::new(server_addr, V4, key)) as Arc, ProxyType::Http => Arc::new(HttpManager::new(server_addr, key)) as Arc, - ProxyType::None => Arc::new(NoProxyManager::new(server_addr)) as Arc, + ProxyType::None => Arc::new(NoProxyManager::new()) as Arc, }; let mut ipstack_config = ipstack::IpStackConfig::default(); diff --git a/src/no_proxy.rs b/src/no_proxy.rs index 83edadf..d3c00e0 100644 --- a/src/no_proxy.rs +++ b/src/no_proxy.rs @@ -80,9 +80,7 @@ impl ProxyHandler for NoProxyHandler { } } -pub(crate) struct NoProxyManager { - server: SocketAddr, -} +pub(crate) struct NoProxyManager; #[async_trait::async_trait] impl ProxyHandlerManager for NoProxyManager { @@ -100,14 +98,10 @@ impl ProxyHandlerManager for NoProxyManager { udp_associate, }))) } - - fn get_server_addr(&self) -> SocketAddr { - self.server - } } impl NoProxyManager { - pub(crate) fn new(server: SocketAddr) -> Self { - Self { server } + pub(crate) fn new() -> Self { + Self } } diff --git a/src/proxy_handler.rs b/src/proxy_handler.rs index 94406a6..fb9d9f4 100644 --- a/src/proxy_handler.rs +++ b/src/proxy_handler.rs @@ -27,5 +27,4 @@ pub(crate) trait ProxyHandlerManager: Send + Sync { domain_name: Option, udp_associate: bool, ) -> std::io::Result>>; - fn get_server_addr(&self) -> SocketAddr; } diff --git a/src/socks.rs b/src/socks.rs index a7c0b0b..3800c6a 100644 --- a/src/socks.rs +++ b/src/socks.rs @@ -354,10 +354,6 @@ impl ProxyHandlerManager for SocksProxyManager { command, )?))) } - - fn get_server_addr(&self) -> SocketAddr { - self.server - } } impl SocksProxyManager { From 56be614334e49ac75ecf15a551969f6430ab6f22 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Sat, 6 Apr 2024 23:21:50 +0800 Subject: [PATCH 07/12] Args class --- src/args.rs | 29 ++++++++++++++++++++++++----- src/desktop_api.rs | 22 +++++++++++----------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/args.rs b/src/args.rs index bd9d413..b0766f6 100644 --- a/src/args.rs +++ b/src/args.rs @@ -16,7 +16,7 @@ pub struct Args { /// Name of the tun interface, such as tun0, utun4, etc. /// If this option is not provided, the OS will generate a random one. - #[arg(short, long, value_name = "name", conflicts_with = "tun_fd")] + #[arg(short, long, value_name = "name", conflicts_with = "tun_fd", value_parser = validate_tun)] pub tun: Option, /// File descriptor of the tun interface @@ -31,7 +31,7 @@ pub struct Args { /// File descriptor for UNIX datagram socket meant to transfer /// network sockets from global namespace to the new one. /// See `unshare(1)`, `namespaces(7)`, `sendmsg(2)`, `unix(7)`. - #[arg(long)] + #[arg(long, value_name = "fd")] pub socket_transfer_fd: Option, /// Specify a command to run with root-like capabilities in the new namespace. @@ -43,9 +43,9 @@ pub struct Args { #[arg(short = '6', long)] pub ipv6_enabled: bool, - #[arg(short, long)] /// Routing and system setup, which decides whether to setup the routing and system configuration. /// This option is only available on Linux and requires root-like privileges. See `capabilities(7)`. + #[arg(short, long, default_value = if cfg!(target_os = "linux") { "false" } else { "true" })] pub setup: bool, /// DNS handling strategy @@ -73,8 +73,20 @@ pub struct Args { pub verbosity: ArgVerbosity, } +fn validate_tun(p: &str) -> Result { + #[cfg(target_os = "macos")] + if p.len() <= 4 || &p[..4] != "utun" { + return Err(Error::from("Invalid tun interface name, please use utunX")); + } + Ok(p.to_string()) +} + impl Default for Args { fn default() -> Self { + #[cfg(target_os = "linux")] + let setup = false; + #[cfg(not(target_os = "linux"))] + let setup = true; Args { proxy: ArgProxy::default(), tun: None, @@ -83,7 +95,7 @@ impl Default for Args { socket_transfer_fd: None, admin_command: Vec::new(), ipv6_enabled: false, - setup: false, + setup, dns: ArgDns::default(), dns_addr: "8.8.8.8".parse().unwrap(), bypass: vec![], @@ -95,9 +107,16 @@ impl Default for Args { } impl Args { + #[allow(clippy::let_and_return)] pub fn parse_args() -> Self { use clap::Parser; - Self::parse() + let args = Self::parse(); + #[cfg(target_os = "linux")] + if !args.setup && args.tun.is_none() { + eprintln!("Missing required argument, '--tun' must present when '--setup' is not used."); + std::process::exit(-1); + } + args } pub fn proxy(&mut self, proxy: ArgProxy) -> &mut Self { diff --git a/src/desktop_api.rs b/src/desktop_api.rs index 71f67dc..72da377 100644 --- a/src/desktop_api.rs +++ b/src/desktop_api.rs @@ -83,25 +83,25 @@ pub unsafe extern "C" fn tun2proxy_with_name_run( pub async fn desktop_run_async(args: Args, shutdown_token: tokio_util::sync::CancellationToken) -> std::io::Result<()> { let bypass_ips = args.bypass.clone(); - let mut config = tun2::Configuration::default(); - config.address(TUN_IPV4).netmask(TUN_NETMASK).mtu(MTU).up(); - config.destination(TUN_GATEWAY); + let mut tun_config = tun2::Configuration::default(); + tun_config.address(TUN_IPV4).netmask(TUN_NETMASK).mtu(MTU).up(); + tun_config.destination(TUN_GATEWAY); if let Some(tun_fd) = args.tun_fd { - config.raw_fd(tun_fd); + tun_config.raw_fd(tun_fd); } else if let Some(ref tun) = args.tun { - config.tun_name(tun); + tun_config.tun_name(tun); } #[cfg(target_os = "linux")] - config.platform_config(|config| { + tun_config.platform_config(|cfg| { #[allow(deprecated)] - config.packet_information(true); - config.ensure_root_privileges(args.setup); + cfg.packet_information(true); + cfg.ensure_root_privileges(args.setup); }); #[cfg(target_os = "windows")] - config.platform_config(|config| { - config.device_guid(Some(12324323423423434234_u128)); + tun_config.platform_config(|cfg| { + cfg.device_guid(Some(12324323423423434234_u128)); }); #[allow(unused_variables)] @@ -113,7 +113,7 @@ pub async fn desktop_run_async(args: Args, shutdown_token: tokio_util::sync::Can #[allow(unused_mut, unused_assignments, unused_variables)] let mut setup = true; - let device = tun2::create_as_async(&config)?; + let device = tun2::create_as_async(&tun_config)?; if let Ok(tun_name) = device.as_ref().tun_name() { tproxy_args = tproxy_args.tun_name(&tun_name); From f9f5401ba4bb23e06d0b8675cc8d69252e1db9b4 Mon Sep 17 00:00:00 2001 From: "Remy D. Farley" Date: Sun, 7 Apr 2024 10:28:11 +0000 Subject: [PATCH 08/12] fix socks5 udp connectivity --- src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b3bf11c..c378a3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -305,9 +305,8 @@ where Ok(proxy_handler) => { let socket_queue = socket_queue.clone(); tokio::spawn(async move { - if let Err(err) = - handle_udp_associate_session(udp, args.proxy.proxy_type, proxy_handler, socket_queue, ipv6_enabled).await - { + let ty = args.proxy.proxy_type; + if let Err(err) = handle_udp_associate_session(udp, ty, proxy_handler, socket_queue, ipv6_enabled).await { log::info!("Ending {} with \"{}\"", info, err); } log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); @@ -406,13 +405,12 @@ async fn handle_udp_associate_session( log::info!("Beginning {}", session_info); - let udp_addr = match udp_addr { - Some(udp_addr) => udp_addr, + let (_server, udp_addr) = match udp_addr { + Some(udp_addr) => (None, udp_addr), None => { let mut server = create_tcp_stream(&socket_queue, server_addr).await?; - let udp_addr = handle_proxy_session(&mut server, proxy_handler).await?; - udp_addr.ok_or("udp associate failed")? + (Some(server), udp_addr.ok_or("udp associate failed")?) } }; From af6a8a3cb01024c12bfe9fa9bbc49372f0d331f4 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Sun, 7 Apr 2024 19:02:57 +0800 Subject: [PATCH 09/12] minor changes --- src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index c378a3d..71f8298 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -405,6 +405,8 @@ async fn handle_udp_associate_session( log::info!("Beginning {}", session_info); + // `_server` is meaningful here, it must be alive all the time + // to ensure that UDP transmission will not be interrupted accidentally. let (_server, udp_addr) = match udp_addr { Some(udp_addr) => (None, udp_addr), None => { @@ -567,6 +569,9 @@ async fn handle_dns_over_tcp_session( Ok(()) } +/// This function is used to handle the business logic of tun2proxy and SOCKS5 server. +/// When handling UDP proxy, the return value UDP associate IP address is the result of this business logic. +/// However, when handling TCP business logic, the return value Ok(None) is meaningless, just indicating that the operation was successful. async fn handle_proxy_session(server: &mut TcpStream, proxy_handler: Arc>) -> crate::Result> { let mut launched = false; let mut proxy_handler = proxy_handler.lock().await; From e8469f0aee568e8f3d67c296d88cdf674d70a567 Mon Sep 17 00:00:00 2001 From: "B. Blechschmidt" Date: Sun, 7 Apr 2024 21:12:20 +0200 Subject: [PATCH 10/12] Restrict namespace arguments to Linux --- src/args.rs | 20 ++++++++++++++------ src/bin/main.rs | 9 +++++---- src/desktop_api.rs | 39 ++++++++++++++++++++------------------- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/args.rs b/src/args.rs index b0766f6..e5d1ca5 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,9 +1,10 @@ use crate::{Error, Result}; use socks5_impl::protocol::UserKey; -use std::{ - ffi::OsString, - net::{IpAddr, SocketAddr, ToSocketAddrs}, -}; + +#[cfg(target_os = "linux")] +use std::ffi::OsString; + +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; #[derive(Debug, Clone, clap::Parser)] #[command(author, version, about = "Tunnel interface to proxy.", long_about = None)] @@ -25,17 +26,21 @@ pub struct Args { /// Create a tun interface in a newly created unprivileged namespace /// while maintaining proxy connectivity via the global network namespace. + #[cfg(target_os = "linux")] #[arg(long)] pub unshare: bool, /// File descriptor for UNIX datagram socket meant to transfer /// network sockets from global namespace to the new one. /// See `unshare(1)`, `namespaces(7)`, `sendmsg(2)`, `unix(7)`. - #[arg(long, value_name = "fd")] + #[cfg(target_os = "linux")] + #[arg(long, value_name = "fd", hide(true))] pub socket_transfer_fd: Option, - /// Specify a command to run with root-like capabilities in the new namespace. + /// Specify a command to run with root-like capabilities in the new namespace + /// when using `--unshare`. /// This could be useful to start additional daemons, e.g. `openvpn` instance. + #[cfg(target_os = "linux")] #[arg(requires = "unshare")] pub admin_command: Vec, @@ -91,8 +96,11 @@ impl Default for Args { proxy: ArgProxy::default(), tun: None, tun_fd: None, + #[cfg(target_os = "linux")] unshare: false, + #[cfg(target_os = "linux")] socket_transfer_fd: None, + #[cfg(target_os = "linux")] admin_command: Vec::new(), ipv6_enabled: false, setup, diff --git a/src/bin/main.rs b/src/bin/main.rs index 1723895..4984050 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -13,14 +13,15 @@ async fn main() -> Result<(), BoxError> { let join_handle = tokio::spawn({ let shutdown_token = shutdown_token.clone(); async move { + #[cfg(target_os = "linux")] if args.unshare && args.socket_transfer_fd.is_none() { - #[cfg(target_os = "linux")] if let Err(err) = namespace_proxy_main(args, shutdown_token).await { log::error!("namespace proxy error: {}", err); } - #[cfg(not(target_os = "linux"))] - log::error!("Your platform doesn't support unprivileged namespaces"); - } else if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { + return; + } + + if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { log::error!("main loop error: {}", err); } } diff --git a/src/desktop_api.rs b/src/desktop_api.rs index 72da377..e1fc29e 100644 --- a/src/desktop_api.rs +++ b/src/desktop_api.rs @@ -149,6 +149,7 @@ pub async fn desktop_run_async(args: Args, shutdown_token: tokio_util::sync::Can run_ip_util(format!("-6 route delete 80::/1 dev {}", tproxy_args.tun_name)); } + #[cfg(target_os = "linux")] if setup && args.unshare { // New namespace doesn't have any other routing device by default // So our `tun` device should act as such to make space for other proxies. @@ -164,27 +165,27 @@ pub async fn desktop_run_async(args: Args, shutdown_token: tokio_util::sync::Can run_ip_util(format!("-6 route add ::/0 dev {}", tproxy_args.tun_name)); } } - } - let mut admin_command_args = args.admin_command.iter(); - if let Some(command) = admin_command_args.next() { - let child = tokio::process::Command::new(command) - .args(admin_command_args) - .kill_on_drop(true) - .spawn(); + let mut admin_command_args = args.admin_command.iter(); + if let Some(command) = admin_command_args.next() { + let child = tokio::process::Command::new(command) + .args(admin_command_args) + .kill_on_drop(true) + .spawn(); - match child { - Err(err) => { - log::warn!("Failed to start admin process: {err}"); - } - Ok(mut child) => { - tokio::spawn(async move { - if let Err(err) = child.wait().await { - log::warn!("Admin process terminated: {err}"); - } - }); - } - }; + match child { + Err(err) => { + log::warn!("Failed to start admin process: {err}"); + } + Ok(mut child) => { + tokio::spawn(async move { + if let Err(err) = child.wait().await { + log::warn!("Admin process terminated: {err}"); + } + }); + } + }; + } } let join_handle = tokio::spawn(crate::run(device, MTU, args, shutdown_token)); From 4f5a128972b025c6034414385e001472e558b20e Mon Sep 17 00:00:00 2001 From: "B. Blechschmidt" Date: Sun, 7 Apr 2024 21:21:59 +0200 Subject: [PATCH 11/12] Update README --- README.md | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 952ed3a..ea290a4 100644 --- a/README.md +++ b/README.md @@ -114,22 +114,32 @@ sudo ip link del tun0 ``` Tunnel interface to proxy. -Usage: tun2proxy [OPTIONS] --proxy +Usage: tun2proxy [OPTIONS] --proxy [ADMIN_COMMAND]... + +Arguments: + [ADMIN_COMMAND]... Specify a command to run with root-like capabilities in the new namespace when using `--unshare`. + This could be useful to start additional daemons, e.g. `openvpn` instance Options: - -p, --proxy Proxy URL in the form proto://[username[:password]@]host:port, where proto is one of socks4, - socks5, http. For example: socks5://myname:password@127.0.0.1:1080 - -t, --tun Name of the tun interface [default: tun0] - --tun-fd File descriptor of the tun interface - -6, --ipv6-enabled IPv6 enabled - -s, --setup Routing and system setup, which decides whether to setup the routing and system configuration, - this option requires root privileges - -d, --dns DNS handling strategy [default: direct] [possible values: virtual, over-tcp, direct] - --dns-addr DNS resolver address [default: 8.8.8.8] - -b, --bypass IPs used in routing setup which should bypass the tunnel - -v, --verbosity Verbosity level [default: info] [possible values: off, error, warn, info, debug, trace] - -h, --help Print help - -V, --version Print version + -p, --proxy Proxy URL in the form proto://[username[:password]@]host:port, where proto is one of + socks4, socks5, http. For example: socks5://myname:password@127.0.0.1:1080 + -t, --tun Name of the tun interface, such as tun0, utun4, etc. If this option is not provided, the + OS will generate a random one + --tun-fd File descriptor of the tun interface + --unshare Create a tun interface in a newly created unprivileged namespace while maintaining proxy + connectivity via the global network namespace + -6, --ipv6-enabled IPv6 enabled + -s, --setup Routing and system setup, which decides whether to setup the routing and system + configuration. This option is only available on Linux and requires root-like privileges. + See `capabilities(7)` + -d, --dns DNS handling strategy [default: direct] [possible values: virtual, over-tcp, direct] + --dns-addr DNS resolver address [default: 8.8.8.8] + -b, --bypass IPs used in routing setup which should bypass the tunnel + --tcp-timeout TCP timeout in seconds [default: 600] + --udp-timeout UDP timeout in seconds [default: 10] + -v, --verbosity Verbosity level [default: info] [possible values: off, error, warn, info, debug, trace] + -h, --help Print help + -V, --version Print version ``` Currently, tun2proxy supports HTTP, SOCKS4/SOCKS4a and SOCKS5. A proxy is supplied to the `--proxy` argument in the URL format. For example, an HTTP proxy at `1.2.3.4:3128` with a username of `john.doe` and a password of `secret` is From 40368dd232ee2a8c0640db76e5bc39bdce8474ec Mon Sep 17 00:00:00 2001 From: "B. Blechschmidt" Date: Sun, 7 Apr 2024 21:44:50 +0200 Subject: [PATCH 12/12] Increase security and portability through the use of /proc/self/exe --- src/bin/main.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index 4984050..9360b54 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -45,13 +45,17 @@ async fn namespace_proxy_main( _args: Args, _shutdown_token: tokio_util::sync::CancellationToken, ) -> Result { + use nix::fcntl::{open, OFlag}; + use nix::sys::stat::Mode; use std::os::fd::AsRawFd; let (socket, remote_fd) = tun2proxy::socket_transfer::create_transfer_socket_pair().await?; + let fd = open("/proc/self/exe", OFlag::O_PATH, Mode::empty())?; + let child = tokio::process::Command::new("unshare") .args("--user --map-current-user --net --mount --keep-caps --kill-child --fork".split(' ')) - .arg(std::env::current_exe()?) + .arg(format!("/proc/self/fd/{}", fd)) .arg("--socket-transfer-fd") .arg(remote_fd.as_raw_fd().to_string()) .args(std::env::args().skip(1))