diff --git a/.dockerignore b/.dockerignore new file mode 120000 index 0000000..3e4e48b --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +.gitignore \ No newline at end of file diff --git a/.github/workflows/auto-merge.yaml b/.github/workflows/auto-merge.yaml new file mode 100644 index 0000000..1280f57 --- /dev/null +++ b/.github/workflows/auto-merge.yaml @@ -0,0 +1,20 @@ +name: Dependabot Auto Merge + +on: + pull_request_target: + types: [labeled] + +jobs: + auto: + if: github.actor == 'dependabot[bot]' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + - name: Auto approve pull request, then squash and merge + uses: ahmadnassri/action-dependabot-auto-merge@v2 + with: + # target: minor + # here `PAT_REPO_ADMIN` is a user's passkey provided by github. + github-token: ${{ secrets.PAT_REPO_ADMIN }} diff --git a/.github/workflows/close-stale-issues.yml b/.github/workflows/close-stale-issues.yml new file mode 100644 index 0000000..d06ff8a --- /dev/null +++ b/.github/workflows/close-stale-issues.yml @@ -0,0 +1,26 @@ +name: Close stale issues and PRs + +on: + schedule: + - cron: "0 0 * * *" # run a cron job every day at midnight + +jobs: + stale: + runs-on: ubuntu-latest + steps: + - name: Close stale issues and PRs + uses: actions/stale@v9 + with: + stale-issue-message: 'This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days.' + stale-pr-message: 'This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.' + close-issue-message: 'This issue was closed because it has been stalled for 5 days with no activity.' + close-pr-message: 'This PR was closed because it has been stalled for 10 days with no activity.' + days-before-issue-stale: 30 + days-before-pr-stale: 45 + days-before-issue-close: 5 + days-before-pr-close: 10 + stale-issue-label: 'no-issue-activity' + exempt-issue-labels: 'keep-open,awaiting-approval,work-in-progress' + stale-pr-label: 'no-pr-activity' + exempt-pr-labels: 'awaiting-approval,work-in-progress' + # only-labels: 'awaiting-feedback,awaiting-answers' diff --git a/.github/workflows/publish-docker.yml b/.github/workflows/publish-docker.yml index 10db660..541e53e 100644 --- a/.github/workflows/publish-docker.yml +++ b/.github/workflows/publish-docker.yml @@ -1,7 +1,5 @@ -# -name: Create and publish a Docker image +name: Publish Docker Images -# Configures this workflow to run every time a change is pushed to the branch called `release`. on: push: tags: [ 'v*.*.*' ] @@ -9,12 +7,20 @@ on: # Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. env: REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} + # This also contains the owner, i.e. tun2proxy/tun2proxy. + IMAGE_PATH: ${{ github.repository }} + IMAGE_NAME: ${{ github.event.repository.name }} + DEFAULT_OS: scratch # There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. jobs: build-and-push-image: + name: Build and push Docker image runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + os: [ 'scratch', 'ubuntu', 'alpine' ] # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. permissions: contents: read @@ -31,30 +37,36 @@ jobs: - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - + # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. - name: Log in to the Container registry - uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + uses: docker/login-action@v3 with: registry: ${{ env.REGISTRY }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. - - name: Extract metadata (tags, labels) for Docker + - name: Extract metadata (tags, labels) for Docker Image id: meta - uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + uses: docker/metadata-action@v5 with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - + # We publish the images with an OS-suffix. + # The image based on a default OS is also published without a suffix. + images: | + ${{ env.REGISTRY }}/${{ env.IMAGE_PATH }}-${{ matrix.os }} + ${{ env.DEFAULT_OS == matrix.os && format('{0}/{1}', env.REGISTRY, env.IMAGE_PATH) || '' }} + # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. - name: Build and push Docker image - uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 + uses: docker/build-push-action@v6 with: platforms: linux/amd64,linux/arm64 context: . + file: Dockerfile + target: ${{ env.IMAGE_NAME }}-${{ matrix.os }} push: true tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} diff --git a/.github/workflows/publish-exe.yml b/.github/workflows/publish-exe.yml index 778203d..697f352 100644 --- a/.github/workflows/publish-exe.yml +++ b/.github/workflows/publish-exe.yml @@ -1,4 +1,5 @@ on: + workflow_dispatch: push: tags: - "v*.*.*" @@ -15,6 +16,7 @@ jobs: attestations: write strategy: + fail-fast: false matrix: target: - x86_64-unknown-linux-gnu @@ -33,7 +35,7 @@ jobs: include: - target: x86_64-unknown-linux-gnu - host_os: ubuntu-latest + host_os: ubuntu-22.04 - target: x86_64-unknown-linux-musl host_os: ubuntu-latest - target: i686-unknown-linux-musl @@ -72,14 +74,15 @@ jobs: rustup target add ${{ matrix.target }} fi cargo install cbindgen - if [[ "${{ matrix.host_os }}" == "ubuntu-latest" ]]; then + if [[ "${{ contains(matrix.host_os, 'ubuntu') }}" == "true" && "${{ matrix.host_os }}" != "ubuntu-22.04" ]]; then sudo .github/workflows/install-cross.sh fi - name: Build + if: ${{ !cancelled() }} shell: bash run: | - if [[ "${{ matrix.host_os }}" == "ubuntu-latest" ]]; then + if [[ "${{ contains(matrix.host_os, 'ubuntu') }}" == "true" && "${{ matrix.host_os }}" != "ubuntu-22.04" ]]; then cross build --all-features --release --target ${{ matrix.target }} else if [[ "${{ matrix.target }}" == "x86_64-win7-windows-msvc" || "${{ matrix.target }}" == "i686-win7-windows-msvc" ]]; then @@ -90,21 +93,21 @@ jobs: cargo build --all-features --release --target ${{ matrix.target }} fi fi - cbindgen --config cbindgen.toml -l C --cpp-compat -o target/tun2proxy-ffi.h + cbindgen --config cbindgen.toml -o target/tun2proxy.h if [[ "${{ matrix.host_os }}" == "windows-latest" ]]; then powershell -Command "(Get-Item README.md).LastWriteTime = Get-Date" powershell -Command "(Get-Item target/${{ matrix.target }}/release/wintun.dll).LastWriteTime = Get-Date" - powershell Compress-Archive -Path target/${{ matrix.target }}/release/tun2proxy-bin.exe, README.md, target/tun2proxy-ffi.h, target/${{ matrix.target }}/release/tun2proxy.dll, target/${{ matrix.target }}/release/wintun.dll -DestinationPath mypubdir4/tun2proxy-${{ matrix.target }}.zip + powershell Compress-Archive -Path target/${{ matrix.target }}/release/tun2proxy-bin.exe, target/${{ matrix.target }}/release/udpgw-server.exe, README.md, target/tun2proxy.h, target/${{ matrix.target }}/release/tun2proxy.dll, target/${{ matrix.target }}/release/wintun.dll -DestinationPath mypubdir4/tun2proxy-${{ matrix.target }}.zip elif [[ "${{ matrix.host_os }}" == "macos-latest" ]]; then - zip -j mypubdir4/tun2proxy-${{ matrix.target }}.zip target/${{ matrix.target }}/release/tun2proxy-bin README.md target/tun2proxy-ffi.h target/${{ matrix.target }}/release/libtun2proxy.dylib + zip -j mypubdir4/tun2proxy-${{ matrix.target }}.zip target/${{ matrix.target }}/release/tun2proxy-bin target/${{ matrix.target }}/release/udpgw-server README.md target/tun2proxy.h target/${{ matrix.target }}/release/libtun2proxy.dylib if [[ "${{ matrix.target }}" == "x86_64-apple-darwin" ]]; then ./build-aarch64-apple-ios.sh zip -r mypubdir4/tun2proxy-aarch64-apple-ios-xcframework.zip ./tun2proxy.xcframework/ ./build-apple.sh zip -r mypubdir4/tun2proxy-apple-xcframework.zip ./tun2proxy.xcframework/ fi - elif [[ "${{ matrix.host_os }}" == "ubuntu-latest" ]]; then - zip -j mypubdir4/tun2proxy-${{ matrix.target }}.zip target/${{ matrix.target }}/release/tun2proxy-bin README.md target/tun2proxy-ffi.h target/${{ matrix.target }}/release/libtun2proxy.so + elif [[ "${{ contains(matrix.host_os, 'ubuntu') }}" == "true" ]]; then + zip -j mypubdir4/tun2proxy-${{ matrix.target }}.zip target/${{ matrix.target }}/release/tun2proxy-bin target/${{ matrix.target }}/release/udpgw-server README.md target/tun2proxy.h target/${{ matrix.target }}/release/libtun2proxy.so if [[ "${{ matrix.target }}" == "x86_64-unknown-linux-gnu" ]]; then ./build-android.sh cp ./tun2proxy-android-libs.zip ./mypubdir4/ @@ -112,20 +115,26 @@ jobs: fi - name: Upload artifacts + if: ${{ !cancelled() }} uses: actions/upload-artifact@v4 with: name: bin-${{ matrix.target }} path: mypubdir4/* - name: Generate artifact attestation + if: ${{ !cancelled() }} uses: actions/attest-build-provenance@v1 with: subject-path: mypubdir4/* - name: Publish + if: ${{ !cancelled() }} uses: softprops/action-gh-release@v1 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: files: mypubdir4/* + - name: Abort on error + if: ${{ failure() }} + run: echo "Some of jobs failed" && false diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 13992eb..2e57100 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -1,12 +1,15 @@ name: Push or PR on: + workflow_dispatch: push: branches: - '**' pull_request: branches: - '**' + schedule: + - cron: '0 0 * * 0' # Every Sunday at midnight UTC env: CARGO_TERM_COLOR: always @@ -47,6 +50,53 @@ jobs: if: ${{ failure() }} run: echo "Some of jobs failed" && false + build_n_test_android: + strategy: + fail-fast: false + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Install cargo ndk and rust compiler for android target + if: ${{ !cancelled() }} + run: | + cargo install --locked cargo-ndk + rustup target add x86_64-linux-android + - name: clippy + if: ${{ !cancelled() }} + run: cargo ndk -t x86_64 clippy --all-features -- -D warnings + - name: Build + if: ${{ !cancelled() }} + run: | + cargo ndk -t x86_64 rustc --verbose --all-features --lib --crate-type=cdylib + - name: Abort on error + if: ${{ failure() }} + run: echo "Android build job failed" && false + + build_n_test_ios: + strategy: + fail-fast: false + runs-on: macos-latest + + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - name: Install cargo lipo and rust compiler for ios target + if: ${{ !cancelled() }} + run: | + cargo install --locked cargo-lipo + rustup target add x86_64-apple-ios aarch64-apple-ios + - name: clippy + if: ${{ !cancelled() }} + run: cargo clippy --target x86_64-apple-ios --all-features -- -D warnings + - name: Build + if: ${{ !cancelled() }} + run: | + cargo lipo --verbose --all-features + - name: Abort on error + if: ${{ failure() }} + run: echo "iOS build job failed" && false + semver: name: Check semver strategy: diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c274674..32d02f5 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,18 +12,16 @@ jobs: proxy_tests: name: Proxy Tests runs-on: ubuntu-latest - if: github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'safe to test') + if: (github.event_name != 'pull_request' || contains(github.event.pull_request.labels.*.name, 'safe to test')) && github.actor != 'dependabot[bot]' && github.actor != 'github-actions[bot]' steps: - - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: stable - override: true + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable - name: Populate .env env: DOTENV: ${{ secrets.DOTENV }} - run: echo "$DOTENV" > .env + run: | + echo "$DOTENV" > tests/.env + ln -s tests/.env - name: Set up Python uses: actions/setup-python@v2 @@ -40,7 +38,7 @@ jobs: - name: Build project run: cargo build --release - + - name: Run tests run: | source venv/bin/activate diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..4a7b842 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,11 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v2.3.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/rhysd/actionlint + rev: v1.7.7 + hooks: + - id: actionlint diff --git a/Cargo.toml b/Cargo.toml index d237122..04c93b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,69 +1,82 @@ [package] name = "tun2proxy" -version = "0.5.4" -edition = "2021" +version = "0.7.11" +edition = "2024" license = "MIT" repository = "https://github.com/tun2proxy/tun2proxy" homepage = "https://github.com/tun2proxy/tun2proxy" authors = ["B. Blechschmidt", "ssrlive"] description = "Tunnel interface to proxy" readme = "README.md" -rust-version = "1.80" +rust-version = "1.85" [lib] crate-type = ["staticlib", "cdylib", "lib"] -[dependencies] -async-trait = "0.1" -base64 = { version = "0.22" } -chrono = "0.4" -clap = { version = "4", features = ["derive", "wrap_help", "color"] } -ctrlc2 = { version = "3", features = ["tokio", "termination"] } -digest_auth = "0.3" -dotenvy = "0.15" -env_logger = "0.11" -hashlink = "0.9" -hickory-proto = "0.24" -httparse = "1" -ipstack = { version = "0.1" } -log = { version = "0.4", features = ["std"] } -mimalloc = { version = "0.1", default-features = false, optional = true } -percent-encoding = "2" -socks5-impl = { version = "0.5" } -thiserror = "1" -tokio = { version = "1", features = ["full"] } -tokio-util = "0.7" -tproxy-config = { version = "6", default-features = false } -tun2 = { version = "3", features = ["async"] } -udp-stream = { version = "0.0.12", default-features = false } -unicase = "2" -url = "2" - -[target.'cfg(target_os="linux")'.dependencies] -serde = { version = "1", features = ["derive"] } -bincode = "1" -nix = { version = "0.29", default-features = false, features = [ - "fs", - "socket", - "uio", -] } - -[target.'cfg(target_os="android")'.dependencies] -android_logger = "0.14" -jni = { version = "0.21", default-features = false } - -[target.'cfg(unix)'.dependencies] -daemonize = "0.5" - -[target.'cfg(target_os = "windows")'.dependencies] -windows-service = "0.7" - -[build-dependencies] -serde_json = "1" - [[bin]] name = "tun2proxy-bin" path = "src/bin/main.rs" -[profile.release] -strip = "symbols" +[[bin]] +name = "udpgw-server" +path = "src/bin/udpgw_server.rs" +required-features = ["udpgw"] + +[features] +default = ["udpgw"] +udpgw = [] + +[dependencies] +async-trait = "0.1" +base64easy = "0.1" +chrono = "0.4" +clap = { version = "4", features = ["derive", "wrap_help", "color"] } +ctrlc2 = { version = "3.6.5", features = ["async", "termination"] } +digest_auth = "0.3" +dotenvy = "0.15" +env_logger = "0.11" +hashlink = "0.10" +hickory-proto = "0.25" +httparse = "1" +ipstack = { version = "0.4" } +log = { version = "0.4", features = ["std"] } +mimalloc = { version = "0.1", default-features = false, optional = true } +percent-encoding = "2" +shlex = "1.3.0" +socks5-impl = { version = "0.7", default-features = false, features = [ + "tokio", +] } +thiserror = "2" +tokio = { version = "1", features = ["full"] } +tokio-util = "0.7" +tproxy-config = { version = "7", default-features = false } +tun = { version = "0.8", features = ["async"] } +udp-stream = { version = "0.0.12", default-features = false } +unicase = "2" +url = "2" + +[target.'cfg(target_os="android")'.dependencies] +android_logger = "0.15" +jni = { version = "0.21", default-features = false } + +[target.'cfg(target_os="linux")'.dependencies] +bincode = "2" +serde = { version = "1", features = ["derive"] } + +[target.'cfg(target_os="windows")'.dependencies] +windows-service = "0.8" + +[target.'cfg(unix)'.dependencies] +daemonize = "0.5" +nix = { version = "0.30", default-features = false, features = [ + "fs", + "socket", + "uio", +] } + +[build-dependencies] +chrono = "0.4" +serde_json = "1" + +# [profile.release] +# strip = "symbols" diff --git a/Dockerfile b/Dockerfile index e6ad592..f7aafdc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,20 +1,61 @@ #################################################################################################### -## Builder +# This is a multi-stage Dockerfile. +# Build with `docker buildx build -t --target .` +# For example, to build the Alpine-based image while naming it tun2proxy, run: +# `docker buildx build -t tun2proxy --target tun2proxy-alpine .` #################################################################################################### -FROM rust:latest AS builder - -WORKDIR /worker -COPY ./ . -RUN cargo build --release - #################################################################################################### -## Final image +## glibc builder #################################################################################################### -FROM ubuntu:latest +FROM rust:latest AS glibc-builder -RUN apt update && apt install -y iproute2 && apt clean all + WORKDIR /worker + COPY ./ . + RUN cargo build --release -COPY --from=builder /worker/target/release/tun2proxy-bin /usr/bin/tun2proxy-bin +#################################################################################################### +## musl builder +#################################################################################################### +FROM rust:latest AS musl-builder -ENTRYPOINT ["/usr/bin/tun2proxy-bin", "--setup"] + WORKDIR /worker + COPY ./ . + RUN ARCH=$(rustc -vV | sed -nE 's/host:\s*([^-]+).*/\1/p') \ + && rustup target add "$ARCH-unknown-linux-musl" \ + && cargo build --release --target "$ARCH-unknown-linux-musl" + + RUN mkdir /.etc \ + && touch /.etc/resolv.conf \ + && mkdir /.tmp \ + && chmod 777 /.tmp \ + && chmod +t /.tmp + +#################################################################################################### +## Alpine image +#################################################################################################### +FROM alpine:latest AS tun2proxy-alpine + + COPY --from=musl-builder /worker/target/*/release/tun2proxy-bin /usr/bin/tun2proxy-bin + + ENTRYPOINT ["/usr/bin/tun2proxy-bin", "--setup"] + +#################################################################################################### +## Ubuntu image +#################################################################################################### +FROM ubuntu:latest AS tun2proxy-ubuntu + + COPY --from=glibc-builder /worker/target/release/tun2proxy-bin /usr/bin/tun2proxy-bin + + ENTRYPOINT ["/usr/bin/tun2proxy-bin", "--setup"] + +#################################################################################################### +## OS-less image (default) +#################################################################################################### +FROM scratch AS tun2proxy-scratch + + COPY --from=musl-builder ./tmp /tmp + COPY --from=musl-builder ./etc /etc + COPY --from=musl-builder /worker/target/*/release/tun2proxy-bin /usr/bin/tun2proxy-bin + + ENTRYPOINT ["/usr/bin/tun2proxy-bin", "--setup"] diff --git a/README.md b/README.md index a236c7f..e813cb0 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ +[![tun2proxy](https://socialify.git.ci/tun2proxy/tun2proxy/image?description=1&language=1&name=1&stargazers=1&theme=Light)](https://github.com/tun2proxy/tun2proxy) + # tun2proxy A tunnel interface for HTTP and SOCKS proxies on Linux, Android, macOS, iOS and Windows. [![Crates.io](https://img.shields.io/crates/v/tun2proxy.svg)](https://crates.io/crates/tun2proxy) -![tun2proxy](https://docs.rs/tun2proxy/badge.svg) +[![tun2proxy](https://docs.rs/tun2proxy/badge.svg)](https://docs.rs/tun2proxy) [![Documentation](https://img.shields.io/badge/docs-release-brightgreen.svg?style=flat)](https://docs.rs/tun2proxy) [![Download](https://img.shields.io/crates/d/tun2proxy.svg)](https://crates.io/crates/tun2proxy) [![License](https://img.shields.io/crates/l/tun2proxy.svg?style=flat)](https://github.com/tun2proxy/tun2proxy/blob/master/LICENSE) @@ -18,6 +20,7 @@ A tunnel interface for HTTP and SOCKS proxies on Linux, Android, macOS, iOS and - GFW evasion mechanism for certain use cases (see [issue #35](https://github.com/tun2proxy/tun2proxy/issues/35)) - SOCKS5 UDP support - Native support for proxying DNS over TCP +- UdpGW (UDP gateway) support for UDP over TCP, see the [wiki](https://github.com/tun2proxy/tun2proxy/wiki/UDP-gateway-feature) for more information ## Build Clone the repository and `cd` into the project folder. Then run the following: @@ -26,7 +29,7 @@ cargo build --release ``` ### Building Framework for Apple Devices -To build an XCFramework for macOS and iOS, run the following: +To build an XCFramework for macOS and iOS, run the following: ``` ./build-apple.sh ``` @@ -129,40 +132,61 @@ Tunnel interface to proxy. Usage: tun2proxy-bin [OPTIONS] --proxy [ADMIN_COMMAND]... Arguments: - [ADMIN_COMMAND]... Specify a command to run with root-like capabilities in the new namespace when using `--unshare`. - This could be useful to start additional daemons, e.g. `openvpn` instance + [ADMIN_COMMAND]... Specify a command to run with root-like capabilities in the new namespace when using `--unshare`. This could be + useful to start additional daemons, e.g. `openvpn` instance Options: - -p, --proxy Proxy URL in the form proto://[username[:password]@]host:port, where proto is one of - socks4, socks5, http. For example: socks5://myname:password@127.0.0.1:1080 - -t, --tun Name of the tun interface, such as tun0, utun4, etc. If this option is not provided, the - OS will generate a random one - --tun-fd File descriptor of the tun interface - --unshare Create a tun interface in a newly created unprivileged namespace while maintaining proxy - connectivity via the global network namespace - -6, --ipv6-enabled IPv6 enabled - -s, --setup Routing and system setup, which decides whether to setup the routing and system - configuration. This option is only available on Linux and requires root-like privileges. - See `capabilities(7)` - -d, --dns DNS handling strategy [default: direct] [possible values: virtual, over-tcp, direct] - --dns-addr DNS resolver address [default: 8.8.8.8] - -b, --bypass IPs used in routing setup which should bypass the tunnel, in the form of IP or IP/CIDR. - Multiple IPs can be specified, e.g. --bypass 3.4.5.0/24 --bypass 5.6.7.8 - --tcp-timeout TCP timeout in seconds [default: 600] - --udp-timeout UDP timeout in seconds [default: 10] - -v, --verbosity Verbosity level [default: info] [possible values: off, error, warn, info, debug, trace] - -h, --help Print help - -V, --version Print version + -p, --proxy Proxy URL in the form proto://[username[:password]@]host:port, where proto is one of + socks4, socks5, http. Username and password are encoded in percent encoding. For example: + socks5://myname:pass%40word@127.0.0.1:1080 + -t, --tun Name of the tun interface, such as tun0, utun4, etc. If this option is not provided, the + OS will generate a random one + --tun-fd File descriptor of the tun interface + --close-fd-on-drop Set whether to close the received raw file descriptor on drop or not. This setting is + dependent on [tun_fd] [possible values: true, false] + --unshare Create a tun interface in a newly created unprivileged namespace while maintaining proxy + connectivity via the global network namespace + --unshare-pidfile Create a pidfile of `unshare` process when using `--unshare` + -6, --ipv6-enabled IPv6 enabled + -s, --setup Routing and system setup, which decides whether to setup the routing and system + configuration. This option requires root-like privileges on every platform. + It is very important on Linux, see `capabilities(7)` + -d, --dns DNS handling strategy [default: direct] [possible values: virtual, over-tcp, direct] + --dns-addr DNS resolver address [default: 8.8.8.8] + --virtual-dns-pool IP address pool to be used by virtual DNS in CIDR notation [default: 198.18.0.0/15] + -b, --bypass IPs used in routing setup which should bypass the tunnel, in the form of IP or IP/CIDR. + Multiple IPs can be specified, e.g. --bypass 3.4.5.0/24 --bypass 5.6.7.8 + --tcp-timeout TCP timeout in seconds [default: 600] + --udp-timeout UDP timeout in seconds [default: 10] + -v, --verbosity Verbosity level [default: info] [possible values: off, error, warn, info, debug, trace] + --daemonize Daemonize for unix family or run as Windows service + --exit-on-fatal-error Exit immediately when fatal error occurs, useful for running as a service + --max-sessions Maximum number of sessions to be handled concurrently [default: 200] + --udpgw-server UDP gateway server address, forwards UDP packets via specified TCP server + --udpgw-connections Max connections for the UDP gateway, default value is 5 + --udpgw-keepalive Keepalive interval in seconds for the UDP gateway, default value is 30 + -h, --help Print help + -V, --version Print version ``` Currently, tun2proxy supports HTTP, SOCKS4/SOCKS4a and SOCKS5. A proxy is supplied to the `--proxy` argument in the URL format. For example, an HTTP proxy at `1.2.3.4:3128` with a username of `john.doe` and a password of `secret` is supplied as `--proxy http://john.doe:secret@1.2.3.4:3128`. This works analogously to curl's `--proxy` argument. -## Docker Support +## Container Support +### Docker Tun2proxy can serve as a proxy for other Docker containers. To make use of that feature, first build the image: ```bash -docker build -t tun2proxy . +docker buildx build -t tun2proxy . +``` + +This will build an image containing a statically linked `tun2proxy` binary (based on `musl`) without OS. + +Alternatively, you can build images based on Ubuntu or Alpine as follows: + +```bash +docker buildx build -t tun2proxy --target tun2proxy-ubuntu . +docker buildx build -t tun2proxy --target tun2proxy-alpine . ``` Next, start a container from the tun2proxy image: @@ -173,7 +197,7 @@ docker run -d \ --sysctl net.ipv6.conf.default.disable_ipv6=0 \ --cap-add NET_ADMIN \ --name tun2proxy \ - tun2proxy-bin --proxy proto://[username[:password]@]host:port + tun2proxy --proxy proto://[username[:password]@]host:port ``` You can then provide the running container's network to another worker container by sharing the network namespace (like kubernetes sidecar): @@ -183,6 +207,36 @@ docker run -it \ --network "container:tun2proxy" \ ubuntu:latest ``` +### Docker Compose + +Create a `docker-compose.yaml` file with the following content: + +```yaml +services: + tun2proxy: + volumes: + - /dev/net/tun:/dev/net/tun + sysctls: + - net.ipv6.conf.default.disable_ipv6=0 + cap_add: + - NET_ADMIN + container_name: tun2proxy + image: ghcr.io/tun2proxy/tun2proxy-ubuntu:latest + command: --proxy proto://[username[:password]@]host:port + alpine: + stdin_open: true + tty: true + network_mode: container:tun2proxy + image: alpine:latest + command: apk add curl && curl ifconfig.icu && sleep 10 +``` + +Then run the compose file + +```bash +docker compose up -d tun2proxy +docker compose up alpine +``` ## Configuration Tips ### DNS @@ -204,3 +258,10 @@ asked to open connections to IPv6 destinations. In such a case, you can disable either through `sysctl -w net.ipv6.conf.all.disable_ipv6=1` and `sysctl -w net.ipv6.conf.default.disable_ipv6=1` or through `ip -6 route del default`, which causes the `libc` resolver (and other software) to not issue DNS AAAA requests for IPv6 addresses. + +## Contributors ✨ +Thanks goes to these wonderful people: + + + + diff --git a/build-aarch64-apple-ios-debug.sh b/build-aarch64-apple-ios-debug.sh index 7642758..6c97921 100755 --- a/build-aarch64-apple-ios-debug.sh +++ b/build-aarch64-apple-ios-debug.sh @@ -10,7 +10,7 @@ cargo build --target aarch64-apple-ios --features mimalloc echo "Generating includes..." mkdir -p target/include/ rm -rf target/include/* -cbindgen --config cbindgen.toml -l C --cpp-compat -o target/include/tun2proxy.h +cbindgen --config cbindgen.toml -o target/include/tun2proxy.h cat > target/include/tun2proxy.modulemap < target/include/tun2proxy.modulemap < target/include/tun2proxy.modulemap < Result<(), Box> { + if let Ok(git_hash) = get_git_hash() { + // Set the environment variables + println!("cargo:rustc-env=GIT_HASH={}", git_hash.trim()); + } + + // Get the build time + let build_time = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(); + println!("cargo:rustc-env=BUILD_TIME={build_time}"); + #[cfg(target_os = "windows")] if let Ok(cargo_target_dir) = get_cargo_target_dir() { let mut f = std::fs::File::create(cargo_target_dir.join("build.log"))?; @@ -19,7 +28,7 @@ fn main() -> Result<(), Box> { // Copy to the target directory if let Err(e) = std::fs::copy(src_path, &dst_path) { - f.write_all(format!("Failed to copy 'wintun.dll': {}\r\n", e).as_bytes())?; + f.write_all(format!("Failed to copy 'wintun.dll': {e}\r\n").as_bytes())?; } else { f.write_all(format!("Copied 'wintun.dll' to '{}'\r\n", dst_path.display()).as_bytes())?; @@ -85,3 +94,10 @@ fn get_crate_dir(crate_name: &str) -> Result std::io::Result { + use std::process::Command; + let git_hash = Command::new("git").args(["rev-parse", "--short", "HEAD"]).output()?.stdout; + let git_hash = String::from_utf8(git_hash).map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + Ok(git_hash) +} diff --git a/cbindgen.toml b/cbindgen.toml index bed1b79..50ea65f 100644 --- a/cbindgen.toml +++ b/cbindgen.toml @@ -1,15 +1,19 @@ +language = "C" +cpp_compat = true + [export] include = [ + "tun2proxy_run_with_cli", "tun2proxy_with_fd_run", "tun2proxy_with_name_run", - "tun2proxy_with_name_stop", - "tun2proxy_with_fd_stop", + "tun2proxy_stop", "tun2proxy_set_log_callback", "tun2proxy_set_traffic_status_callback", ] exclude = [ "Java_com_github_shadowsocks_bg_Tun2proxy_run", "Java_com_github_shadowsocks_bg_Tun2proxy_stop", + "UdpFlag", ] [export.rename] diff --git a/src/android.rs b/src/android.rs index cca174c..030e921 100644 --- a/src/android.rs +++ b/src/android.rs @@ -1,14 +1,14 @@ #![cfg(target_os = "android")] use crate::{ + Args, args::ArgProxy, error::{Error, Result}, - Args, }; use jni::{ + JNIEnv, objects::{JClass, JString}, sys::{jboolean, jchar, jint}, - JNIEnv, }; /// # Safety @@ -21,7 +21,7 @@ use jni::{ /// - tun_mtu: the tun mtu /// - dns_strategy: the dns strategy, see ArgDns enum /// - verbosity: the verbosity level, see ArgVerbosity enum -#[no_mangle] +#[unsafe(no_mangle)] pub unsafe extern "C" fn Java_com_github_shadowsocks_bg_Tun2proxy_run( mut env: JNIEnv, _clazz: JClass, @@ -52,15 +52,15 @@ pub unsafe extern "C" fn Java_com_github_shadowsocks_bg_Tun2proxy_run( .close_fd_on_drop(close_fd_on_drop) .dns(dns) .verbosity(verbosity); - crate::mobile_api::mobile_run(args, tun_mtu, false) + crate::general_api::general_run_for_api(args, tun_mtu, false) } /// # Safety /// /// Shutdown tun2proxy -#[no_mangle] +#[unsafe(no_mangle)] pub unsafe extern "C" fn Java_com_github_shadowsocks_bg_Tun2proxy_stop(_env: JNIEnv, _: JClass) -> jint { - crate::mobile_api::mobile_stop() + crate::general_api::tun2proxy_stop_internal() } fn get_java_string(env: &mut JNIEnv, string: &JString) -> Result { diff --git a/src/apple.rs b/src/apple.rs deleted file mode 100644 index 155b101..0000000 --- a/src/apple.rs +++ /dev/null @@ -1,54 +0,0 @@ -#![cfg(any(target_os = "android", target_os = "ios", target_os = "macos"))] - -use crate::{ - args::{ArgDns, ArgProxy}, - ArgVerbosity, Args, -}; -use std::os::raw::{c_char, c_int, c_ushort}; - -/// # Safety -/// -/// Run the tun2proxy component with some arguments. -/// Parameters: -/// - proxy_url: the proxy url, e.g. "socks5://127.0.0.1:1080" -/// - tun_fd: the tun file descriptor, it will be owned by tun2proxy -/// - close_fd_on_drop: whether close the tun_fd on drop -/// - packet_information: whether exists packet information in tun_fd -/// - tun_mtu: the tun mtu -/// - dns_strategy: the dns strategy, see ArgDns enum -/// - verbosity: the verbosity level, see ArgVerbosity enum -#[no_mangle] -pub unsafe extern "C" fn tun2proxy_with_fd_run( - proxy_url: *const c_char, - tun_fd: c_int, - close_fd_on_drop: bool, - packet_information: bool, - tun_mtu: c_ushort, - dns_strategy: ArgDns, - verbosity: ArgVerbosity, -) -> c_int { - log::set_max_level(verbosity.into()); - if let Err(err) = log::set_boxed_logger(Box::::default()) { - log::warn!("failed to set logger: {:?}", err); - } - - let proxy_url = std::ffi::CStr::from_ptr(proxy_url).to_str().unwrap(); - let proxy = ArgProxy::try_from(proxy_url).unwrap(); - - let mut args = Args::default(); - args.proxy(proxy) - .tun_fd(Some(tun_fd)) - .close_fd_on_drop(close_fd_on_drop) - .dns(dns_strategy) - .verbosity(verbosity); - - crate::mobile_api::mobile_run(args, tun_mtu, packet_information) -} - -/// # Safety -/// -/// Shutdown the tun2proxy component. -#[no_mangle] -pub unsafe extern "C" fn tun2proxy_with_fd_stop() -> c_int { - crate::mobile_api::mobile_stop() -} diff --git a/src/args.rs b/src/args.rs index 352cb57..333e758 100644 --- a/src/args.rs +++ b/src/args.rs @@ -8,8 +8,19 @@ use std::ffi::OsString; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; use std::str::FromStr; +#[macro_export] +macro_rules! version_info { + () => { + concat!(env!("CARGO_PKG_VERSION"), " (", env!("GIT_HASH"), " ", env!("BUILD_TIME"), ")") + }; +} + +fn about_info() -> &'static str { + concat!("Tunnel interface to proxy.\nVersion: ", version_info!()) +} + #[derive(Debug, Clone, clap::Parser)] -#[command(author, version, about = "Tunnel interface to proxy.", long_about = None)] +#[command(author, version = version_info!(), about = about_info(), long_about = None)] pub struct Args { /// Proxy URL in the form proto://[username[:password]@]host:port, /// where proto is one of socks4, socks5, http. @@ -30,10 +41,9 @@ pub struct Args { pub tun_fd: Option, /// Set whether to close the received raw file descriptor on drop or not. - /// This setting is passed to the tun2 crate. - /// See [tun2::Configuration::close_fd_on_drop]. + /// This setting is dependent on [tun_fd]. #[cfg(unix)] - #[arg(long, value_name = "true or false", conflicts_with = "tun")] + #[arg(long, value_name = "true or false", conflicts_with = "tun", requires = "tun_fd")] pub close_fd_on_drop: Option, /// Create a tun interface in a newly created unprivileged namespace @@ -66,8 +76,9 @@ pub struct Args { pub ipv6_enabled: bool, /// Routing and system setup, which decides whether to setup the routing and system configuration. - /// This option is only available on Linux and requires root-like privileges. See `capabilities(7)`. - #[arg(short, long, default_value = if cfg!(target_os = "linux") { "false" } else { "true" })] + /// This option requires root-like privileges on every platform. + /// It is very important on Linux, see `capabilities(7)`. + #[arg(short, long)] pub setup: bool, /// DNS handling strategy @@ -111,6 +122,21 @@ pub struct Args { /// Maximum number of sessions to be handled concurrently #[arg(long, value_name = "number", default_value = "200")] pub max_sessions: usize, + + /// UDP gateway server address, forwards UDP packets via specified TCP server + #[cfg(feature = "udpgw")] + #[arg(long, value_name = "IP:PORT")] + pub udpgw_server: Option, + + /// Max connections for the UDP gateway, default value is 5 + #[cfg(feature = "udpgw")] + #[arg(long, value_name = "number", requires = "udpgw_server")] + pub udpgw_connections: Option, + + /// Keepalive interval in seconds for the UDP gateway, default value is 30 + #[cfg(feature = "udpgw")] + #[arg(long, value_name = "seconds", requires = "udpgw_server")] + pub udpgw_keepalive: Option, } fn validate_tun(p: &str) -> Result { @@ -154,6 +180,12 @@ impl Default for Args { daemonize: false, exit_on_fatal_error: false, max_sessions: 200, + #[cfg(feature = "udpgw")] + udpgw_server: None, + #[cfg(feature = "udpgw")] + udpgw_connections: None, + #[cfg(feature = "udpgw")] + udpgw_keepalive: None, } } } @@ -161,8 +193,7 @@ impl Default for Args { impl Args { #[allow(clippy::let_and_return)] pub fn parse_args() -> Self { - use clap::Parser; - let args = Self::parse(); + let args = ::parse(); #[cfg(target_os = "linux")] if !args.setup && args.tun.is_none() { eprintln!("Missing required argument, '--tun' must present when '--setup' is not used."); @@ -181,6 +212,18 @@ impl Args { self } + #[cfg(feature = "udpgw")] + pub fn udpgw_server(&mut self, udpgw: SocketAddr) -> &mut Self { + self.udpgw_server = Some(udpgw); + self + } + + #[cfg(feature = "udpgw")] + pub fn udpgw_connections(&mut self, udpgw_connections: usize) -> &mut Self { + self.udpgw_connections = Some(udpgw_connections); + self + } + #[cfg(unix)] pub fn tun_fd(&mut self, tun_fd: Option) -> &mut Self { self.tun_fd = tun_fd; @@ -336,7 +379,7 @@ impl Default for ArgProxy { impl std::fmt::Display for ArgProxy { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let auth = match &self.credentials { - Some(creds) => format!("{}", creds), + Some(creds) => format!("{creds}"), None => "".to_owned(), }; if auth.is_empty() { @@ -363,17 +406,11 @@ impl TryFrom<&str> for ArgProxy { let e = format!("`{s}` does not contain a host"); let host = url.host_str().ok_or(Error::from(e))?; - let mut url_host = String::from(host); let e = format!("`{s}` does not contain a port"); - let port = url.port().ok_or(Error::from(&e))?; - url_host.push(':'); - url_host.push_str(port.to_string().as_str()); + let port = url.port_or_known_default().ok_or(Error::from(&e))?; - let e = format!("`{host}` could not be resolved"); - let mut addr_iter = url_host.to_socket_addrs().map_err(|_| Error::from(&e))?; - - let e = format!("`{host}` does not resolve to a usable IP address"); - let addr = addr_iter.next().ok_or(Error::from(&e))?; + let e2 = format!("`{host}` does not resolve to a usable IP address"); + let addr = (host, port).to_socket_addrs()?.next().ok_or(Error::from(&e2))?; let credentials = if url.username() == "" && url.password().is_none() { None diff --git a/src/bin/main.rs b/src/bin/main.rs index e39b7b4..1f5142f 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -1,4 +1,4 @@ -use tun2proxy::{Args, BoxError}; +use tun2proxy::{ArgVerbosity, Args, BoxError}; fn main() -> Result<(), BoxError> { dotenvy::dotenv().ok(); @@ -27,48 +27,68 @@ fn main() -> Result<(), BoxError> { rt.block_on(main_async(args)) } -async fn main_async(args: Args) -> Result<(), BoxError> { - let default = format!("{:?},hickory_proto=warn", args.verbosity); +fn setup_logging(args: &Args) { + let avoid_trace = match args.verbosity { + ArgVerbosity::Trace => ArgVerbosity::Debug, + _ => args.verbosity, + }; + let default = format!( + "{:?},hickory_proto=warn,ipstack={:?},netlink_proto={:?},netlink_sys={:?}", + args.verbosity, avoid_trace, avoid_trace, avoid_trace + ); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(default)).init(); +} + +async fn main_async(args: Args) -> Result<(), BoxError> { + setup_logging(&args); let shutdown_token = tokio_util::sync::CancellationToken::new(); let main_loop_handle = tokio::spawn({ + let args = args.clone(); let shutdown_token = shutdown_token.clone(); async move { #[cfg(target_os = "linux")] if args.unshare && args.socket_transfer_fd.is_none() { if let Err(err) = namespace_proxy_main(args, shutdown_token).await { - log::error!("namespace proxy error: {}", err); + log::error!("namespace proxy error: {err}"); } - return; + return Ok(0); } unsafe extern "C" fn traffic_cb(status: *const tun2proxy::TrafficStatus, _: *mut std::ffi::c_void) { - let status = &*status; + let status = unsafe { &*status }; log::debug!("Traffic: ▲ {} : ▼ {}", status.tx, status.rx); } unsafe { tun2proxy::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) }; - if let Err(err) = tun2proxy::desktop_run_async(args, shutdown_token).await { - log::error!("main loop error: {}", err); + let ret = tun2proxy::general_run_async(args, tun::DEFAULT_MTU, cfg!(target_os = "macos"), shutdown_token).await; + if let Err(err) = &ret { + log::error!("main loop error: {err}"); } + ret } }); let ctrlc_fired = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let ctrlc_fired_clone = ctrlc_fired.clone(); - let ctrlc_handel = ctrlc2::set_async_handler(async move { + let ctrlc_handel = ctrlc2::AsyncCtrlC::new(move || { log::info!("Ctrl-C received, exiting..."); ctrlc_fired_clone.store(true, std::sync::atomic::Ordering::SeqCst); shutdown_token.cancel(); - }) - .await; + true + })?; - main_loop_handle.await?; + let tasks = main_loop_handle.await??; if ctrlc_fired.load(std::sync::atomic::Ordering::SeqCst) { log::info!("Ctrl-C fired, waiting the handler to finish..."); - ctrlc_handel.await.map_err(|err| err.to_string())?; + ctrlc_handel.await?; + } + + if args.exit_on_fatal_error && tasks >= args.max_sessions { + // Because `main_async` function perhaps stuck in `await` state, so we need to exit the process forcefully + log::info!("Internal fatal error, max sessions reached ({tasks}/{})", args.max_sessions); + std::process::exit(-1); } Ok(()) @@ -79,7 +99,7 @@ async fn namespace_proxy_main( _args: Args, _shutdown_token: tokio_util::sync::CancellationToken, ) -> Result { - use nix::fcntl::{open, OFlag}; + use nix::fcntl::{OFlag, open}; use nix::sys::stat::Mode; use std::os::fd::AsRawFd; @@ -89,7 +109,7 @@ async fn namespace_proxy_main( let child = tokio::process::Command::new("unshare") .args("--user --map-current-user --net --mount --keep-caps --kill-child --fork".split(' ')) - .arg(format!("/proc/self/fd/{}", fd)) + .arg(format!("/proc/self/fd/{}", fd.as_raw_fd())) .arg("--socket-transfer-fd") .arg(remote_fd.as_raw_fd().to_string()) .args(std::env::args().skip(1)) @@ -113,13 +133,10 @@ async fn namespace_proxy_main( log::info!("Use `tun2proxy-bin --unshare --setup [...] -- openvpn --config [...]`"); log::info!(""); log::info!("To run a new process in the created namespace (e.g. a flatpak app)"); - log::info!( - "Use `nsenter --preserve-credentials --user --net --mount --target {} /bin/sh`", - unshare_pid - ); + log::info!("Use `nsenter --preserve-credentials --user --net --mount --target {unshare_pid} /bin/sh`"); log::info!(""); if let Some(pidfile) = _args.unshare_pidfile.as_ref() { - log::info!("Writing unshare pid to {}", pidfile); + log::info!("Writing unshare pid to {pidfile}"); std::fs::write(pidfile, unshare_pid.to_string()).ok(); } tokio::spawn(async move { tun2proxy::socket_transfer::process_socket_requests(&socket).await }); diff --git a/src/bin/udpgw_server.rs b/src/bin/udpgw_server.rs new file mode 100644 index 0000000..b6c2dc6 --- /dev/null +++ b/src/bin/udpgw_server.rs @@ -0,0 +1,271 @@ +use socks5_impl::protocol::AsyncStreamOperation; +use std::net::SocketAddr; +use tokio::{ + io::AsyncWriteExt, + net::{ + UdpSocket, + tcp::{ReadHalf, WriteHalf}, + }, + sync::mpsc::{Receiver, Sender}, +}; +use tun2proxy::{ + ArgVerbosity, BoxError, Error, Result, + udpgw::{Packet, UdpFlag}, +}; + +pub(crate) const CLIENT_DISCONNECT_TIMEOUT: tokio::time::Duration = std::time::Duration::from_secs(60); + +#[derive(Debug, Clone)] +pub struct Client { + addr: SocketAddr, + last_activity: std::time::Instant, +} + +impl Client { + pub fn new(addr: SocketAddr) -> Self { + let last_activity = std::time::Instant::now(); + Self { addr, last_activity } + } +} + +fn about_info() -> &'static str { + concat!("UDP Gateway Server for tun2proxy\nVersion: ", tun2proxy::version_info!()) +} + +#[derive(Debug, Clone, clap::Parser)] +#[command(author, version = tun2proxy::version_info!(), about = about_info(), long_about = None)] +pub struct UdpGwArgs { + /// UDP gateway listen address + #[arg(short, long, value_name = "IP:PORT", default_value = "127.0.0.1:7300")] + pub listen_addr: SocketAddr, + + /// UDP mtu + #[arg(short = 'm', long, value_name = "udp mtu", default_value = "10240")] + pub udp_mtu: u16, + + /// UDP timeout in seconds + #[arg(short = 't', long, value_name = "seconds", default_value = "3")] + pub udp_timeout: u64, + + /// Daemonize for unix family or run as Windows service + #[cfg(unix)] + #[arg(short, long)] + pub daemonize: bool, + + /// Verbosity level + #[arg(short, long, value_name = "level", value_enum, default_value = "info")] + pub verbosity: ArgVerbosity, +} + +impl UdpGwArgs { + pub fn parse_args() -> Self { + ::parse() + } +} + +async fn send_error_response(tx: Sender, conn_id: u16) { + let error_packet = Packet::build_error_packet(conn_id); + if let Err(e) = tx.send(error_packet).await { + log::error!("send error response error {e:?}"); + } +} + +async fn send_keepalive_response(tx: Sender, conn_id: u16) { + let keepalive_packet = Packet::build_keepalive_packet(conn_id); + if let Err(e) = tx.send(keepalive_packet).await { + log::error!("send keepalive response error {e:?}"); + } +} + +/// Send data field of packet from client to destination server and receive response, +/// then wrap response data to the packet's data field and send packet back to client. +async fn process_udp(udp_mtu: u16, udp_timeout: u64, tx: Sender, mut packet: Packet) -> Result<()> { + let Some(dst_addr) = &packet.address else { + return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "udp request address is None").into()); + }; + use std::net::ToSocketAddrs; + let Some(dst_addr) = dst_addr.to_socket_addrs()?.next() else { + return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "to_socket_addrs").into()); + }; + let std_sock = match dst_addr { + std::net::SocketAddr::V6(_) => std::net::UdpSocket::bind("[::]:0")?, + std::net::SocketAddr::V4(_) => std::net::UdpSocket::bind("0.0.0.0:0")?, + }; + std_sock.set_nonblocking(true)?; + #[cfg(unix)] + nix::sys::socket::setsockopt(&std_sock, nix::sys::socket::sockopt::ReuseAddr, &true)?; + let socket = UdpSocket::from_std(std_sock)?; + // 1. send udp data to destination server + socket.send_to(&packet.data, &dst_addr).await?; + // 2. receive response from destination server + let mut buf = vec![0u8; udp_mtu as usize]; + let (len, _addr) = tokio::time::timeout(tokio::time::Duration::from_secs(udp_timeout), socket.recv_from(&mut buf)) + .await + .map_err(std::io::Error::from)??; + packet.data = buf[..len].to_vec(); + // 3. send response back to client + use std::io::{Error, ErrorKind::BrokenPipe}; + tx.send(packet).await.map_err(|e| Error::new(BrokenPipe, e))?; + Ok(()) +} + +fn mask_ip(ip: &str) -> String { + if ip.len() <= 2 { + return ip.to_string(); + } + let mut masked_ip = String::new(); + for (i, c) in ip.chars().enumerate() { + if i == 0 || i == ip.len() - 1 || c == '.' || c == ':' { + masked_ip.push(c); + } else { + masked_ip.push('*'); + } + } + masked_ip +} + +fn mask_socket_addr(socket_addr: std::net::SocketAddr) -> String { + match socket_addr { + std::net::SocketAddr::V4(addr) => { + let masked_ip = mask_ip(&addr.ip().to_string()); + format!("{}:{}", masked_ip, addr.port()) + } + std::net::SocketAddr::V6(addr) => { + let masked_ip = mask_ip(&addr.ip().to_string()); + format!("[{}]:{}", masked_ip, addr.port()) + } + } +} + +async fn process_client_udp_req(args: &UdpGwArgs, tx: Sender, mut client: Client, mut reader: ReadHalf<'_>) -> std::io::Result<()> { + let udp_timeout = args.udp_timeout; + let udp_mtu = args.udp_mtu; + + let masked_addr = mask_socket_addr(client.addr); + + loop { + let masked_addr = masked_addr.clone(); + // 1. read udpgw packet from client + let res = tokio::time::timeout(tokio::time::Duration::from_secs(2), Packet::retrieve_from_async_stream(&mut reader)).await; + let packet = match res { + Ok(Ok(packet)) => packet, + Ok(Err(e)) => { + log::debug!("client {masked_addr} retrieve_from_async_stream \"{e}\""); + break; + } + Err(e) => { + if client.last_activity.elapsed() >= CLIENT_DISCONNECT_TIMEOUT { + log::debug!("client {masked_addr} last_activity elapsed \"{e}\""); + break; + } + continue; + } + }; + client.last_activity = std::time::Instant::now(); + + let flags = packet.header.flags; + let conn_id = packet.header.conn_id; + if flags & UdpFlag::KEEPALIVE == UdpFlag::KEEPALIVE { + log::trace!("client {masked_addr} send keepalive"); + // 2. if keepalive packet, do nothing, send keepalive response to client + send_keepalive_response(tx.clone(), conn_id).await; + continue; + } + log::trace!("client {masked_addr} received udp data {packet}"); + + // 3. process client udpgw packet in a new task + let tx = tx.clone(); + tokio::spawn(async move { + if let Err(e) = process_udp(udp_mtu, udp_timeout, tx.clone(), packet).await { + send_error_response(tx, conn_id).await; + log::debug!("client {masked_addr} process udp function \"{e}\""); + } + }); + } + Ok(()) +} + +async fn write_to_client(addr: SocketAddr, mut writer: WriteHalf<'_>, mut rx: Receiver) -> std::io::Result<()> { + let masked_addr = mask_socket_addr(addr); + loop { + use std::io::{Error, ErrorKind::BrokenPipe}; + let packet = rx.recv().await.ok_or(Error::new(BrokenPipe, "recv error"))?; + log::trace!("send response to client {masked_addr} with {packet}"); + let data: Vec = packet.into(); + let _r = writer.write(&data).await?; + } +} + +async fn main_async(args: UdpGwArgs) -> Result<(), BoxError> { + log::info!("{} {} starting...", module_path!(), tun2proxy::version_info!()); + log::info!("UDP Gateway Server running at {}", args.listen_addr); + + let shutdown_token = tokio_util::sync::CancellationToken::new(); + let main_loop_handle = tokio::spawn(run(args, shutdown_token.clone())); + + let ctrlc_fired = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let ctrlc_fired_clone = ctrlc_fired.clone(); + let ctrlc_handel = ctrlc2::AsyncCtrlC::new(move || { + log::info!("Ctrl-C received, exiting..."); + ctrlc_fired_clone.store(true, std::sync::atomic::Ordering::SeqCst); + shutdown_token.cancel(); + true + })?; + + let _ = main_loop_handle.await?; + + if ctrlc_fired.load(std::sync::atomic::Ordering::SeqCst) { + log::info!("Ctrl-C fired, waiting the handler to finish..."); + ctrlc_handel.await?; + } + + Ok(()) +} + +pub async fn run(args: UdpGwArgs, shutdown_token: tokio_util::sync::CancellationToken) -> crate::Result<()> { + let tcp_listener = tokio::net::TcpListener::bind(args.listen_addr).await?; + loop { + let (mut tcp_stream, addr) = tokio::select! { + v = tcp_listener.accept() => v?, + _ = shutdown_token.cancelled() => break, + }; + let client = Client::new(addr); + let masked_addr = mask_socket_addr(addr); + log::info!("client {masked_addr} connected"); + let params = args.clone(); + tokio::spawn(async move { + let (tx, rx) = tokio::sync::mpsc::channel::(100); + let (tcp_read_stream, tcp_write_stream) = tcp_stream.split(); + let res = tokio::select! { + v = process_client_udp_req(¶ms, tx, client, tcp_read_stream) => v, + v = write_to_client(addr, tcp_write_stream, rx) => v, + }; + log::info!("client {masked_addr} disconnected with {res:?}"); + }); + } + Ok::<(), Error>(()) +} + +fn main() -> Result<(), BoxError> { + dotenvy::dotenv().ok(); + let args = UdpGwArgs::parse_args(); + + let default = format!("{:?}", args.verbosity); + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(default)).init(); + + #[cfg(unix)] + if args.daemonize { + let stdout = std::fs::File::create("/tmp/udpgw.out")?; + let stderr = std::fs::File::create("/tmp/udpgw.err")?; + let daemonize = daemonize::Daemonize::new() + .working_directory("/tmp") + .umask(0o777) + .stdout(stdout) + .stderr(stderr) + .privileged_action(|| "Executed before drop privileges"); + let _ = daemonize.start().map_err(|e| format!("Failed to daemonize process, error:{e:?}"))?; + } + + let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; + rt.block_on(main_async(args)) +} diff --git a/src/desktop_api.rs b/src/desktop_api.rs deleted file mode 100644 index 0617ad3..0000000 --- a/src/desktop_api.rs +++ /dev/null @@ -1,187 +0,0 @@ -#![cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))] - -use crate::{ - args::{ArgDns, ArgProxy}, - ArgVerbosity, Args, -}; -use std::os::raw::{c_char, c_int}; -use tproxy_config::{TproxyArgs, TUN_GATEWAY, TUN_IPV4, TUN_NETMASK}; -use tun2::{AbstractDevice, DEFAULT_MTU as MTU}; - -static TUN_QUIT: std::sync::Mutex> = std::sync::Mutex::new(None); - -/// # Safety -/// -/// Run the tun2proxy component with some arguments. -/// Parameters: -/// - proxy_url: the proxy url, e.g. "socks5://127.0.0.1:1080" -/// - tun: the tun device name, e.g. "utun5" -/// - bypass: the bypass IP/CIDR, e.g. "123.45.67.0/24" -/// - dns_strategy: the dns strategy, see ArgDns enum -/// - root_privilege: whether to run with root privilege -/// - verbosity: the verbosity level, see ArgVerbosity enum -#[no_mangle] -pub unsafe extern "C" fn tun2proxy_with_name_run( - proxy_url: *const c_char, - tun: *const c_char, - bypass: *const c_char, - dns_strategy: ArgDns, - _root_privilege: bool, - verbosity: ArgVerbosity, -) -> c_int { - let shutdown_token = tokio_util::sync::CancellationToken::new(); - { - if let Ok(mut lock) = TUN_QUIT.lock() { - if lock.is_some() { - return -1; - } - *lock = Some(shutdown_token.clone()); - } else { - return -2; - } - } - - log::set_max_level(verbosity.into()); - if let Err(err) = log::set_boxed_logger(Box::::default()) { - log::warn!("set logger error: {}", err); - } - - let proxy_url = std::ffi::CStr::from_ptr(proxy_url).to_str().unwrap(); - let proxy = ArgProxy::try_from(proxy_url).unwrap(); - let tun = std::ffi::CStr::from_ptr(tun).to_str().unwrap().to_string(); - - let mut args = Args::default(); - args.proxy(proxy).tun(tun).dns(dns_strategy).verbosity(verbosity); - - #[cfg(target_os = "linux")] - args.setup(_root_privilege); - - if let Ok(bypass) = std::ffi::CStr::from_ptr(bypass).to_str() { - args.bypass(bypass.parse().unwrap()); - } - - let main_loop = async move { - if let Err(err) = desktop_run_async(args, shutdown_token).await { - log::error!("main loop error: {}", err); - return Err(err); - } - Ok(()) - }; - - let exit_code = match tokio::runtime::Builder::new_multi_thread().enable_all().build() { - Err(_e) => -3, - Ok(rt) => match rt.block_on(main_loop) { - Ok(_) => 0, - Err(_e) => -4, - }, - }; - - exit_code -} - -/// Run the tun2proxy component with some arguments. -pub async fn desktop_run_async(args: Args, shutdown_token: tokio_util::sync::CancellationToken) -> std::io::Result<()> { - let bypass_ips = args.bypass.clone(); - - let mut tun_config = tun2::Configuration::default(); - tun_config.address(TUN_IPV4).netmask(TUN_NETMASK).mtu(MTU).up(); - tun_config.destination(TUN_GATEWAY); - #[cfg(unix)] - if let Some(fd) = args.tun_fd { - tun_config.raw_fd(fd); - if let Some(v) = args.close_fd_on_drop { - tun_config.close_fd_on_drop(v); - }; - } else if let Some(ref tun) = args.tun { - tun_config.tun_name(tun); - } - #[cfg(windows)] - if let Some(ref tun) = args.tun { - tun_config.tun_name(tun); - } - - #[cfg(target_os = "linux")] - tun_config.platform_config(|cfg| { - #[allow(deprecated)] - cfg.packet_information(true); - cfg.ensure_root_privileges(args.setup); - }); - - #[cfg(target_os = "windows")] - tun_config.platform_config(|cfg| { - cfg.device_guid(12324323423423434234_u128); - }); - - #[allow(unused_variables)] - let mut tproxy_args = TproxyArgs::new() - .tun_dns(args.dns_addr) - .proxy_addr(args.proxy.addr) - .bypass_ips(&bypass_ips) - .ipv6_default_route(args.ipv6_enabled); - - #[allow(unused_mut, unused_assignments, unused_variables)] - let mut setup = true; - - let device = tun2::create_as_async(&tun_config)?; - - if let Ok(tun_name) = device.tun_name() { - tproxy_args = tproxy_args.tun_name(&tun_name); - } - - // TproxyState implements the Drop trait to restore network configuration, - // so we need to assign it to a variable, even if it is not used. - let mut _restore: Option = None; - - #[cfg(target_os = "linux")] - { - setup = args.setup; - } - - #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] - if setup { - _restore = Some(tproxy_config::tproxy_setup(&tproxy_args)?); - } - - #[cfg(target_os = "linux")] - { - let mut admin_command_args = args.admin_command.iter(); - if let Some(command) = admin_command_args.next() { - let child = tokio::process::Command::new(command) - .args(admin_command_args) - .kill_on_drop(true) - .spawn(); - - match child { - Err(err) => { - log::warn!("Failed to start admin process: {err}"); - } - Ok(mut child) => { - tokio::spawn(async move { - if let Err(err) = child.wait().await { - log::warn!("Admin process terminated: {err}"); - } - }); - } - }; - } - } - - let join_handle = tokio::spawn(crate::run(device, MTU, args, shutdown_token)); - join_handle.await.map_err(std::io::Error::from)??; - - Ok::<(), std::io::Error>(()) -} - -/// # Safety -/// -/// Shutdown the tun2proxy component. -#[no_mangle] -pub unsafe extern "C" fn tun2proxy_with_name_stop() -> c_int { - if let Ok(mut lock) = TUN_QUIT.lock() { - if let Some(shutdown_token) = lock.take() { - shutdown_token.cancel(); - return 0; - } - } - -1 -} diff --git a/src/dns.rs b/src/dns.rs index e18d218..ed36f5f 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,21 +1,16 @@ use hickory_proto::{ op::{Message, MessageType, ResponseCode}, - rr::{record_type::RecordType, Name, RData, Record}, + rr::{ + Name, RData, Record, + rdata::{A, AAAA}, + }, }; use std::{net::IpAddr, str::FromStr}; pub fn build_dns_response(mut request: Message, domain: &str, ip: IpAddr, ttl: u32) -> Result { let record = match ip { - IpAddr::V4(ip) => { - let mut record = Record::with(Name::from_str(domain)?, RecordType::A, ttl); - record.set_data(Some(RData::A(ip.into()))); - record - } - IpAddr::V6(ip) => { - let mut record = Record::with(Name::from_str(domain)?, RecordType::AAAA, ttl); - record.set_data(Some(RData::AAAA(ip.into()))); - record - } + IpAddr::V4(ip) => Record::from_rdata(Name::from_str(domain)?, ttl, RData::A(A(ip))), + IpAddr::V6(ip) => Record::from_rdata(Name::from_str(domain)?, ttl, RData::AAAA(AAAA(ip))), }; // We must indicate that this message is a response. Otherwise, implementations may not @@ -27,9 +22,7 @@ pub fn build_dns_response(mut request: Message, domain: &str, ip: IpAddr, ttl: u } pub fn remove_ipv6_entries(message: &mut Message) { - message - .answers_mut() - .retain(|answer| !matches!(answer.data(), Some(RData::AAAA(_)))); + message.answers_mut().retain(|answer| !matches!(answer.data(), RData::AAAA(_))); } pub fn extract_ipaddr_from_dns_message(message: &Message) -> Result { @@ -38,7 +31,7 @@ pub fn extract_ipaddr_from_dns_message(message: &Message) -> Result { return Ok(IpAddr::V4((*addr).into())); } diff --git a/src/dump_logger.rs b/src/dump_logger.rs index b04ff09..aba05dd 100644 --- a/src/dump_logger.rs +++ b/src/dump_logger.rs @@ -9,7 +9,7 @@ pub(crate) static DUMP_CALLBACK: Mutex> = Mutex::new(None); /// # Safety /// /// set dump log info callback. -#[no_mangle] +#[unsafe(no_mangle)] pub unsafe extern "C" fn tun2proxy_set_log_callback( callback: Option, ctx: *mut c_void, @@ -23,7 +23,7 @@ pub struct DumpCallback(Option), #[error("DnsProtoError {0:?}")] - DnsProto(#[from] hickory_proto::error::ProtoError), + DnsProto(#[from] hickory_proto::ProtoError), #[error("httparse::Error {0:?}")] Httparse(#[from] httparse::Error), @@ -43,10 +43,12 @@ pub enum Error { #[error("std::num::ParseIntError {0:?}")] IntParseError(#[from] std::num::ParseIntError), +} - #[cfg(target_os = "linux")] - #[error("bincode::Error {0:?}")] - BincodeError(#[from] bincode::Error), +impl From for Error { + fn from(err: ipstack::IpStackError) -> Self { + Self::IpStack(Box::new(err)) + } } impl From<&str> for Error { @@ -71,7 +73,7 @@ impl From for std::io::Error { fn from(err: Error) -> Self { match err { Error::Io(err) => err, - _ => std::io::Error::new(std::io::ErrorKind::Other, err), + _ => std::io::Error::other(err), } } } diff --git a/src/general_api.rs b/src/general_api.rs new file mode 100644 index 0000000..e713409 --- /dev/null +++ b/src/general_api.rs @@ -0,0 +1,269 @@ +use crate::{ + ArgVerbosity, Args, + args::{ArgDns, ArgProxy}, +}; +use std::os::raw::{c_char, c_int, c_ushort}; + +static TUN_QUIT: std::sync::Mutex> = std::sync::Mutex::new(None); + +/// # Safety +/// +/// Run the tun2proxy component with some arguments. +/// Parameters: +/// - proxy_url: the proxy url, e.g. "socks5://127.0.0.1:1080" +/// - tun: the tun device name, e.g. "utun5" +/// - bypass: the bypass IP/CIDR, e.g. "123.45.67.0/24" +/// - dns_strategy: the dns strategy, see ArgDns enum +/// - root_privilege: whether to run with root privilege +/// - verbosity: the verbosity level, see ArgVerbosity enum +#[unsafe(no_mangle)] +pub unsafe extern "C" fn tun2proxy_with_name_run( + proxy_url: *const c_char, + tun: *const c_char, + bypass: *const c_char, + dns_strategy: ArgDns, + _root_privilege: bool, + verbosity: ArgVerbosity, +) -> c_int { + let proxy_url = unsafe { std::ffi::CStr::from_ptr(proxy_url) }.to_str().unwrap(); + let proxy = ArgProxy::try_from(proxy_url).unwrap(); + let tun = unsafe { std::ffi::CStr::from_ptr(tun) }.to_str().unwrap().to_string(); + + let mut args = Args::default(); + if let Ok(bypass) = unsafe { std::ffi::CStr::from_ptr(bypass) }.to_str() { + args.bypass(bypass.parse().unwrap()); + } + args.proxy(proxy).tun(tun).dns(dns_strategy).verbosity(verbosity); + + #[cfg(target_os = "linux")] + args.setup(_root_privilege); + + general_run_for_api(args, tun::DEFAULT_MTU, false) +} + +/// # Safety +/// +/// Run the tun2proxy component with some arguments. +/// Parameters: +/// - proxy_url: the proxy url, e.g. "socks5://127.0.0.1:1080" +/// - tun_fd: the tun file descriptor, it will be owned by tun2proxy +/// - close_fd_on_drop: whether close the tun_fd on drop +/// - packet_information: indicates whether exists packet information in packet from TUN device +/// - tun_mtu: the tun mtu +/// - dns_strategy: the dns strategy, see ArgDns enum +/// - verbosity: the verbosity level, see ArgVerbosity enum +#[cfg(unix)] +#[unsafe(no_mangle)] +pub unsafe extern "C" fn tun2proxy_with_fd_run( + proxy_url: *const c_char, + tun_fd: c_int, + close_fd_on_drop: bool, + packet_information: bool, + tun_mtu: c_ushort, + dns_strategy: ArgDns, + verbosity: ArgVerbosity, +) -> c_int { + let proxy_url = unsafe { std::ffi::CStr::from_ptr(proxy_url) }.to_str().unwrap(); + let proxy = ArgProxy::try_from(proxy_url).unwrap(); + + let mut args = Args::default(); + args.proxy(proxy) + .tun_fd(Some(tun_fd)) + .close_fd_on_drop(close_fd_on_drop) + .dns(dns_strategy) + .verbosity(verbosity); + + general_run_for_api(args, tun_mtu, packet_information) +} + +/// # Safety +/// Run the tun2proxy component with command line arguments +/// Parameters: +/// - cli_args: The command line arguments, +/// e.g. `tun2proxy-bin --setup --proxy socks5://127.0.0.1:1080 --bypass 98.76.54.0/24 --dns over-tcp --verbosity trace` +/// - tun_mtu: The MTU of the TUN device, e.g. 1500 +/// - packet_information: Whether exists packet information in packet from TUN device +#[unsafe(no_mangle)] +pub unsafe extern "C" fn tun2proxy_run_with_cli_args(cli_args: *const c_char, tun_mtu: c_ushort, packet_information: bool) -> c_int { + let Ok(cli_args) = unsafe { std::ffi::CStr::from_ptr(cli_args) }.to_str() else { + log::error!("Failed to convert CLI arguments to string"); + return -5; + }; + let Some(args) = shlex::split(cli_args) else { + log::error!("Failed to split CLI arguments"); + return -6; + }; + let args = ::parse_from(args); + general_run_for_api(args, tun_mtu, packet_information) +} + +pub fn general_run_for_api(args: Args, tun_mtu: u16, packet_information: bool) -> c_int { + log::set_max_level(args.verbosity.into()); + if let Err(err) = log::set_boxed_logger(Box::::default()) { + log::debug!("set logger error: {err}"); + } + + let shutdown_token = tokio_util::sync::CancellationToken::new(); + if let Ok(mut lock) = TUN_QUIT.lock() { + if lock.is_some() { + log::error!("tun2proxy already started"); + return -1; + } + *lock = Some(shutdown_token.clone()); + } else { + log::error!("failed to lock tun2proxy quit token"); + return -2; + } + + let Ok(rt) = tokio::runtime::Builder::new_multi_thread().enable_all().build() else { + log::error!("failed to create tokio runtime with"); + return -3; + }; + match rt.block_on(async move { + let ret = general_run_async(args.clone(), tun_mtu, packet_information, shutdown_token).await; + match &ret { + Ok(sessions) => { + if args.exit_on_fatal_error && *sessions >= args.max_sessions { + log::error!("Forced exit due to max sessions reached ({sessions}/{})", args.max_sessions); + std::process::exit(-1); + } + log::debug!("tun2proxy exited normally, current sessions: {sessions}"); + } + Err(err) => log::error!("main loop error: {err}"), + } + ret + }) { + Ok(_) => 0, + Err(e) => { + log::error!("failed to run tun2proxy with error: {e:?}"); + -4 + } + } +} + +/// Run the tun2proxy component with some arguments. +pub async fn general_run_async( + args: Args, + tun_mtu: u16, + _packet_information: bool, + shutdown_token: tokio_util::sync::CancellationToken, +) -> std::io::Result { + let mut tun_config = tun::Configuration::default(); + + #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] + { + use tproxy_config::{TUN_GATEWAY, TUN_IPV4, TUN_NETMASK}; + tun_config.address(TUN_IPV4).netmask(TUN_NETMASK).mtu(tun_mtu).up(); + tun_config.destination(TUN_GATEWAY); + } + + #[cfg(unix)] + if let Some(fd) = args.tun_fd { + tun_config.raw_fd(fd); + if let Some(v) = args.close_fd_on_drop { + tun_config.close_fd_on_drop(v); + }; + } else if let Some(ref tun) = args.tun { + tun_config.tun_name(tun); + } + #[cfg(windows)] + if let Some(ref tun) = args.tun { + tun_config.tun_name(tun); + } + + #[cfg(target_os = "linux")] + tun_config.platform_config(|cfg| { + #[allow(deprecated)] + cfg.packet_information(true); + cfg.ensure_root_privileges(args.setup); + }); + + #[cfg(target_os = "windows")] + tun_config.platform_config(|cfg| { + cfg.device_guid(12324323423423434234_u128); + }); + + #[cfg(any(target_os = "ios", target_os = "macos"))] + tun_config.platform_config(|cfg| { + cfg.packet_information(_packet_information); + }); + + #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] + #[allow(unused_variables)] + let mut tproxy_args = tproxy_config::TproxyArgs::new() + .tun_dns(args.dns_addr) + .proxy_addr(args.proxy.addr) + .bypass_ips(&args.bypass) + .ipv6_default_route(args.ipv6_enabled); + + let device = tun::create_as_async(&tun_config)?; + + #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] + if let Ok(tun_name) = tun::AbstractDevice::tun_name(&*device) { + // Above line is equivalent to: `use tun::AbstractDevice; if let Ok(tun_name) = device.tun_name() {` + tproxy_args = tproxy_args.tun_name(&tun_name); + } + + // TproxyState implements the Drop trait to restore network configuration, + // so we need to assign it to a variable, even if it is not used. + #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] + let mut restore: Option = None; + + #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] + if args.setup { + restore = Some(tproxy_config::tproxy_setup(&tproxy_args).await?); + } + + #[cfg(target_os = "linux")] + { + let mut admin_command_args = args.admin_command.iter(); + if let Some(command) = admin_command_args.next() { + let child = tokio::process::Command::new(command) + .args(admin_command_args) + .kill_on_drop(true) + .spawn(); + + match child { + Err(err) => { + log::warn!("Failed to start admin process: {err}"); + } + Ok(mut child) => { + tokio::spawn(async move { + if let Err(err) = child.wait().await { + log::warn!("Admin process terminated: {err}"); + } + }); + } + }; + } + } + + let join_handle = tokio::spawn(crate::run(device, tun_mtu, args, shutdown_token.clone())); + + match join_handle.await? { + Ok(sessions) => { + #[cfg(any(target_os = "linux", target_os = "windows", target_os = "macos"))] + tproxy_config::tproxy_remove(restore).await?; + Ok(sessions) + } + Err(err) => Err(std::io::Error::from(err)), + } +} + +/// # Safety +/// +/// Shutdown the tun2proxy component. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn tun2proxy_stop() -> c_int { + tun2proxy_stop_internal() +} + +pub(crate) fn tun2proxy_stop_internal() -> c_int { + if let Ok(mut lock) = TUN_QUIT.lock() { + if let Some(shutdown_token) = lock.take() { + shutdown_token.cancel(); + return 0; + } + } + -1 +} diff --git a/src/http.rs b/src/http.rs index 2b5f491..2b08ed1 100644 --- a/src/http.rs +++ b/src/http.rs @@ -4,11 +4,10 @@ use crate::{ proxy_handler::{ProxyHandler, ProxyHandlerManager}, session_info::{IpProtocol, SessionInfo}, }; -use base64::Engine; use httparse::Response; use socks5_impl::protocol::UserKey; use std::{ - collections::{hash_map::RandomState, HashMap, VecDeque}, + collections::{HashMap, VecDeque, hash_map::RandomState}, iter::FromIterator, net::SocketAddr, str, @@ -141,10 +140,9 @@ impl HttpConnection { .extend(format!("{}: {}\r\n", PROXY_AUTHORIZATION, response.to_header_string()).as_bytes()); } AuthenticationScheme::Basic => { - let cred = format!("{}:{}", credentials.username, credentials.password); - let auth_b64 = base64::engine::general_purpose::STANDARD.encode(cred); + let auth_b64 = base64easy::encode(credentials.to_string(), base64easy::EngineKind::Standard); self.server_outbuf - .extend(format!("{}: Basic {}\r\n", PROXY_AUTHORIZATION, auth_b64).as_bytes()); + .extend(format!("{PROXY_AUTHORIZATION}: Basic {auth_b64}\r\n").as_bytes()); } AuthenticationScheme::None => {} } @@ -252,7 +250,7 @@ impl HttpConnection { } // The HTTP/1.1 expected to be keep alive waiting for the next frame so, we must - // compute the lenght of the response in order to detect the next frame (response) + // compute the length of the response in order to detect the next frame (response) // [RFC-9112](https://datatracker.ietf.org/doc/html/rfc9112#body.content-length) // Transfer-Encoding isn't supported yet diff --git a/src/lib.rs b/src/lib.rs index 554fe14..737a82d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "udpgw")] +use crate::udpgw::UdpGwClient; use crate::{ directions::{IncomingDataEvent, IncomingDirection, OutgoingDirection}, http::HttpManager, @@ -5,10 +7,12 @@ use crate::{ session_info::{IpProtocol, SessionInfo}, virtual_dns::VirtualDns, }; -use ipstack::stream::{IpStackStream, IpStackTcpStream, IpStackUdpStream}; +use ipstack::{IpStackStream, IpStackTcpStream, IpStackUdpStream}; use proxy_handler::{ProxyHandler, ProxyHandlerManager}; use socks::SocksProxyManager; pub use socks5_impl::protocol::UserKey; +#[cfg(feature = "udpgw")] +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}; use std::{ collections::VecDeque, io::ErrorKind, @@ -18,59 +22,54 @@ use std::{ use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, net::{TcpSocket, TcpStream, UdpSocket}, - sync::{mpsc::Receiver, Mutex}, + sync::{Mutex, mpsc::Receiver}, }; pub use tokio_util::sync::CancellationToken; use tproxy_config::is_private_ip; use udp_stream::UdpStream; +#[cfg(feature = "udpgw")] +use udpgw::{UDPGW_KEEPALIVE_TIME, UDPGW_MAX_CONNECTIONS, UdpGwClientStream, UdpGwResponse}; pub use { args::{ArgDns, ArgProxy, ArgVerbosity, Args, ProxyType}, error::{BoxError, Error, Result}, - traffic_status::{tun2proxy_set_traffic_status_callback, TrafficStatus}, + traffic_status::{TrafficStatus, tun2proxy_set_traffic_status_callback}, }; #[cfg(feature = "mimalloc")] #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -#[cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))] -pub use desktop_api::desktop_run_async; - -#[cfg(any(target_os = "ios", target_os = "android"))] -pub use mobile_api::{desktop_run_async, mobile_run, mobile_stop}; - -#[cfg(target_os = "macos")] -pub use mobile_api::{mobile_run, mobile_stop}; +pub use general_api::general_run_async; mod android; -mod apple; mod args; -mod desktop_api; mod directions; mod dns; mod dump_logger; mod error; +mod general_api; mod http; -mod mobile_api; mod no_proxy; mod proxy_handler; mod session_info; pub mod socket_transfer; mod socks; mod traffic_status; +#[cfg(feature = "udpgw")] +pub mod udpgw; mod virtual_dns; #[doc(hidden)] pub mod win_svc; const DNS_PORT: u16 = 53; -static TASK_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); -use std::sync::atomic::Ordering::Relaxed; - #[allow(unused)] #[derive(Hash, Copy, Clone, Eq, PartialEq, Debug)] -#[cfg_attr(target_os = "linux", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr( + target_os = "linux", + derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize) +)] pub enum SocketProtocol { Tcp, Udp, @@ -78,7 +77,10 @@ pub enum SocketProtocol { #[allow(unused)] #[derive(Hash, Copy, Clone, Eq, PartialEq, Debug)] -#[cfg_attr(target_os = "linux", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr( + target_os = "linux", + derive(bincode::Encode, bincode::Decode, serde::Serialize, serde::Deserialize) +)] pub enum SocketDomain { IpV4, IpV6, @@ -149,11 +151,13 @@ async fn create_udp_stream(socket_queue: &Option>, peer: Socket /// * `mtu` - The MTU of the network device /// * `args` - The arguments to use /// * `shutdown_token` - The token to exit the server -pub async fn run(device: D, mtu: u16, args: Args, shutdown_token: CancellationToken) -> crate::Result<()> +/// # Returns +/// * The number of sessions while exiting +pub async fn run(device: D, mtu: u16, args: Args, shutdown_token: CancellationToken) -> crate::Result where D: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - log::info!("{} {} starting...", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + log::info!("{} {} starting...", env!("CARGO_PKG_NAME"), version_info!()); log::info!("Proxy {} server: {}", args.proxy.proxy_type, args.proxy.addr); let server_addr = args.proxy.addr; @@ -217,11 +221,11 @@ where let socket_queue = None; use socks5_impl::protocol::Version::{V4, V5}; - let mgr = match args.proxy.proxy_type { - ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)) as Arc, - ProxyType::Socks4 => Arc::new(SocksProxyManager::new(server_addr, V4, key)) as Arc, - ProxyType::Http => Arc::new(HttpManager::new(server_addr, key)) as Arc, - ProxyType::None => Arc::new(NoProxyManager::new()) as Arc, + let mgr: Arc = match args.proxy.proxy_type { + ProxyType::Socks5 => Arc::new(SocksProxyManager::new(server_addr, V5, key)), + ProxyType::Socks4 => Arc::new(SocksProxyManager::new(server_addr, V4, key)), + ProxyType::Http => Arc::new(HttpManager::new(server_addr, key)), + ProxyType::None => Arc::new(NoProxyManager::new()), }; let mut ipstack_config = ipstack::IpStackConfig::default(); @@ -231,7 +235,29 @@ where let mut ip_stack = ipstack::IpStack::new(ipstack_config, device); + #[cfg(feature = "udpgw")] + let udpgw_client = args.udpgw_server.map(|addr| { + log::info!("UDP Gateway enabled, server: {addr}"); + use std::time::Duration; + let client = Arc::new(UdpGwClient::new( + mtu, + args.udpgw_connections.unwrap_or(UDPGW_MAX_CONNECTIONS), + args.udpgw_keepalive.map(Duration::from_secs).unwrap_or(UDPGW_KEEPALIVE_TIME), + args.udp_timeout, + addr, + )); + let client_keepalive = client.clone(); + tokio::spawn(async move { + let _ = client_keepalive.heartbeat_task().await; + }); + client + }); + + let task_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + use std::sync::atomic::Ordering::Relaxed; + loop { + let task_count = task_count.clone(); let virtual_dns = virtual_dns.clone(); let ip_stack_stream = tokio::select! { _ = shutdown_token.cancelled() => { @@ -242,10 +268,10 @@ where ip_stack_stream? } }; - let max_sessions = args.max_sessions as u64; + let max_sessions = args.max_sessions; match ip_stack_stream { IpStackStream::Tcp(tcp) => { - if TASK_COUNT.load(Relaxed) > max_sessions { + if task_count.load(Relaxed) >= max_sessions { if args.exit_on_fatal_error { log::info!("Too many sessions that over {max_sessions}, exiting..."); break; @@ -253,7 +279,7 @@ where log::warn!("Too many sessions that over {max_sessions}, dropping new session"); continue; } - log::trace!("Session count {}", TASK_COUNT.fetch_add(1, Relaxed) + 1); + log::trace!("Session count {}", task_count.fetch_add(1, Relaxed).saturating_add(1)); let info = SessionInfo::new(tcp.local_addr(), tcp.peer_addr(), IpProtocol::Tcp); let domain_name = if let Some(virtual_dns) = &virtual_dns { let mut virtual_dns = virtual_dns.lock().await; @@ -266,13 +292,13 @@ where let socket_queue = socket_queue.clone(); tokio::spawn(async move { if let Err(err) = handle_tcp_session(tcp, proxy_handler, socket_queue).await { - log::error!("{} error \"{}\"", info, err); + log::error!("{info} error \"{err}\""); } - log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); + log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); }); } IpStackStream::Udp(udp) => { - if TASK_COUNT.load(Relaxed) > max_sessions { + if task_count.load(Relaxed) >= max_sessions { if args.exit_on_fatal_error { log::info!("Too many sessions that over {max_sessions}, exiting..."); break; @@ -280,11 +306,11 @@ where log::warn!("Too many sessions that over {max_sessions}, dropping new session"); continue; } - log::trace!("Session count {}", TASK_COUNT.fetch_add(1, Relaxed) + 1); + log::trace!("Session count {}", task_count.fetch_add(1, Relaxed).saturating_add(1)); let mut info = SessionInfo::new(udp.local_addr(), udp.peer_addr(), IpProtocol::Udp); if info.dst.port() == DNS_PORT { if is_private_ip(info.dst.ip()) { - info.dst.set_ip(dns_addr); + info.dst.set_ip(dns_addr); // !!! Here we change the destination address to remote DNS server!!! } if args.dns == ArgDns::OverTcp { info.protocol = IpProtocol::Tcp; @@ -292,9 +318,9 @@ where let socket_queue = socket_queue.clone(); tokio::spawn(async move { if let Err(err) = handle_dns_over_tcp_session(udp, proxy_handler, socket_queue, ipv6_enabled).await { - log::error!("{} error \"{}\"", info, err); + log::error!("{info} error \"{err}\""); } - log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); + log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); }); continue; } @@ -302,10 +328,10 @@ where tokio::spawn(async move { if let Some(virtual_dns) = virtual_dns { if let Err(err) = handle_virtual_dns_session(udp, virtual_dns).await { - log::error!("{} error \"{}\"", info, err); + log::error!("{info} error \"{err}\""); } } - log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); + log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); }); continue; } @@ -318,25 +344,47 @@ where } else { None }; + #[cfg(feature = "udpgw")] + if let Some(udpgw) = udpgw_client.clone() { + let tcp_src = match udp.peer_addr() { + SocketAddr::V4(_) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)), + SocketAddr::V6(_) => SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0)), + }; + let tcpinfo = SessionInfo::new(tcp_src, udpgw.get_udpgw_server_addr(), IpProtocol::Tcp); + let proxy_handler = mgr.new_proxy_handler(tcpinfo, None, false).await?; + let queue = socket_queue.clone(); + tokio::spawn(async move { + let dst = info.dst; // real UDP destination address + let dst_addr = match domain_name { + Some(ref d) => socks5_impl::protocol::Address::from((d.clone(), dst.port())), + None => dst.into(), + }; + if let Err(e) = handle_udp_gateway_session(udp, udpgw, &dst_addr, proxy_handler, queue, ipv6_enabled).await { + log::info!("Ending {info} with \"{e}\""); + } + log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); + }); + continue; + } match mgr.new_proxy_handler(info, domain_name, true).await { Ok(proxy_handler) => { let socket_queue = socket_queue.clone(); tokio::spawn(async move { let ty = args.proxy.proxy_type; if let Err(err) = handle_udp_associate_session(udp, ty, proxy_handler, socket_queue, ipv6_enabled).await { - log::info!("Ending {} with \"{}\"", info, err); + log::info!("Ending {info} with \"{err}\""); } - log::trace!("Session count {}", TASK_COUNT.fetch_sub(1, Relaxed) - 1); + log::trace!("Session count {}", task_count.fetch_sub(1, Relaxed).saturating_sub(1)); }); } Err(e) => { - log::error!("Failed to create UDP connection: {}", e); + log::error!("Failed to create UDP connection: {e}"); } } } IpStackStream::UnknownTransport(u) => { let len = u.payload().len(); - log::info!("#0 unhandled transport - Ip Protocol 0x{:02X}, length {}", u.ip_protocol(), len); + log::info!("#0 unhandled transport - Ip Protocol {:?}, length {}", u.ip_protocol(), len); continue; } IpStackStream::UnknownNetwork(pkt) => { @@ -345,7 +393,7 @@ where } } } - Ok(()) + Ok(task_count.load(Relaxed)) } async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc>) -> crate::Result<()> { @@ -354,7 +402,7 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc { // indicate UDP read fails not an error. - log::debug!("Virtual DNS session error: {}", e); + log::debug!("Virtual DNS session error: {e}"); break; } Ok(len) => len, @@ -364,7 +412,7 @@ async fn handle_virtual_dns_session(mut udp: IpStackUdpStream, dns: Arc {}", qname, ip); + log::debug!("Virtual DNS query: {qname} -> {ip}"); } Ok(()) } @@ -383,7 +431,7 @@ where total += n as u64; let (tx, rx) = if is_tx { (n, 0) } else { (0, n) }; if let Err(e) = crate::traffic_status::traffic_status_update(tx, rx) { - log::debug!("Record traffic status error: {}", e); + log::debug!("Record traffic status error: {e}"); } writer.write_all(&buf[..n]).await?; } @@ -405,7 +453,7 @@ async fn handle_tcp_session( let mut server = create_tcp_stream(&socket_queue, server_addr).await?; - log::info!("Beginning {}", session_info); + log::info!("Beginning {session_info}"); if let Err(e) = handle_proxy_session(&mut server, proxy_handler).await { tcp_stack.shutdown().await?; @@ -419,19 +467,141 @@ async fn handle_tcp_session( async move { let r = copy_and_record_traffic(&mut t_rx, &mut s_tx, true).await; if let Err(err) = s_tx.shutdown().await { - log::trace!("{} s_tx shutdown error {}", session_info, err); + log::trace!("{session_info} s_tx shutdown error {err}"); } r }, async move { let r = copy_and_record_traffic(&mut s_rx, &mut t_tx, false).await; if let Err(err) = t_tx.shutdown().await { - log::trace!("{} t_tx shutdown error {}", session_info, err); + log::trace!("{session_info} t_tx shutdown error {err}"); } r }, ); - log::info!("Ending {} with {:?}", session_info, res); + log::info!("Ending {session_info} with {res:?}"); + + Ok(()) +} + +#[cfg(feature = "udpgw")] +async fn handle_udp_gateway_session( + mut udp_stack: IpStackUdpStream, + udpgw_client: Arc, + udp_dst: &socks5_impl::protocol::Address, + proxy_handler: Arc>, + socket_queue: Option>, + ipv6_enabled: bool, +) -> crate::Result<()> { + let proxy_server_addr = { proxy_handler.lock().await.get_server_addr() }; + let udp_mtu = udpgw_client.get_udp_mtu(); + let udp_timeout = udpgw_client.get_udp_timeout(); + + let mut stream = loop { + match udpgw_client.pop_server_connection_from_queue().await { + Some(stream) => { + if stream.is_closed() { + continue; + } else { + break stream; + } + } + None => { + let mut tcp_server_stream = create_tcp_stream(&socket_queue, proxy_server_addr).await?; + if let Err(e) = handle_proxy_session(&mut tcp_server_stream, proxy_handler).await { + return Err(format!("udpgw connection error: {e}").into()); + } + break UdpGwClientStream::new(tcp_server_stream); + } + } + }; + + let tcp_local_addr = stream.local_addr(); + let sn = stream.serial_number(); + + log::info!("[UdpGw] Beginning stream {} {} -> {}", sn, &tcp_local_addr, udp_dst); + + let Some(mut reader) = stream.get_reader() else { + return Err("get reader failed".into()); + }; + + let Some(mut writer) = stream.get_writer() else { + return Err("get writer failed".into()); + }; + + let mut tmp_buf = vec![0; udp_mtu.into()]; + + loop { + tokio::select! { + len = udp_stack.read(&mut tmp_buf) => { + let read_len = match len { + Ok(0) => { + log::info!("[UdpGw] Ending stream {} {} <> {}", sn, &tcp_local_addr, udp_dst); + break; + } + Ok(n) => n, + Err(e) => { + log::info!("[UdpGw] Ending stream {} {} <> {} with udp stack \"{}\"", sn, &tcp_local_addr, udp_dst, e); + break; + } + }; + crate::traffic_status::traffic_status_update(read_len, 0)?; + let sn = stream.serial_number(); + if let Err(e) = UdpGwClient::send_udpgw_packet(ipv6_enabled, &tmp_buf[0..read_len], udp_dst, sn, &mut writer).await { + log::info!("[UdpGw] Ending stream {} {} <> {} with send_udpgw_packet {}", sn, &tcp_local_addr, udp_dst, e); + break; + } + log::debug!("[UdpGw] stream {} {} -> {} send len {}", sn, &tcp_local_addr, udp_dst, read_len); + stream.update_activity(); + } + ret = UdpGwClient::recv_udpgw_packet(udp_mtu, udp_timeout, &mut reader) => { + if let Ok((len, _)) = ret { + crate::traffic_status::traffic_status_update(0, len)?; + } + match ret { + Err(e) => { + log::warn!("[UdpGw] Ending stream {} {} <> {} with recv_udpgw_packet {}", sn, &tcp_local_addr, udp_dst, e); + stream.close(); + break; + } + Ok((_, packet)) => match packet { + //should not received keepalive + UdpGwResponse::KeepAlive => { + log::error!("[UdpGw] Ending stream {} {} <> {} with recv keepalive", sn, &tcp_local_addr, udp_dst); + stream.close(); + break; + } + //server udp may be timeout,can continue to receive udp data? + UdpGwResponse::Error => { + log::info!("[UdpGw] Ending stream {} {} <> {} with recv udp error", sn, &tcp_local_addr, udp_dst); + stream.update_activity(); + continue; + } + UdpGwResponse::TcpClose => { + log::error!("[UdpGw] Ending stream {} {} <> {} with tcp closed", sn, &tcp_local_addr, udp_dst); + stream.close(); + break; + } + UdpGwResponse::Data(data) => { + use socks5_impl::protocol::StreamOperation; + let len = data.len(); + let f = data.header.flags; + log::debug!("[UdpGw] stream {sn} {} <- {} receive {f} len {len}", &tcp_local_addr, udp_dst); + if let Err(e) = udp_stack.write_all(&data.data).await { + log::error!("[UdpGw] Ending stream {} {} <> {} with send_udp_packet {}", sn, &tcp_local_addr, udp_dst, e); + break; + } + } + } + } + stream.update_activity(); + } + } + } + + if !stream.is_closed() { + udpgw_client.store_server_connection_full(stream, reader, writer).await; + } Ok(()) } @@ -455,7 +625,7 @@ async fn handle_udp_associate_session( ) }; - log::info!("Beginning {}", session_info); + log::info!("Beginning {session_info}"); // `_server` is meaningful here, it must be alive all the time // to ensure that UDP transmission will not be interrupted accidentally. @@ -532,7 +702,7 @@ async fn handle_udp_associate_session( } } - log::info!("Ending {}", session_info); + log::info!("Ending {session_info}"); Ok(()) } @@ -551,7 +721,7 @@ async fn handle_dns_over_tcp_session( let mut server = create_tcp_stream(&socket_queue, server_addr).await?; - log::info!("Beginning {}", session_info); + log::info!("Beginning {session_info}"); let _ = handle_proxy_session(&mut server, proxy_handler).await?; @@ -604,7 +774,7 @@ async fn handle_dns_over_tcp_session( let name = dns::extract_domain_from_dns_message(&message)?; let ip = dns::extract_ipaddr_from_dns_message(&message); - log::trace!("DNS over TCP query result: {} -> {:?}", name, ip); + log::trace!("DNS over TCP query result: {name} -> {ip:?}"); if !ipv6_enabled { dns::remove_ipv6_entries(&mut message); @@ -624,7 +794,7 @@ async fn handle_dns_over_tcp_session( } } - log::info!("Ending {}", session_info); + log::info!("Ending {session_info}"); Ok(()) } diff --git a/src/mobile_api.rs b/src/mobile_api.rs deleted file mode 100644 index 69e684c..0000000 --- a/src/mobile_api.rs +++ /dev/null @@ -1,83 +0,0 @@ -#![cfg(any(target_os = "ios", target_os = "android", target_os = "macos"))] - -use crate::Args; -use std::os::raw::c_int; - -static TUN_QUIT: std::sync::Mutex> = std::sync::Mutex::new(None); - -/// Dummy function to make the build pass. -#[doc(hidden)] -#[cfg(not(target_os = "macos"))] -pub async fn desktop_run_async(_: Args, _: tokio_util::sync::CancellationToken) -> std::io::Result<()> { - Ok(()) -} - -pub fn mobile_run(args: Args, tun_mtu: u16, _packet_information: bool) -> c_int { - let shutdown_token = tokio_util::sync::CancellationToken::new(); - { - if let Ok(mut lock) = TUN_QUIT.lock() { - if lock.is_some() { - log::error!("tun2proxy already started"); - return -1; - } - *lock = Some(shutdown_token.clone()); - } else { - log::error!("failed to lock tun2proxy quit token"); - return -2; - } - } - - let block = async move { - let mut config = tun2::Configuration::default(); - - #[cfg(unix)] - if let Some(fd) = args.tun_fd { - config.raw_fd(fd); - if let Some(v) = args.close_fd_on_drop { - config.close_fd_on_drop(v); - }; - } else if let Some(ref tun) = args.tun { - config.tun_name(tun); - } - #[cfg(windows)] - if let Some(ref tun) = args.tun { - config.tun_name(tun); - } - - #[cfg(any(target_os = "ios", target_os = "macos"))] - config.platform_config(|config| { - config.packet_information(_packet_information); - }); - - let device = tun2::create_as_async(&config).map_err(std::io::Error::from)?; - let join_handle = tokio::spawn(crate::run(device, tun_mtu, args, shutdown_token)); - - join_handle.await.map_err(std::io::Error::from)? - }; - - let exit_code = match tokio::runtime::Builder::new_multi_thread().enable_all().build() { - Err(e) => { - log::error!("failed to create tokio runtime with error: {:?}", e); - -1 - } - Ok(rt) => match rt.block_on(block) { - Ok(_) => 0, - Err(e) => { - log::error!("failed to run tun2proxy with error: {:?}", e); - -2 - } - }, - }; - - exit_code -} - -pub fn mobile_stop() -> c_int { - if let Ok(mut lock) = TUN_QUIT.lock() { - if let Some(shutdown_token) = lock.take() { - shutdown_token.cancel(); - return 0; - } - } - -1 -} diff --git a/src/session_info.rs b/src/session_info.rs index fc4e938..a0784a9 100644 --- a/src/session_info.rs +++ b/src/session_info.rs @@ -16,7 +16,7 @@ impl std::fmt::Display for IpProtocol { IpProtocol::Tcp => write!(f, "TCP"), IpProtocol::Udp => write!(f, "UDP"), IpProtocol::Icmp => write!(f, "ICMP"), - IpProtocol::Other(v) => write!(f, "Other(0x{:02X})", v), + IpProtocol::Other(v) => write!(f, "Other(0x{v:02X})"), } } } diff --git a/src/socket_transfer.rs b/src/socket_transfer.rs index f10b3f2..4c81da7 100644 --- a/src/socket_transfer.rs +++ b/src/socket_transfer.rs @@ -1,10 +1,10 @@ #![cfg(target_os = "linux")] -use crate::{error, SocketDomain, SocketProtocol}; +use crate::{SocketDomain, SocketProtocol, error}; use nix::{ errno::Errno, fcntl::{self, FdFlag}, - sys::socket::{cmsg_space, getsockopt, recvmsg, sendmsg, sockopt, ControlMessage, ControlMessageOwned, MsgFlags, SockType}, + sys::socket::{ControlMessage, ControlMessageOwned, MsgFlags, SockType, cmsg_space, getsockopt, recvmsg, sendmsg, sockopt}, }; use serde::{Deserialize, Serialize}; use std::{ @@ -16,31 +16,31 @@ use tokio::net::{TcpSocket, UdpSocket, UnixDatagram}; const REQUEST_BUFFER_SIZE: usize = 64; -#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +#[derive(bincode::Encode, bincode::Decode, Hash, Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] struct Request { protocol: SocketProtocol, domain: SocketDomain, number: u32, } -#[derive(Hash, Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +#[derive(bincode::Encode, bincode::Decode, PartialEq, Debug, Hash, Copy, Clone, Eq, Serialize, Deserialize)] enum Response { Ok, } /// Reconstruct socket from raw `fd` pub fn reconstruct_socket(fd: RawFd) -> Result { - // Check if `fd` is valid - let fd_flags = fcntl::fcntl(fd, fcntl::F_GETFD)?; - // `fd` is confirmed to be valid so it should be closed let socket = unsafe { OwnedFd::from_raw_fd(fd) }; + // Check if `fd` is valid + let fd_flags = fcntl::fcntl(socket.as_fd(), fcntl::F_GETFD)?; + // Insert CLOEXEC flag to the `fd` to prevent further propagation across `execve(2)` calls let mut fd_flags = FdFlag::from_bits(fd_flags).ok_or(ErrorKind::Unsupported)?; if !fd_flags.contains(FdFlag::FD_CLOEXEC) { fd_flags.insert(FdFlag::FD_CLOEXEC); - fcntl::fcntl(fd, fcntl::F_SETFD(fd_flags))?; + fcntl::fcntl(socket.as_fd(), fcntl::F_SETFD(fd_flags))?; } Ok(socket) @@ -70,12 +70,12 @@ pub async fn create_transfer_socket_pair() -> std::io::Result<(UnixDatagram, Own let remote_fd: OwnedFd = remote.into_std().unwrap().into(); // Get `remote_fd` flags - let fd_flags = fcntl::fcntl(remote_fd.as_raw_fd(), fcntl::F_GETFD)?; + let fd_flags = fcntl::fcntl(remote_fd.as_fd(), fcntl::F_GETFD)?; // Remove CLOEXEC flag from the `remote_fd` to allow propagating across `execve(2)` let mut fd_flags = FdFlag::from_bits(fd_flags).ok_or(ErrorKind::Unsupported)?; fd_flags.remove(FdFlag::FD_CLOEXEC); - fcntl::fcntl(remote_fd.as_raw_fd(), fcntl::F_SETFD(fd_flags))?; + fcntl::fcntl(remote_fd.as_fd(), fcntl::F_SETFD(fd_flags))?; Ok((local, remote_fd)) } @@ -135,14 +135,21 @@ where // Borrow socket as mut to prevent multiple simultaneous requests let socket = socket.deref_mut(); - // Send request - let request = bincode::serialize(&Request { - protocol: T::domain(), - domain, - number, - })?; + let mut request = [0u8; 1000]; - socket.send(&request[..]).await?; + // Send request + let size = bincode::encode_into_slice( + Request { + protocol: T::domain(), + domain, + number, + }, + &mut request, + bincode::config::standard(), + ) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; + + socket.send(&request[..size]).await?; // Receive response loop { @@ -150,8 +157,7 @@ where let mut buf = [0_u8; REQUEST_BUFFER_SIZE]; let mut iov = [IoSliceMut::new(&mut buf[..])]; - let mut cmsg = Vec::with_capacity(cmsg_space::() * number as usize); - + let mut cmsg = vec![0; cmsg_space::() * number as usize]; let msg = recvmsg::<()>(socket.as_fd().as_raw_fd(), &mut iov, Some(&mut cmsg), MsgFlags::empty()); let msg = match msg { @@ -161,7 +167,9 @@ where // Parse response let response = &msg.iovs().next().unwrap()[..msg.bytes]; - let response: Response = bincode::deserialize(response)?; + let response: Response = bincode::decode_from_slice(response, bincode::config::standard()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))? + .0; if !matches!(response, Response::Ok) { return Err("Request for new sockets failed".into()); } @@ -194,10 +202,14 @@ pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()> let len = socket.recv(&mut buf[..]).await?; - let request: Request = bincode::deserialize(&buf[..len])?; + let request: Request = bincode::decode_from_slice(&buf[..len], bincode::config::standard()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))? + .0; let response = Response::Ok; - let buf = bincode::serialize(&response)?; + let mut buf = [0u8; 1000]; + let size = bincode::encode_into_slice(response, &mut buf, bincode::config::standard()) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?; let mut owned_fd_buf: Vec = Vec::with_capacity(request.number as usize); for _ in 0..request.number { @@ -223,7 +235,7 @@ pub async fn process_socket_requests(socket: &UnixDatagram) -> error::Result<()> let raw_fd_buf: Vec = owned_fd_buf.iter().map(|fd| fd.as_raw_fd()).collect(); let cmsg = ControlMessage::ScmRights(&raw_fd_buf[..]); - let iov = [IoSlice::new(&buf[..])]; + let iov = [IoSlice::new(&buf[..size])]; sendmsg::<()>(socket.as_raw_fd(), &iov, &[cmsg], MsgFlags::empty(), None)?; } diff --git a/src/socks.rs b/src/socks.rs index 3800c6a..e2265ba 100644 --- a/src/socks.rs +++ b/src/socks.rs @@ -4,7 +4,7 @@ use crate::{ proxy_handler::{ProxyHandler, ProxyHandlerManager}, session_info::SessionInfo, }; -use socks5_impl::protocol::{self, handshake, password_method, Address, AuthMethod, StreamOperation, UserKey, Version}; +use socks5_impl::protocol::{self, Address, AuthMethod, StreamOperation, UserKey, Version, handshake, password_method}; use std::{collections::VecDeque, net::SocketAddr, sync::Arc}; use tokio::sync::Mutex; @@ -78,7 +78,7 @@ impl SocksProxyImpl { } } SocketAddr::V6(addr) => { - return Err(format!("SOCKS4 does not support IPv6: {}", addr).into()); + return Err(format!("SOCKS4 does not support IPv6: {addr}").into()); } } self.server_outbuf.extend(ip_vec); @@ -136,7 +136,7 @@ impl SocksProxyImpl { let response = handshake::Response::retrieve_from_stream(&mut self.server_inbuf.clone()); if let Err(e) = response { if e.kind() == std::io::ErrorKind::UnexpectedEof { - log::trace!("receive_server_hello_socks5 needs more data \"{}\"...", e); + log::trace!("receive_server_hello_socks5 needs more data \"{e}\"..."); return Ok(()); } else { return Err(e); @@ -181,7 +181,7 @@ impl SocksProxyImpl { let response = Response::retrieve_from_stream(&mut self.server_inbuf.clone()); if let Err(e) = response { if e.kind() == std::io::ErrorKind::UnexpectedEof { - log::trace!("receive_auth_data needs more data \"{}\"...", e); + log::trace!("receive_auth_data needs more data \"{e}\"..."); return Ok(()); } else { return Err(e); @@ -213,7 +213,7 @@ impl SocksProxyImpl { let response = protocol::Response::retrieve_from_stream(&mut self.server_inbuf.clone()); if let Err(e) = response { if e.kind() == std::io::ErrorKind::UnexpectedEof { - log::trace!("receive_connection_status needs more data \"{}\"...", e); + log::trace!("receive_connection_status needs more data \"{e}\"..."); return Ok(()); } else { return Err(e); diff --git a/src/traffic_status.rs b/src/traffic_status.rs index 3117a22..9f32b7c 100644 --- a/src/traffic_status.rs +++ b/src/traffic_status.rs @@ -5,7 +5,7 @@ use std::sync::{LazyLock, Mutex}; /// # Safety /// /// set traffic status callback. -#[no_mangle] +#[unsafe(no_mangle)] pub unsafe extern "C" fn tun2proxy_set_traffic_status_callback( send_interval_secs: u32, callback: Option, @@ -34,7 +34,7 @@ struct TrafficStatusCallback(Option> = LazyLock::new(|| Mutex: pub(crate) fn traffic_status_update(delta_tx: usize, delta_rx: usize) -> Result<()> { { let is_none_or_error = TRAFFIC_STATUS_CALLBACK.lock().map(|guard| guard.is_none()).unwrap_or_else(|e| { - log::error!("Failed to acquire lock: {}", e); + log::error!("Failed to acquire lock: {e}"); true }); if is_none_or_error { diff --git a/src/udpgw.rs b/src/udpgw.rs new file mode 100644 index 0000000..24edaad --- /dev/null +++ b/src/udpgw.rs @@ -0,0 +1,578 @@ +use crate::error::Result; +use socks5_impl::protocol::{Address, AsyncStreamOperation, BufMut, StreamOperation}; +use std::{collections::VecDeque, hash::Hash, net::SocketAddr, sync::atomic::Ordering::Relaxed}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{ + TcpStream, + tcp::{OwnedReadHalf, OwnedWriteHalf}, + }, + sync::Mutex, + time::{Duration, sleep}, +}; + +pub(crate) const UDPGW_LENGTH_FIELD_SIZE: usize = std::mem::size_of::(); +pub(crate) const UDPGW_MAX_CONNECTIONS: usize = 5; +pub(crate) const UDPGW_KEEPALIVE_TIME: tokio::time::Duration = std::time::Duration::from_secs(30); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct UdpFlag(pub u8); + +impl UdpFlag { + pub const ZERO: UdpFlag = UdpFlag(0x00); + pub const KEEPALIVE: UdpFlag = UdpFlag(0x01); + pub const ERR: UdpFlag = UdpFlag(0x20); + pub const DATA: UdpFlag = UdpFlag(0x02); +} + +impl std::fmt::Display for UdpFlag { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let flag = match self.0 { + 0x00 => "ZERO", + 0x01 => "KEEPALIVE", + 0x20 => "ERR", + 0x02 => "DATA", + n => return write!(f, "Unknown UdpFlag(0x{n:02X})"), + }; + write!(f, "{flag}") + } +} + +impl std::ops::BitAnd for UdpFlag { + type Output = Self; + fn bitand(self, rhs: Self) -> Self::Output { + UdpFlag(self.0 & rhs.0) + } +} + +impl std::ops::BitOr for UdpFlag { + type Output = Self; + fn bitor(self, rhs: Self) -> Self::Output { + UdpFlag(self.0 | rhs.0) + } +} + +/// UDP Gateway Packet Format +/// +/// The format is referenced from SOCKS5 packet format, with additional flags and connection ID fields. +/// +/// `LEN`: This field is indicated the length of the packet, not including the length field itself. +/// +/// `FLAGS`: This field is used to indicate the packet type. The flags are defined as follows: +/// - `0x01`: Keepalive packet without address and data +/// - `0x20`: Error packet without address and data +/// - `0x02`: Data packet with address and data +/// +/// `CONN_ID`: This field is used to indicate the unique connection ID for the packet. +/// +/// `ATYP` & `DST.ADDR` & `DST.PORT`: This fields are used to indicate the remote address and port. +/// It can be either an IPv4 address, an IPv6 address, or a domain name, depending on the `ATYP` field. +/// The address format directly uses the address format of the [SOCKS5](https://datatracker.ietf.org/doc/html/rfc1928#section-4) protocol. +/// - `ATYP`: Address Type, 1 byte, indicating the type of address ( 0x01-IPv4, 0x04-IPv6, or 0x03-domain name ) +/// - `DST.ADDR`: Destination Address. If `ATYP` is 0x01 or 0x04, it is 4 or 16 bytes of IP address; +/// If `ATYP` is 0x03, it is a domain name, `DST.ADDR` is a variable length field, +/// it begins with a 1-byte length field and then the domain name without null-termination, +/// since the length field is 1 byte, the maximum length of the domain name is 255 bytes. +/// - `DST.PORT`: Destination Port, 2 bytes, the port number of the destination address. +/// +/// `DATA`: The data field, a variable length field, the length is determined by the `LEN` field. +/// +/// All the digits fields are in big-endian byte order. +/// +/// ```plain +/// +-----+ +-------+---------+ +------+----------+----------+ +----------+ +/// | LEN | | FLAGS | CONN_ID | | ATYP | DST.ADDR | DST.PORT | | DATA | +/// +-----+ +-------+---------+ +------+----------+----------+ +----------+ +/// | 2 | | 1 | 2 | | 1 | Variable | 2 | | Variable | +/// +-----+ +-------+---------+ +------+----------+----------+ +----------+ +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Packet { + pub header: UdpgwHeader, + pub address: Option
, + pub data: Vec, +} + +impl std::fmt::Display for Packet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let addr = self.address.as_ref().map_or("None".to_string(), |addr| addr.to_string()); + let len = self.data.len(); + write!(f, "Packet {{ {}, address: {}, payload length: {} }}", self.header, addr, len) + } +} + +impl From for Vec { + fn from(packet: Packet) -> Vec { + (&packet).into() + } +} + +impl From<&Packet> for Vec { + fn from(packet: &Packet) -> Vec { + let mut bytes: Vec = vec![]; + packet.write_to_buf(&mut bytes); + bytes + } +} + +impl TryFrom<&[u8]> for Packet { + type Error = std::io::Error; + + fn try_from(value: &[u8]) -> std::result::Result { + if value.len() < UDPGW_LENGTH_FIELD_SIZE { + return Err(std::io::ErrorKind::InvalidData.into()); + } + let mut iter = std::io::Cursor::new(value); + use tokio_util::bytes::Buf; + let length = iter.get_u16(); + if value.len() < length as usize + UDPGW_LENGTH_FIELD_SIZE { + return Err(std::io::ErrorKind::InvalidData.into()); + } + let header = UdpgwHeader::retrieve_from_stream(&mut iter)?; + let address = if header.flags & UdpFlag::DATA != UdpFlag::ZERO { + Some(Address::retrieve_from_stream(&mut iter)?) + } else { + None + }; + Ok(Packet::new(header, address, iter.chunk())) + } +} + +impl Packet { + pub fn new(header: UdpgwHeader, address: Option
, data: &[u8]) -> Self { + let data = data.to_vec(); + Packet { header, address, data } + } + + pub fn build_keepalive_packet(conn_id: u16) -> Self { + Packet::new(UdpgwHeader::new(UdpFlag::KEEPALIVE, conn_id), None, &[]) + } + + pub fn build_error_packet(conn_id: u16) -> Self { + Packet::new(UdpgwHeader::new(UdpFlag::ERR, conn_id), None, &[]) + } + + pub fn build_packet_from_address(conn_id: u16, remote_addr: &Address, data: &[u8]) -> std::io::Result { + use socks5_impl::protocol::Address::{DomainAddress, SocketAddress}; + let packet = match remote_addr { + SocketAddress(addr) => Packet::build_ip_packet(conn_id, *addr, data), + DomainAddress(domain, port) => Packet::build_domain_packet(conn_id, *port, domain, data)?, + }; + Ok(packet) + } + + pub fn build_ip_packet(conn_id: u16, remote_addr: SocketAddr, data: &[u8]) -> Self { + let addr: Address = remote_addr.into(); + Packet::new(UdpgwHeader::new(UdpFlag::DATA, conn_id), Some(addr), data) + } + + pub fn build_domain_packet(conn_id: u16, port: u16, domain: &str, data: &[u8]) -> std::io::Result { + if domain.len() > 255 { + return Err(std::io::ErrorKind::InvalidInput.into()); + } + let addr = Address::from((domain, port)); + Ok(Packet::new(UdpgwHeader::new(UdpFlag::DATA, conn_id), Some(addr), data)) + } +} + +impl StreamOperation for Packet { + fn retrieve_from_stream(stream: &mut R) -> std::io::Result + where + R: std::io::Read, + Self: Sized, + { + let mut buf = [0; UDPGW_LENGTH_FIELD_SIZE]; + stream.read_exact(&mut buf)?; + let length = u16::from_be_bytes(buf) as usize; + let header = UdpgwHeader::retrieve_from_stream(stream)?; + let address = if header.flags & UdpFlag::DATA == UdpFlag::DATA { + Some(Address::retrieve_from_stream(stream)?) + } else { + None + }; + let read_len = header.len() + address.as_ref().map_or(0, |addr| addr.len()); + if length < read_len { + return Err(std::io::ErrorKind::InvalidData.into()); + } + let mut data = vec![0; length - read_len]; + stream.read_exact(&mut data)?; + Ok(Packet::new(header, address, &data)) + } + + fn write_to_buf(&self, buf: &mut B) { + let len = self.len() - UDPGW_LENGTH_FIELD_SIZE; + buf.put_u16(len as u16); + self.header.write_to_buf(buf); + if let Some(addr) = &self.address { + addr.write_to_buf(buf); + } + buf.put_slice(&self.data); + } + + fn len(&self) -> usize { + UDPGW_LENGTH_FIELD_SIZE + self.header.len() + self.address.as_ref().map_or(0, |addr| addr.len()) + self.data.len() + } +} + +#[async_trait::async_trait] +impl AsyncStreamOperation for Packet { + async fn retrieve_from_async_stream(r: &mut R) -> std::io::Result + where + R: tokio::io::AsyncRead + Unpin + Send + ?Sized, + Self: Sized, + { + let mut buf = [0; UDPGW_LENGTH_FIELD_SIZE]; + r.read_exact(&mut buf).await?; + let length = u16::from_be_bytes(buf) as usize; + let header = UdpgwHeader::retrieve_from_async_stream(r).await?; + let address = if header.flags & UdpFlag::DATA == UdpFlag::DATA { + Some(Address::retrieve_from_async_stream(r).await?) + } else { + None + }; + let read_len = header.len() + address.as_ref().map_or(0, |addr| addr.len()); + if length < read_len { + return Err(std::io::ErrorKind::InvalidData.into()); + } + let mut data = vec![0; length - read_len]; + r.read_exact(&mut data).await?; + Ok(Packet::new(header, address, &data)) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct UdpgwHeader { + pub flags: UdpFlag, + pub conn_id: u16, +} + +impl std::fmt::Display for UdpgwHeader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} conn_id: {}", self.flags, self.conn_id) + } +} + +impl StreamOperation for UdpgwHeader { + fn retrieve_from_stream(stream: &mut R) -> std::io::Result + where + R: std::io::Read, + Self: Sized, + { + let mut buf = [0; UdpgwHeader::static_len()]; + stream.read_exact(&mut buf)?; + UdpgwHeader::try_from(&buf[..]) + } + + fn write_to_buf(&self, buf: &mut B) { + let bytes: Vec = self.into(); + buf.put_slice(&bytes); + } + + fn len(&self) -> usize { + Self::static_len() + } +} + +#[async_trait::async_trait] +impl AsyncStreamOperation for UdpgwHeader { + async fn retrieve_from_async_stream(r: &mut R) -> std::io::Result + where + R: tokio::io::AsyncRead + Unpin + Send + ?Sized, + Self: Sized, + { + let mut buf = [0; UdpgwHeader::static_len()]; + r.read_exact(&mut buf).await?; + UdpgwHeader::try_from(&buf[..]) + } +} + +impl UdpgwHeader { + pub fn new(flags: UdpFlag, conn_id: u16) -> Self { + UdpgwHeader { flags, conn_id } + } + + pub const fn static_len() -> usize { + std::mem::size_of::() + std::mem::size_of::() + } +} + +impl TryFrom<&[u8]> for UdpgwHeader { + type Error = std::io::Error; + + fn try_from(value: &[u8]) -> std::result::Result { + if value.len() < UdpgwHeader::static_len() { + return Err(std::io::ErrorKind::InvalidData.into()); + } + let conn_id = u16::from_be_bytes([value[1], value[2]]); + Ok(UdpgwHeader::new(UdpFlag(value[0]), conn_id)) + } +} + +impl From<&UdpgwHeader> for Vec { + fn from(header: &UdpgwHeader) -> Vec { + let mut bytes = vec![0; header.len()]; + bytes[0] = header.flags.0; + bytes[1..3].copy_from_slice(&header.conn_id.to_be_bytes()); + bytes + } +} + +#[allow(dead_code)] +#[derive(Debug)] +pub(crate) enum UdpGwResponse { + KeepAlive, + Error, + TcpClose, + Data(Packet), +} + +impl std::fmt::Display for UdpGwResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + UdpGwResponse::KeepAlive => write!(f, "KeepAlive"), + UdpGwResponse::Error => write!(f, "Error"), + UdpGwResponse::TcpClose => write!(f, "TcpClose"), + UdpGwResponse::Data(packet) => write!(f, "Data({packet})"), + } + } +} + +static SERIAL_NUMBER: std::sync::atomic::AtomicU16 = std::sync::atomic::AtomicU16::new(1); + +#[derive(Debug)] +pub(crate) struct UdpGwClientStream { + local_addr: SocketAddr, + writer: Option, + reader: Option, + closed: bool, + last_activity: std::time::Instant, + serial_number: u16, +} + +impl UdpGwClientStream { + pub fn close(&mut self) { + self.closed = true; + } + + pub fn get_reader(&mut self) -> Option { + self.reader.take() + } + + pub fn set_reader(&mut self, reader: Option) { + self.reader = reader; + } + + pub fn set_writer(&mut self, writer: Option) { + self.writer = writer; + } + + pub fn get_writer(&mut self) -> Option { + self.writer.take() + } + + pub fn local_addr(&self) -> SocketAddr { + self.local_addr + } + + pub fn update_activity(&mut self) { + self.last_activity = std::time::Instant::now(); + } + + pub fn is_closed(&self) -> bool { + self.closed + } + + pub fn serial_number(&self) -> u16 { + self.serial_number + } + + pub fn new(tcp_server_stream: TcpStream) -> Self { + let default = "0.0.0.0:0".parse::().unwrap(); + let local_addr = tcp_server_stream.local_addr().unwrap_or(default); + let (reader, writer) = tcp_server_stream.into_split(); + let serial_number = SERIAL_NUMBER.fetch_add(1, Relaxed); + UdpGwClientStream { + local_addr, + reader: Some(reader), + writer: Some(writer), + last_activity: std::time::Instant::now(), + closed: false, + serial_number, + } + } +} + +#[derive(Debug)] +pub(crate) struct UdpGwClient { + udp_mtu: u16, + max_connections: usize, + udp_timeout: u64, + keepalive_time: Duration, + udpgw_server: SocketAddr, + server_connections: Mutex>, +} + +impl UdpGwClient { + pub fn new(udp_mtu: u16, max_connections: usize, keepalive_time: Duration, udp_timeout: u64, udpgw_server: SocketAddr) -> Self { + let server_connections = Mutex::new(VecDeque::with_capacity(max_connections)); + UdpGwClient { + udp_mtu, + max_connections, + udp_timeout, + udpgw_server, + keepalive_time, + server_connections, + } + } + + pub(crate) fn get_udp_mtu(&self) -> u16 { + self.udp_mtu + } + + pub(crate) fn get_udp_timeout(&self) -> u64 { + self.udp_timeout + } + + pub(crate) async fn pop_server_connection_from_queue(&self) -> Option { + self.server_connections.lock().await.pop_front() + } + + pub(crate) async fn store_server_connection(&self, stream: UdpGwClientStream) { + if self.server_connections.lock().await.len() < self.max_connections { + self.server_connections.lock().await.push_back(stream); + } + } + + pub(crate) async fn store_server_connection_full(&self, mut stream: UdpGwClientStream, reader: OwnedReadHalf, writer: OwnedWriteHalf) { + if self.server_connections.lock().await.len() < self.max_connections { + stream.set_reader(Some(reader)); + stream.set_writer(Some(writer)); + self.server_connections.lock().await.push_back(stream); + } + } + + pub(crate) fn get_udpgw_server_addr(&self) -> SocketAddr { + self.udpgw_server + } + + /// Heartbeat task asynchronous function to periodically check and maintain the active state of the server connection. + pub(crate) async fn heartbeat_task(&self) -> std::io::Result<()> { + loop { + sleep(self.keepalive_time).await; + let mut streams = Vec::new(); + + while let Some(stream) = self.pop_server_connection_from_queue().await { + if !stream.is_closed() { + streams.push(stream); + } + } + + let (mut tx, mut rx) = (0, 0); + + for mut stream in streams { + if stream.last_activity.elapsed() < self.keepalive_time { + self.store_server_connection(stream).await; + continue; + } + + let Some(mut stream_reader) = stream.get_reader() else { + continue; + }; + + let Some(mut stream_writer) = stream.get_writer() else { + continue; + }; + let local_addr = stream_writer.local_addr()?; + let sn = stream.serial_number(); + let keepalive_packet: Vec = Packet::build_keepalive_packet(sn).into(); + tx += keepalive_packet.len(); + if let Err(e) = stream_writer.write_all(&keepalive_packet).await { + log::warn!("stream {sn} {local_addr:?} send keepalive failed: {e}"); + continue; + } + match UdpGwClient::recv_udpgw_packet(self.udp_mtu, self.udp_timeout, &mut stream_reader).await { + Ok((len, UdpGwResponse::KeepAlive)) => { + stream.update_activity(); + self.store_server_connection_full(stream, stream_reader, stream_writer).await; + log::trace!("stream {sn} {local_addr:?} send keepalive and recieve it successfully"); + rx += len; + } + Ok((len, v)) => { + log::debug!("stream {sn} {local_addr:?} keepalive unexpected response: {v}"); + rx += len; + } + Err(e) => log::debug!("stream {sn} {local_addr:?} keepalive no response, error \"{e}\""), + } + } + crate::traffic_status::traffic_status_update(tx, rx)?; + } + } + + /// Parses the UDP response data. + pub(crate) fn parse_udp_response(udp_mtu: u16, packet: Packet) -> Result { + let flags = packet.header.flags; + if flags & UdpFlag::ERR == UdpFlag::ERR { + return Ok(UdpGwResponse::Error); + } + if flags & UdpFlag::KEEPALIVE == UdpFlag::KEEPALIVE { + return Ok(UdpGwResponse::KeepAlive); + } + if packet.data.len() > udp_mtu as usize { + return Err("too much data".into()); + } + Ok(UdpGwResponse::Data(packet)) + } + + /// Receives a UDP gateway packet. + /// + /// This function is responsible for receiving packets from the UDP gateway + /// + /// # Arguments + /// - `udp_mtu`: The maximum transmission unit size for UDP packets. + /// - `udp_timeout`: The timeout in seconds for receiving UDP packets. + /// - `stream`: A mutable reference to the UDP gateway client stream reader. + /// + /// # Returns + /// - `Result`: Returns a result type containing the parsed UDP gateway response, or an error if one occurs. + pub(crate) async fn recv_udpgw_packet(udp_mtu: u16, udp_timeout: u64, stream: &mut OwnedReadHalf) -> Result<(usize, UdpGwResponse)> { + let packet = tokio::time::timeout( + tokio::time::Duration::from_secs(udp_timeout + 2), + Packet::retrieve_from_async_stream(stream), + ) + .await + .map_err(std::io::Error::from)??; + Ok((packet.len(), UdpGwClient::parse_udp_response(udp_mtu, packet)?)) + } + + /// Sends a UDP gateway packet. + /// + /// This function constructs and sends a UDP gateway packet based on the IPv6 enabled status, data length, + /// remote address, domain (if any), connection ID, and the UDP gateway client writer stream. + /// + /// # Arguments + /// + /// * `ipv6_enabled` - Whether IPv6 is enabled + /// * `data` - The data packet + /// * `remote_addr` - Remote address + /// * `conn_id` - Connection ID + /// * `stream` - UDP gateway client writer stream + /// + /// # Returns + /// + /// Returns `Ok(())` if the packet is sent successfully, otherwise returns an error. + pub(crate) async fn send_udpgw_packet( + ipv6_enabled: bool, + data: &[u8], + remote_addr: &socks5_impl::protocol::Address, + conn_id: u16, + stream: &mut OwnedWriteHalf, + ) -> Result<()> { + if !ipv6_enabled && remote_addr.get_type() == socks5_impl::protocol::AddressType::IPv6 { + return Err("ipv6 not support".into()); + } + let out_data: Vec = Packet::build_packet_from_address(conn_id, remote_addr, data)?.into(); + stream.write_all(&out_data).await?; + + Ok(()) + } +} diff --git a/src/virtual_dns.rs b/src/virtual_dns.rs index 8dae2a7..7794d37 100644 --- a/src/virtual_dns.rs +++ b/src/virtual_dns.rs @@ -1,5 +1,5 @@ use crate::error::Result; -use hashlink::{linked_hash_map::RawEntryMut, LruCache}; +use hashlink::{LruCache, linked_hash_map::RawEntryMut}; use std::{ collections::HashMap, convert::TryInto, diff --git a/src/win_svc.rs b/src/win_svc.rs index afca2f1..2fed698 100644 --- a/src/win_svc.rs +++ b/src/win_svc.rs @@ -16,7 +16,7 @@ fn my_service_main(arguments: Vec) { // `service_dispatcher::start` from `main`. if let Err(_e) = run_service(arguments) { - log::error!("Error: {:?}", _e); + log::error!("Error: {_e:?}"); } } @@ -73,13 +73,21 @@ fn run_service(_arguments: Vec) -> Result<(), crate::BoxErro let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; rt.block_on(async { unsafe extern "C" fn traffic_cb(status: *const crate::TrafficStatus, _: *mut std::ffi::c_void) { - let status = &*status; + let status = unsafe { &*status }; log::debug!("Traffic: ▲ {} : ▼ {}", status.tx, status.rx); } unsafe { crate::tun2proxy_set_traffic_status_callback(1, Some(traffic_cb), std::ptr::null_mut()) }; - if let Err(err) = crate::desktop_run_async(args, shutdown_token).await { - log::error!("main loop error: {}", err); + let ret = crate::general_run_async(args.clone(), tun::DEFAULT_MTU, false, shutdown_token).await; + match &ret { + Ok(sessions) => { + if args.exit_on_fatal_error && *sessions >= args.max_sessions { + log::error!("Forced exit due to max sessions reached ({sessions}/{})", args.max_sessions); + std::process::exit(-1); + } + log::debug!("tun2proxy exited normally, current sessions: {sessions}"); + } + Err(err) => log::error!("main loop error: {err}"), } Ok::<(), crate::Error>(()) })?; diff --git a/tests/requirements.txt b/tests/requirements.txt index d44fe44..e0fed8e 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,2 +1,3 @@ requests -python-dotenv \ No newline at end of file +python-dotenv +psutil \ No newline at end of file diff --git a/tests/tests.py b/tests/tests.py index 3ff9dff..7175f64 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -4,6 +4,7 @@ import os import subprocess import time import unittest +import psutil import dotenv import requests @@ -33,6 +34,14 @@ def get_tool_path(): default = default[0] if len(default) > 0 else 'tun2proxy-bin' return os.environ.get('TOOL_PATH', default) +def sudo_kill_process_and_children(proc): + try: + for child in psutil.Process(proc.pid).children(recursive=True): + if child.name() == 'tun2proxy-bin': + subprocess.run(['sudo', 'kill', str(child.pid)]) + subprocess.run(['sudo', 'kill', str(proc.pid)]) + except psutil.NoSuchProcess: + pass class Tun2ProxyTest(unittest.TestCase): @staticmethod @@ -49,6 +58,7 @@ class Tun2ProxyTest(unittest.TestCase): except Exception as e: raise e finally: + sudo_kill_process_and_children(p) p.terminate() p.wait()