minor changes

This commit is contained in:
ssrlive 2024-11-03 15:15:03 +08:00
parent 21355e37da
commit c2382ee29b
3 changed files with 27 additions and 20 deletions

View file

@ -1,4 +1,4 @@
use socks5_impl::protocol::{AddressType, AsyncStreamOperation}; use socks5_impl::protocol::AsyncStreamOperation;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
@ -44,7 +44,7 @@ pub struct UdpGwArgs {
/// Daemonize for unix family or run as Windows service /// Daemonize for unix family or run as Windows service
#[cfg(unix)] #[cfg(unix)]
#[arg(long)] #[arg(short, long)]
pub daemonize: bool, pub daemonize: bool,
/// Verbosity level /// Verbosity level
@ -76,25 +76,22 @@ async fn send_keepalive_response(tx: Sender<Packet>, conn_id: u16) {
/// Send data field of packet from client to destination server and receive response, /// Send data field of packet from client to destination server and receive response,
/// then wrap response data to the packet's data field and send packet back to client. /// then wrap response data to the packet's data field and send packet back to client.
async fn process_udp(client: SocketAddr, udp_mtu: u16, udp_timeout: u64, tx: Sender<Packet>, mut packet: Packet) -> Result<()> { async fn process_udp(_client: SocketAddr, udp_mtu: u16, udp_timeout: u64, tx: Sender<Packet>, mut packet: Packet) -> Result<()> {
let Some(dst_addr) = &packet.address else { let Some(dst_addr) = &packet.address else {
log::error!("client {} udp request address is None", client); return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "udp request address is None").into());
return Ok(());
}; };
let std_sock = if dst_addr.get_type() == AddressType::IPv6 { use std::net::ToSocketAddrs;
std::net::UdpSocket::bind("[::]:0")? let Some(dst_addr) = dst_addr.to_socket_addrs()?.next() else {
} else { return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "to_socket_addrs").into());
std::net::UdpSocket::bind("0.0.0.0:0")? };
let std_sock = match dst_addr {
std::net::SocketAddr::V6(_) => std::net::UdpSocket::bind("[::]:0")?,
std::net::SocketAddr::V4(_) => std::net::UdpSocket::bind("0.0.0.0:0")?,
}; };
std_sock.set_nonblocking(true)?; std_sock.set_nonblocking(true)?;
#[cfg(unix)] #[cfg(unix)]
nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?; nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?;
let socket = UdpSocket::from_std(std_sock)?; let socket = UdpSocket::from_std(std_sock)?;
use std::net::ToSocketAddrs;
let Some(dst_addr) = dst_addr.to_socket_addrs()?.next() else {
log::error!("client {} udp request address to_socket_addrs", client);
return Ok(());
};
// 1. send udp data to destination server // 1. send udp data to destination server
socket.send_to(&packet.data, &dst_addr).await?; socket.send_to(&packet.data, &dst_addr).await?;
// 2. receive response from destination server // 2. receive response from destination server

View file

@ -555,13 +555,16 @@ async fn handle_udp_gateway_session(
stream.update_activity(); stream.update_activity();
} }
ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut reader) => { ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut reader) => {
if let Ok((len, _)) = ret {
crate::traffic_status::traffic_status_update(0, len)?;
}
match ret { match ret {
Err(e) => { Err(e) => {
log::warn!("[UdpGw] Ending stream {} {} <> {} with recv_udpgw_packet {}", sn, &tcp_local_addr, udp_dst, e); log::warn!("[UdpGw] Ending stream {} {} <> {} with recv_udpgw_packet {}", sn, &tcp_local_addr, udp_dst, e);
stream.close(); stream.close();
break; break;
} }
Ok(packet) => match packet { Ok((_, packet)) => match packet {
//should not received keepalive //should not received keepalive
UdpGwResponse::KeepAlive => { UdpGwResponse::KeepAlive => {
log::error!("[UdpGw] Ending stream {} {} <> {} with recv keepalive", sn, &tcp_local_addr, udp_dst); log::error!("[UdpGw] Ending stream {} {} <> {} with recv keepalive", sn, &tcp_local_addr, udp_dst);
@ -588,7 +591,6 @@ async fn handle_udp_gateway_session(
log::error!("[UdpGw] Ending stream {} {} <> {} with send_udp_packet {}", sn, &tcp_local_addr, udp_dst, e); log::error!("[UdpGw] Ending stream {} {} <> {} with send_udp_packet {}", sn, &tcp_local_addr, udp_dst, e);
break; break;
} }
crate::traffic_status::traffic_status_update(0, len)?;
} }
} }
} }

