Browse Source

initial commit

master
Jonathan Cobb 3 years ago
commit
026d021cdc
6 changed files with 1708 additions and 0 deletions
  1. +2
    -0
      .gitignore
  2. +1309
    -0
      Cargo.lock
  3. +24
    -0
      Cargo.toml
  4. +6
    -0
      first_time_ubuntu.sh
  5. +182
    -0
      src/lib.rs
  6. +185
    -0
      src/main.rs

+ 2
- 0
.gitignore View File

@@ -0,0 +1,2 @@
/target
.idea

+ 1309
- 0
Cargo.lock
File diff suppressed because it is too large
View File


+ 24
- 0
Cargo.toml View File

@@ -0,0 +1,24 @@
# Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://bubblev.com/bubble-license/
[package]
name = "bubble-flexrouter"
version = "0.1.0"
authors = ["Jonathan Cobb <jonathan@getbubblenow.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = "2.33.0"
futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2.1"
hyper = "0.13.7"
lru = "0.6.0"
os_info = { version = "2.0.8", default-features = false }
pnet = "0.26.0"
tokio = { version = "0.2.22", features = ["full"] }
trust-dns-resolver = "0.19.5"

[profile.release]
panic = 'abort'

+ 6
- 0
first_time_ubuntu.sh View File

@@ -0,0 +1,6 @@
#!/bin/bash

sudo apt install -y gcc curl libssl-dev pkg-config

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh


+ 182
- 0
src/lib.rs View File

@@ -0,0 +1,182 @@
use std::net::SocketAddr;
use std::process::{Command, Stdio};
use std::sync::Arc;

use hyper::{Body, Client, Method, Request, Response, Server};

use lru::LruCache;

use os_info::{Info, Type};

use tokio::sync::Mutex;

use trust_dns_resolver::{Resolver, AsyncResolver};
use trust_dns_resolver::TokioAsyncResolver;
use trust_dns_resolver::config::*;
use trust_dns_resolver::lookup_ip::LookupIp;
use trust_dns_resolver::name_server::GenericConnection;

pub async fn create_resolver (dns1_sock : SocketAddr, dns2_sock : SocketAddr) -> TokioAsyncResolver {
let mut resolver_config : ResolverConfig = ResolverConfig::new();

resolver_config.add_name_server(NameServerConfig {
socket_addr: dns1_sock,
protocol: Protocol::Udp,
tls_dns_name: None
});
resolver_config.add_name_server(NameServerConfig {
socket_addr: dns1_sock,
protocol: Protocol::Tcp,
tls_dns_name: None
});
resolver_config.add_name_server(NameServerConfig {
socket_addr: dns2_sock,
protocol: Protocol::Udp,
tls_dns_name: None
});
resolver_config.add_name_server(NameServerConfig {
socket_addr: dns2_sock,
protocol: Protocol::Tcp,
tls_dns_name: None
});
TokioAsyncResolver::tokio(resolver_config, ResolverOpts::default()).await.unwrap()
}

fn chop_newline (output : String) -> String {
let mut data : String = output.clone();
let newline = data.find("\n");
return if newline.is_some() {
data.truncate(newline.unwrap());
data
} else {
data
}
}

pub fn ip_gateway() -> String {
let info : Info = os_info::get();
let ostype : Type = info.os_type();
return if ostype == Type::Windows {
let output = Command::new("C:\\Windows\\System32\\cmd.exe")
.stdin(Stdio::null())
.arg("/c")
.arg("route").arg("print").arg("0.0.0.0")
.arg("|").arg("findstr").arg("/L").arg("/C:0.0.0.0")
.output().unwrap().stdout;
let data = String::from_utf8(output).unwrap();
let mut parts = data.split_ascii_whitespace();
parts.next();
parts.next();
chop_newline(String::from(parts.next().unwrap()))

} else if ostype == Type::Macos {
let output = Command::new("/bin/sh")
.stdin(Stdio::null())
.arg("-c")
.arg("netstat -rn | grep -m 1 default | cut -d' ' -f2")
.output().unwrap().stdout;
chop_newline(String::from_utf8(output).unwrap())

} else {
let output = Command::new("/bin/sh")
.stdin(Stdio::null())
.arg("-c")
.arg("ip route show | grep -m 1 default | cut -d' ' -f3")
.output().unwrap().stdout;
chop_newline(String::from_utf8(output).unwrap())
}
}

pub async fn resolve_with_cache(host : &str,
resolver : &TokioAsyncResolver,
mut resolver_cache: Arc<Mutex<LruCache<String, String>>>) -> String {

let host_string = String::from(host);
let mut guard = resolver_cache.lock().await;
let found = (*guard).get(&host_string);

if found.is_none() {
println!("resolve_with_cache: host={} not in cache, resolving...", host_string);
let resolved_ip = format!("{}", resolver.lookup_ip(host).await.unwrap().iter().next().unwrap());
(*guard).put(host_string, resolved_ip.to_string());
resolved_ip
} else {
let found = found.unwrap();
println!("resolve_with_cache: host={} found in cache, returning: {}", host_string, found);
String::from(found)
}
}

pub fn needs_static_route(ip_string : &String) -> bool {
println!("needs_static_route: checking ip={:?}", ip_string);
let info : Info = os_info::get();
let ostype : Type = info.os_type();
let output = if ostype == Type::Windows {
Command::new("C:\\Windows\\System32\\cmd.exe")
.stdin(Stdio::null())
.arg("/c")
.arg("route").arg("print").arg(ip_string)
.arg("|")
.arg("findstr").arg("/L").arg("/C:\"Network Destination\"")
.output().unwrap().stdout

} else if ostype == Type::Macos {
Command::new("/bin/sh")
.stdin(Stdio::null())
.arg("-c")
.arg(format!("netstat -rn | egrep -m 1 \"^{}\"", ip_string))
.output().unwrap().stdout

} else {
Command::new("/bin/sh")
.stdin(Stdio::null())
.arg("-c")
.arg(format!("ip route show | egrep -m 1 \"^{}\" | cut -d' ' -f3", ip_string))
.output().unwrap().stdout
};
let data = String::from_utf8(output).unwrap();
let mut parts = data.split_ascii_whitespace();
let first_part = parts.next();
first_part.is_none() || first_part.unwrap().len() == 0
}

pub fn create_static_route(gateway : &String, ip_string : &String) -> bool {
println!("create_static_route: creating: gateway={}, ip={}", gateway, ip_string);
let info : Info = os_info::get();
let ostype : Type = info.os_type();
let output = if ostype == Type::Windows {
Command::new("C:\\Windows\\System32\\cmd.exe")
.stdin(Stdio::null())
.arg("/c")
.arg("route").arg("add").arg(ip_string).arg(gateway)
.output().unwrap().stderr

} else if ostype == Type::Macos {
Command::new("/bin/sh")
.stdin(Stdio::null())
.arg("-c")
.arg(format!("sudo route -n add {} {}", ip_string, gateway))
.output().unwrap().stderr

} else {
Command::new("/bin/sh")
.stdin(Stdio::null())
.arg("-c")
.arg(format!("sudo ip route add {} via {}", ip_string, gateway))
.output().unwrap().stderr
};
let data = String::from_utf8(output).unwrap();
let mut parts = data.split_ascii_whitespace();
let first_part = parts.next();
let ok = first_part.is_none() || first_part.unwrap().len() == 0;
if !ok {
println!("create_static_route: error creating route to {}: {}", ip_string, data);
}
ok
}

pub fn bad_request (message : &str) -> Result<Response<Body>, hyper::Error> {
let mut resp = Response::new(Body::from(String::from(message)));
*resp.status_mut() = http::StatusCode::BAD_REQUEST;
return Ok(resp);
}

+ 185
- 0
src/main.rs View File

@@ -0,0 +1,185 @@
// #![deny(warnings)]

extern crate lru;

use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;

use clap::{Arg, ArgMatches, App};

use futures_util::future::try_join;

use hyper::service::{make_service_fn, service_fn};
use hyper::upgrade::Upgraded;
use hyper::{Body, Client, Method, Request, Response, Server};

use lru::LruCache;

use tokio::net::TcpStream;
use tokio::sync::Mutex;

use trust_dns_resolver::TokioAsyncResolver;

use bubble_flexrouter::*;

type HttpClient = Client<hyper::client::HttpConnector>;

// To try this example:
// 1. cargo run --example http_proxy
// 2. config http_proxy in command line
// $ export http_proxy=http://127.0.0.1:8100
// $ export https_proxy=http://127.0.0.1:8100
// 3. send requests
// $ curl -i https://www.some_domain.com/
#[tokio::main]
async fn main() {
let client = HttpClient::new();
let gateway = Arc::new(ip_gateway());

let args : ArgMatches = App::new("bubble-flexrouter")
.version("0.1.0")
.author("Jonathan Cobb <jonathan@getbubblenow.com>")
.about("Proxy services for Bubble nodes")
.arg(Arg::with_name("dns1")
.short("d")
.long("dns1")
.value_name("IP_ADDRESS")
.help("Primary DNS server")
.default_value("1.1.1.1")
.takes_value(true))
.arg(Arg::with_name("dns2")
.short("e")
.long("dns2")
.value_name("IP_ADDRESS")
.help("Secondary DNS server")
.default_value("1.0.0.1")
.takes_value(true))
.get_matches();

let dns1_sock : SocketAddr = format!("{}:53", args.value_of("dns1").unwrap()).parse().unwrap();
let dns2_sock : SocketAddr = format!("{}:53", args.value_of("dns2").unwrap()).parse().unwrap();
let resolver = Arc::new(create_resolver(dns1_sock, dns2_sock).await);
let addr = SocketAddr::from(([127, 0, 0, 1], 8100));

let resolver_cache = Arc::new(Mutex::new(LruCache::new(1000)));

let make_service = make_service_fn(move |_| {
let client = client.clone();
let gateway = gateway.clone();
let resolver = resolver.clone();
let resolver_cache = resolver_cache.clone();
async move {
Ok::<_, Infallible>(service_fn(
move |req| proxy(
client.clone(),
gateway.clone(),
resolver.clone(),
resolver_cache.clone(),
req)
))
}
});

let server = Server::bind(&addr).serve(make_service);

println!("Listening on http://{}", addr);

if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}

async fn proxy(client: HttpClient,
gateway: Arc<String>,
resolver: Arc<TokioAsyncResolver>,
resolver_cache: Arc<Mutex<LruCache<String, String>>>,
req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let host = req.uri().host();
if host.is_none() {
return bad_request("No host!");
}
let host = host.unwrap();
let ip_string = resolve_with_cache(host, &resolver, resolver_cache).await;
println!("req(host {} resolved to: {}): {:?}", host, ip_string, req);

if needs_static_route(&ip_string) {
if !create_static_route(&gateway, &ip_string) {
return bad_request(format!("Error: error creating static route to {:?}", ip_string).as_str());
}
}

if Method::CONNECT == req.method() {
// Received an HTTP request like:
// ```
// CONNECT www.domain.com:443 HTTP/1.1
// Host: www.domain.com:443
// Proxy-Connection: Keep-Alive
// ```
//
// When HTTP method is CONNECT we should return an empty body
// then we can eventually upgrade the connection and talk a new protocol.
//
// Note: only after client received an empty body with STATUS_OK can the
// connection be upgraded, so we can't return a response inside
// `on_upgrade` future.
if let Some(addr) = host_addr(req.uri()) {
tokio::task::spawn(async move {
match req.into_body().on_upgrade().await {
Ok(upgraded) => {
if let Err(e) = tunnel(upgraded, addr).await {
eprintln!("server io error: {}", e);
};
}
Err(e) => eprintln!("upgrade error: {}", e),
}
});

Ok(Response::new(Body::empty()))
} else {
eprintln!("CONNECT host is not socket addr: {:?}", req.uri());
let mut resp = Response::new(Body::from("CONNECT must be to a socket address"));
*resp.status_mut() = http::StatusCode::BAD_REQUEST;

Ok(resp)
}
} else {
client.request(req).await
}
}

fn host_addr(uri: &http::Uri) -> Option<SocketAddr> {
uri.authority().and_then(|auth| auth.as_str().parse().ok())
}

// Create a TCP connection to host:port, build a tunnel between the connection and
// the upgraded connection
async fn tunnel(upgraded: Upgraded, addr: SocketAddr) -> std::io::Result<()> {
// Connect to remote server
let mut server = TcpStream::connect(addr).await?;

// Proxying data
let amounts = {
let (mut server_rd, mut server_wr) = server.split();
let (mut client_rd, mut client_wr) = tokio::io::split(upgraded);

let client_to_server = tokio::io::copy(&mut client_rd, &mut server_wr);
let server_to_client = tokio::io::copy(&mut server_rd, &mut client_wr);

try_join(client_to_server, server_to_client).await
};

// Print message when done
match amounts {
Ok((from_client, from_server)) => {
println!(
"client wrote {} bytes and received {} bytes",
from_client, from_server
);
}
Err(e) => {
println!("tunnel error: {}", e);
}
};
Ok(())
}

Loading…
Cancel
Save