@@ -41,7 +41,7 @@ public class FlexRoutersResource extends AccountOwnedResource<FlexRouter, FlexRo | |||
if (!router.hasAuthToken()) throw invalidEx("err.token.required"); | |||
router.setToken(router.getAuth_token()); | |||
if (!pingFlexRouter(router)) { | |||
if (!pingFlexRouter(router, getDao())) { | |||
log.info("setReferences: router not active: "+json(router, COMPACT_MAPPER)); | |||
throw invalidEx("err.token.required"); | |||
} | |||
@@ -22,16 +22,17 @@ public class TlsPassthruRuleDriver extends AbstractAppRuleDriver { | |||
@Override public ConnectionCheckResponse checkConnection(AppRuleHarness harness, Account account, Device device, String addr, String fqdn) { | |||
final TlsPassthruConfig passthruConfig = getRuleConfig(); | |||
boolean passthru = false; | |||
if (passthruConfig.isPassthru(fqdn) || passthruConfig.isPassthru(addr)) { | |||
if (log.isDebugEnabled()) log.debug("checkConnection: returning passthru for fqdn/addr="+fqdn+"/"+addr); | |||
return ConnectionCheckResponse.passthru; | |||
if (log.isDebugEnabled()) log.debug("checkConnection: detected passthru for fqdn/addr="+fqdn+"/"+addr); | |||
passthru = true; | |||
} | |||
if (passthruConfig.isFlex(fqdn)) { | |||
if (log.isDebugEnabled()) log.debug("checkConnection: returning flex for fqdn/addr="+fqdn+"/"+addr); | |||
return ConnectionCheckResponse.flex; | |||
if (log.isDebugEnabled()) log.debug("checkConnection: detected flex for fqdn/addr="+fqdn+"/"+addr); | |||
return passthru ? ConnectionCheckResponse.passthru_flex : ConnectionCheckResponse.noop_flex; | |||
} | |||
if (log.isDebugEnabled()) log.debug("checkConnection: returning noop for fqdn/addr="+fqdn+"/"+addr); | |||
return ConnectionCheckResponse.noop; | |||
return passthru ? ConnectionCheckResponse.passthru : ConnectionCheckResponse.noop; | |||
} | |||
@Override public JsonNode upgradeRuleConfig(JsonNode sageRuleConfig, JsonNode localRuleConfig) { | |||
@@ -3,8 +3,13 @@ package bubble.service.device; | |||
import bubble.dao.device.FlexRouterDAO; | |||
import bubble.model.device.FlexRouter; | |||
import bubble.model.device.FlexRouterPing; | |||
import lombok.Cleanup; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.apache.http.client.HttpClient; | |||
import org.apache.http.client.config.RequestConfig; | |||
import org.apache.http.impl.client.CloseableHttpClient; | |||
import org.apache.http.impl.client.HttpClientBuilder; | |||
import org.cobbzilla.util.daemon.SimpleDaemon; | |||
import org.cobbzilla.util.http.HttpRequestBean; | |||
import org.cobbzilla.util.http.HttpResponseBean; | |||
@@ -27,15 +32,29 @@ public class FlexRouterService extends SimpleDaemon { | |||
public static final int MAX_PING_TRIES = 5; | |||
private static final long PING_SLEEP_FACTOR = SECONDS.toMillis(2); | |||
public static final int DEFAULT_PING_TIMEOUT = (int) SECONDS.toMillis(FlexRouterPing.MAX_PING_AGE/2); | |||
public static final RequestConfig DEFAULT_PING_REQUEST_CONFIG = RequestConfig.custom() | |||
.setConnectTimeout(DEFAULT_PING_TIMEOUT) | |||
.setSocketTimeout(DEFAULT_PING_TIMEOUT) | |||
.setConnectionRequestTimeout(DEFAULT_PING_TIMEOUT).build(); | |||
private static CloseableHttpClient getHttpClient() { | |||
return HttpClientBuilder.create() | |||
.setDefaultRequestConfig(DEFAULT_PING_REQUEST_CONFIG) | |||
.build(); | |||
} | |||
@Getter private final long sleepTime = MINUTES.toMillis(2); | |||
@Autowired private FlexRouterDAO flexRouterDAO; | |||
@Override protected void process() { | |||
try { | |||
@Cleanup final CloseableHttpClient httpClient = getHttpClient(); | |||
final List<FlexRouter> routers = flexRouterDAO.findEnabledAndRegistered(); | |||
for (FlexRouter router : routers) { | |||
boolean active = pingFlexRouter(router); | |||
boolean active = pingFlexRouter(router, flexRouterDAO, httpClient); | |||
if (active != router.active()) { | |||
flexRouterDAO.update(router.setActive(active)); | |||
} | |||
@@ -45,7 +64,17 @@ public class FlexRouterService extends SimpleDaemon { | |||
} | |||
} | |||
public static boolean pingFlexRouter(FlexRouter router) { | |||
public static boolean pingFlexRouter(FlexRouter router, FlexRouterDAO flexRouterDAO) { | |||
try { | |||
@Cleanup final CloseableHttpClient httpClient = getHttpClient(); | |||
return pingFlexRouter(router, flexRouterDAO, httpClient); | |||
} catch (Exception e) { | |||
log.error("pingFlexRouter("+router+"): "+shortError(e)); | |||
return false; | |||
} | |||
} | |||
public static boolean pingFlexRouter(FlexRouter router, FlexRouterDAO flexRouterDAO, HttpClient httpClient) { | |||
final String prefix = "pingRouter(" + router + "): "; | |||
final HttpRequestBean request = new HttpRequestBean(POST, router.pingUrl()); | |||
for (int i=0; i<MAX_PING_TRIES; i++) { | |||
@@ -53,11 +82,16 @@ public class FlexRouterService extends SimpleDaemon { | |||
if (i == 0) { | |||
if (log.isInfoEnabled()) log.info(prefix+"pinging router..."); | |||
} else { | |||
router = flexRouterDAO.findByUuid(router.getUuid()); | |||
if (router == null) { | |||
log.error(prefix+"router no longer exists"); | |||
return false; | |||
} | |||
if (log.isWarnEnabled()) log.warn(prefix+"attempting to ping again (try="+(i+1)+"/"+MAX_PING_TRIES+")"); | |||
} | |||
try { | |||
request.setEntity(json(router.pingObject())); | |||
final HttpResponseBean response = HttpUtil.getResponse(request); | |||
final HttpResponseBean response = HttpUtil.getResponse(request, httpClient); | |||
if (!response.isOk()) { | |||
log.error(prefix+"response not OK: "+response); | |||
} else { | |||
@@ -10,7 +10,7 @@ import static bubble.ApiConstants.enumFromString; | |||
public enum ConnectionCheckResponse { | |||
noop, passthru, flex, block, error; | |||
noop, noop_flex, passthru, passthru_flex, block, error; | |||
@JsonCreator public static ConnectionCheckResponse fromString (String v) { return enumFromString(ConnectionCheckResponse.class, v); } | |||
@@ -163,22 +163,30 @@ def check_bubble_connection(client_addr, server_addr, fqdns, security_level): | |||
if check_response is None or check_response == 'error': | |||
if security_level['level'] == SEC_MAX: | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', security_level=maximum, returning Block') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'block': True, 'reason': 'bubble_error'} | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'flex': False, 'block': True, 'reason': 'bubble_error'} | |||
else: | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning True') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'reason': 'bubble_error'} | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'flex': False, 'reason': 'bubble_error'} | |||
elif check_response == 'passthru': | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning True') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'reason': 'bubble_passthru'} | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'flex': False, 'reason': 'bubble_passthru'} | |||
elif check_response == 'block': | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning Block') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'block': True, 'reason': 'bubble_block'} | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'flex': False, 'block': True, 'reason': 'bubble_block'} | |||
elif check_response == 'passthru_flex': | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning True') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'flex': True, 'reason': 'bubble_passthru_flex'} | |||
elif check_response == 'noop_flex': | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning True') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'flex': True, 'reason': 'bubble_no_passthru_flex'} | |||
else: | |||
bubble_log('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning False') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'reason': 'bubble_no_passthru'} | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'flex': False, 'reason': 'bubble_no_passthru'} | |||
def check_connection(client_addr, server_addr, fqdns, security_level): | |||
@@ -221,6 +229,7 @@ def next_layer(next_layer): | |||
security_level = get_device_security_level(client_addr, fqdns) | |||
next_layer.security_level = security_level | |||
next_layer.do_block = False | |||
called_check_api = False | |||
if is_bubble_request(server_addr, fqdns): | |||
bubble_log('next_layer: enabling passthru for LOCAL bubble='+server_addr+' (bubble_host ('+bubble_host+') in fqdns or bubble_host_alias ('+bubble_host_alias+') in fqdns) regardless of security_level='+repr(security_level)+' for client='+client_addr+', fqdns='+repr(fqdns)) | |||
check = FORCE_PASSTHRU | |||
@@ -255,8 +264,15 @@ def next_layer(next_layer): | |||
else: | |||
bubble_log('next_layer: calling check_connection for server='+server_addr+', fqdns='+str(fqdns)+', client='+client_addr+' with security_level='+repr(security_level)) | |||
check = check_connection(client_addr, server_addr, fqdns, security_level) | |||
called_check_api = True | |||
if check is None or ('passthru' in check and check['passthru']): | |||
# make sure this is not a flex route | |||
#if not called_check_api: | |||
# check = check_connection(client_addr, server_addr, fqdns, security_level) | |||
#if check is not None and 'flex' in check and check['flex']: | |||
# # todo -- how do we route through the flex router for TLS passthru requests? | |||
# bubble_log('next_layer: FLEX ROUTING NOT YET SUPPORTED for passthru for server=' + server_addr+', fqdns='+str(fqdns)) | |||
bubble_log('next_layer: enabling passthru for server=' + server_addr+', fqdns='+str(fqdns)) | |||
bubble_activity_log(client_addr, server_addr, 'tls_passthru', fqdns) | |||
next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True) | |||