From c2382ee29b4c35669cf8d8debd90a1c364c7f943 Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Sun, 3 Nov 2024 15:15:03 +0800 Subject: [PATCH] minor changes --- src/bin/udpgw_server.rs | 25 +++++++++++-------------- src/lib.rs | 6 ++++-- src/udpgw.rs | 16 ++++++++++++---- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/bin/udpgw_server.rs b/src/bin/udpgw_server.rs index 550f51f..8f475e5 100644 --- a/src/bin/udpgw_server.rs +++ b/src/bin/udpgw_server.rs @@ -1,4 +1,4 @@ -use socks5_impl::protocol::{AddressType, AsyncStreamOperation}; +use socks5_impl::protocol::AsyncStreamOperation; use std::net::SocketAddr; use tokio::{ io::AsyncWriteExt, @@ -44,7 +44,7 @@ pub struct UdpGwArgs { /// Daemonize for unix family or run as Windows service #[cfg(unix)] - #[arg(long)] + #[arg(short, long)] pub daemonize: bool, /// Verbosity level @@ -76,25 +76,22 @@ async fn send_keepalive_response(tx: Sender, conn_id: u16) { /// Send data field of packet from client to destination server and receive response, /// then wrap response data to the packet's data field and send packet back to client. -async fn process_udp(client: SocketAddr, udp_mtu: u16, udp_timeout: u64, tx: Sender, mut packet: Packet) -> Result<()> { +async fn process_udp(_client: SocketAddr, udp_mtu: u16, udp_timeout: u64, tx: Sender, mut packet: Packet) -> Result<()> { let Some(dst_addr) = &packet.address else { - log::error!("client {} udp request address is None", client); - return Ok(()); + return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "udp request address is None").into()); }; - let std_sock = if dst_addr.get_type() == AddressType::IPv6 { - std::net::UdpSocket::bind("[::]:0")? - } else { - std::net::UdpSocket::bind("0.0.0.0:0")? + use std::net::ToSocketAddrs; + let Some(dst_addr) = dst_addr.to_socket_addrs()?.next() else { + return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "to_socket_addrs").into()); + }; + let std_sock = match dst_addr { + std::net::SocketAddr::V6(_) => std::net::UdpSocket::bind("[::]:0")?, + std::net::SocketAddr::V4(_) => std::net::UdpSocket::bind("0.0.0.0:0")?, }; std_sock.set_nonblocking(true)?; #[cfg(unix)] nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?; let socket = UdpSocket::from_std(std_sock)?; - use std::net::ToSocketAddrs; - let Some(dst_addr) = dst_addr.to_socket_addrs()?.next() else { - log::error!("client {} udp request address to_socket_addrs", client); - return Ok(()); - }; // 1. send udp data to destination server socket.send_to(&packet.data, &dst_addr).await?; // 2. receive response from destination server diff --git a/src/lib.rs b/src/lib.rs index 932dda8..1972b9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -555,13 +555,16 @@ async fn handle_udp_gateway_session( stream.update_activity(); } ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut reader) => { + if let Ok((len, _)) = ret { + crate::traffic_status::traffic_status_update(0, len)?; + } match ret { Err(e) => { log::warn!("[UdpGw] Ending stream {} {} <> {} with recv_udpgw_packet {}", sn, &tcp_local_addr, udp_dst, e); stream.close(); break; } - Ok(packet) => match packet { + Ok((_, packet)) => match packet { //should not received keepalive UdpGwResponse::KeepAlive => { log::error!("[UdpGw] Ending stream {} {} <> {} with recv keepalive", sn, &tcp_local_addr, udp_dst); @@ -588,7 +591,6 @@ async fn handle_udp_gateway_session( log::error!("[UdpGw] Ending stream {} {} <> {} with send_udp_packet {}", sn, &tcp_local_addr, udp_dst, e); break; } - crate::traffic_status::traffic_status_update(0, len)?; } } } diff --git a/src/udpgw.rs b/src/udpgw.rs index be791f8..ae2cb87 100644 --- a/src/udpgw.rs +++ b/src/udpgw.rs @@ -467,6 +467,8 @@ impl UdpGwClient { } } + let (mut tx, mut rx) = (0, 0); + for mut stream in streams { if stream.last_activity.elapsed() < self.keepalive_time { self.store_server_connection(stream).await; @@ -483,20 +485,26 @@ impl UdpGwClient { let local_addr = stream_writer.local_addr()?; let sn = stream.serial_number(); let keepalive_packet: Vec = Packet::build_keepalive_packet(sn).into(); + tx += keepalive_packet.len(); if let Err(e) = stream_writer.write_all(&keepalive_packet).await { log::warn!("stream {} {:?} send keepalive failed: {}", sn, local_addr, e); continue; } match UdpGwClient::recv_udpgw_packet(self.udp_mtu, self.udp_timeout, &mut stream_reader).await { - Ok(UdpGwResponse::KeepAlive) => { + Ok((len, UdpGwResponse::KeepAlive)) => { stream.update_activity(); self.store_server_connection_full(stream, stream_reader, stream_writer).await; log::trace!("stream {sn} {:?} send keepalive and recieve it successfully", local_addr); + rx += len; + } + Ok((len, v)) => { + log::debug!("stream {sn} {:?} keepalive unexpected response: {v}", local_addr); + rx += len; } - Ok(v) => log::debug!("stream {sn} {:?} keepalive unexpected response: {v}", local_addr), Err(e) => log::debug!("stream {sn} {:?} keepalive no response, error \"{e}\"", local_addr), } } + crate::traffic_status::traffic_status_update(tx, rx)?; } } @@ -526,14 +534,14 @@ impl UdpGwClient { /// /// # Returns /// - `Result`: Returns a result type containing the parsed UDP gateway response, or an error if one occurs. - pub(crate) async fn recv_udpgw_packet(udp_mtu: u16, udp_timeout: u64, stream: &mut OwnedReadHalf) -> Result { + pub(crate) async fn recv_udpgw_packet(udp_mtu: u16, udp_timeout: u64, stream: &mut OwnedReadHalf) -> Result<(usize, UdpGwResponse)> { let packet = tokio::time::timeout( tokio::time::Duration::from_secs(udp_timeout + 2), Packet::retrieve_from_async_stream(stream), ) .await .map_err(std::io::Error::from)??; - UdpGwClient::parse_udp_response(udp_mtu, packet) + Ok((packet.len(), UdpGwClient::parse_udp_response(udp_mtu, packet)?)) } /// Sends a UDP gateway packet.