diff --git a/src/admin.rs b/src/admin.rs index 6569e13..5950081 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -1,4 +1,4 @@ -#![deny(warnings)] +//#![deny(warnings)] /** * Copyright (c) 2020 Bubble, Inc. All rights reserved. * For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ @@ -22,6 +22,7 @@ use warp::{Filter}; use crate::pass::is_correct_password; use crate::ssh::{spawn_ssh, stop_ssh, SshContainer}; use crate::net::is_valid_ip; +use crate::util::HEADER_BUBBLE_SESSION; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct AdminRegistration { @@ -57,6 +58,26 @@ struct BubbleRegistration { auth_token: String } +struct BubbleInternalRegistration { + key: String, + ip: Arc, + bubble: Arc, + session: Arc, + auth_token: String +} + +impl BubbleInternalRegistration { + pub fn new(reg : &BubbleRegistration, bubble : String, session : String) -> BubbleInternalRegistration { + BubbleInternalRegistration { + key: reg.key.clone(), + ip: Arc::new(reg.ip.clone()), + bubble: Arc::new(bubble.clone()), + session: Arc::new(session.clone()), + auth_token: reg.auth_token.clone() + } + } +} + #[derive(Debug, Deserialize, Serialize, Clone)] struct BubbleRegistrationResponse { port: u16, @@ -92,8 +113,6 @@ pub async fn start_admin (admin_reg : Arc>>, admin_server.await; } -const HEADER_BUBBLE_SESSION: &'static str = "X-Bubble-Session"; - async fn handle_register(registration : AdminRegistration, admin_reg : Arc>>, proxy_port : u16, @@ -149,13 +168,15 @@ async fn handle_register(registration : AdminRegistration, }; (*guard) = Some(registration); } + let internal_reg = BubbleInternalRegistration::new(&bubble_registration, validated.bubble, validated.session); // PUT it and see if it worked let client = reqwest::Client::new(); - let url = format!("https://{}/api/me/flexRouters", validated.bubble); + let url = format!("https://{}/api/me/flexRouters", internal_reg.bubble.clone()); debug!("handle_register registering ourself with {}, sending: {:?}", url, bubble_registration); + let session = internal_reg.session.clone(); match client.put(url.as_str()) - .header(HEADER_BUBBLE_SESSION, validated.session) + .header(HEADER_BUBBLE_SESSION, session.to_string()) .json(&bubble_registration) .send().await { Ok(response) => { @@ -176,9 +197,11 @@ async fn handle_register(registration : AdminRegistration, info!("handle_register: parsed response object: {:?}", reg_response); let ssh_result = spawn_ssh( ssh_container.clone(), + internal_reg.ip.clone(), reg_response.port, proxy_port, - validated.bubble, + internal_reg.bubble.clone(), + internal_reg.session.clone(), reg_response.host_key, ssh_priv_key).await; if ssh_result.is_err() { diff --git a/src/ssh.rs b/src/ssh.rs index bc2f460..e025429 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -14,13 +14,14 @@ use futures::future::{abortable, Abortable, AbortHandle}; use log::{debug, info, error, trace}; use reqwest; +use reqwest::StatusCode as ReqwestStatusCode; use tokio::time::{interval_at, Instant, Duration}; use tokio::sync::Mutex; use whoami::{platform, Platform}; -use crate::util::write_string_to_file; +use crate::util::{HEADER_BUBBLE_SESSION, write_string_to_file}; const SSH_WINDOWS: &'static str = "C:\\Windows\\System32\\OpenSSH\\ssh.exe"; const SSH_MACOS: &'static str = "/usr/bin/ssh"; @@ -42,6 +43,13 @@ pub fn ssh_command() -> &'static str { #[derive(Debug)] pub struct SshContainer { pub child: Option>, + pub ip: Option>, + pub port: Option, + pub proxy_port: Option, + pub bubble: Option>, + pub session: Option>, + pub host_key: Option, + pub priv_key: Option>, pub checker: Option> } @@ -49,15 +57,24 @@ impl SshContainer { pub fn new () -> SshContainer { SshContainer { child: None, + ip: None, + port: None, + proxy_port: None, + bubble: None, + session: None, + host_key: None, + priv_key: None, checker: None } } } pub async fn spawn_ssh (ssh_container : Arc>, + ip : Arc, port : u16, proxy_port : u16, - host : String, + bubble : Arc, + session : Arc, host_key : String, priv_key : Arc) -> Result>, Option> { @@ -67,9 +84,9 @@ pub async fn spawn_ssh (ssh_container : Arc>, Ok(ssh_container.clone()) } else { let tunnel = format!("{}:127.0.0.1:{}", port, proxy_port); - let target = format!("bubble-flex@{}", host); + let target = format!("bubble-flex@{}", bubble); let host_file = host_file(); - let host_file_result = write_string_to_file(host_file, host_key); + let host_file_result = write_string_to_file(host_file, host_key.clone().to_string()); if host_file_result.is_err() { let err = host_file_result.err(); if err.is_some() { @@ -105,14 +122,17 @@ pub async fn spawn_ssh (ssh_container : Arc>, if result.is_ok() { child = result.unwrap(); (*guard).child = Some(Mutex::new(child)); - let task = tokio::spawn(async { - let mut checker = interval_at(Instant::now().checked_add(Duration::new(10, 0)).unwrap(), Duration::new(10, 0)); - loop { - checker.tick().await; - // let ok = reqwest::get("http://example.com/").await; - info!(">>> checker runs!"); - } - }); + (*guard).ip = Some(ip.clone()); + (*guard).port = Some(port); + (*guard).proxy_port = Some(proxy_port); + (*guard).bubble = Some(bubble.clone()); + (*guard).session = Some(session.clone()); + (*guard).host_key = Some(host_key.clone()); + (*guard).priv_key = Some(priv_key.clone()); + let check_host = bubble.clone(); + let check_ip = ip.clone(); + let check_session = session.clone(); + let task = tokio::spawn(check_ssh(check_host, check_ip, check_session)); let (fut, abort_handle) = abortable(task); (*guard).checker = Some(Mutex::new(abort_handle)); Ok(ssh_container.clone()) @@ -128,6 +148,75 @@ pub async fn spawn_ssh (ssh_container : Arc>, } } +const CHECK_SSH_START_DELAY : u64 = 10; +const CHECK_SSH_INTERVAL: u64 = 10; +const MAX_CHECK_ERRORS_BEFORE_RESTART : u8 = 3; + +async fn check_ssh (bubble : Arc, ip : Arc, session : Arc) { + let mut checker = interval_at(Instant::now().checked_add(Duration::new(CHECK_SSH_START_DELAY, 0)).unwrap(), Duration::new(CHECK_SSH_INTERVAL, 0)); + let check_url = format!("https://{}/api/me/flexRouters/{}/status", bubble.clone(), ip.clone()); + let client = reqwest::Client::new(); + let mut error_count : u8 = 0; + let mut deleted : bool = false; + let session = session.clone(); + loop { + checker.tick().await; + trace!("check_ssh: checking status via {}", check_url); + let check_result = client.get(check_url.as_str()) + .header(HEADER_BUBBLE_SESSION, session.to_string()) + .send().await; + match check_result { + Err(e) => { + error!("check_ssh: error checking status via {}: {:?}", check_url, e); + }, + Ok(response) => { + let status_code = response.status(); + let body_bytes = &response.bytes().await.unwrap(); + let body = String::from_utf8(body_bytes.to_vec()).unwrap(); + let server_status = body.replace(|c: char| c == '\"', ""); + trace!("check_ssh: tunnel status for {} returned status={:?}, body={}", check_url, &status_code, body); + match status_code { + ReqwestStatusCode::OK => { + match server_status.as_str() { + "none" => { + info!("check_ssh: checked tunnel status via {}: tunnel status not yet available", check_url); + error_count = error_count + 1; + } + "active" => { + debug!("check_ssh: tunnel status via {}: tunnel status is OK", check_url); + error_count = 0; + } + "unreachable" => { + debug!("check_ssh: tunnel status via {}: tunnel is unreachable, restarting tunnel", check_url); + error_count = error_count + 1; + } + "deleted" => { + // todo: shutdown ssh and ourselves + debug!("check_ssh: tunnel status via {}: tunnel was deleted, stopping tunnel", check_url); + deleted = true; + } + _ => { + error!("check_ssh: error checking tunnel status via {}: unknown tunnel status={}", check_url, server_status); + error_count = error_count + 1; + } + } + }, + _ => { + error!("check_ssh: error checking tunnel status via {}: status={:?} body={}", check_url, &status_code, body); + error_count = error_count + 1; + } + } + if deleted { + info!("check_ssh: tunnel deleted, stopping ssh and checker"); + } else if error_count >= MAX_CHECK_ERRORS_BEFORE_RESTART { + info!("check_ssh: tunnel had too many errors, restarting ssh tunnel"); + error_count = 0; + } + } + } + } +} + const HOST_FILE_WINDOWS: &'static str = "C:\\Windows\\Temp\\bubble_flex_host_key"; const HOST_FILE_MACOS: &'static str = "/tmp/bubble_flex_host_key"; const HOST_FILE_LINUX: &'static str = "/tmp/bubble_flex_host_key"; diff --git a/src/util.rs b/src/util.rs index 20097ca..800feb1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,6 +14,8 @@ use std::process::exit; use log::error; +pub const HEADER_BUBBLE_SESSION: &'static str = "X-Bubble-Session"; + pub fn read_required_env_var_argument(arg_name : &str, opt : Option<&str>) -> String { if opt.is_none() { error!("read_required_env_var_argument: {} argument is required", arg_name);