diff --git a/Cargo.toml b/Cargo.toml index 865c553..e0b7980 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,6 @@ base64 = "0.22" url = "2.5" sha2 = "0.10" whoami = "1.5" -kcp-sys="0.1" [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] mac_address = "1.1" diff --git a/protos/rendezvous.proto b/protos/rendezvous.proto index 7bd02c6..5aa22ac 100644 --- a/protos/rendezvous.proto +++ b/protos/rendezvous.proto @@ -23,12 +23,20 @@ message PunchHoleRequest { ConnType conn_type = 4; string token = 5; string version = 6; + int32 udp_port = 7; + bool force_relay = 8; + int32 upnp_port = 9; + bytes socket_addr_v6 = 10; } message PunchHole { bytes socket_addr = 1; string relay_server = 2; NatType nat_type = 3; + int32 udp_port = 4; + bool force_relay = 5; + int32 upnp_port = 6; + bytes socket_addr_v6 = 7; } message TestNatRequest { @@ -53,7 +61,8 @@ message PunchHoleSent { string relay_server = 3; NatType nat_type = 4; string version = 5; - bool is_udp = 6; + int32 upnp_port = 6; + bytes socket_addr_v6 = 7; } message RegisterPk { @@ -96,6 +105,8 @@ message PunchHoleResponse { string other_failure = 7; int32 feedback = 8; bool is_udp = 9; + int32 upnp_port = 10; + bytes socket_addr_v6 = 11; } message ConfigUpdate { @@ -125,6 +136,8 @@ message RelayResponse { string refuse_reason = 6; string version = 7; int32 feedback = 9; + bytes socket_addr_v6 = 10; + int32 upnp_port = 11; } message SoftwareUpdate { string url = 1; } @@ -136,6 +149,7 @@ message SoftwareUpdate { string url = 1; } message FetchLocalAddr { bytes socket_addr = 1; string relay_server = 2; + bytes socket_addr_v6 = 3; } message LocalAddr { @@ -144,6 +158,7 @@ message LocalAddr { string relay_server = 3; string id = 4; string version = 5; + bytes socket_addr_v6 = 6; } message PeerDiscovery { diff --git a/src/kcp_stream.rs b/src/kcp_stream.rs deleted file mode 100644 index 6f1d27e..0000000 --- a/src/kcp_stream.rs +++ /dev/null @@ -1,128 +0,0 @@ -use crate::tcp::{DynTcpStream, FramedStream}; -use kcp_sys::{ - endpoint::*, - packet_def::{Bytes, BytesMut, KcpPacket}, - stream, -}; -use std::{net::SocketAddr, sync::Arc}; -use tokio::{net::UdpSocket, sync::mpsc}; - -pub struct KcpStream { - pub endpoint: Arc, - pub stream: FramedStream, -} - -impl KcpStream { - fn create_framed(stream: stream::KcpStream, local_addr: Option) -> FramedStream { - FramedStream( - tokio_util::codec::Framed::new( - DynTcpStream(Box::new(stream)), - crate::bytes_codec::BytesCodec::new(), - ), - local_addr.unwrap_or(crate::config::Config::get_any_listen_addr(true)), - None, - 0, - ) - } - - pub async fn accept( - udp_socket: Arc, - from_addr: SocketAddr, - ) -> crate::ResultType { - let mut endpoint = KcpEndpoint::new(); - endpoint.run().await; - - let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap()); - udp_socket.connect(&[from_addr][..]).await?; - Self::kcp_io(udp_socket.clone(), input, output).await; - - let conn_id = endpoint.accept().await?; - if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) { - Ok(Self { - endpoint: Arc::new(endpoint), - stream: Self::create_framed(stream, udp_socket.local_addr().ok()), - }) - } else { - Err(anyhow::anyhow!("Failed to create KcpStream")) - } - } - - pub async fn connect( - udp_socket: Arc, - to_addr: SocketAddr, - timeout: std::time::Duration, - ) -> crate::ResultType { - let mut endpoint = KcpEndpoint::new(); - endpoint.run().await; - - let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap()); - udp_socket.connect(&[to_addr][..]).await?; - Self::kcp_io(udp_socket.clone(), input, output).await; - - let conn_id = endpoint.connect(timeout, 0, 0, Bytes::new()).await.unwrap(); - if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) { - Ok(Self { - endpoint: Arc::new(endpoint), - stream: Self::create_framed(stream, udp_socket.local_addr().ok()), - }) - } else { - Err(anyhow::anyhow!("Failed to create KcpStream")) - } - } - - async fn kcp_io( - udp_socket: Arc, - input: mpsc::Sender, - mut output: mpsc::Receiver, - ) { - let udp = udp_socket.clone(); - tokio::spawn(async move { - loop { - tokio::select! { - Some(data) = output.recv() => { - if let Err(e) = udp.send(&data.inner()).await { - // Break on fatal errors, but ignore WouldBlock or Interrupted - if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted { - log::error!("kcp send error: {:?}", e); - break; - } - } - } - else => { - log::debug!("kcp endpoint output closed"); - break; - } - } - } - }); - - let udp = udp_socket.clone(); - tokio::spawn(async move { - let mut buf = vec![0; 10240]; - loop { - tokio::select! { - result = udp.recv_from(&mut buf) => { - match result { - Ok((size, _)) => { - input - .send(BytesMut::from(&buf[..size]).into()) - .await.ok(); - } - Err(e) => { - // Break on fatal errors, but ignore WouldBlock or Interrupted - if e.kind() != std::io::ErrorKind::WouldBlock && e.kind() != std::io::ErrorKind::Interrupted { - log::error!("kcp recv_from error: {:?}", e); - break; - } - } - } - } - else => { - log::debug!("kcp endpoint input closed"); - break; - } - } - } - }); - } -} diff --git a/src/lib.rs b/src/lib.rs index 5f8027c..3f24fdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,6 @@ pub mod websocket; pub mod stream; pub use stream::Stream; pub use whoami; -pub mod kcp_stream; pub type SessionID = uuid::Uuid; diff --git a/src/socket_client.rs b/src/socket_client.rs index ee37eb0..f0e5b05 100644 --- a/src/socket_client.rs +++ b/src/socket_client.rs @@ -6,8 +6,8 @@ use crate::{ ResultType, Stream, }; use anyhow::Context; -use std::net::SocketAddr; -use tokio::net::ToSocketAddrs; +use std::{net::SocketAddr, sync::Arc}; +use tokio::net::{ToSocketAddrs, UdpSocket}; use tokio_socks::{IntoTargetAddr, TargetAddr}; #[inline] @@ -207,6 +207,14 @@ async fn test_target(target: &str) -> ResultType { .context(format!("Failed to look up host for {target}")) } +#[inline] +pub async fn new_direct_udp_for(target: &str) -> ResultType<(Arc, SocketAddr)> { + let peer_addr = test_target(target).await?; + let local_addr = Config::get_any_listen_addr(peer_addr.is_ipv4()); + let socket = UdpSocket::bind(local_addr).await?; + Ok((Arc::new(socket), peer_addr)) +} + #[inline] pub async fn new_udp_for( target: &str, diff --git a/src/tcp.rs b/src/tcp.rs index 8852c8d..2296edb 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -22,16 +22,16 @@ use tokio_socks::IntoTargetAddr; use tokio_util::codec::Framed; pub trait TcpStreamTrait: AsyncRead + AsyncWrite + Unpin {} -pub struct DynTcpStream(pub(crate) Box); +pub struct DynTcpStream(pub Box); #[derive(Clone)] pub struct Encrypt(pub Key, pub u64, pub u64); pub struct FramedStream( - pub(crate) Framed, - pub(crate) SocketAddr, - pub(crate) Option, - pub(crate) u64, + pub Framed, + pub SocketAddr, + pub Option, + pub u64, ); impl Deref for FramedStream {