View file

@ -467,6 +467,8 @@ impl UdpGwClient {
} }
} }
let (mut tx, mut rx) = (0, 0);
for mut stream in streams { for mut stream in streams {
if stream.last_activity.elapsed() < self.keepalive_time { if stream.last_activity.elapsed() < self.keepalive_time {
self.store_server_connection(stream).await; self.store_server_connection(stream).await;
@ -483,20 +485,26 @@ impl UdpGwClient {
let local_addr = stream_writer.local_addr()?; let local_addr = stream_writer.local_addr()?;
let sn = stream.serial_number(); let sn = stream.serial_number();
let keepalive_packet: Vec<u8> = Packet::build_keepalive_packet(sn).into(); let keepalive_packet: Vec<u8> = Packet::build_keepalive_packet(sn).into();
tx += keepalive_packet.len();
if let Err(e) = stream_writer.write_all(&keepalive_packet).await { if let Err(e) = stream_writer.write_all(&keepalive_packet).await {
log::warn!("stream {} {:?} send keepalive failed: {}", sn, local_addr, e); log::warn!("stream {} {:?} send keepalive failed: {}", sn, local_addr, e);
continue; continue;
} }
match UdpGwClient::recv_udpgw_packet(self.udp_mtu, self.udp_timeout, &mut stream_reader).await { match UdpGwClient::recv_udpgw_packet(self.udp_mtu, self.udp_timeout, &mut stream_reader).await {
Ok(UdpGwResponse::KeepAlive) => { Ok((len, UdpGwResponse::KeepAlive)) => {
stream.update_activity(); stream.update_activity();
self.store_server_connection_full(stream, stream_reader, stream_writer).await; self.store_server_connection_full(stream, stream_reader, stream_writer).await;
log::trace!("stream {sn} {:?} send keepalive and recieve it successfully", local_addr); log::trace!("stream {sn} {:?} send keepalive and recieve it successfully", local_addr);
rx += len;
}
Ok((len, v)) => {
log::debug!("stream {sn} {:?} keepalive unexpected response: {v}", local_addr);
rx += len;
} }
Ok(v) => log::debug!("stream {sn} {:?} keepalive unexpected response: {v}", local_addr),
Err(e) => log::debug!("stream {sn} {:?} keepalive no response, error \"{e}\"", local_addr), Err(e) => log::debug!("stream {sn} {:?} keepalive no response, error \"{e}\"", local_addr),
} }
} }
crate::traffic_status::traffic_status_update(tx, rx)?;
} }
} }
@ -526,14 +534,14 @@ impl UdpGwClient {
/// ///
/// # Returns /// # Returns
/// - `Result<UdpGwResponse>`: Returns a result type containing the parsed UDP gateway response, or an error if one occurs. /// - `Result<UdpGwResponse>`: Returns a result type containing the parsed UDP gateway response, or an error if one occurs.
pub(crate) async fn recv_udpgw_packet(udp_mtu: u16, udp_timeout: u64, stream: &mut OwnedReadHalf) -> Result<UdpGwResponse> { pub(crate) async fn recv_udpgw_packet(udp_mtu: u16, udp_timeout: u64, stream: &mut OwnedReadHalf) -> Result<(usize, UdpGwResponse)> {
let packet = tokio::time::timeout( let packet = tokio::time::timeout(
tokio::time::Duration::from_secs(udp_timeout + 2), tokio::time::Duration::from_secs(udp_timeout + 2),
Packet::retrieve_from_async_stream(stream), Packet::retrieve_from_async_stream(stream),
) )
.await .await
.map_err(std::io::Error::from)??; .map_err(std::io::Error::from)??;
UdpGwClient::parse_udp_response(udp_mtu, packet) Ok((packet.len(), UdpGwClient::parse_udp_response(udp_mtu, packet)?))
} }
/// Sends a UDP gateway packet. /// Sends a UDP gateway packet.