mirror of
https://github.com/tun2proxy/tun2proxy.git
synced 2025-04-23 07:19:08 +00:00
deal_with_incoming_udp_packets
This commit is contained in:
parent
d42d3a8287
commit
0f3903f455
1 changed files with 46 additions and 35 deletions
|
@ -456,6 +456,51 @@ impl<'a> TunToProxy<'a> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn deal_with_incoming_udp_packets(
|
||||||
|
&mut self,
|
||||||
|
manager: &Rc<dyn ConnectionManager>,
|
||||||
|
info: &ConnectionInfo,
|
||||||
|
origin_dst: SocketAddr,
|
||||||
|
payload: &[u8],
|
||||||
|
) -> Result<()> {
|
||||||
|
if !self.connection_map.contains_key(info) {
|
||||||
|
log::info!("UDP associate session {} ({})", info, origin_dst);
|
||||||
|
let tcp_proxy_handler = manager.new_tcp_proxy(info, true)?;
|
||||||
|
let server_addr = manager.get_server_addr();
|
||||||
|
let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?;
|
||||||
|
state.udp_origin_dst = Some(origin_dst);
|
||||||
|
self.connection_map.insert(info.clone(), state);
|
||||||
|
|
||||||
|
self.expect_smoltcp_send()?;
|
||||||
|
self.tunsocket_read_and_forward(info)?;
|
||||||
|
self.write_to_server(info)?;
|
||||||
|
} else {
|
||||||
|
log::trace!("Subsequent udp packet {} ({})", info, origin_dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
let err = "udp associate state not find";
|
||||||
|
let state = self.connection_map.get_mut(info).ok_or(err)?;
|
||||||
|
assert!(state.expiry.is_some());
|
||||||
|
state.expiry = Some(Self::udp_associate_timeout());
|
||||||
|
|
||||||
|
// Add SOCKS5 UDP header to the incoming data
|
||||||
|
let mut s5_udp_data = Vec::<u8>::new();
|
||||||
|
UdpHeader::new(0, info.dst.clone()).write_to_stream(&mut s5_udp_data)?;
|
||||||
|
s5_udp_data.extend_from_slice(payload);
|
||||||
|
|
||||||
|
if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() {
|
||||||
|
// UDP associate session has been established, we can send packets directly...
|
||||||
|
if let Some(socket) = state.udp_socket.as_ref() {
|
||||||
|
socket.send_to(&s5_udp_data, udp_associate)?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// UDP associate tunnel not ready yet, we must cache the packets...
|
||||||
|
log::trace!("Cache udp packet {} ({})", info, origin_dst);
|
||||||
|
state.udp_data_cache.push_back(s5_udp_data);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// A raw packet was received on the tunnel interface.
|
// A raw packet was received on the tunnel interface.
|
||||||
fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> {
|
fn receive_tun(&mut self, frame: &mut [u8]) -> Result<(), Error> {
|
||||||
let mut handler = || -> Result<(), Error> {
|
let mut handler = || -> Result<(), Error> {
|
||||||
|
@ -527,41 +572,7 @@ impl<'a> TunToProxy<'a> {
|
||||||
self.send_udp_packet_to_client(origin_dst, connection_info.src, response.as_slice())?;
|
self.send_udp_packet_to_client(origin_dst, connection_info.src, response.as_slice())?;
|
||||||
} else {
|
} else {
|
||||||
// Another UDP packet
|
// Another UDP packet
|
||||||
if !self.connection_map.contains_key(&connection_info) {
|
self.deal_with_incoming_udp_packets(&manager, &connection_info, origin_dst, payload)?;
|
||||||
log::info!("UDP associate session {} ({})", connection_info, origin_dst);
|
|
||||||
let tcp_proxy_handler = manager.new_tcp_proxy(&connection_info, true)?;
|
|
||||||
#[rustfmt::skip]
|
|
||||||
let mut state = self.create_new_tcp_connection_state(server_addr, origin_dst, tcp_proxy_handler, true)?;
|
|
||||||
state.udp_origin_dst = Some(origin_dst);
|
|
||||||
self.connection_map.insert(connection_info.clone(), state);
|
|
||||||
|
|
||||||
self.expect_smoltcp_send()?;
|
|
||||||
self.tunsocket_read_and_forward(&connection_info)?;
|
|
||||||
self.write_to_server(&connection_info)?;
|
|
||||||
} else {
|
|
||||||
log::trace!("Subsequent udp packet {} ({})", connection_info, origin_dst);
|
|
||||||
}
|
|
||||||
|
|
||||||
let err = "udp associate state not find";
|
|
||||||
let state = self.connection_map.get_mut(&connection_info).ok_or(err)?;
|
|
||||||
assert!(state.expiry.is_some());
|
|
||||||
state.expiry = Some(Self::udp_associate_timeout());
|
|
||||||
|
|
||||||
// Add SOCKS5 UDP header to the incoming data
|
|
||||||
let mut s5_udp_data = Vec::<u8>::new();
|
|
||||||
UdpHeader::new(0, connection_info.dst.clone()).write_to_stream(&mut s5_udp_data)?;
|
|
||||||
s5_udp_data.extend_from_slice(payload);
|
|
||||||
|
|
||||||
if let Some(udp_associate) = state.tcp_proxy_handler.get_udp_associate() {
|
|
||||||
// UDP associate session has been established, we can send packets directly...
|
|
||||||
if let Some(socket) = state.udp_socket.as_ref() {
|
|
||||||
socket.send_to(&s5_udp_data, udp_associate)?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// UDP associate tunnel not ready yet, we must cache the packets...
|
|
||||||
log::trace!("Cache udp packet {} ({})", connection_info, origin_dst);
|
|
||||||
state.udp_data_cache.push_back(s5_udp_data);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log::warn!("Unsupported protocol: {} ({})", connection_info, origin_dst);
|
log::warn!("Unsupported protocol: {} ({})", connection_info, origin_dst);
|
||||||
|
|
Loading…
Add table
Reference in a new issue