Преглед изворни кода

WIP. support stop/respawn ssh tunnel

master
Jonathan Cobb пре 4 година
родитељ
комит
6c18715800
2 измењених фајлова са 153 додато и 33 уклоњено
  1. +2
    -2
      src/admin.rs
  2. +151
    -31
      src/ssh.rs

+ 2
- 2
src/admin.rs Прегледај датотеку

@@ -20,7 +20,7 @@ use warp;
use warp::{Filter}; use warp::{Filter};


use crate::pass::is_correct_password; use crate::pass::is_correct_password;
use crate::ssh::{spawn_ssh, stop_ssh, SshContainer};
use crate::ssh::{spawn_ssh, stop_ssh_and_checker, SshContainer};
use crate::net::is_valid_ip; use crate::net::is_valid_ip;
use crate::util::HEADER_BUBBLE_SESSION; use crate::util::HEADER_BUBBLE_SESSION;


@@ -157,7 +157,7 @@ async fn handle_register(registration : AdminRegistration,
let mut guard = admin_reg.lock().await; let mut guard = admin_reg.lock().await;
if (*guard).is_some() { if (*guard).is_some() {
// shut down previous tunnel // shut down previous tunnel
stop_ssh(ssh_container.clone()).await;
stop_ssh_and_checker(ssh_container.clone()).await;
} }


// create the registration object // create the registration object


+ 151
- 31
src/ssh.rs Прегледај датотеку

@@ -50,7 +50,7 @@ pub struct SshContainer {
pub session: Option<Arc<String>>, pub session: Option<Arc<String>>,
pub host_key: Option<String>, pub host_key: Option<String>,
pub priv_key: Option<Arc<String>>, pub priv_key: Option<Arc<String>>,
pub checker: Option<Mutex<AbortHandle>>
pub checker: Option<Arc<Mutex<AbortHandle>>>
} }


impl SshContainer { impl SshContainer {
@@ -100,24 +100,9 @@ pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
Err(None) Err(None)
} }
} else { } else {
let user_known_hosts = format!("UserKnownHostsFile={}", host_file);
let server_keepalive = format!("ServerAliveInterval=10");

let result = Command::new(ssh_command())
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.arg("-i")
.arg(priv_key.as_str())
.arg("-o")
.arg(user_known_hosts)
.arg("-o")
.arg(server_keepalive)
.arg("-Nn")
.arg("-R")
.arg(tunnel)
.arg(target)
.spawn();
let mut command = Command::new(ssh_command());
build_ssh_command(&mut command,tunnel, target, host_file.to_string(), priv_key.clone());
let result = command.spawn();
let child; let child;
if result.is_ok() { if result.is_ok() {
child = result.unwrap(); child = result.unwrap();
@@ -132,9 +117,69 @@ pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
let check_host = bubble.clone(); let check_host = bubble.clone();
let check_ip = ip.clone(); let check_ip = ip.clone();
let check_session = session.clone(); let check_session = session.clone();
let task = tokio::spawn(check_ssh(check_host, check_ip, check_session));
let task = tokio::spawn(check_ssh(ssh_container.clone(), check_host, check_ip, check_session));
let (fut, abort_handle) = abortable(task); let (fut, abort_handle) = abortable(task);
(*guard).checker = Some(Mutex::new(abort_handle));
(*guard).checker = Some(Arc::new(Mutex::new(abort_handle)));
Ok(ssh_container.clone())
} else {
let err = result.err();
if err.is_none() {
Err(None)
} else {
Err(Some(err.unwrap()))
}
}
}
}
}

pub async fn respawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
ip : Arc<String>,
port : u16,
proxy_port : u16,
bubble : Arc<String>,
session : Arc<String>,
host_key : String,
priv_key : Arc<String>,
checker : Arc<Mutex<AbortHandle>>) -> Result<Arc<Mutex<SshContainer>>, Option<Error>> {

let mut guard = ssh_container.lock().await;
if (*guard).child.is_some() {
// todo: verify that child is still running
Ok(ssh_container.clone())
} else {
let tunnel = format!("{}:127.0.0.1:{}", port, proxy_port);
let target = format!("bubble-flex@{}", bubble);
let host_file = host_file();
let host_file_result = write_string_to_file(host_file, host_key.clone().to_string());
if host_file_result.is_err() {
let err = host_file_result.err();
if err.is_some() {
let err = err.unwrap();
if err.is_some() {
Err(Some(err.unwrap()))
} else {
Err(None)
}
} else {
Err(None)
}
} else {
let mut command = Command::new(ssh_command());
build_ssh_command(&mut command,tunnel, target, host_file.to_string(), priv_key.clone());
let result = command.spawn();
let child;
if result.is_ok() {
child = result.unwrap();
(*guard).child = Some(Mutex::new(child));
(*guard).ip = Some(ip.clone());
(*guard).port = Some(port);
(*guard).proxy_port = Some(proxy_port);
(*guard).bubble = Some(bubble.clone());
(*guard).session = Some(session.clone());
(*guard).host_key = Some(host_key.clone());
(*guard).priv_key = Some(priv_key.clone());
(*guard).checker = Some(checker.clone());
Ok(ssh_container.clone()) Ok(ssh_container.clone())
} else { } else {
let err = result.err(); let err = result.err();
@@ -148,12 +193,34 @@ pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
} }
} }


