support udp gateway mode

This commit is contained in:
suchao 2024-10-19 22:39:13 +08:00
parent 0a833d69a6
commit c27af98585
3 changed files with 19 additions and 14 deletions

View file

@ -13,7 +13,7 @@ use tokio::sync::mpsc::Sender;
pub use tun2proxy::udpgw::*; pub use tun2proxy::udpgw::*;
use tun2proxy::ArgVerbosity; use tun2proxy::ArgVerbosity;
use tun2proxy::Result; use tun2proxy::Result;
pub(crate) const CLIENT_DISCONNECT_TIMEOUT: tokio::time::Duration = std::time::Duration::from_secs(30); pub(crate) const CLIENT_DISCONNECT_TIMEOUT: tokio::time::Duration = std::time::Duration::from_secs(60);
#[derive(Debug)] #[derive(Debug)]
struct UdpRequest { struct UdpRequest {
@ -42,6 +42,7 @@ pub struct UdpGwArgs {
pub verbosity: ArgVerbosity, pub verbosity: ArgVerbosity,
/// Daemonize for unix family or run as Windows service /// Daemonize for unix family or run as Windows service
#[cfg(unix)]
#[arg(long)] #[arg(long)]
pub daemonize: bool, pub daemonize: bool,
@ -71,8 +72,12 @@ async fn send_error(tx: Sender<Vec<u8>>, con: &mut UdpRequest) {
} }
} }
async fn send_keepalive_response(tx: Sender<Vec<u8>>, keepalive_packet: &[u8]) { async fn send_keepalive_response(tx: Sender<Vec<u8>>, conid: u16) {
if let Err(e) = tx.send(keepalive_packet.to_vec()).await { let mut keepalive_packet = vec![];
keepalive_packet.extend_from_slice(&(std::mem::size_of::<UdpgwHeader>() as u16).to_le_bytes());
keepalive_packet.extend_from_slice(&[UDPGW_FLAG_KEEPALIVE]);
keepalive_packet.extend_from_slice(&conid.to_le_bytes());
if let Err(e) = tx.send(keepalive_packet).await {
log::error!("send keepalive response error {:?}", e); log::error!("send keepalive response error {:?}", e);
} }
} }
@ -177,6 +182,7 @@ pub fn parse_udp(udp_mtu: u16, data_len: usize, data: &[u8]) -> Result<(&[u8], u
async fn process_udp(addr: SocketAddr, udp_timeout: u64, tx: Sender<Vec<u8>>, con: &mut UdpRequest) -> Result<()> { async fn process_udp(addr: SocketAddr, udp_timeout: u64, tx: Sender<Vec<u8>>, con: &mut UdpRequest) -> Result<()> {
let std_sock = std::net::UdpSocket::bind("0.0.0.0:0")?; let std_sock = std::net::UdpSocket::bind("0.0.0.0:0")?;
std_sock.set_nonblocking(true)?; std_sock.set_nonblocking(true)?;
#[cfg(unix)]
nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?; nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?;
let socket = UdpSocket::from_std(std_sock)?; let socket = UdpSocket::from_std(std_sock)?;
socket.send_to(&con.data, &con.server_addr).await?; socket.send_to(&con.data, &con.server_addr).await?;
@ -222,6 +228,7 @@ async fn process_client_udp_req<'a>(args: &UdpGwArgs, tx: Sender<Vec<u8>>, mut c
let mut len_buf = [0; mem::size_of::<PackLenHeader>()]; let mut len_buf = [0; mem::size_of::<PackLenHeader>()];
let udp_mtu = args.udp_mtu; let udp_mtu = args.udp_mtu;
let udp_timeout = args.udp_timeout; let udp_timeout = args.udp_timeout;
'out: loop { 'out: loop {
let result; let result;
match tokio::time::timeout(tokio::time::Duration::from_secs(2), tcp_read_stream.read(&mut len_buf)).await { match tokio::time::timeout(tokio::time::Duration::from_secs(2), tcp_read_stream.read(&mut len_buf)).await {
@ -267,7 +274,7 @@ async fn process_client_udp_req<'a>(args: &UdpGwArgs, tx: Sender<Vec<u8>>, mut c
if let Ok((udpdata, flags, conid, reqaddr)) = ret { if let Ok((udpdata, flags, conid, reqaddr)) = ret {
if flags & UDPGW_FLAG_KEEPALIVE != 0 { if flags & UDPGW_FLAG_KEEPALIVE != 0 {
log::debug!("client {} send keepalive", client.addr); log::debug!("client {} send keepalive", client.addr);
send_keepalive_response(tx.clone(), udpdata).await; send_keepalive_response(tx.clone(), conid).await;
continue; continue;
} }
log::debug!( log::debug!(
@ -331,12 +338,6 @@ async fn main() -> Result<()> {
.map_err(|e| format!("Failed to daemonize process, error:{:?}", e))?; .map_err(|e| format!("Failed to daemonize process, error:{:?}", e))?;
} }
#[cfg(target_os = "windows")]
if args.daemonize {
tun2proxy::win_svc::start_service()?;
return Ok(());
}
loop { loop {
let (mut tcp_stream, addr) = tcp_listener.accept().await?; let (mut tcp_stream, addr) = tcp_listener.accept().await?;
let client = Client { let client = Client {

View file

@ -479,7 +479,7 @@ async fn handle_udp_gateway_session(
socket_queue: Option<Arc<SocketQueue>>, socket_queue: Option<Arc<SocketQueue>>,
ipv6_enabled: bool, ipv6_enabled: bool,
) -> crate::Result<()> { ) -> crate::Result<()> {
let (session_info, server_addr) = { let (_session_info, server_addr) = {
let handler = proxy_handler.lock().await; let handler = proxy_handler.lock().await;
(handler.get_session_info(), handler.get_server_addr()) (handler.get_session_info(), handler.get_server_addr())
}; };
@ -555,6 +555,7 @@ async fn handle_udp_gateway_session(
); );
break; break;
} }
log::debug!("{} <- {} send udpgw len {}", udpinfo, &tcp_local_addr,read_len);
server_stream.update_activity(); server_stream.update_activity();
} }
ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut stream_reader) => { ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut stream_reader) => {
@ -574,6 +575,7 @@ async fn handle_udp_gateway_session(
} }
UdpGwResponse::Data(data) => { UdpGwResponse::Data(data) => {
let len = data.len(); let len = data.len();
log::debug!("{} <- {} receive udpgw len {}", udpinfo, &tcp_local_addr,len);
if let Err(e) = UdpGwClient::send_udp_packet(data, &mut udp_stack).await { if let Err(e) = UdpGwClient::send_udp_packet(data, &mut udp_stack).await {
log::error!("Ending {} <- {} with send_udp_packet {}", udpinfo, &tcp_local_addr, e); log::error!("Ending {} <- {} with send_udp_packet {}", udpinfo, &tcp_local_addr, e);
break; break;

View file

@ -281,16 +281,18 @@ impl UdpGwClient {
}; };
log::debug!("{:?}:{} send keepalive", stream_writer.inner.local_addr(), stream.id()); log::debug!("{:?}:{} send keepalive", stream_writer.inner.local_addr(), stream.id());
if let Err(e) = stream_writer.inner.write_all(&self.keepalive_packet).await { if let Err(e) = stream_writer.inner.write_all(&self.keepalive_packet).await {
log::warn!("{:?}:{} Heartbeat failed: {}", stream_writer.inner.local_addr(), stream.id(), e); log::warn!("{:?}:{} send keepalive failed: {}", stream_writer.inner.local_addr(), stream.id(), e);
} else { } else {
match UdpGwClient::recv_udpgw_packet(self.udp_mtu, 10, &mut stream_reader).await { match UdpGwClient::recv_udpgw_packet(self.udp_mtu, 10, &mut stream_reader).await {
Ok(UdpGwResponse::KeepAlive) => { Ok(UdpGwResponse::KeepAlive) => {
stream.last_activity = std::time::Instant::now(); stream.update_activity();
self.release_server_connection_with_stream(stream, stream_reader, stream_writer) self.release_server_connection_with_stream(stream, stream_reader, stream_writer)
.await; .await;
} }
//shoud not receive other type //shoud not receive other type
_ => {} _ => {
log::warn!("{:?}:{} keepalive no response", stream_writer.inner.local_addr(), stream.id());
}
} }
} }
} }