Compare commits

..

2 commits

Author SHA1 Message Date
Koi to Coco
a8ebe0b9be
fix: use vec! to allocate buffer #213 (#214)
Some checks are pending
Push or PR / build_n_test (macos-latest) (push) Waiting to run
Push or PR / build_n_test (ubuntu-latest) (push) Waiting to run
Push or PR / build_n_test (windows-latest) (push) Waiting to run
Push or PR / build_n_test_android (push) Waiting to run
Push or PR / build_n_test_ios (push) Waiting to run
Push or PR / Check semver (push) Waiting to run
Integration Tests / Proxy Tests (push) Waiting to run
2025-06-29 16:18:25 +08:00
ssrlive
31b0972801 make rustc happy 2025-06-29 16:11:14 +08:00
14 changed files with 57 additions and 61 deletions

View file

@ -8,6 +8,8 @@ on:
pull_request:
branches:
- '**'
schedule:
- cron: '0 0 * * 0' # Every Sunday at midnight UTC
env:
CARGO_TERM_COLOR: always

View file

@ -6,7 +6,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Get the build time
let build_time = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string();
println!("cargo:rustc-env=BUILD_TIME={}", build_time);
println!("cargo:rustc-env=BUILD_TIME={build_time}");
#[cfg(target_os = "windows")]
if let Ok(cargo_target_dir) = get_cargo_target_dir() {
@ -28,7 +28,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Copy to the target directory
if let Err(e) = std::fs::copy(src_path, &dst_path) {
f.write_all(format!("Failed to copy 'wintun.dll': {}\r\n", e).as_bytes())?;
f.write_all(format!("Failed to copy 'wintun.dll': {e}\r\n").as_bytes())?;
} else {
f.write_all(format!("Copied 'wintun.dll' to '{}'\r\n", dst_path.display()).as_bytes())?;

View file

@ -379,7 +379,7 @@ impl Default for ArgProxy {
impl std::fmt::Display for ArgProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let auth = match &self.credentials {
Some(creds) => format!("{}", creds),
Some(creds) => format!("{creds}"),
None => "".to_owned(),
};
if auth.is_empty() {

View file

@ -50,7 +50,7 @@ async fn main_async(args: Args) -> Result<(), BoxError> {
#[cfg(target_os = "linux")]
if args.unshare && args.socket_transfer_fd.is_none() {
if let Err(err) = namespace_proxy_main(args, shutdown_token).await {
log::error!("namespace proxy error: {}", err);
log::error!("namespace proxy error: {err}");
}
return Ok(0);
}
@ -133,13 +133,10 @@ async fn namespace_proxy_main(
log::info!("Use `tun2proxy-bin --unshare --setup [...] -- openvpn --config [...]`");
log::info!("");
log::info!("To run a new process in the created namespace (e.g. a flatpak app)");
log::info!(
"Use `nsenter --preserve-credentials --user --net --mount --target {} /bin/sh`",
unshare_pid
);
log::info!("Use `nsenter --preserve-credentials --user --net --mount --target {unshare_pid} /bin/sh`");
log::info!("");
if let Some(pidfile) = _args.unshare_pidfile.as_ref() {
log::info!("Writing unshare pid to {}", pidfile);
log::info!("Writing unshare pid to {pidfile}");
std::fs::write(pidfile, unshare_pid.to_string()).ok();
}
tokio::spawn(async move { tun2proxy::socket_transfer::process_socket_requests(&socket).await });

View file

@ -66,14 +66,14 @@ impl UdpGwArgs {
async fn send_error_response(tx: Sender<Packet>, conn_id: u16) {
let error_packet = Packet::build_error_packet(conn_id);
if let Err(e) = tx.send(error_packet).await {
log::error!("send error response error {:?}", e);
log::error!("send error response error {e:?}");
}
}
async fn send_keepalive_response(tx: Sender<Packet>, conn_id: u16) {
let keepalive_packet = Packet::build_keepalive_packet(conn_id);
if let Err(e) = tx.send(keepalive_packet).await {
log::error!("send keepalive response error {:?}", e);
log::error!("send keepalive response error {e:?}");
}
}
@ -150,12 +150,12 @@ async fn process_client_udp_req(args: &UdpGwArgs, tx: Sender<Packet>, mut client
let packet = match res {
Ok(Ok(packet)) => packet,
Ok(Err(e)) => {
log::debug!("client {} retrieve_from_async_stream \"{}\"", masked_addr, e);
log::debug!("client {masked_addr} retrieve_from_async_stream \"{e}\"");
break;
}
Err(e) => {
if client.last_activity.elapsed() >= CLIENT_DISCONNECT_TIMEOUT {
log::debug!("client {} last_activity elapsed \"{e}\"", masked_addr);
log::debug!("client {masked_addr} last_activity elapsed \"{e}\"");
break;
}
continue;
@ -166,19 +166,19 @@ async fn process_client_udp_req(args: &UdpGwArgs, tx: Sender<Packet>, mut client
let flags = packet.header.flags;
let conn_id = packet.header.conn_id;
if flags & UdpFlag::KEEPALIVE == UdpFlag::KEEPALIVE {
log::trace!("client {} send keepalive", masked_addr);
log::trace!("client {masked_addr} send keepalive");
// 2. if keepalive packet, do nothing, send keepalive response to client
send_keepalive_response(tx.clone(), conn_id).await;
continue;
}
log::trace!("client {} received udp data {}", masked_addr, packet);
log::trace!("client {masked_addr} received udp data {packet}");
// 3. process client udpgw packet in a new task
let tx = tx.clone();
tokio::spawn(async move {
if let Err(e) = process_udp(udp_mtu, udp_timeout, tx.clone(), packet).await {
send_error_response(tx, conn_id).await;
log::debug!("client {} process udp function \"{e}\"", masked_addr);
log::debug!("client {masked_addr} process udp function \"{e}\"");
}
});
}
@ -190,7 +190,7 @@ async fn write_to_client(addr: SocketAddr, mut writer: WriteHalf<'_>, mut rx: Re
loop {
use std::io::{Error, ErrorKind::BrokenPipe};
let packet = rx.recv().await.ok_or(Error::new(BrokenPipe, "recv error"))?;
log::trace!("send response to client {} with {}", masked_addr, packet);
log::trace!("send response to client {masked_addr} with {packet}");
let data: Vec<u8> = packet.into();
let _r = writer.write(&data).await?;
}
@ -231,7 +231,7 @@ pub async fn run(args: UdpGwArgs, shutdown_token: tokio_util::sync::Cancellation
};
let client = Client::new(addr);
let masked_addr = mask_socket_addr(addr);
log::info!("client {} connected", masked_addr);
log::info!("client {masked_addr} connected");
let params = args.clone();
tokio::spawn(async move {
let (tx, rx) = tokio::sync::mpsc::channel::<Packet>(100);
@ -240,7 +240,7 @@ pub async fn run(args: UdpGwArgs, shutdown_token: tokio_util::sync::Cancellation
v = process_client_udp_req(&params, tx, client, tcp_read_stream) => v,
v = write_to_client(addr, tcp_write_stream, rx) => v,
};
log::info!("client {} disconnected with {:?}", masked_addr, res);
log::info!("client {masked_addr} disconnected with {res:?}");
});
}
Ok::<(), Error>(())
@ -263,9 +263,7 @@ fn main() -> Result<(), BoxError> {
.stdout(stdout)
.stderr(stderr)
.privileged_action(|| "Executed before drop privileges");
let _ = daemonize
.start()
.map_err(|e| format!("Failed to daemonize process, error:{:?}", e))?;
let _ = daemonize.start().map_err(|e| format!("Failed to daemonize process, error:{e:?}"))?;
}
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;

View file

@ -100,7 +100,7 @@ pub unsafe extern "C" fn tun2proxy_run_with_cli_args(cli_args: *const c_char, tu
pub fn general_run_for_api(args: Args, tun_mtu: u16, packet_information: bool) -> c_int {
log::set_max_level(args.verbosity.into());
if let Err(err) = log::set_boxed_logger(Box::<crate::dump_logger::DumpLogger>::default()) {
log::debug!("set logger error: {}", err);
log::debug!("set logger error: {err}");
}
let shutdown_token = tokio_util::sync::CancellationToken::new();
@ -135,7 +135,7 @@ pub fn general_run_for_api(args: Args, tun_mtu: u16, packet_information: bool) -
}) {
Ok(_) => 0,
Err(e) => {
log::error!("failed to run tun2proxy with error: {:?}", e);
log::error!("failed to run tun2proxy with error: {e:?}");
-4
}
}

View file

@ -142,7 +142,7 @@ impl HttpConnection {
AuthenticationScheme::Basic => {
let auth_b64 = base64easy::encode(credentials.to_string(), base64easy::EngineKind::Standard);
self.server_outbuf
.extend(format!("{}: Basic {}\r\n", PROXY_AUTHORIZATION, auth_b64).as_bytes());
.extend(format!("{PROXY_AUTHORIZATION}: Basic {auth_b64}\r\n").as_bytes());
}
AuthenticationScheme::None => {}
}

View file

@ -237,7 +237,7 @@ where
#[cfg(feature = "udpgw")]
let udpgw_client = args.udpgw_server.map(|addr| {
log::info!("UDP Gateway enabled, server: {}", addr);
log::info!("UDP Gateway enabled, server: {addr}");
use std::time::Duration;
let client = Arc::new(UdpGwClient::new(
mtu,
@ -292,7 +292,7 @@ where
let socket_queue = socket_queue.clone();
tokio::spawn(async move {
if let Err(err) = handle_tcp_session(tcp, proxy_handler, socket_queue).await {
log::error!("{} error \"{}\"", info, err);
log::error!("{info} error \"{err}\"");
}
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1));
});
@ -318,7 +318,7 @@ where
let socket_queue = socket_queue.clone();
tokio::spawn(async move {
if let Err(err) = handle_dns_over_tcp_session(udp, proxy_handler, socket_queue, ipv6_enabled).await {
log::error!("{} error \"{}\"", info, err);
log::error!("{info} error \"{err}\"");
}
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1));
});
@ -328,7 +328,7 @@ where
tokio::spawn(async move {
if let Some(virtual_dns) = virtual_dns {
if let Err(err) = handle_virtual_dns_session(udp, virtual_dns).await {
log::error!("{} error \"{}\"", info, err);
log::error!("{info} error \"{err}\"");
}
}
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1));
@ -360,7 +360,7 @@ where
None => dst.into(),
};
if let Err(e) = handle_udp_gateway_session(udp, udpgw, &dst_addr, proxy_handler, queue, ipv6_enabled).await {
log::info!("Ending {} with \"{}\"", info, e);
log::info!("Ending {info} with \"{e}\"");
}
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1));
});
@ -372,13 +372,13 @@ where
tokio::spawn(async move {
let ty = args.proxy.proxy_type;
if let Err(err) = handle_udp_associate_session(udp, ty, proxy_handler, socket_queue, ipv6_enabled).await {
log::info!("Ending {} with \"{}\"", info, err);
log::info!("Ending {info} with \"{err}\"");
}
log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1));
});
}
Err(e) => {
log::error!("Failed to create UDP connection: {}", e);
log::error!("Failed to create UDP connection: {e}");
}
}
}
@ -402,7 +402,7 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc<Mutex<Vi
let len = match udp.read(&mut buf).await {
Err(e) => {
// indicate UDP read fails not an error.
log::debug!("Virtual DNS session error: {}", e);
log::debug!("Virtual DNS session error: {e}");
break;
}
Ok(len) => len,
@ -412,7 +412,7 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc<Mutex<Vi
}
let (msg, qname, ip) = dns.lock().await.generate_query(&buf[..len])?;
udp.write_all(&msg).await?;
log::debug!("Virtual DNS query: {} -> {}", qname, ip);
log::debug!("Virtual DNS query: {qname} -> {ip}");
}
Ok(())
}
@ -431,7 +431,7 @@ where
total += n as u64;
let (tx, rx) = if is_tx { (n, 0) } else { (0, n) };
if let Err(e) = crate::traffic_status::traffic_status_update(tx, rx) {
log::debug!("Record traffic status error: {}", e);
log::debug!("Record traffic status error: {e}");
}
writer.write_all(&buf[..n]).await?;
}
@ -453,7 +453,7 @@ async fn handle_tcp_session(
let mut server = create_tcp_stream(&socket_queue, server_addr).await?;
log::info!("Beginning {}", session_info);
log::info!("Beginning {session_info}");
if let Err(e) = handle_proxy_session(&mut server, proxy_handler).await {
tcp_stack.shutdown().await?;
@ -467,19 +467,19 @@ async fn handle_tcp_session(
async move {
let r = copy_and_record_traffic(&mut t_rx, &mut s_tx, true).await;
if let Err(err) = s_tx.shutdown().await {
log::trace!("{} s_tx shutdown error {}", session_info, err);
log::trace!("{session_info} s_tx shutdown error {err}");
}
r
},
async move {
let r = copy_and_record_traffic(&mut s_rx, &mut t_tx, false).await;
if let Err(err) = t_tx.shutdown().await {
log::trace!("{} t_tx shutdown error {}", session_info, err);
log::trace!("{session_info} t_tx shutdown error {err}");
}
r
},
);
log::info!("Ending {} with {:?}", session_info, res);
log::info!("Ending {session_info} with {res:?}");
Ok(())
}
@ -509,7 +509,7 @@ async fn handle_udp_gateway_session(
None => {
let mut tcp_server_stream = create_tcp_stream(&socket_queue, proxy_server_addr).await?;
if let Err(e) = handle_proxy_session(&mut tcp_server_stream, proxy_handler).await {
return Err(format!("udpgw connection error: {}", e).into());
return Err(format!("udpgw connection error: {e}").into());
}
break UdpGwClientStream::new(tcp_server_stream);
}
@ -625,7 +625,7 @@ async fn handle_udp_associate_session(
)
};
log::info!("Beginning {}", session_info);
log::info!("Beginning {session_info}");
// `_server` is meaningful here, it must be alive all the time
// to ensure that UDP transmission will not be interrupted accidentally.
@ -702,7 +702,7 @@ async fn handle_udp_associate_session(
}
}
log::info!("Ending {}", session_info);
log::info!("Ending {session_info}");
Ok(())
}
@ -721,7 +721,7 @@ async fn handle_dns_over_tcp_session(
let mut server = create_tcp_stream(&socket_queue, server_addr).await?;
log::info!("Beginning {}", session_info);
log::info!("Beginning {session_info}");
let _ = handle_proxy_session(&mut server, proxy_handler).await?;
@ -774,7 +774,7 @@ async fn handle_dns_over_tcp_session(
let name = dns::extract_domain_from_dns_message(&message)?;
let ip = dns::extract_ipaddr_from_dns_message(&message);
log::trace!("DNS over TCP query result: {} -> {:?}", name, ip);
log::trace!("DNS over TCP query result: {name} -> {ip:?}");
if !ipv6_enabled {
dns::remove_ipv6_entries(&mut message);
@ -794,7 +794,7 @@ async fn handle_dns_over_tcp_session(
}
}
log::info!("Ending {}", session_info);
log::info!("Ending {session_info}");
Ok(())
}

View file

@ -16,7 +16,7 @@ impl std::fmt::Display for IpProtocol {
IpProtocol::Tcp => write!(f, "TCP"),
IpProtocol::Udp => write!(f, "UDP"),
IpProtocol::Icmp => write!(f, "ICMP"),
IpProtocol::Other(v) => write!(f, "Other(0x{:02X})", v),
IpProtocol::Other(v) => write!(f, "Other(0x{v:02X})"),
}
}
}

