From 731490684175cb6f1a34de1c6a228e5331b6d33f Mon Sep 17 00:00:00 2001 From: ssrlive <30760636+ssrlive@users.noreply.github.com> Date: Mon, 11 Nov 2024 14:48:02 +0800 Subject: [PATCH] mask_socket_addr function --- src/bin/udpgw_server.rs | 53 +++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/src/bin/udpgw_server.rs b/src/bin/udpgw_server.rs index d02d6c4..e78fbb9 100644 --- a/src/bin/udpgw_server.rs +++ b/src/bin/udpgw_server.rs @@ -77,7 +77,7 @@ async fn send_keepalive_response(tx: Sender, conn_id: u16) { /// Send data field of packet from client to destination server and receive response, /// then wrap response data to the packet's data field and send packet back to client. -async fn process_udp(_client: SocketAddr, udp_mtu: u16, udp_timeout: u64, tx: Sender, mut packet: Packet) -> Result<()> { +async fn process_udp(udp_mtu: u16, udp_timeout: u64, tx: Sender, mut packet: Packet) -> Result<()> { let Some(dst_addr) = &packet.address else { return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "udp request address is None").into()); }; @@ -107,22 +107,53 @@ async fn process_udp(_client: SocketAddr, udp_mtu: u16, udp_timeout: u64, tx: Se Ok(()) } +fn mask_ip(ip: &str) -> String { + if ip.len() <= 2 { + return ip.to_string(); + } + let mut masked_ip = String::new(); + for (i, c) in ip.chars().enumerate() { + if i == 0 || i == ip.len() - 1 || c == '.' || c == ':' { + masked_ip.push(c); + } else { + masked_ip.push('*'); + } + } + masked_ip +} + +fn mask_socket_addr(socket_addr: std::net::SocketAddr) -> String { + match socket_addr { + std::net::SocketAddr::V4(addr) => { + let masked_ip = mask_ip(&addr.ip().to_string()); + format!("{}:{}", masked_ip, addr.port()) + } + std::net::SocketAddr::V6(addr) => { + let masked_ip = mask_ip(&addr.ip().to_string()); + format!("[{}]:{}", masked_ip, addr.port()) + } + } +} + async fn process_client_udp_req(args: &UdpGwArgs, tx: Sender, mut client: Client, mut reader: ReadHalf<'_>) -> std::io::Result<()> { let udp_timeout = args.udp_timeout; let udp_mtu = args.udp_mtu; + let masked_addr = mask_socket_addr(client.addr); + loop { + let masked_addr = masked_addr.clone(); // 1. read udpgw packet from client let res = tokio::time::timeout(tokio::time::Duration::from_secs(2), Packet::retrieve_from_async_stream(&mut reader)).await; let packet = match res { Ok(Ok(packet)) => packet, Ok(Err(e)) => { - log::debug!("client {} retrieve_from_async_stream \"{}\"", client.addr, e); + log::debug!("client {} retrieve_from_async_stream \"{}\"", masked_addr, e); break; } Err(e) => { if client.last_activity.elapsed() >= CLIENT_DISCONNECT_TIMEOUT { - log::debug!("client {} last_activity elapsed \"{e}\"", client.addr); + log::debug!("client {} last_activity elapsed \"{e}\"", masked_addr); break; } continue; @@ -133,19 +164,19 @@ async fn process_client_udp_req(args: &UdpGwArgs, tx: Sender, mut client let flags = packet.header.flags; let conn_id = packet.header.conn_id; if flags & UdpFlag::KEEPALIVE == UdpFlag::KEEPALIVE { - log::trace!("client {} send keepalive", client.addr); + log::trace!("client {} send keepalive", masked_addr); // 2. if keepalive packet, do nothing, send keepalive response to client send_keepalive_response(tx.clone(), conn_id).await; continue; } - log::trace!("client {} received udp data {}", client.addr, packet); + log::trace!("client {} received udp data {}", masked_addr, packet); // 3. process client udpgw packet in a new task let tx = tx.clone(); tokio::spawn(async move { - if let Err(e) = process_udp(client.addr, udp_mtu, udp_timeout, tx.clone(), packet).await { + if let Err(e) = process_udp(udp_mtu, udp_timeout, tx.clone(), packet).await { send_error_response(tx, conn_id).await; - log::debug!("client {} process udp function \"{e}\"", client.addr); + log::debug!("client {} process udp function \"{e}\"", masked_addr); } }); } @@ -153,10 +184,11 @@ async fn process_client_udp_req(args: &UdpGwArgs, tx: Sender, mut client } async fn write_to_client(addr: SocketAddr, mut writer: WriteHalf<'_>, mut rx: Receiver) -> std::io::Result<()> { + let masked_addr = mask_socket_addr(addr); loop { use std::io::{Error, ErrorKind::BrokenPipe}; let packet = rx.recv().await.ok_or(Error::new(BrokenPipe, "recv error"))?; - log::trace!("send response to client {} with {}", addr, packet); + log::trace!("send response to client {} with {}", masked_addr, packet); let data: Vec = packet.into(); let _r = writer.write(&data).await?; } @@ -196,7 +228,8 @@ pub async fn run(args: UdpGwArgs, shutdown_token: tokio_util::sync::Cancellation _ = shutdown_token.cancelled() => break, }; let client = Client::new(addr); - log::info!("client {} connected", addr); + let masked_addr = mask_socket_addr(addr); + log::info!("client {} connected", masked_addr); let params = args.clone(); tokio::spawn(async move { let (tx, rx) = tokio::sync::mpsc::channel::(100); @@ -205,7 +238,7 @@ pub async fn run(args: UdpGwArgs, shutdown_token: tokio_util::sync::Cancellation v = process_client_udp_req(¶ms, tx, client, tcp_read_stream) => v, v = write_to_client(addr, tcp_write_stream, rx) => v, }; - log::info!("client {} disconnected with {:?}", addr, res); + log::info!("client {} disconnected with {:?}", masked_addr, res); }); } Ok::<(), Error>(())