merge master

This commit is contained in:
ssrlive 2023-08-22 15:24:05 +08:00
commit 8b566b66d7
2 changed files with 152 additions and 122 deletions

View file

@ -9,6 +9,9 @@ pub enum Error {
#[error("std::io::Error {0}")] #[error("std::io::Error {0}")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("TryFromIntError {0:?}")]
TryFromInt(#[from] std::num::TryFromIntError),
#[error("std::net::AddrParseError {0}")] #[error("std::net::AddrParseError {0}")]
AddrParse(#[from] std::net::AddrParseError), AddrParse(#[from] std::net::AddrParseError),

View file

@ -456,15 +456,7 @@ impl<'a> TunToProxy<'a> {
Ok(()) Ok(())
} }
// A raw packet was received on the tunnel interface. fn preprocess_origin_connection_info(&mut self, info: ConnectionInfo) -> Result<ConnectionInfo> {
fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> {
let mut handler = || -> Result<(), Error> {
let result = connection_tuple(frame);
if let Err(error) = result {
log::info!("{}, ignored", error);
return Ok(());
}
let (info, _first_packet, payload_offset, payload_size) = result?;
let origin_dst = SocketAddr::try_from(&info.dst)?; let origin_dst = SocketAddr::try_from(&info.dst)?;
let connection_info = match &mut self.options.virtual_dns { let connection_info = match &mut self.options.virtual_dns {
None => { None => {
@ -485,15 +477,101 @@ impl<'a> TunToProxy<'a> {
} }
} }
}; };
Ok(connection_info)
}
fn process_incoming_udp_packets(
&mut self,
manager: &Rc<dyn ConnectionManager>,
info: &ConnectionInfo,
origin_dst: SocketAddr,
payload: &[u8],
) -> Result<()> {
if self.options.dns_over_tcp && origin_dst.port() == DNS_PORT {
dns::parse_data_to_dns_message(payload, false)?;
if !self.connection_map.contains_key(info) {
log::info!("DNS over TCP {} ({})", info, origin_dst);
let tcp_proxy_handler = manager.new_tcp_proxy(info, false)?;
let server_addr = manager.get_server_addr();
let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?;
self.connection_map.insert(info.clone(), state);
} else {
log::trace!("DNS over TCP subsequent packet {} ({})", info, origin_dst);
}
// Insert the DNS message length in front of the payload
let len = u16::try_from(payload.len())?;
let mut buf = Vec::with_capacity(2 + usize::from(len));
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(payload);
// FIXME: Build an IP packet with TCP and inject it into the device.
self.device.inject_packet(&buf);
self.expect_smoltcp_send()?;
self.tunsocket_read_and_forward(info)?;
self.write_to_server(info)?;
return Ok(());
}
if !self.connection_map.contains_key(info) {
log::info!("UDP associate session {} ({})", info, origin_dst);
let tcp_proxy_handler = manager.new_tcp_proxy(info, true)?;
let server_addr = manager.get_server_addr();
let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?;
state.udp_origin_dst = Some(origin_dst);
self.connection_map.insert(info.clone(), state);
self.expect_smoltcp_send()?;
self.tunsocket_read_and_forward(info)?;
self.write_to_server(info)?;
} else {
log::trace!("Subsequent udp packet {} ({})", info, origin_dst);
}
let err = "udp associate state not find";
let state = self.connection_map.get_mut(info).ok_or(err)?;
assert!(state.expiry.is_some());
state.expiry = Some(Self::udp_associate_timeout());
// Add SOCKS5 UDP header to the incoming data
let mut s5_udp_data = Vec::<u8>::new();
UdpHeader::new(0, info.dst.clone()).write_to_stream(&mut s5_udp_data)?;
s5_udp_data.extend_from_slice(payload);
if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() {
// UDP associate session has been established, we can send packets directly...
if let Some(socket) = state.udp_socket.as_ref() {
socket.send_to(&s5_udp_data, udp_associate)?;
}
} else {
// UDP associate tunnel not ready yet, we must cache the packets...
log::trace!("Cache udp packet {} ({})", info, origin_dst);
state.udp_data_cache.push_back(s5_udp_data);
}
Ok(())
}
// A raw packet was received on the tunnel interface.
fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> {
let mut handler = || -> Result<(), Error> {
let result = connection_tuple(frame);
if let Err(error) = result {
log::info!("{}, ignored", error);
return Ok(());
}
let (info, _first_packet, payload_offset, payload_size) = result?;
let origin_dst = SocketAddr::try_from(&info.dst)?;
let connection_info = self.preprocess_origin_connection_info(info)?;
let manager = self.get_connection_manager().ok_or("get connection manager")?; let manager = self.get_connection_manager().ok_or("get connection manager")?;
let server_addr = manager.get_server_addr();
if connection_info.protocol == IpProtocol::Tcp { if connection_info.protocol == IpProtocol::Tcp {
if _first_packet { if _first_packet {
let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?; let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?;
#[rustfmt::skip] let server = manager.get_server_addr();
let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?; let state = self.create_new_tcp_connection_state(server, origin_dst, tcp_proxy_handler, false)?;
self.connection_map.insert(connection_info.clone(), state); self.connection_map.insert(connection_info.clone(), state);
log::info!("Connect done {} ({})", connection_info, origin_dst); log::info!("Connect done {} ({})", connection_info, origin_dst);
@ -527,65 +605,7 @@ impl<'a> TunToProxy<'a> {
self.send_udp_packet_to_client(origin_dst, connection_info.src, response.as_slice())?; self.send_udp_packet_to_client(origin_dst, connection_info.src, response.as_slice())?;
} else { } else {
// Another UDP packet // Another UDP packet
if self.options.dns_over_tcp && port == DNS_PORT { self.process_incoming_udp_packets(&manager, &connection_info, origin_dst, payload)?;
if !self.connection_map.contains_key(&connection_info) {
log::info!("DNS over TCP {} ({})", connection_info, origin_dst);
let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, false)?;
#[rustfmt::skip]
let state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, false)?;
self.connection_map.insert(connection_info.clone(), state);
} else {
log::trace!("Subsequent dns over tcp packet {} ({})", connection_info, origin_dst);
}
let len = payload.len() as u16;
let mut buf = Vec::with_capacity(2 + len as usize);
buf.extend_from_slice(&len.to_be_bytes());
buf.extend_from_slice(payload);
// TODO: Build an IP packet and inject it into the device.
self.device.inject_packet(&buf);
self.expect_smoltcp_send()?;
self.tunsocket_read_and_forward(&connection_info)?;
self.write_to_server(&connection_info)?;
return Ok(());
}
if !self.connection_map.contains_key(&connection_info) {
log::info!("UDP associate session {} ({})", connection_info, origin_dst);
let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, true)?;
#[rustfmt::skip]
let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?;
state.udp_origin_dst = Some(origin_dst);
self.connection_map.insert(connection_info.clone(), state);
self.expect_smoltcp_send()?;
self.tunsocket_read_and_forward(&connection_info)?;
self.write_to_server(&connection_info)?;
} else {
log::trace!("Subsequent udp packet {} ({})", connection_info, origin_dst);
}
let err = "udp associate state not find";
let state = self.connection_map.get_mut(&connection_info).ok_or(err)?;
assert!(state.expiry.is_some());
state.expiry = Some(Self::udp_associate_timeout());
// Add SOCKS5 UDP header to the incoming data
let mut s5_udp_data = Vec::<u8>::new();
UdpHeader::new(0, connection_info.dst.clone()).write_to_stream(&mut s5_udp_data)?;
s5_udp_data.extend_from_slice(payload);
if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() {
// UDP associate session has been established, we can send packets directly...
if let Some(socket) = state.udp_socket.as_ref() {
socket.send_to(&s5_udp_data, udp_associate)?;
}
} else {
// UDP associate tunnel not ready yet, we must cache the packets...
log::trace!("Cache udp packet {} ({})", connection_info, origin_dst);
state.udp_data_cache.push_back(s5_udp_data);
}
} }
} else { } else {
log::warn!("Unsupported protocol: {} ({})", connection_info, origin_dst); log::warn!("Unsupported protocol: {} ({})", connection_info, origin_dst);
@ -781,11 +801,9 @@ impl<'a> TunToProxy<'a> {
Ok(()) Ok(())
} }
fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> { fn receive_udp_packet_and_write_to_client(&mut self, info: &ConnectionInfo) -> Result<()> {
if let Some(info) = self.find_info_by_udp_token(event.token()) {
let info = info.clone();
let err = "udp connection state not found"; let err = "udp connection state not found";
let state = self.connection_map.get_mut(&info).ok_or(err)?; let state = self.connection_map.get_mut(info).ok_or(err)?;
state.expiry = Some(Self::udp_associate_timeout()); state.expiry = Some(Self::udp_associate_timeout());
let mut to_send: LinkedList<Vec<u8>> = LinkedList::new(); let mut to_send: LinkedList<Vec<u8>> = LinkedList::new();
if let Some(udp_socket) = state.udp_socket.as_ref() { if let Some(udp_socket) = state.udp_socket.as_ref() {
@ -813,8 +831,27 @@ impl<'a> TunToProxy<'a> {
while let Some(packet) = to_send.pop_front() { while let Some(packet) = to_send.pop_front() {
self.send_udp_packet_to_client(src, info.src, &packet)?; self.send_udp_packet_to_client(src, info.src, &packet)?;
} }
Ok(())
}
return Ok(()); fn comsume_cached_udp_packets(&mut self, info: &ConnectionInfo) -> Result<()> {
// Try to send the first UDP packets to remote SOCKS5 server for UDP associate session
if let Some(state) = self.connection_map.get_mut(info) {
if let Some(udp_socket) = state.udp_socket.as_ref() {
if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() {
// Consume udp_data_cache data
while let Some(buf) = state.udp_data_cache.pop_front() {
udp_socket.send_to(&buf, addr)?;
}
}
}
}
Ok(())
}
fn mio_socket_event(&mut self, event: &Event) -> Result<(), Error> {
if let Some(info) = self.find_info_by_udp_token(event.token()) {
return self.receive_udp_packet_and_write_to_client(&info.clone());
} }
let conn_info = match self.find_info_by_token(event.token()) { let conn_info = match self.find_info_by_token(event.token()) {
@ -896,17 +933,7 @@ impl<'a> TunToProxy<'a> {
// server. // server.
self.write_to_server(&conn_info)?; self.write_to_server(&conn_info)?;
// Try to send the first UDP packet to remote SOCKS5 server for UDP associate session self.comsume_cached_udp_packets(&conn_info)?;
if let Some(state) = self.connection_map.get_mut(&conn_info) {
if let Some(udp_socket) = state.udp_socket.as_ref() {
if let Some(addr) = state.tcp_proxy_handler.get_udp_associate() {
// Consume udp_data_cache data
while let Some(buf) = state.udp_data_cache.pop_front() {
udp_socket.send_to(&buf, addr)?;
}
}
}
}
} }
if event.is_writable() { if event.is_writable() {