read code

This commit is contained in:
ssrlive 2024-10-23 19:49:20 +08:00
parent 1ec7caf844
commit 01b5951ec1

View file

@ -490,10 +490,9 @@ async fn handle_udp_gateway_session(
ipv6_enabled: bool,
) -> crate::Result<()> {
let proxy_server_addr = { proxy_handler.lock().await.get_server_addr() };
let udpinfo = SessionInfo::new(udp_stack.local_addr(), udp_dst, IpProtocol::Udp);
let udp_mtu = udpgw_client.get_udp_mtu();
let udp_timeout = udpgw_client.get_udp_timeout();
let mut server_stream = match udpgw_client.get_server_connection().await {
let mut stream = match udpgw_client.get_server_connection().await {
Some(server) => server,
None => {
if udpgw_client.is_full() {
@ -507,91 +506,89 @@ async fn handle_udp_gateway_session(
}
};
let tcp_local_addr = server_stream.local_addr().clone();
let tcp_local_addr = stream.local_addr().clone();
match domain_name {
Some(ref d) => log::info!("Beginning {} <- {}, domain:{}", udpinfo, &tcp_local_addr, d),
None => log::info!("Beginning {} <- {}", udpinfo, &tcp_local_addr),
Some(ref d) => log::info!("[UdpGw] Beginning {} -> {}, domain:{}", &tcp_local_addr, udp_dst, d),
None => log::info!("[UdpGw] Beginning {} -> {}", &tcp_local_addr, udp_dst),
}
let Some(mut stream_reader) = server_stream.get_reader() else {
let Some(mut reader) = stream.get_reader() else {
return Err("get reader failed".into());
};
let Some(mut stream_writer) = server_stream.get_writer() else {
let Some(mut writer) = stream.get_writer() else {
return Err("get writer failed".into());
};
loop {
tokio::select! {
len = UdpGwClient::recv_udp_packet(&mut udp_stack, &mut stream_writer) => {
len = UdpGwClient::recv_udp_packet(&mut udp_stack, &mut writer) => {
let read_len;
match len {
Ok(n) => {
if n == 0 {
log::info!("Ending {} <- {}",udpinfo, &tcp_local_addr);
log::info!("[UdpGw] Ending {} <> {}", &tcp_local_addr, udp_dst);
break;
}
read_len = n;
crate::traffic_status::traffic_status_update(n, 0)?;
}
Err(e) => {
log::info!("Ending {} <- {} with recv_udp_packet {}", udpinfo, &tcp_local_addr, e);
log::info!("[UdpGw] Ending {} <> {} with recv_udp_packet {}", &tcp_local_addr, udp_dst, e);
break;
}
}
let new_id = server_stream.new_id();
if let Err(e) = UdpGwClient::send_udpgw_packet(ipv6_enabled, read_len, udp_dst, domain_name.as_ref(), new_id, &mut stream_writer).await {
log::info!("Ending {} <- {} with send_udpgw_packet {}", udpinfo, &tcp_local_addr, e);
let new_id = stream.new_id();
if let Err(e) = UdpGwClient::send_udpgw_packet(ipv6_enabled, read_len, udp_dst, domain_name.as_ref(), new_id, &mut writer).await {
log::info!("[UdpGw] Ending {} <> {} with send_udpgw_packet {}", &tcp_local_addr, udp_dst, e);
break;
}
log::debug!("{} <- {} send udpgw len {}", udpinfo, &tcp_local_addr, read_len);
server_stream.update_activity();
log::debug!("[UdpGw] {} -> {} send len {}", &tcp_local_addr, udp_dst, read_len);
stream.update_activity();
}
ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut stream_reader) => {
ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut reader) => {
match ret {
Ok(packet) => match packet {
//should not received keepalive
UdpGwResponse::KeepAlive => {
log::error!("Ending {} <- {} with recv keepalive", udpinfo, &tcp_local_addr);
server_stream.close();
log::error!("[UdpGw] Ending {} <> {} with recv keepalive", &tcp_local_addr, udp_dst);
stream.close();
break;
}
//server udp may be timeout,can continue to receive udp data?
UdpGwResponse::Error => {
log::info!("Ending {} <- {} with recv udp error", udpinfo, &tcp_local_addr);
server_stream.update_activity();
log::info!("[UdpGw] Ending {} <> {} with recv udp error", &tcp_local_addr, udp_dst);
stream.update_activity();
continue;
}
UdpGwResponse::TcpClose => {
log::error!("Ending {} <- {} with tcp closed", udpinfo, &tcp_local_addr);
server_stream.close();
log::error!("[UdpGw] Ending {} <> {} with tcp closed", &tcp_local_addr, udp_dst);
stream.close();
break;
}
UdpGwResponse::Data(data) => {
let len = data.len();
log::debug!("{} <- {} receive udpgw len {}", udpinfo, &tcp_local_addr,len);
log::debug!("[UdpGw] {} <- {} receive len {}", &tcp_local_addr, udp_dst, len);
if let Err(e) = UdpGwClient::send_udp_packet(data, &mut udp_stack).await {
log::error!("Ending {} <- {} with send_udp_packet {}", udpinfo, &tcp_local_addr, e);
log::error!("[UdpGw] Ending {} <> {} with send_udp_packet {}", &tcp_local_addr, udp_dst, e);
break;
}
crate::traffic_status::traffic_status_update(0, len)?;
}
},
Err(e) => {
log::warn!("Ending {} <- {} with recv_udpgw_packet {}", udpinfo, &tcp_local_addr, e);
log::warn!("[UdpGw] Ending {} <> {} with recv_udpgw_packet {}", &tcp_local_addr, udp_dst, e);
break;
}
}
server_stream.update_activity();
stream.update_activity();
}
}
}
if !server_stream.is_closed() {
udpgw_client
.release_server_connection_with_stream(server_stream, stream_reader, stream_writer)
.await;
if !stream.is_closed() {
udpgw_client.release_server_connection_with_stream(stream, reader, writer).await;
}
Ok(())