From d9cc0e9147e3b0429d4009cd5cd02bf16aaa7f01 Mon Sep 17 00:00:00 2001 From: Jonathan Cobb Date: Mon, 7 Sep 2020 00:46:05 -0400 Subject: [PATCH] WIP. remove ip as program arg, add to registration object. stop ssh server if registration is changing. --- src/admin.rs | 102 +++++++++++++++++++++++++++++++++++++++++---------- src/main.rs | 42 +++------------------ src/net.rs | 27 +++++++++++++- src/proxy.rs | 9 ++++- src/ssh.rs | 26 ++++++++++++- 5 files changed, 145 insertions(+), 61 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index d363516..37a09cb 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -20,13 +20,34 @@ use warp; use warp::{Filter}; use crate::pass::is_correct_password; -use crate::ssh::{spawn_ssh, SshContainer}; +use crate::ssh::{spawn_ssh, stop_ssh, SshContainer}; +use crate::net::is_valid_ip; #[derive(Debug, Deserialize, Serialize, Clone)] -struct AdminRegistration { +pub struct AdminRegistration { + password: Option, + session: Option, + bubble: Option, + ip: Option +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ValidAdminRegistration { password: String, session: String, - bubble: String + bubble: String, + ip: String +} + +impl AdminRegistration { + pub fn new () -> AdminRegistration { + AdminRegistration { + password: None, + session: None, + bubble: None, + ip: None + } + } } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -42,8 +63,8 @@ struct BubbleRegistrationResponse { host_key: String } -pub async fn start_admin (admin_port : u16, - proxy_ip : String, +pub async fn start_admin (admin_reg : Arc>>, + admin_port : u16, proxy_port : u16, password_hash: String, auth_token : Arc, @@ -55,7 +76,7 @@ pub async fn start_admin (admin_port : u16, let register = warp::path!("register") .and(warp::body::content_length_limit(1024 * 16)) .and(warp::body::json()) - .and(warp::any().map(move || proxy_ip.clone())) + .and(warp::any().map(move || admin_reg.clone())) .and(warp::any().map(move || proxy_port)) .and(warp::any().map(move || password_hash.clone())) .and(warp::any().map(move || auth_token.clone())) @@ -74,14 +95,31 @@ pub async fn start_admin (admin_port : u16, const HEADER_BUBBLE_SESSION: &'static str = "X-Bubble-Session"; async fn handle_register(registration : AdminRegistration, - proxy_ip: String, + admin_reg : Arc>>, proxy_port : u16, hashed_password : String, auth_token : Arc, ssh_priv_key : Arc, ssh_pub_key : Arc, ssh_container : Arc>) -> Result { - let pass_result = is_correct_password(registration.password, hashed_password); + // validate registration + let validated = validate_admin_registration(registration.clone()); + if validated.is_err() { + let err = validated.err(); + if err.is_some() { + let err = err.unwrap(); + error!("invalid request object: {:?}", err) + } else { + error!("invalid request object") + } + return Ok(warp::reply::with_status( + "invalid request object", + http::StatusCode::UNAUTHORIZED, + )); + } + let validated = validated.unwrap(); + + let pass_result = is_correct_password(validated.password, hashed_password); if pass_result.is_err() { error!("handle_register: error verifying password: {:?}", pass_result.err()); Ok(warp::reply::with_status( @@ -94,21 +132,30 @@ async fn handle_register(registration : AdminRegistration, http::StatusCode::UNAUTHORIZED, )) } else { - // try to register with bubble + // do we have a previous registration? + let bubble_registration; + { + let mut guard = admin_reg.lock().await; + if (*guard).is_some() { + // shut down previous tunnel + stop_ssh(ssh_container.clone()).await; + } - // create the registration object - let bubble_registration = BubbleRegistration { - key: ssh_pub_key.to_string(), - ip: proxy_ip, - auth_token: auth_token.to_string() - }; + // create the registration object + bubble_registration = BubbleRegistration { + key: ssh_pub_key.to_string(), + ip: validated.ip, + auth_token: auth_token.to_string() + }; + (*guard) = Some(registration); + } // PUT it and see if it worked let client = reqwest::Client::new(); - let url = format!("https://{}/api/me/flexRouters", registration.bubble); + let url = format!("https://{}/api/me/flexRouters", validated.bubble); debug!("handle_register registering ourself with {}, sending: {:?}", url, bubble_registration); match client.put(url.as_str()) - .header(HEADER_BUBBLE_SESSION, registration.session) + .header(HEADER_BUBBLE_SESSION, validated.session) .json(&bubble_registration) .send().await { Ok(response) => { @@ -128,10 +175,10 @@ async fn handle_register(registration : AdminRegistration, let reg_response: BubbleRegistrationResponse = reg_opt.unwrap(); info!("handle_register: parsed response object: {:?}", reg_response); let ssh_result = spawn_ssh( - ssh_container, + ssh_container.clone(), reg_response.port, proxy_port, - registration.bubble, + validated.bubble, reg_response.host_key, ssh_priv_key).await; if ssh_result.is_err() { @@ -175,4 +222,21 @@ async fn handle_register(registration : AdminRegistration, } } } +} + +pub fn validate_admin_registration(reg : AdminRegistration) -> Result{ + // validate ip + if reg.ip.is_none() || reg.password.is_none() || reg.bubble.is_none() || reg.session.is_none() { + return Err(String::from("required field not found")); + } + let ip = reg.ip.unwrap(); + if !is_valid_ip(&ip) { + return Err(String::from("ip was invalid")); + } + return Ok(ValidAdminRegistration { + password: reg.password.unwrap(), + session: reg.session.unwrap(), + bubble: reg.bubble.unwrap(), + ip + }) } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index bcb9021..2e7a9fa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,12 +24,11 @@ use futures_util::future::join; use log::{info, error}; -use pnet::datalink; +use tokio::sync::Mutex; use whoami; -use bubble_flexrouter::admin::start_admin; -use bubble_flexrouter::net::is_private_ip; +use bubble_flexrouter::admin::{AdminRegistration, start_admin}; use bubble_flexrouter::pass::init_password; use bubble_flexrouter::proxy::start_proxy; use bubble_flexrouter::util::read_required_env_var_argument; @@ -59,12 +58,6 @@ async fn main() { .help("Secondary DNS server") .default_value("1.0.0.1") .takes_value(true)) - .arg(Arg::with_name("proxy_ip") - .short("i") - .long("proxy-ip") - .value_name("IP_ADDRESS") - .help("IP address to listen for proxy connections, must be a private IP") - .takes_value(true)) .arg(Arg::with_name("proxy_port") .short("p") .long("proxy-port") @@ -140,33 +133,6 @@ async fn main() { let password_opt = args.value_of("password_env_var"); let password_hash = init_password(password_file.as_str(), password_opt); - let proxy_ip_opt = args.value_of("proxy_ip"); - if proxy_ip_opt.is_none() { - error!("main: proxy-ip argument is required"); - exit(2); - } - - let proxy_ip = proxy_ip_opt.unwrap(); - if !is_private_ip(proxy_ip.to_string()) { - error!("main: proxy IP must be a private IP address: {}", proxy_ip); - exit(2); - } - let mut proxy_bind_addr = None; - for iface in datalink::interfaces() { - if iface.is_loopback() { continue; } - if !iface.is_up() { continue; } - for ip in iface.ips { - if ip.ip().to_string().eq(proxy_ip) { - proxy_bind_addr = Some(ip); - } - break; - } - } - if proxy_bind_addr.is_none() { - error!("main: Could not find IP for binding: {}", proxy_ip); - exit(2); - } - let admin_port = args.value_of("admin_port").unwrap().parse::().unwrap(); let dns1_ip = args.value_of("dns1").unwrap(); let dns2_ip = args.value_of("dns2").unwrap(); @@ -199,9 +165,11 @@ async fn main() { } let auth_token = Arc::new(String::from(auth_token_val)); + let admin_reg: Arc>> = Arc::new(Mutex::new(None)); + let admin = start_admin( + admin_reg.clone(), admin_port, - proxy_ip.to_string(), proxy_port, password_hash, auth_token.clone(), diff --git a/src/net.rs b/src/net.rs index bb2d0fc..99889d8 100644 --- a/src/net.rs +++ b/src/net.rs @@ -8,9 +8,34 @@ use std::process::{exit, Command, Stdio}; use log::{debug, info, error}; +use pnet::datalink; + use whoami::{platform, Platform}; -pub fn is_private_ip(ip : String) -> bool { +pub fn is_valid_ip(ip : &String) -> bool { + if !is_private_ip(ip) { + error!("is_valid_ip: not a private IP address: {}", ip); + false + } else { + let mut addr = None; + for iface in datalink::interfaces() { + if iface.is_loopback() { continue; } + if !iface.is_up() { continue; } + for net_ip in iface.ips { + if net_ip.ip().to_string().eq(ip) { + addr = Some(net_ip); + } + break; + } + } + if addr.is_none() { + error!("is_valid_ip: IP address not found among network interfaces: {}", ip); + } + addr.is_some() + } +} + +pub fn is_private_ip(ip : &String) -> bool { return ip.starts_with("10.") || ip.starts_with("192.168.") || ip.starts_with("172.16.") diff --git a/src/proxy.rs b/src/proxy.rs index 4db8fa6..6eae00d 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -95,10 +95,12 @@ async fn proxy(client: Client>>, let uri = req.uri(); let host = uri.host(); if host.is_none() { - return if uri.path().eq("/ping") && req.method() == Method::POST { + let path = uri.path(); + let method = req.method(); + return if path.eq("/ping") && method == Method::POST { let body_bytes = hyper::body::to_bytes(req.into_body()).await?; let body = String::from_utf8(body_bytes.to_vec()).unwrap(); - let ping : Ping = serde_json::from_str(body.as_str()).unwrap(); + let ping: Ping = serde_json::from_str(body.as_str()).unwrap(); trace!("proxy: ping received: {:?}", ping); if !ping.verify(auth_token.clone()) { error!("proxy: invalid ping hash"); @@ -109,6 +111,9 @@ async fn proxy(client: Client>>, trace!("proxy: valid ping, responding with pong: {}", pong_json); Ok(Response::new(Body::from(pong_json))) } + } else if path.eq("/health") && method == Method::GET { + Ok(Response::new(Body::from("proxy is alive"))) + } else { error!("proxy: no host"); bad_request("No host") diff --git a/src/ssh.rs b/src/ssh.rs index 23edc80..036c762 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -36,7 +36,7 @@ pub fn ssh_command() -> &'static str { #[derive(Debug)] pub struct SshContainer { - pub child: Option + pub child: Option> } impl SshContainer { @@ -75,6 +75,7 @@ pub async fn spawn_ssh (ssh_container : Arc>, } } else { let user_known_hosts = format!("UserKnownHostsFile={}", host_file); + let server_keepalive = format!("ServerAliveInterval=10"); let result = Command::new(ssh_command()) .stdin(Stdio::null()) @@ -84,6 +85,8 @@ pub async fn spawn_ssh (ssh_container : Arc>, .arg(priv_key.as_str()) .arg("-o") .arg(user_known_hosts) + .arg("-o") + .arg(server_keepalive) .arg("-Nn") .arg("-R") .arg(tunnel) @@ -92,7 +95,7 @@ pub async fn spawn_ssh (ssh_container : Arc>, let child; if result.is_ok() { child = result.unwrap(); - (*guard).child = Some(child); + (*guard).child = Some(Mutex::new(child)); Ok(ssh_container.clone()) } else { let err = result.err(); @@ -121,4 +124,23 @@ pub fn host_file() -> &'static str { exit(2); } } +} + +pub async fn stop_ssh (ssh_container : Arc>) { + let mut guard = ssh_container.lock().await; + if (*guard).child.is_some() { + { + let mut child_guard = (*guard).child.as_mut().unwrap().lock().await; + let kill_result = (*child_guard).kill(); + if kill_result.is_err() { + let err = kill_result.err(); + if err.is_some() { + error!("stop_ssh: error killing process: {:?}", err.unwrap()); + } else { + error!("stop_ssh: error killing process"); + } + } + } + (*guard).child = None; + } } \ No newline at end of file