From 6c18715800ed577c35a9c2fce79fffc61124f2e2 Mon Sep 17 00:00:00 2001 From: Jonathan Cobb Date: Mon, 7 Sep 2020 14:24:28 -0400 Subject: [PATCH] WIP. support stop/respawn ssh tunnel --- src/admin.rs | 4 +- src/ssh.rs | 182 ++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 153 insertions(+), 33 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 5950081..045dbd8 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -20,7 +20,7 @@ use warp; use warp::{Filter}; use crate::pass::is_correct_password; -use crate::ssh::{spawn_ssh, stop_ssh, SshContainer}; +use crate::ssh::{spawn_ssh, stop_ssh_and_checker, SshContainer}; use crate::net::is_valid_ip; use crate::util::HEADER_BUBBLE_SESSION; @@ -157,7 +157,7 @@ async fn handle_register(registration : AdminRegistration, let mut guard = admin_reg.lock().await; if (*guard).is_some() { // shut down previous tunnel - stop_ssh(ssh_container.clone()).await; + stop_ssh_and_checker(ssh_container.clone()).await; } // create the registration object diff --git a/src/ssh.rs b/src/ssh.rs index 21f96de..02d99de 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -50,7 +50,7 @@ pub struct SshContainer { pub session: Option>, pub host_key: Option, pub priv_key: Option>, - pub checker: Option> + pub checker: Option>> } impl SshContainer { @@ -100,24 +100,9 @@ pub async fn spawn_ssh (ssh_container : Arc>, Err(None) } } else { - let user_known_hosts = format!("UserKnownHostsFile={}", host_file); - let server_keepalive = format!("ServerAliveInterval=10"); - - let result = Command::new(ssh_command()) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .arg("-i") - .arg(priv_key.as_str()) - .arg("-o") - .arg(user_known_hosts) - .arg("-o") - .arg(server_keepalive) - .arg("-Nn") - .arg("-R") - .arg(tunnel) - .arg(target) - .spawn(); + let mut command = Command::new(ssh_command()); + build_ssh_command(&mut command,tunnel, target, host_file.to_string(), priv_key.clone()); + let result = command.spawn(); let child; if result.is_ok() { child = result.unwrap(); @@ -132,9 +117,69 @@ pub async fn spawn_ssh (ssh_container : Arc>, let check_host = bubble.clone(); let check_ip = ip.clone(); let check_session = session.clone(); - let task = tokio::spawn(check_ssh(check_host, check_ip, check_session)); + let task = tokio::spawn(check_ssh(ssh_container.clone(), check_host, check_ip, check_session)); let (fut, abort_handle) = abortable(task); - (*guard).checker = Some(Mutex::new(abort_handle)); + (*guard).checker = Some(Arc::new(Mutex::new(abort_handle))); + Ok(ssh_container.clone()) + } else { + let err = result.err(); + if err.is_none() { + Err(None) + } else { + Err(Some(err.unwrap())) + } + } + } + } +} + +pub async fn respawn_ssh (ssh_container : Arc>, + ip : Arc, + port : u16, + proxy_port : u16, + bubble : Arc, + session : Arc, + host_key : String, + priv_key : Arc, + checker : Arc>) -> Result>, Option> { + + let mut guard = ssh_container.lock().await; + if (*guard).child.is_some() { + // todo: verify that child is still running + Ok(ssh_container.clone()) + } else { + let tunnel = format!("{}:127.0.0.1:{}", port, proxy_port); + let target = format!("bubble-flex@{}", bubble); + let host_file = host_file(); + let host_file_result = write_string_to_file(host_file, host_key.clone().to_string()); + if host_file_result.is_err() { + let err = host_file_result.err(); + if err.is_some() { + let err = err.unwrap(); + if err.is_some() { + Err(Some(err.unwrap())) + } else { + Err(None) + } + } else { + Err(None) + } + } else { + let mut command = Command::new(ssh_command()); + build_ssh_command(&mut command,tunnel, target, host_file.to_string(), priv_key.clone()); + let result = command.spawn(); + let child; + if result.is_ok() { + child = result.unwrap(); + (*guard).child = Some(Mutex::new(child)); + (*guard).ip = Some(ip.clone()); + (*guard).port = Some(port); + (*guard).proxy_port = Some(proxy_port); + (*guard).bubble = Some(bubble.clone()); + (*guard).session = Some(session.clone()); + (*guard).host_key = Some(host_key.clone()); + (*guard).priv_key = Some(priv_key.clone()); + (*guard).checker = Some(checker.clone()); Ok(ssh_container.clone()) } else { let err = result.err(); @@ -148,12 +193,34 @@ pub async fn spawn_ssh (ssh_container : Arc>, } } +fn build_ssh_command<'a>(command : &mut Command, tunnel: String, target: String, host_file : String, priv_key : Arc) { + let user_known_hosts = format!("UserKnownHostsFile={}", host_file); + let server_keepalive = format!("ServerAliveInterval=10"); + command + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .arg("-i") + .arg(priv_key.as_str()) + .arg("-o") + .arg(user_known_hosts) + .arg("-o") + .arg(server_keepalive) + .arg("-Nn") + .arg("-R") + .arg(tunnel) + .arg(target); +} + const CHECK_SSH_START_DELAY : u64 = 10; const CHECK_SSH_INTERVAL: u64 = 10; const MAX_CHECK_ERRORS_BEFORE_RESTART : u8 = 3; const CHECK_SSH_HTTP_TIMEOUT: u64 = 10; -async fn check_ssh (bubble : Arc, ip : Arc, session : Arc) { +async fn check_ssh (ssh_container : Arc>, + bubble : Arc, + ip : Arc, + session : Arc) { let mut checker = interval_at(Instant::now().checked_add(Duration::new(CHECK_SSH_START_DELAY, 0)).unwrap(), Duration::new(CHECK_SSH_INTERVAL, 0)); let check_url = format!("https://{}/api/me/flexRouters/{}/status", bubble.clone(), ip.clone()); let client = reqwest::Client::builder() @@ -206,13 +273,56 @@ async fn check_ssh (bubble : Arc, ip : Arc, session : Arc { error!("check_ssh: error checking tunnel status via {}: status={:?} body={}", check_url, &status_code, body); - error_count = error_count + 1; } } if deleted { info!("check_ssh: tunnel deleted, stopping ssh and checker"); + stop_ssh_and_checker(ssh_container.clone()); + } else if error_count >= MAX_CHECK_ERRORS_BEFORE_RESTART { info!("check_ssh: tunnel had too many errors, restarting ssh tunnel"); + let ip; + let port; + let proxy_port; + let session; + let bubble; + let host_key; + let priv_key; + let checker; + trace!("check_ssh: locking ssh_container to copy values"); + let mut guard = ssh_container.lock().await; + { + ip = (*guard).ip.clone(); + port = (*guard).port.unwrap().clone(); + proxy_port = (*guard).proxy_port.unwrap().clone(); + session = (*guard).session.clone(); + bubble = (*guard).bubble.clone(); + host_key = (*guard).host_key.clone(); + priv_key = (*guard).priv_key.clone(); + checker = (*guard).checker.clone(); + } + trace!("check_ssh: calling stop_ssh"); + stop_ssh_retain_checker(ssh_container.clone()).await; + trace!("check_ssh: stop_ssh returned, starting new ssh tunnel"); + let ssh_result = respawn_ssh(ssh_container.clone(), + ip.unwrap(), + port, + proxy_port, + bubble.unwrap(), + session.unwrap(), + host_key.unwrap(), + priv_key.unwrap(), + checker.unwrap()).await; + if ssh_result.is_err() { + let err = ssh_result.err(); + if err.is_none() { + error!("check_ssh: error spawning ssh"); + } else { + error!("check_ssh: error spawning ssh: {:?}", err.unwrap()); + } + } else { + info!("check_ssh: successfully respawned ssh tunnel"); + } error_count = 0; } } @@ -237,16 +347,26 @@ pub fn host_file() -> &'static str { } } -pub async fn stop_ssh (ssh_container : Arc>) { +pub async fn stop_ssh_retain_checker (ssh_container : Arc>) { + return stop_ssh(ssh_container, false).await; +} + +pub async fn stop_ssh_and_checker (ssh_container : Arc>) { + return stop_ssh(ssh_container, true).await; +} + +pub async fn stop_ssh (ssh_container : Arc>, stop_checker : bool) { let mut guard = ssh_container.lock().await; - if (*guard).checker.is_some() { - { - trace!("stop_ssh: aborting checker"); - let mut checker_guard = (*guard).checker.as_mut().unwrap().lock().await; - checker_guard.abort(); - trace!("stop_ssh: aborted checker"); + if stop_checker { + if (*guard).checker.is_some() { + { + trace!("stop_ssh: aborting checker"); + let mut checker_guard = (*guard).checker.as_mut().unwrap().lock().await; + checker_guard.abort(); + trace!("stop_ssh: aborted checker"); + } + (*guard).checker = None; } - (*guard).checker = None; } if (*guard).child.is_some() { {