Bläddra i källkod

WIP. implementing tunnel status check

master
Jonathan Cobb 4 år sedan
förälder
incheckning
49cc24f9ce
3 ändrade filer med 132 tillägg och 18 borttagningar
  1. +29
    -6
      src/admin.rs
  2. +101
    -12
      src/ssh.rs
  3. +2
    -0
      src/util.rs

+ 29
- 6
src/admin.rs Visa fil

@@ -1,4 +1,4 @@
#![deny(warnings)]
//#![deny(warnings)]
/**
* Copyright (c) 2020 Bubble, Inc. All rights reserved.
* For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/
@@ -22,6 +22,7 @@ use warp::{Filter};
use crate::pass::is_correct_password;
use crate::ssh::{spawn_ssh, stop_ssh, SshContainer};
use crate::net::is_valid_ip;
use crate::util::HEADER_BUBBLE_SESSION;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct AdminRegistration {
@@ -57,6 +58,26 @@ struct BubbleRegistration {
auth_token: String
}

struct BubbleInternalRegistration {
key: String,
ip: Arc<String>,
bubble: Arc<String>,
session: Arc<String>,
auth_token: String
}

impl BubbleInternalRegistration {
pub fn new(reg : &BubbleRegistration, bubble : String, session : String) -> BubbleInternalRegistration {
BubbleInternalRegistration {
key: reg.key.clone(),
ip: Arc::new(reg.ip.clone()),
bubble: Arc::new(bubble.clone()),
session: Arc::new(session.clone()),
auth_token: reg.auth_token.clone()
}
}
}

#[derive(Debug, Deserialize, Serialize, Clone)]
struct BubbleRegistrationResponse {
port: u16,
@@ -92,8 +113,6 @@ pub async fn start_admin (admin_reg : Arc<Mutex<Option<AdminRegistration>>>,
admin_server.await;
}

const HEADER_BUBBLE_SESSION: &'static str = "X-Bubble-Session";

async fn handle_register(registration : AdminRegistration,
admin_reg : Arc<Mutex<Option<AdminRegistration>>>,
proxy_port : u16,
@@ -149,13 +168,15 @@ async fn handle_register(registration : AdminRegistration,
};
(*guard) = Some(registration);
}
let internal_reg = BubbleInternalRegistration::new(&bubble_registration, validated.bubble, validated.session);

// PUT it and see if it worked
let client = reqwest::Client::new();
let url = format!("https://{}/api/me/flexRouters", validated.bubble);
let url = format!("https://{}/api/me/flexRouters", internal_reg.bubble.clone());
debug!("handle_register registering ourself with {}, sending: {:?}", url, bubble_registration);
let session = internal_reg.session.clone();
match client.put(url.as_str())
.header(HEADER_BUBBLE_SESSION, validated.session)
.header(HEADER_BUBBLE_SESSION, session.to_string())
.json(&bubble_registration)
.send().await {
Ok(response) => {
@@ -176,9 +197,11 @@ async fn handle_register(registration : AdminRegistration,
info!("handle_register: parsed response object: {:?}", reg_response);
let ssh_result = spawn_ssh(
ssh_container.clone(),
internal_reg.ip.clone(),
reg_response.port,
proxy_port,
validated.bubble,
internal_reg.bubble.clone(),
internal_reg.session.clone(),
reg_response.host_key,
ssh_priv_key).await;
if ssh_result.is_err() {


+ 101
- 12
src/ssh.rs Visa fil

@@ -14,13 +14,14 @@ use futures::future::{abortable, Abortable, AbortHandle};
use log::{debug, info, error, trace};

use reqwest;
use reqwest::StatusCode as ReqwestStatusCode;

use tokio::time::{interval_at, Instant, Duration};
use tokio::sync::Mutex;

use whoami::{platform, Platform};

use crate::util::write_string_to_file;
use crate::util::{HEADER_BUBBLE_SESSION, write_string_to_file};

const SSH_WINDOWS: &'static str = "C:\\Windows\\System32\\OpenSSH\\ssh.exe";
const SSH_MACOS: &'static str = "/usr/bin/ssh";
@@ -42,6 +43,13 @@ pub fn ssh_command() -> &'static str {
#[derive(Debug)]
pub struct SshContainer {
pub child: Option<Mutex<Child>>,
pub ip: Option<Arc<String>>,
pub port: Option<u16>,
pub proxy_port: Option<u16>,
pub bubble: Option<Arc<String>>,
pub session: Option<Arc<String>>,
pub host_key: Option<String>,
pub priv_key: Option<Arc<String>>,
pub checker: Option<Mutex<AbortHandle>>
}

@@ -49,15 +57,24 @@ impl SshContainer {
pub fn new () -> SshContainer {
SshContainer {
child: None,
ip: None,
port: None,
proxy_port: None,
bubble: None,
session: None,
host_key: None,
priv_key: None,
checker: None
}
}
}

pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
ip : Arc<String>,
port : u16,
proxy_port : u16,
host : String,
bubble : Arc<String>,
session : Arc<String>,
host_key : String,
priv_key : Arc<String>) -> Result<Arc<Mutex<SshContainer>>, Option<Error>> {

@@ -67,9 +84,9 @@ pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
Ok(ssh_container.clone())
} else {
let tunnel = format!("{}:127.0.0.1:{}", port, proxy_port);
let target = format!("bubble-flex@{}", host);
let target = format!("bubble-flex@{}", bubble);
let host_file = host_file();
let host_file_result = write_string_to_file(host_file, host_key);
let host_file_result = write_string_to_file(host_file, host_key.clone().to_string());
if host_file_result.is_err() {
let err = host_file_result.err();
if err.is_some() {
@@ -105,14 +122,17 @@ pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
if result.is_ok() {
child = result.unwrap();
(*guard).child = Some(Mutex::new(child));
let task = tokio::spawn(async {
let mut checker = interval_at(Instant::now().checked_add(Duration::new(10, 0)).unwrap(), Duration::new(10, 0));
loop {
checker.tick().await;
// let ok = reqwest::get("http://example.com/").await;
info!(">>> checker runs!");
}
});
(*guard).ip = Some(ip.clone());
(*guard).port = Some(port);
(*guard).proxy_port = Some(proxy_port);
(*guard).bubble = Some(bubble.clone());
(*guard).session = Some(session.clone());
(*guard).host_key = Some(host_key.clone());
(*guard).priv_key = Some(priv_key.clone());
let check_host = bubble.clone();
let check_ip = ip.clone();
let check_session = session.clone();
let task = tokio::spawn(check_ssh(check_host, check_ip, check_session));
let (fut, abort_handle) = abortable(task);
(*guard).checker = Some(Mutex::new(abort_handle));
Ok(ssh_container.clone())
@@ -128,6 +148,75 @@ pub async fn spawn_ssh (ssh_container : Arc<Mutex<SshContainer>>,
}
}

const CHECK_SSH_START_DELAY : u64 = 10;
const CHECK_SSH_INTERVAL: u64 = 10;
const MAX_CHECK_ERRORS_BEFORE_RESTART : u8 = 3;

async fn check_ssh (bubble : Arc<String>, ip : Arc<String>, session : Arc<String>) {
let mut checker = interval_at(Instant::now().checked_add(Duration::new(CHECK_SSH_START_DELAY, 0)).unwrap(), Duration::new(CHECK_SSH_INTERVAL, 0));
let check_url = format!("https://{}/api/me/flexRouters/{}/status", bubble.clone(), ip.clone());
let client = reqwest::Client::new();
let mut error_count : u8 = 0;
let mut deleted : bool = false;
let session = session.clone();
loop {
checker.tick().await;
trace!("check_ssh: checking status via {}", check_url);
let check_result = client.get(check_url.as_str())
.header(HEADER_BUBBLE_SESSION, session.to_string())
.send().await;
match check_result {
Err(e) => {
error!("check_ssh: error checking status via {}: {:?}", check_url, e);
},
Ok(response) => {
let status_code = response.status();
let body_bytes = &response.bytes().await.unwrap();
let body = String::from_utf8(body_bytes.to_vec()).unwrap();
let server_status = body.replace(|c: char| c == '\"', "");
trace!("check_ssh: tunnel status for {} returned status={:?}, body={}", check_url, &status_code, body);
match status_code {
ReqwestStatusCode::OK => {
match server_status.as_str() {
"none" => {
info!("check_ssh: checked tunnel status via {}: tunnel status not yet available", check_url);
error_count = error_count + 1;
}
"active" => {
debug!("check_ssh: tunnel status via {}: tunnel status is OK", check_url);
error_count = 0;
}
"unreachable" => {
debug!("check_ssh: tunnel status via {}: tunnel is unreachable, restarting tunnel", check_url);
error_count = error_count + 1;
}
"deleted" => {
// todo: shutdown ssh and ourselves
debug!("check_ssh: tunnel status via {}: tunnel was deleted, stopping tunnel", check_url);
deleted = true;
}
_ => {
error!("check_ssh: error checking tunnel status via {}: unknown tunnel status={}", check_url, server_status);
error_count = error_count + 1;
}
}
},
_ => {
error!("check_ssh: error checking tunnel status via {}: status={:?} body={}", check_url, &status_code, body);
error_count = error_count + 1;
}
}
if deleted {
info!("check_ssh: tunnel deleted, stopping ssh and checker");
} else if error_count >= MAX_CHECK_ERRORS_BEFORE_RESTART {
info!("check_ssh: tunnel had too many errors, restarting ssh tunnel");
error_count = 0;
}
}
}
}
}

const HOST_FILE_WINDOWS: &'static str = "C:\\Windows\\Temp\\bubble_flex_host_key";
const HOST_FILE_MACOS: &'static str = "/tmp/bubble_flex_host_key";
const HOST_FILE_LINUX: &'static str = "/tmp/bubble_flex_host_key";


+ 2
- 0
src/util.rs Visa fil

@@ -14,6 +14,8 @@ use std::process::exit;

use log::error;

pub const HEADER_BUBBLE_SESSION: &'static str = "X-Bubble-Session";

pub fn read_required_env_var_argument(arg_name : &str, opt : Option<&str>) -> String {
if opt.is_none() {
error!("read_required_env_var_argument: {} argument is required", arg_name);


Laddar…
Avbryt
Spara