rename TcpProxy to ProxyHandler

This commit is contained in:
ssrlive 2023-09-01 11:28:06 +08:00
parent 11995d525b
commit e5a645638a
3 changed files with 35 additions and 41 deletions

View file

@ -2,7 +2,7 @@ use crate::{
error::Error, error::Error,
tun2proxy::{ tun2proxy::{
ConnectionInfo, ConnectionManager, Direction, IncomingDataEvent, IncomingDirection, OutgoingDataEvent, ConnectionInfo, ConnectionManager, Direction, IncomingDataEvent, IncomingDirection, OutgoingDataEvent,
OutgoingDirection, TcpProxy, OutgoingDirection, ProxyHandler,
}, },
}; };
use base64::Engine; use base64::Engine;
@ -317,7 +317,7 @@ impl HttpConnection {
} }
} }
impl TcpProxy for HttpConnection { impl ProxyHandler for HttpConnection {
fn get_connection_info(&self) -> &ConnectionInfo { fn get_connection_info(&self) -> &ConnectionInfo {
&self.info &self.info
} }
@ -395,7 +395,7 @@ pub(crate) struct HttpManager {
} }
impl ConnectionManager for HttpManager { impl ConnectionManager for HttpManager {
fn new_tcp_proxy(&self, info: &ConnectionInfo, _: bool) -> Result<Box<dyn TcpProxy>, Error> { fn new_proxy_handler(&self, info: &ConnectionInfo, _: bool) -> Result<Box<dyn ProxyHandler>, Error> {
if info.protocol != IpProtocol::Tcp { if info.protocol != IpProtocol::Tcp {
return Err("Invalid protocol".into()); return Err("Invalid protocol".into());
} }

View file

@ -2,7 +2,7 @@ use crate::{
error::{Error, Result}, error::{Error, Result},
tun2proxy::{ tun2proxy::{
ConnectionInfo, ConnectionManager, Direction, IncomingDataEvent, IncomingDirection, OutgoingDataEvent, ConnectionInfo, ConnectionManager, Direction, IncomingDataEvent, IncomingDirection, OutgoingDataEvent,
OutgoingDirection, TcpProxy, OutgoingDirection, ProxyHandler,
}, },
}; };
use socks5_impl::protocol::{self, handshake, password_method, Address, AuthMethod, StreamOperation, UserKey, Version}; use socks5_impl::protocol::{self, handshake, password_method, Address, AuthMethod, StreamOperation, UserKey, Version};
@ -268,7 +268,7 @@ impl SocksProxyImpl {
} }
} }
impl TcpProxy for SocksProxyImpl { impl ProxyHandler for SocksProxyImpl {
fn get_connection_info(&self) -> &ConnectionInfo { fn get_connection_info(&self) -> &ConnectionInfo {
&self.info &self.info
} }
@ -346,7 +346,7 @@ pub(crate) struct SocksProxyManager {
} }
impl ConnectionManager for SocksProxyManager { impl ConnectionManager for SocksProxyManager {
fn new_tcp_proxy(&self, info: &ConnectionInfo, udp_associate: bool) -> Result<Box<dyn TcpProxy>> { fn new_proxy_handler(&self, info: &ConnectionInfo, udp_associate: bool) -> Result<Box<dyn ProxyHandler>> {
use socks5_impl::protocol::Command::{Connect, UdpAssociate}; use socks5_impl::protocol::Command::{Connect, UdpAssociate};
let command = if udp_associate { UdpAssociate } else { Connect }; let command = if udp_associate { UdpAssociate } else { Connect };
let credentials = self.credentials.clone(); let credentials = self.credentials.clone();

View file

@ -186,7 +186,7 @@ struct ConnectionState {
smoltcp_handle: Option<SocketHandle>, smoltcp_handle: Option<SocketHandle>,
mio_stream: TcpStream, mio_stream: TcpStream,
token: Token, token: Token,
tcp_proxy_handler: Box<dyn TcpProxy>, proxy_handler: Box<dyn ProxyHandler>,
close_state: u8, close_state: u8,
wait_read: bool, wait_read: bool,
wait_write: bool, wait_write: bool,
@ -198,7 +198,7 @@ struct ConnectionState {
dns_over_tcp_expiry: Option<::std::time::Instant>, dns_over_tcp_expiry: Option<::std::time::Instant>,
} }
pub(crate) trait TcpProxy { pub(crate) trait ProxyHandler {
fn get_connection_info(&self) -> &ConnectionInfo; fn get_connection_info(&self) -> &ConnectionInfo;
fn push_data(&mut self, event: IncomingDataEvent<'_>) -> Result<(), Error>; fn push_data(&mut self, event: IncomingDataEvent<'_>) -> Result<(), Error>;
fn consume_data(&mut self, dir: OutgoingDirection, size: usize); fn consume_data(&mut self, dir: OutgoingDirection, size: usize);
@ -210,7 +210,7 @@ pub(crate) trait TcpProxy {
} }
pub(crate) trait ConnectionManager { pub(crate) trait ConnectionManager {
fn new_tcp_proxy(&self, info: &ConnectionInfo, udp_associate: bool) -> Result<Box<dyn TcpProxy>>; fn new_proxy_handler(&self, info: &ConnectionInfo, udp_associate: bool) -> Result<Box<dyn ProxyHandler>>;
fn close_connection(&self, info: &ConnectionInfo); fn close_connection(&self, info: &ConnectionInfo);
fn get_server_addr(&self) -> SocketAddr; fn get_server_addr(&self) -> SocketAddr;
fn get_credentials(&self) -> &Option<UserKey>; fn get_credentials(&self) -> &Option<UserKey>;
@ -399,10 +399,10 @@ impl<'a> TunToProxy<'a> {
let mut closed_ends = 0; let mut closed_ends = 0;
if (state.close_state & SERVER_WRITE_CLOSED) == SERVER_WRITE_CLOSED if (state.close_state & SERVER_WRITE_CLOSED) == SERVER_WRITE_CLOSED
&& !state && !state
.tcp_proxy_handler .proxy_handler
.have_data(Direction::Incoming(IncomingDirection::FromServer)) .have_data(Direction::Incoming(IncomingDirection::FromServer))
&& !state && !state
.tcp_proxy_handler .proxy_handler
.have_data(Direction::Outgoing(OutgoingDirection::ToClient)) .have_data(Direction::Outgoing(OutgoingDirection::ToClient))
{ {
if let Some(handle) = state.smoltcp_handle { if let Some(handle) = state.smoltcp_handle {
@ -415,10 +415,10 @@ impl<'a> TunToProxy<'a> {
if (state.close_state & CLIENT_WRITE_CLOSED) == CLIENT_WRITE_CLOSED if (state.close_state & CLIENT_WRITE_CLOSED) == CLIENT_WRITE_CLOSED
&& !state && !state
.tcp_proxy_handler .proxy_handler
.have_data(Direction::Incoming(IncomingDirection::FromClient)) .have_data(Direction::Incoming(IncomingDirection::FromClient))
&& !state && !state
.tcp_proxy_handler .proxy_handler
.have_data(Direction::Outgoing(OutgoingDirection::ToServer)) .have_data(Direction::Outgoing(OutgoingDirection::ToServer))
{ {
// Close remote server // Close remote server
@ -452,7 +452,7 @@ impl<'a> TunToProxy<'a> {
direction: IncomingDirection::FromClient, direction: IncomingDirection::FromClient,
buffer: data, buffer: data,
}; };
error = state.tcp_proxy_handler.push_data(event); error = state.proxy_handler.push_data(event);
(data.len(), ()) (data.len(), ())
})?; })?;
} }
@ -534,9 +534,9 @@ impl<'a> TunToProxy<'a> {
if !self.connection_map.contains_key(info) { if !self.connection_map.contains_key(info) {
log::info!("DNS over TCP {} ({})", info, origin_dst); log::info!("DNS over TCP {} ({})", info, origin_dst);
let tcp_proxy_handler = manager.new_tcp_proxy(info, false)?; let proxy_handler = manager.new_proxy_handler(info, false)?;
let server_addr = manager.get_server_addr(); let server_addr = manager.get_server_addr();
let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?; let state = self.create_new_tcp_connection_state(server_addr, origin_dst, proxy_handler, false)?;
self.connection_map.insert(info.clone(), state); self.connection_map.insert(info.clone(), state);
// TODO: Move this 3 lines to the function end? // TODO: Move this 3 lines to the function end?
@ -561,7 +561,7 @@ impl<'a> TunToProxy<'a> {
direction: IncomingDirection::FromClient, direction: IncomingDirection::FromClient,
buffer: &buf, buffer: &buf,
}; };
state.tcp_proxy_handler.push_data(data_event)?; state.proxy_handler.push_data(data_event)?;
Ok(()) Ok(())
} }
@ -589,13 +589,13 @@ impl<'a> TunToProxy<'a> {
direction: IncomingDirection::FromServer, direction: IncomingDirection::FromServer,
buffer: &data[0..read], buffer: &data[0..read],
}; };
if let Err(error) = state.tcp_proxy_handler.push_data(data_event) { if let Err(error) = state.proxy_handler.push_data(data_event) {
log::error!("{}", error); log::error!("{}", error);
self.remove_connection(&info.clone())?; self.remove_connection(&info.clone())?;
return Ok(()); return Ok(());
} }
let dns_event = state.tcp_proxy_handler.peek_data(OutgoingDirection::ToClient); let dns_event = state.proxy_handler.peek_data(OutgoingDirection::ToClient);
let mut buf = dns_event.buffer.to_vec(); let mut buf = dns_event.buffer.to_vec();
let mut to_send: LinkedList<Vec<u8>> = LinkedList::new(); let mut to_send: LinkedList<Vec<u8>> = LinkedList::new();
@ -615,9 +615,7 @@ impl<'a> TunToProxy<'a> {
let ip = dns::extract_ipaddr_from_dns_message(&message); let ip = dns::extract_ipaddr_from_dns_message(&message);
log::trace!("DNS over TCP query result: {} -> {:?}", name, ip); log::trace!("DNS over TCP query result: {} -> {:?}", name, ip);
state state.proxy_handler.consume_data(OutgoingDirection::ToClient, len + 2);
.tcp_proxy_handler
.consume_data(OutgoingDirection::ToClient, len + 2);
if !self.options.ipv6_enabled { if !self.options.ipv6_enabled {
dns::remove_ipv6_entries(&mut message); dns::remove_ipv6_entries(&mut message);
@ -667,9 +665,9 @@ impl<'a> TunToProxy<'a> {
) -> Result<()> { ) -> Result<()> {
if !self.connection_map.contains_key(info) { if !self.connection_map.contains_key(info) {
log::info!("UDP associate session {} ({})", info, origin_dst); log::info!("UDP associate session {} ({})", info, origin_dst);
let tcp_proxy_handler = manager.new_tcp_proxy(info, true)?; let proxy_handler = manager.new_proxy_handler(info, true)?;
let server_addr = manager.get_server_addr(); let server_addr = manager.get_server_addr();
let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?; let state = self.create_new_tcp_connection_state(server_addr, origin_dst, proxy_handler, true)?;
self.connection_map.insert(info.clone(), state); self.connection_map.insert(info.clone(), state);
self.expect_smoltcp_send()?; self.expect_smoltcp_send()?;
@ -689,7 +687,7 @@ impl<'a> TunToProxy<'a> {
UdpHeader::new(0, info.dst.clone()).write_to_stream(&mut s5_udp_data)?; UdpHeader::new(0, info.dst.clone()).write_to_stream(&mut s5_udp_data)?;
s5_udp_data.extend_from_slice(payload); s5_udp_data.extend_from_slice(payload);
if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() { if let Some(udp_associate) = state.proxy_handler.get_udp_associate() {
// UDP associate session has been established, we can send packets directly... // UDP associate session has been established, we can send packets directly...
if let Some(socket) = state.udp_socket.as_ref() { if let Some(socket) = state.udp_socket.as_ref() {
socket.send_to(&s5_udp_data, udp_associate)?; socket.send_to(&s5_udp_data, udp_associate)?;
@ -718,9 +716,9 @@ impl<'a> TunToProxy<'a> {
if info.protocol == IpProtocol::Tcp { if info.protocol == IpProtocol::Tcp {
if _first_packet { if _first_packet {
let tcp_proxy_handler = manager.new_tcp_proxy(&info, false)?; let proxy_handler = manager.new_proxy_handler(&info, false)?;
let server = manager.get_server_addr(); let server = manager.get_server_addr();
let state = self.create_new_tcp_connection_state(server, origin_dst, tcp_proxy_handler, false)?; let state = self.create_new_tcp_connection_state(server, origin_dst, proxy_handler, false)?;
self.connection_map.insert(info.clone(), state); self.connection_map.insert(info.clone(), state);
log::info!("Connect done {} ({})", info, origin_dst); log::info!("Connect done {} ({})", info, origin_dst);
@ -773,7 +771,7 @@ impl<'a> TunToProxy<'a> {
&mut self, &mut self,
server_addr: SocketAddr, server_addr: SocketAddr,
dst: SocketAddr, dst: SocketAddr,
tcp_proxy_handler: Box<dyn TcpProxy>, proxy_handler: Box<dyn ProxyHandler>,
udp_associate: bool, udp_associate: bool,
) -> Result<ConnectionState> { ) -> Result<ConnectionState> {
let mut socket = tcp::Socket::new( let mut socket = tcp::Socket::new(
@ -808,7 +806,7 @@ impl<'a> TunToProxy<'a> {
smoltcp_handle: Some(handle), smoltcp_handle: Some(handle),
mio_stream: client, mio_stream: client,
token, token,
tcp_proxy_handler, proxy_handler,
close_state: 0, close_state: 0,
wait_read: true, wait_read: true,
wait_write: false, wait_write: false,
@ -860,7 +858,7 @@ impl<'a> TunToProxy<'a> {
fn write_to_server(&mut self, info: &ConnectionInfo) -> Result<(), Error> { fn write_to_server(&mut self, info: &ConnectionInfo) -> Result<(), Error> {
if let Some(state) = self.connection_map.get_mut(info) { if let Some(state) = self.connection_map.get_mut(info) {
let event = state.tcp_proxy_handler.peek_data(OutgoingDirection::ToServer); let event = state.proxy_handler.peek_data(OutgoingDirection::ToServer);
let buffer_size = event.buffer.len(); let buffer_size = event.buffer.len();
if buffer_size == 0 { if buffer_size == 0 {
state.wait_write = false; state.wait_write = false;
@ -871,9 +869,7 @@ impl<'a> TunToProxy<'a> {
let result = state.mio_stream.write(event.buffer); let result = state.mio_stream.write(event.buffer);
match result { match result {
Ok(written) => { Ok(written) => {
state state.proxy_handler.consume_data(OutgoingDirection::ToServer, written);
.tcp_proxy_handler
.consume_data(OutgoingDirection::ToServer, written);
state.wait_write = written < buffer_size; state.wait_write = written < buffer_size;
Self::update_mio_socket_interest(&mut self.poll, state)?; Self::update_mio_socket_interest(&mut self.poll, state)?;
} }
@ -897,7 +893,7 @@ impl<'a> TunToProxy<'a> {
Some(handle) => handle, Some(handle) => handle,
None => break, None => break,
}; };
let event = state.tcp_proxy_handler.peek_data(OutgoingDirection::ToClient); let event = state.proxy_handler.peek_data(OutgoingDirection::ToClient);
let buflen = event.buffer.len(); let buflen = event.buffer.len();
let consumed; let consumed;
{ {
@ -908,9 +904,7 @@ impl<'a> TunToProxy<'a> {
virtual_dns.touch_ip(&IpAddr::from(socket.local_endpoint().unwrap().addr)); virtual_dns.touch_ip(&IpAddr::from(socket.local_endpoint().unwrap().addr));
} }
consumed = socket.send_slice(event.buffer)?; consumed = socket.send_slice(event.buffer)?;
state state.proxy_handler.consume_data(OutgoingDirection::ToClient, consumed);
.tcp_proxy_handler
.consume_data(OutgoingDirection::ToClient, consumed);
self.expect_smoltcp_send()?; self.expect_smoltcp_send()?;
if consumed < buflen { if consumed < buflen {
self.write_sockets.insert(token); self.write_sockets.insert(token);
@ -994,7 +988,7 @@ impl<'a> TunToProxy<'a> {
// Try to send the first UDP packets to remote SOCKS5 server for UDP associate session // Try to send the first UDP packets to remote SOCKS5 server for UDP associate session
if let Some(state) = self.connection_map.get_mut(info) { if let Some(state) = self.connection_map.get_mut(info) {
if let Some(udp_socket) = state.udp_socket.as_ref() { if let Some(udp_socket) = state.udp_socket.as_ref() {
if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() { if let Some(addr) = state.proxy_handler.get_udp_associate() {
// Consume udp_data_cache data // Consume udp_data_cache data
while let Some(buf) = state.udp_data_cache.pop_front() { while let Some(buf) = state.udp_data_cache.pop_front() {
udp_socket.send_to(&buf, addr)?; udp_socket.send_to(&buf, addr)?;
@ -1030,7 +1024,7 @@ impl<'a> TunToProxy<'a> {
.connection_map .connection_map
.get(&conn_info) .get(&conn_info)
.ok_or("")? .ok_or("")?
.tcp_proxy_handler .proxy_handler
.connection_established(); .connection_established();
if self.options.dns_over_tcp && conn_info.dst.port() == DNS_PORT && established { if self.options.dns_over_tcp && conn_info.dst.port() == DNS_PORT && established {
self.receive_dns_over_tcp_packet_and_write_to_client(&conn_info)?; self.receive_dns_over_tcp_packet_and_write_to_client(&conn_info)?;
@ -1057,14 +1051,14 @@ impl<'a> TunToProxy<'a> {
direction: IncomingDirection::FromServer, direction: IncomingDirection::FromServer,
buffer: &data[0..read], buffer: &data[0..read],
}; };
if let Err(error) = state.tcp_proxy_handler.push_data(data_event) { if let Err(error) = state.proxy_handler.push_data(data_event) {
log::error!("{}", error); log::error!("{}", error);
self.remove_connection(&conn_info.clone())?; self.remove_connection(&conn_info.clone())?;
return Ok(()); return Ok(());
} }
// The handler request for reset the server connection // The handler request for reset the server connection
if state.tcp_proxy_handler.reset_connection() { if state.proxy_handler.reset_connection() {
_ = self.poll.registry().deregister(&mut state.mio_stream); _ = self.poll.registry().deregister(&mut state.mio_stream);
// Closes the connection with the proxy // Closes the connection with the proxy
state.mio_stream.shutdown(Shutdown::Both)?; state.mio_stream.shutdown(Shutdown::Both)?;