From c27af985855fb0b015d5c8fef577f59c00fac728 Mon Sep 17 00:00:00 2001 From: suchao Date: Sat, 19 Oct 2024 22:39:13 +0800 Subject: [PATCH] support udp gateway mode --- src/bin/udpgw_server.rs | 21 +++++++++++---------- src/lib.rs | 4 +++- src/udpgw.rs | 8 +++++--- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/bin/udpgw_server.rs b/src/bin/udpgw_server.rs index 7652209..5d82229 100644 --- a/src/bin/udpgw_server.rs +++ b/src/bin/udpgw_server.rs @@ -13,7 +13,7 @@ use tokio::sync::mpsc::Sender; pub use tun2proxy::udpgw::*; use tun2proxy::ArgVerbosity; use tun2proxy::Result; -pub(crate) const CLIENT_DISCONNECT_TIMEOUT: tokio::time::Duration = std::time::Duration::from_secs(30); +pub(crate) const CLIENT_DISCONNECT_TIMEOUT: tokio::time::Duration = std::time::Duration::from_secs(60); #[derive(Debug)] struct UdpRequest { @@ -42,6 +42,7 @@ pub struct UdpGwArgs { pub verbosity: ArgVerbosity, /// Daemonize for unix family or run as Windows service + #[cfg(unix)] #[arg(long)] pub daemonize: bool, @@ -71,8 +72,12 @@ async fn send_error(tx: Sender>, con: &mut UdpRequest) { } } -async fn send_keepalive_response(tx: Sender>, keepalive_packet: &[u8]) { - if let Err(e) = tx.send(keepalive_packet.to_vec()).await { +async fn send_keepalive_response(tx: Sender>, conid: u16) { + let mut keepalive_packet = vec![]; + keepalive_packet.extend_from_slice(&(std::mem::size_of::() as u16).to_le_bytes()); + keepalive_packet.extend_from_slice(&[UDPGW_FLAG_KEEPALIVE]); + keepalive_packet.extend_from_slice(&conid.to_le_bytes()); + if let Err(e) = tx.send(keepalive_packet).await { log::error!("send keepalive response error {:?}", e); } } @@ -177,6 +182,7 @@ pub fn parse_udp(udp_mtu: u16, data_len: usize, data: &[u8]) -> Result<(&[u8], u async fn process_udp(addr: SocketAddr, udp_timeout: u64, tx: Sender>, con: &mut UdpRequest) -> Result<()> { let std_sock = std::net::UdpSocket::bind("0.0.0.0:0")?; std_sock.set_nonblocking(true)?; + #[cfg(unix)] nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?; let socket = UdpSocket::from_std(std_sock)?; socket.send_to(&con.data, &con.server_addr).await?; @@ -222,6 +228,7 @@ async fn process_client_udp_req<'a>(args: &UdpGwArgs, tx: Sender>, mut c let mut len_buf = [0; mem::size_of::()]; let udp_mtu = args.udp_mtu; let udp_timeout = args.udp_timeout; + 'out: loop { let result; match tokio::time::timeout(tokio::time::Duration::from_secs(2), tcp_read_stream.read(&mut len_buf)).await { @@ -267,7 +274,7 @@ async fn process_client_udp_req<'a>(args: &UdpGwArgs, tx: Sender>, mut c if let Ok((udpdata, flags, conid, reqaddr)) = ret { if flags & UDPGW_FLAG_KEEPALIVE != 0 { log::debug!("client {} send keepalive", client.addr); - send_keepalive_response(tx.clone(), udpdata).await; + send_keepalive_response(tx.clone(), conid).await; continue; } log::debug!( @@ -331,12 +338,6 @@ async fn main() -> Result<()> { .map_err(|e| format!("Failed to daemonize process, error:{:?}", e))?; } - #[cfg(target_os = "windows")] - if args.daemonize { - tun2proxy::win_svc::start_service()?; - return Ok(()); - } - loop { let (mut tcp_stream, addr) = tcp_listener.accept().await?; let client = Client { diff --git a/src/lib.rs b/src/lib.rs index e5c9998..fa506d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -479,7 +479,7 @@ async fn handle_udp_gateway_session( socket_queue: Option>, ipv6_enabled: bool, ) -> crate::Result<()> { - let (session_info, server_addr) = { + let (_session_info, server_addr) = { let handler = proxy_handler.lock().await; (handler.get_session_info(), handler.get_server_addr()) }; @@ -555,6 +555,7 @@ async fn handle_udp_gateway_session( ); break; } + log::debug!("{} <- {} send udpgw len {}", udpinfo, &tcp_local_addr,read_len); server_stream.update_activity(); } ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut stream_reader) => { @@ -574,6 +575,7 @@ async fn handle_udp_gateway_session( } UdpGwResponse::Data(data) => { let len = data.len(); + log::debug!("{} <- {} receive udpgw len {}", udpinfo, &tcp_local_addr,len); if let Err(e) = UdpGwClient::send_udp_packet(data, &mut udp_stack).await { log::error!("Ending {} <- {} with send_udp_packet {}", udpinfo, &tcp_local_addr, e); break; diff --git a/src/udpgw.rs b/src/udpgw.rs index d6989dd..3649043 100644 --- a/src/udpgw.rs +++ b/src/udpgw.rs @@ -281,16 +281,18 @@ impl UdpGwClient { }; log::debug!("{:?}:{} send keepalive", stream_writer.inner.local_addr(), stream.id()); if let Err(e) = stream_writer.inner.write_all(&self.keepalive_packet).await { - log::warn!("{:?}:{} Heartbeat failed: {}", stream_writer.inner.local_addr(), stream.id(), e); + log::warn!("{:?}:{} send keepalive failed: {}", stream_writer.inner.local_addr(), stream.id(), e); } else { match UdpGwClient::recv_udpgw_packet(self.udp_mtu, 10, &mut stream_reader).await { Ok(UdpGwResponse::KeepAlive) => { - stream.last_activity = std::time::Instant::now(); + stream.update_activity(); self.release_server_connection_with_stream(stream, stream_reader, stream_writer) .await; } //shoud not receive other type - _ => {} + _ => { + log::warn!("{:?}:{} keepalive no response", stream_writer.inner.local_addr(), stream.id()); + } } } }