View file

@ -157,8 +157,7 @@ where
let mut buf = [0_u8; REQUEST_BUFFER_SIZE];
let mut iov = [IoSliceMut::new(&mut buf[..])];
let mut cmsg = Vec::with_capacity(cmsg_space::<RawFd>() * number as usize);
let mut cmsg = vec![0; cmsg_space::<RawFd>() * number as usize];
let msg = recvmsg::<()>(socket.as_fd().as_raw_fd(), &mut iov, Some(&mut cmsg), MsgFlags::empty());
let msg = match msg {

View file

@ -78,7 +78,7 @@ impl SocksProxyImpl {
}
}
SocketAddr::V6(addr) => {
return Err(format!("SOCKS4 does not support IPv6: {}", addr).into());
return Err(format!("SOCKS4 does not support IPv6: {addr}").into());
}
}
self.server_outbuf.extend(ip_vec);
@ -136,7 +136,7 @@ impl SocksProxyImpl {
let response = handshake::Response::retrieve_from_stream(&mut self.server_inbuf.clone());
if let Err(e) = response {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
log::trace!("receive_server_hello_socks5 needs more data \"{}\"...", e);
log::trace!("receive_server_hello_socks5 needs more data \"{e}\"...");
return Ok(());
} else {
return Err(e);
@ -181,7 +181,7 @@ impl SocksProxyImpl {
let response = Response::retrieve_from_stream(&mut self.server_inbuf.clone());
if let Err(e) = response {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
log::trace!("receive_auth_data needs more data \"{}\"...", e);
log::trace!("receive_auth_data needs more data \"{e}\"...");
return Ok(());
} else {
return Err(e);
@ -213,7 +213,7 @@ impl SocksProxyImpl {
let response = protocol::Response::retrieve_from_stream(&mut self.server_inbuf.clone());
if let Err(e) = response {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
log::trace!("receive_connection_status needs more data \"{}\"...", e);
log::trace!("receive_connection_status needs more data \"{e}\"...");
return Ok(());
} else {
return Err(e);

View file

@ -51,7 +51,7 @@ static TIME_STAMP: LazyLock<Mutex<std::time::Instant>> = LazyLock::new(|| Mutex:
pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> {
{
let is_none_or_error = TRAFFIC_STATUS_CALLBACK.lock().map(|guard| guard.is_none()).unwrap_or_else(|e| {
log::error!("Failed to acquire lock: {}", e);
log::error!("Failed to acquire lock: {e}");
true
});
if is_none_or_error {

View file

@ -32,9 +32,9 @@ impl std::fmt::Display for UdpFlag {
0x01 => "KEEPALIVE",
0x20 => "ERR",
0x02 => "DATA",
n => return write!(f, "Unknown UdpFlag(0x{:02X})", n),
n => return write!(f, "Unknown UdpFlag(0x{n:02X})"),
};
write!(f, "{}", flag)
write!(f, "{flag}")
}
}
@ -332,7 +332,7 @@ impl std::fmt::Display for UdpGwResponse {
UdpGwResponse::KeepAlive => write!(f, "KeepAlive"),
UdpGwResponse::Error => write!(f, "Error"),
UdpGwResponse::TcpClose => write!(f, "TcpClose"),
UdpGwResponse::Data(packet) => write!(f, "Data({})", packet),
UdpGwResponse::Data(packet) => write!(f, "Data({packet})"),
}
}
}
@ -487,21 +487,21 @@ impl UdpGwClient {
let keepalive_packet: Vec<u8> = Packet::build_keepalive_packet(sn).into();
tx += keepalive_packet.len();
if let Err(e) = stream_writer.write_all(&keepalive_packet).await {
log::warn!("stream {} {:?} send keepalive failed: {}", sn, local_addr, e);
log::warn!("stream {sn} {local_addr:?} send keepalive failed: {e}");
continue;
}
match UdpGwClient::recv_udpgw_packet(self.udp_mtu, self.udp_timeout, &mut stream_reader).await {
Ok((len, UdpGwResponse::KeepAlive)) => {
stream.update_activity();
self.store_server_connection_full(stream, stream_reader, stream_writer).await;
log::trace!("stream {sn} {:?} send keepalive and recieve it successfully", local_addr);
log::trace!("stream {sn} {local_addr:?} send keepalive and recieve it successfully");
rx += len;
}
Ok((len, v)) => {
log::debug!("stream {sn} {:?} keepalive unexpected response: {v}", local_addr);
log::debug!("stream {sn} {local_addr:?} keepalive unexpected response: {v}");
rx += len;
}
Err(e) => log::debug!("stream {sn} {:?} keepalive no response, error \"{e}\"", local_addr),
Err(e) => log::debug!("stream {sn} {local_addr:?} keepalive no response, error \"{e}\""),
}
}
crate::traffic_status::traffic_status_update(tx, rx)?;

View file

@ -16,7 +16,7 @@ fn my_service_main(arguments: Vec<std::ffi::OsString>) {
// `service_dispatcher::start` from `main`.
if let Err(_e) = run_service(arguments) {
log::error!("Error: {:?}", _e);
log::error!("Error: {_e:?}");
}
}