fn build_ssh_command<'a>(command : &mut Command, tunnel: String, target: String, host_file : String, priv_key : Arc<String>) {
let user_known_hosts = format!("UserKnownHostsFile={}", host_file);
let server_keepalive = format!("ServerAliveInterval=10");
command
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.arg("-i")
.arg(priv_key.as_str())
.arg("-o")
.arg(user_known_hosts)
.arg("-o")
.arg(server_keepalive)
.arg("-Nn")
.arg("-R")
.arg(tunnel)
.arg(target);
}

const CHECK_SSH_START_DELAY : u64 = 10; const CHECK_SSH_START_DELAY : u64 = 10;
const CHECK_SSH_INTERVAL: u64 = 10; const CHECK_SSH_INTERVAL: u64 = 10;
const MAX_CHECK_ERRORS_BEFORE_RESTART : u8 = 3; const MAX_CHECK_ERRORS_BEFORE_RESTART : u8 = 3;
const CHECK_SSH_HTTP_TIMEOUT: u64 = 10; const CHECK_SSH_HTTP_TIMEOUT: u64 = 10;


async fn check_ssh (bubble : Arc<String>, ip : Arc<String>, session : Arc<String>) {
async fn check_ssh (ssh_container : Arc<Mutex<SshContainer>>,
bubble : Arc<String>,
ip : Arc<String>,
session : Arc<String>) {
let mut checker = interval_at(Instant::now().checked_add(Duration::new(CHECK_SSH_START_DELAY, 0)).unwrap(), Duration::new(CHECK_SSH_INTERVAL, 0)); let mut checker = interval_at(Instant::now().checked_add(Duration::new(CHECK_SSH_START_DELAY, 0)).unwrap(), Duration::new(CHECK_SSH_INTERVAL, 0));
let check_url = format!("https://{}/api/me/flexRouters/{}/status", bubble.clone(), ip.clone()); let check_url = format!("https://{}/api/me/flexRouters/{}/status", bubble.clone(), ip.clone());
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
@@ -206,13 +273,56 @@ async fn check_ssh (bubble : Arc<String>, ip : Arc<String>, session : Arc<String
}, },
_ => { _ => {
error!("check_ssh: error checking tunnel status via {}: status={:?} body={}", check_url, &status_code, body); error!("check_ssh: error checking tunnel status via {}: status={:?} body={}", check_url, &status_code, body);
error_count = error_count + 1;
} }
} }
if deleted { if deleted {
info!("check_ssh: tunnel deleted, stopping ssh and checker"); info!("check_ssh: tunnel deleted, stopping ssh and checker");
stop_ssh_and_checker(ssh_container.clone());

} else if error_count >= MAX_CHECK_ERRORS_BEFORE_RESTART { } else if error_count >= MAX_CHECK_ERRORS_BEFORE_RESTART {
info!("check_ssh: tunnel had too many errors, restarting ssh tunnel"); info!("check_ssh: tunnel had too many errors, restarting ssh tunnel");
let ip;
let port;
let proxy_port;
let session;
let bubble;
let host_key;
let priv_key;
let checker;
trace!("check_ssh: locking ssh_container to copy values");
let mut guard = ssh_container.lock().await;
{
ip = (*guard).ip.clone();
port = (*guard).port.unwrap().clone();
proxy_port = (*guard).proxy_port.unwrap().clone();
session = (*guard).session.clone();
bubble = (*guard).bubble.clone();
host_key = (*guard).host_key.clone();
priv_key = (*guard).priv_key.clone();
checker = (*guard).checker.clone();
}
trace!("check_ssh: calling stop_ssh");
stop_ssh_retain_checker(ssh_container.clone()).await;
trace!("check_ssh: stop_ssh returned, starting new ssh tunnel");
let ssh_result = respawn_ssh(ssh_container.clone(),
ip.unwrap(),
port,
proxy_port,
bubble.unwrap(),
session.unwrap(),
host_key.unwrap(),
priv_key.unwrap(),
checker.unwrap()).await;
if ssh_result.is_err() {
let err = ssh_result.err();
if err.is_none() {
error!("check_ssh: error spawning ssh");
} else {
error!("check_ssh: error spawning ssh: {:?}", err.unwrap());
}
} else {
info!("check_ssh: successfully respawned ssh tunnel");
}
error_count = 0; error_count = 0;
} }
} }
@@ -237,16 +347,26 @@ pub fn host_file() -> &'static str {
} }
} }


pub async fn stop_ssh (ssh_container : Arc<Mutex<SshContainer>>) {
pub async fn stop_ssh_retain_checker (ssh_container : Arc<Mutex<SshContainer>>) {
return stop_ssh(ssh_container, false).await;
}

pub async fn stop_ssh_and_checker (ssh_container : Arc<Mutex<SshContainer>>) {
return stop_ssh(ssh_container, true).await;
}

pub async fn stop_ssh (ssh_container : Arc<Mutex<SshContainer>>, stop_checker : bool) {
let mut guard = ssh_container.lock().await; let mut guard = ssh_container.lock().await;
if (*guard).checker.is_some() {
{
trace!("stop_ssh: aborting checker");
let mut checker_guard = (*guard).checker.as_mut().unwrap().lock().await;
checker_guard.abort();
trace!("stop_ssh: aborted checker");
if stop_checker {
if (*guard).checker.is_some() {
{
trace!("stop_ssh: aborting checker");
let mut checker_guard = (*guard).checker.as_mut().unwrap().lock().await;
checker_guard.abort();
trace!("stop_ssh: aborted checker");
}
(*guard).checker = None;
} }
(*guard).checker = None;
} }
if (*guard).child.is_some() { if (*guard).child.is_some() {
{ {


Loading…
Откажи
Сачувај