diff --git a/src/lib.rs b/src/lib.rs index f214f08..36c5298 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -490,10 +490,9 @@ async fn handle_udp_gateway_session( ipv6_enabled: bool, ) -> crate::Result<()> { let proxy_server_addr = { proxy_handler.lock().await.get_server_addr() }; - let udpinfo = SessionInfo::new(udp_stack.local_addr(), udp_dst, IpProtocol::Udp); let udp_mtu = udpgw_client.get_udp_mtu(); let udp_timeout = udpgw_client.get_udp_timeout(); - let mut server_stream = match udpgw_client.get_server_connection().await { + let mut stream = match udpgw_client.get_server_connection().await { Some(server) => server, None => { if udpgw_client.is_full() { @@ -507,91 +506,89 @@ async fn handle_udp_gateway_session( } }; - let tcp_local_addr = server_stream.local_addr().clone(); + let tcp_local_addr = stream.local_addr().clone(); match domain_name { - Some(ref d) => log::info!("Beginning {} <- {}, domain:{}", udpinfo, &tcp_local_addr, d), - None => log::info!("Beginning {} <- {}", udpinfo, &tcp_local_addr), + Some(ref d) => log::info!("[UdpGw] Beginning {} -> {}, domain:{}", &tcp_local_addr, udp_dst, d), + None => log::info!("[UdpGw] Beginning {} -> {}", &tcp_local_addr, udp_dst), } - let Some(mut stream_reader) = server_stream.get_reader() else { + let Some(mut reader) = stream.get_reader() else { return Err("get reader failed".into()); }; - let Some(mut stream_writer) = server_stream.get_writer() else { + let Some(mut writer) = stream.get_writer() else { return Err("get writer failed".into()); }; loop { tokio::select! { - len = UdpGwClient::recv_udp_packet(&mut udp_stack, &mut stream_writer) => { + len = UdpGwClient::recv_udp_packet(&mut udp_stack, &mut writer) => { let read_len; match len { Ok(n) => { if n == 0 { - log::info!("Ending {} <- {}",udpinfo, &tcp_local_addr); + log::info!("[UdpGw] Ending {} <> {}", &tcp_local_addr, udp_dst); break; } read_len = n; crate::traffic_status::traffic_status_update(n, 0)?; } Err(e) => { - log::info!("Ending {} <- {} with recv_udp_packet {}", udpinfo, &tcp_local_addr, e); + log::info!("[UdpGw] Ending {} <> {} with recv_udp_packet {}", &tcp_local_addr, udp_dst, e); break; } } - let new_id = server_stream.new_id(); - if let Err(e) = UdpGwClient::send_udpgw_packet(ipv6_enabled, read_len, udp_dst, domain_name.as_ref(), new_id, &mut stream_writer).await { - log::info!("Ending {} <- {} with send_udpgw_packet {}", udpinfo, &tcp_local_addr, e); + let new_id = stream.new_id(); + if let Err(e) = UdpGwClient::send_udpgw_packet(ipv6_enabled, read_len, udp_dst, domain_name.as_ref(), new_id, &mut writer).await { + log::info!("[UdpGw] Ending {} <> {} with send_udpgw_packet {}", &tcp_local_addr, udp_dst, e); break; } - log::debug!("{} <- {} send udpgw len {}", udpinfo, &tcp_local_addr, read_len); - server_stream.update_activity(); + log::debug!("[UdpGw] {} -> {} send len {}", &tcp_local_addr, udp_dst, read_len); + stream.update_activity(); } - ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut stream_reader) => { + ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut reader) => { match ret { Ok(packet) => match packet { //should not received keepalive UdpGwResponse::KeepAlive => { - log::error!("Ending {} <- {} with recv keepalive", udpinfo, &tcp_local_addr); - server_stream.close(); + log::error!("[UdpGw] Ending {} <> {} with recv keepalive", &tcp_local_addr, udp_dst); + stream.close(); break; } //server udp may be timeout,can continue to receive udp data? UdpGwResponse::Error => { - log::info!("Ending {} <- {} with recv udp error", udpinfo, &tcp_local_addr); - server_stream.update_activity(); + log::info!("[UdpGw] Ending {} <> {} with recv udp error", &tcp_local_addr, udp_dst); + stream.update_activity(); continue; } UdpGwResponse::TcpClose => { - log::error!("Ending {} <- {} with tcp closed", udpinfo, &tcp_local_addr); - server_stream.close(); + log::error!("[UdpGw] Ending {} <> {} with tcp closed", &tcp_local_addr, udp_dst); + stream.close(); break; } UdpGwResponse::Data(data) => { let len = data.len(); - log::debug!("{} <- {} receive udpgw len {}", udpinfo, &tcp_local_addr,len); + log::debug!("[UdpGw] {} <- {} receive len {}", &tcp_local_addr, udp_dst, len); if let Err(e) = UdpGwClient::send_udp_packet(data, &mut udp_stack).await { - log::error!("Ending {} <- {} with send_udp_packet {}", udpinfo, &tcp_local_addr, e); + log::error!("[UdpGw] Ending {} <> {} with send_udp_packet {}", &tcp_local_addr, udp_dst, e); break; } crate::traffic_status::traffic_status_update(0, len)?; } }, Err(e) => { - log::warn!("Ending {} <- {} with recv_udpgw_packet {}", udpinfo, &tcp_local_addr, e); + log::warn!("[UdpGw] Ending {} <> {} with recv_udpgw_packet {}", &tcp_local_addr, udp_dst, e); break; } } - server_stream.update_activity(); + stream.update_activity(); } } } - if !server_stream.is_closed() { - udpgw_client - .release_server_connection_with_stream(server_stream, stream_reader, stream_writer) - .await; + if !stream.is_closed() { + udpgw_client.release_server_connection_with_stream(stream, reader, writer).await; } Ok(())