optimize and fix

This commit is contained in:
suchao 2024-10-20 17:54:21 +08:00
parent b21d7a6c2c
commit 8d3533e327
3 changed files with 25 additions and 5 deletions

View file

@ -178,7 +178,11 @@ pub fn parse_udp(udp_mtu: u16, data_len: usize, data: &[u8]) -> Result<(&[u8], u
}
async fn process_udp(addr: SocketAddr, udp_timeout: u64, tx: Sender<Vec<u8>>, con: &mut UdpRequest) -> Result<()> {
let std_sock = std::net::UdpSocket::bind("0.0.0.0:0")?;
let std_sock = if con.flags & UDPGW_FLAG_IPV6 != 0 {
std::net::UdpSocket::bind("[::]:0")?
} else {
std::net::UdpSocket::bind("0.0.0.0:0")?
};
std_sock.set_nonblocking(true)?;
#[cfg(target_os = "linux")]
nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?;

View file

@ -579,6 +579,11 @@ async fn handle_udp_gateway_session(
server_stream.update_activity();
continue;
}
UdpGwResponse::TcpClose => {
log::error!("Ending {} <- {} with tcp closed", udpinfo, &tcp_local_addr);
server_stream.close();
break;
}
UdpGwResponse::Data(data) => {
let len = data.len();
log::debug!("{} <- {} receive udpgw len {}", udpinfo, &tcp_local_addr,len);

View file

@ -4,6 +4,7 @@ use std::collections::VecDeque;
use std::hash::Hash;
use std::mem;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::atomic::Ordering::Relaxed;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::TcpStream;
@ -17,6 +18,8 @@ pub const UDPGW_FLAG_IPV6: u8 = 0x08;
pub const UDPGW_FLAG_DOMAIN: u8 = 0x10;
pub const UDPGW_FLAG_ERR: u8 = 0x20;
static TCP_COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[repr(C)]
#[repr(packed(1))]
@ -103,6 +106,7 @@ impl<'a> UdpGwData<'a> {
pub(crate) enum UdpGwResponse<'a> {
KeepAlive,
Error,
TcpClose,
Data(UdpGwData<'a>),
}
@ -129,6 +133,12 @@ pub(crate) struct UdpGwClientStream {
last_activity: std::time::Instant,
}
impl Drop for UdpGwClientStream {
fn drop(&mut self) {
TCP_COUNTER.fetch_sub(1, Relaxed);
}
}
impl UdpGwClientStream {
pub fn close(&mut self) {
self.closed = true;
@ -184,6 +194,7 @@ impl UdpGwClientStream {
inner: rx,
recv_buf: vec![0; udp_mtu.into()],
};
TCP_COUNTER.fetch_add(1, Relaxed);
UdpGwClientStream {
local_addr,
reader: Some(reader),
@ -232,7 +243,7 @@ impl UdpGwClient {
}
pub(crate) async fn is_full(&self) -> bool {
self.server_connections.lock().await.len() >= self.max_connections as usize
TCP_COUNTER.load(Relaxed) >= self.max_connections as u32
}
pub(crate) async fn get_server_connection(&self) -> Option<UdpGwClientStream> {
@ -411,7 +422,7 @@ impl UdpGwClient {
}
};
match result {
Ok(0) => Err(("tcp connection closed").into()),
Ok(0) => Ok(UdpGwResponse::TcpClose),
Ok(n) => {
if n < std::mem::size_of::<PackLenHeader>() {
return Err("received PackLenHeader error".into());
@ -425,12 +436,12 @@ impl UdpGwClient {
while left_len > 0 {
if let Ok(len) = stream.inner.read(&mut stream.recv_buf[recv_len..left_len]).await {
if len == 0 {
return Err("tcp connection closed".into());
return Ok(UdpGwResponse::TcpClose);
}
recv_len += len;
left_len -= len;
} else {
return Err("tcp connection closed".into());
return Ok(UdpGwResponse::TcpClose);
}
}
return UdpGwClient::parse_udp_response(udp_mtu, packet_len as usize, stream);