@@ -28,6 +28,7 @@ import bubble.service.boot.SelfNodeService; | |||
import bubble.service.device.DeviceService; | |||
import bubble.service.device.FlexRouterInfo; | |||
import bubble.service.device.StandardFlexRouterService; | |||
import bubble.service.message.MessageService; | |||
import bubble.service.stream.ConnectionCheckResponse; | |||
import bubble.service.stream.StandardRuleEngineService; | |||
import com.fasterxml.jackson.databind.JsonNode; | |||
@@ -55,6 +56,7 @@ import java.util.stream.Collectors; | |||
import static bubble.ApiConstants.*; | |||
import static bubble.resources.stream.FilterMatchersResponse.NO_MATCHERS; | |||
import static bubble.rule.AppRuleDriver.isFlexRouteFqdn; | |||
import static bubble.service.device.FlexRouterInfo.missingFlexRouter; | |||
import static bubble.service.stream.HttpStreamDebug.getLogFqdn; | |||
import static bubble.service.stream.StandardRuleEngineService.MATCHERS_CACHE_TIMEOUT; | |||
import static com.google.common.net.HttpHeaders.CONTENT_SECURITY_POLICY; | |||
@@ -90,6 +92,7 @@ public class FilterHttpResource { | |||
@Autowired private SelfNodeService selfNodeService; | |||
@Autowired private BlockStatsService blockStats; | |||
@Autowired private StandardFlexRouterService flexRouterService; | |||
@Autowired private MessageService messageService; | |||
private static final long ACTIVE_REQUEST_TIMEOUT = HOURS.toSeconds(12); | |||
@@ -644,10 +647,11 @@ public class FilterHttpResource { | |||
return ok(summary); | |||
} | |||
@GET @Path(EP_FLEX_ROUTERS) | |||
@GET @Path(EP_FLEX_ROUTERS+"/{fqdn}") | |||
@Produces(APPLICATION_JSON) | |||
public Response getFlexRouter(@Context Request req, | |||
@Context ContainerRequest ctx) { | |||
@Context ContainerRequest ctx, | |||
@PathParam("fqdn") String fqdn) { | |||
final String publicIp = getRemoteAddr(req); | |||
final Device device = deviceService.findDeviceByIp(publicIp); | |||
if (device == null) { | |||
@@ -666,7 +670,10 @@ public class FilterHttpResource { | |||
Collection<FlexRouterInfo> routers = flexRouterService.selectClosestRouter(device.getAccount(), vpnIp, publicIp); | |||
if (log.isDebugEnabled()) log.debug("getFlexRouter: found router(s) for vpnIp="+vpnIp+": "+json(routers, COMPACT_MAPPER)); | |||
if (routers.isEmpty()) return notFound(); | |||
if (routers.isEmpty()) { | |||
final Account account = accountDAO.findByUuid(device.getAccount()); | |||
return ok(missingFlexRouter(account, device, fqdn, messageService, configuration.getHandlebars())); | |||
} | |||
return ok(routers.iterator().next().initAuth()); | |||
} | |||
@@ -5,24 +5,36 @@ | |||
package bubble.service.device; | |||
import bubble.cloud.geoLocation.GeoLocation; | |||
import bubble.model.account.Account; | |||
import bubble.model.device.Device; | |||
import bubble.model.device.DeviceStatus; | |||
import bubble.model.device.FlexRouter; | |||
import bubble.service.message.MessageService; | |||
import com.fasterxml.jackson.annotation.JsonIgnore; | |||
import com.github.jknack.handlebars.Handlebars; | |||
import lombok.Getter; | |||
import lombok.Setter; | |||
import lombok.ToString; | |||
import lombok.experimental.Accessors; | |||
import org.cobbzilla.util.handlebars.HandlebarsUtil; | |||
import java.util.HashMap; | |||
import java.util.Map; | |||
import static bubble.model.device.DeviceStatus.NO_DEVICE_STATUS; | |||
import static org.cobbzilla.util.json.JsonUtil.COMPACT_MAPPER; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
@ToString | |||
@Accessors(chain=true) @ToString | |||
public class FlexRouterInfo { | |||
@JsonIgnore @Getter private final FlexRouter router; | |||
@JsonIgnore @Getter private final DeviceStatus deviceStatus; | |||
@Getter @Setter private String auth; | |||
// set by missingFlexRouter method when there is no flex router but there should be one | |||
@Getter @Setter private String error_html; | |||
public FlexRouterInfo (FlexRouter router, DeviceStatus deviceStatus) { | |||
this.router = router; | |||
this.deviceStatus = deviceStatus; | |||
@@ -30,8 +42,8 @@ public class FlexRouterInfo { | |||
@JsonIgnore public String getVpnIp () { return router.getIp(); } | |||
public int getPort () { return router.getPort(); } | |||
public String getProxyUrl () { return router.proxyBaseUri(); } | |||
public int getPort () { return router == null ? -1 : router.getPort(); } | |||
public String getProxyUrl () { return router == null ? null : router.proxyBaseUri(); } | |||
public boolean hasGeoLocation () { return hasDeviceStatus() && deviceStatus.getLocation() != null && deviceStatus.getLocation().hasLatLon(); } | |||
public boolean hasNoGeoLocation () { return !hasGeoLocation(); } | |||
@@ -54,4 +66,30 @@ public class FlexRouterInfo { | |||
return obj instanceof FlexRouterInfo && ((FlexRouterInfo) obj).getPort() == getPort(); | |||
} | |||
public static final String CTX_ACCOUNT = "account"; | |||
public static final String CTX_DEVICE = "device"; | |||
public static final String CTX_MESSAGES = "messages"; | |||
public static final String CTX_FLEX_FQDN = "flex_fqdn"; | |||
public static final String CTX_DEVICE_TYPE_LABEL = "device_type_label"; | |||
public static FlexRouterInfo missingFlexRouter(Account account, | |||
Device device, | |||
String fqdn, | |||
MessageService messageService, | |||
Handlebars handlebars) { | |||
final String locale = account.getLocale(); | |||
final String template = messageService.loadPageTemplate(locale, "no_flex_router"); | |||
final Map<String, Object> ctx = new HashMap<>(); | |||
ctx.put(CTX_ACCOUNT, account); | |||
ctx.put(CTX_DEVICE, device); | |||
ctx.put(CTX_FLEX_FQDN, fqdn); | |||
final Map<String, String> messages = messageService.formatStandardMessages(locale); | |||
ctx.put(CTX_MESSAGES, messages); | |||
ctx.put(CTX_DEVICE_TYPE_LABEL, messages.get("device_type_"+device.getDeviceType().name())); | |||
final String html = HandlebarsUtil.apply(handlebars, template, ctx); | |||
return new FlexRouterInfo(null, null).setError_html(html); | |||
} | |||
} |
@@ -50,7 +50,7 @@ import static org.cobbzilla.util.system.Sleep.sleep; | |||
@Service @Slf4j | |||
public class StandardFlexRouterService extends SimpleDaemon implements FlexRouterService { | |||
public static final int MAX_PING_TRIES = 10; | |||
public static final int MAX_PING_TRIES = 5; | |||
private static final long PING_SLEEP_FACTOR = SECONDS.toMillis(2); | |||
@@ -61,22 +61,22 @@ public class StandardFlexRouterService extends SimpleDaemon implements FlexRoute | |||
.setSocketTimeout(DEFAULT_PING_TIMEOUT) | |||
.setConnectionRequestTimeout(DEFAULT_PING_TIMEOUT).build(); | |||
// thread pool size | |||
public static final int DEFAULT_MAX_TUNNELS = 5; | |||
// wait for ssh key to be written | |||
private static final long FIRST_TIME_WAIT = SECONDS.toMillis(10); | |||
private static final long INTERRUPT_WAIT = FIRST_TIME_WAIT/2; | |||
public static final long PING_ALL_TIMEOUT | |||
= (SECONDS.toMillis(1) * DEFAULT_PING_TIMEOUT * MAX_PING_TRIES) + FIRST_TIME_WAIT; | |||
// thread pool size | |||
public static final int DEFAULT_MAX_TUNNELS = 5; | |||
private static CloseableHttpClient getHttpClient() { | |||
return HttpClientBuilder.create() | |||
.setDefaultRequestConfig(DEFAULT_PING_REQUEST_CONFIG) | |||
.build(); | |||
} | |||
private static final long PING_ALL_TIMEOUT | |||
= (SECONDS.toMillis(1) * DEFAULT_PING_TIMEOUT * MAX_PING_TRIES) + FIRST_TIME_WAIT; | |||
public static final long DEFAULT_SLEEP_TIME = MINUTES.toMillis(2); | |||
@Autowired private FlexRouterDAO flexRouterDAO; | |||
@@ -227,7 +227,7 @@ public class StandardFlexRouterService extends SimpleDaemon implements FlexRoute | |||
} | |||
} catch (Exception e) { | |||
log.error(prefix+"error: "+shortError(e)); | |||
log.warn(prefix+"error: "+shortError(e)); | |||
} | |||
setStatus(router, FlexRouterStatus.unreachable); | |||
} | |||
@@ -19,13 +19,15 @@ import java.util.concurrent.ConcurrentHashMap; | |||
import static bubble.ApiConstants.MESSAGE_RESOURCE_BASE; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.*; | |||
import static org.cobbzilla.util.io.StreamUtil.loadResourceAsStream; | |||
import static org.cobbzilla.util.io.StreamUtil.*; | |||
import static org.cobbzilla.util.string.StringUtil.UTF8cs; | |||
@Service @Slf4j | |||
public class MessageService { | |||
public static final String MESSAGE_RESOURCE_PATH = "/server/"; | |||
public static final String PAGE_TEMPLATES_PATH = "pages/"; | |||
public static final String PAGE_TEMPLATES_SUFFIX = ".html.hbs"; | |||
public static final String RESOURCE_MESSAGES_PROPS = "ResourceMessages.properties"; | |||
public static final String[] PRE_AUTH_MESSAGE_GROUPS = {"pre_auth", "countries", "timezones"}; | |||
@@ -81,4 +83,13 @@ public class MessageService { | |||
}); | |||
} | |||
private final Map<String, String> pageTemplateCache = new ConcurrentHashMap<>(10); | |||
public String loadPageTemplate(String locale, String templatePath) { | |||
final String key = locale + ":" + templatePath; | |||
return pageTemplateCache.computeIfAbsent(key, k -> { | |||
final String path = MESSAGE_RESOURCE_BASE + locale + MESSAGE_RESOURCE_PATH + PAGE_TEMPLATES_PATH + templatePath + PAGE_TEMPLATES_SUFFIX; | |||
return loadResourceAsStringOrDie(path); | |||
}); | |||
} | |||
} |
@@ -40,6 +40,7 @@ | |||
<!-- <logger name="org.cobbzilla.wizard.dao.SqlViewSearchHelper" level="DEBUG" />--> | |||
<logger name="org.cobbzilla.wizard.server.listener.BrowserLauncherListener" level="INFO" /> | |||
<logger name="bubble.service.notify.NotificationService" level="WARN" /> | |||
<logger name="bubble.service.device.StandardFlexRouterService" level="ERROR" /> | |||
<logger name="bubble.rule.AbstractAppRuleDriver" level="WARN" /> | |||
<logger name="bubble.rule.bblock.BubbleBlockRuleDriver" level="WARN" /> | |||
<!-- <logger name="bubble.rule.passthru.TlsPassthruRuleDriver" level="DEBUG" />--> | |||
@@ -59,8 +60,8 @@ | |||
<!-- <logger name="bubble.service.stream.StandardRuleEngineService" level="DEBUG" />--> | |||
<logger name="bubble.service.stream.ActiveStreamState" level="WARN" /> | |||
<logger name="bubble.resources.stream" level="WARN" /> | |||
<logger name="bubble.resources.stream.FilterHttpResource" level="DEBUG" /> | |||
<!-- <logger name="bubble.resources.stream.FilterHttpResource" level="WARN" />--> | |||
<!-- <logger name="bubble.resources.stream.FilterHttpResource" level="DEBUG" />--> | |||
<logger name="bubble.resources.stream.FilterHttpResource" level="WARN" /> | |||
<logger name="bubble.service.stream" level="INFO" /> | |||
<!-- <logger name="bubble.service.dbfilter" level="DEBUG" />--> | |||
<!-- <logger name="bubble.service.account.StandardAccountMessageService" level="DEBUG" />--> | |||
@@ -1 +1 @@ | |||
Subproject commit 4f8345542b29228db0dba3e845f0662ab9cf6693 | |||
Subproject commit 04db22382ddffa08b3f05c024603da75cdfb8b55 |
@@ -228,9 +228,9 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | |||
return None | |||
def bubble_get_flex_router(client_addr): | |||
def bubble_get_flex_router(client_addr, host): | |||
name = 'bubble_get_flex_router' | |||
url = 'http://127.0.0.1:' + bubble_port + '/api/filter/flexRouters' | |||
url = 'http://127.0.0.1:' + bubble_port + '/api/filter/flexRouters/' + host | |||
headers = { | |||
'X-Forwarded-For': client_addr, | |||
'Accept': 'application/json' | |||
@@ -3,15 +3,14 @@ | |||
# | |||
import asyncio | |||
from httpx._types import RequestData | |||
from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody | |||
from mitmproxy import http | |||
from mitmproxy.net.http import headers as nheaders | |||
from mitmproxy.proxy.protocol.request_capture import RequestCapture | |||
from bubble_api import bubble_get_flex_router, collect_response_headers, bubble_async, async_client, async_stream, \ | |||
HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH | |||
from bubble_api import bubble_get_flex_router, collect_response_headers, async_client, async_stream, \ | |||
HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH, HEADER_CONTENT_TYPE | |||
import logging | |||
from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | |||
@@ -27,6 +26,7 @@ class FlexFlow(RequestCapture): | |||
mitm_flow: None | |||
router: None | |||
request_chunks: None | |||
response_stream: None | |||
def __init__(self, flex_host, mitm_flow, router): | |||
super().__init__() | |||
@@ -40,15 +40,39 @@ class FlexFlow(RequestCapture): | |||
headers={}, | |||
content=None) | |||
def is_error(self): | |||
return 'error_html' in self.router and self.router['error_html'] and len(self.router['error_html']) > 0 | |||
def capture(self, chunks): | |||
self.request_chunks = chunks | |||
def process_no_flex(flex_flow): | |||
flow = flex_flow.mitm_flow | |||
response_headers = nheaders.Headers() | |||
response_headers[HEADER_CONTENT_TYPE] = 'text/html' | |||
response_headers[HEADER_CONTENT_LENGTH] = str(len(flex_flow.router['error_html'])) | |||
flow.response = http.HTTPResponse(http_version='HTTP/1.1', | |||
status_code=200, | |||
reason='OK', | |||
headers=response_headers, | |||
content=None) | |||
error_html = flex_flow.router['error_html'] | |||
flex_flow.response_stream = lambda chunks: error_html | |||
flow.response.stream = lambda chunks: error_html | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('process_no_flex: no router found, returning error_html') | |||
return flex_flow | |||
def new_flex_flow(client_addr, flex_host, flow): | |||
router = bubble_get_flex_router(client_addr) | |||
router = bubble_get_flex_router(client_addr, flex_host) | |||
if router is None or 'auth' not in router: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('new_flex_flow: no flex router for host: '+flex_host) | |||
if bubble_log.isEnabledFor(ERROR): | |||
bubble_log.error('new_flex_flow: no flex router for host: '+flex_host) | |||
return None | |||
if bubble_log.isEnabledFor(INFO): | |||
@@ -58,6 +82,14 @@ def new_flex_flow(client_addr, flex_host, flow): | |||
def process_flex(flex_flow): | |||
if flex_flow.is_error(): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('process_flex: no router found, returning default flow') | |||
return process_no_flex(flex_flow) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('process_flex: using router: '+repr(flex_flow.router)) | |||
flex_host = flex_flow.flex_host | |||
flow = flex_flow.mitm_flow | |||
router = flex_flow.router | |||
@@ -81,7 +113,7 @@ def process_flex(flex_flow): | |||
proxies = {"http": proxy_url, "https": proxy_url} | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('process_flex: sending flex request for ' + method +' ' + url +' to ' + proxy_url +' with headers=' + repr(request_headers) +' and body=' + repr(request_body)) | |||
bubble_log.debug('process_flex: sending flex request for '+method+' '+url+' to '+proxy_url) | |||
loop = asyncio.new_event_loop() | |||
client = async_client(proxies=proxies, timeout=30) | |||
@@ -135,7 +167,9 @@ def process_flex(flex_flow): | |||
# Apply filters | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding...') | |||
return response | |||
flex_flow.response_stream = response | |||
return flex_flow | |||
async def async_chunk_iter(chunks): | |||
@@ -140,7 +140,7 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
return response.content | |||
def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_encoding, content_type, csp): | |||
def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_encoding, content_type, csp): | |||
loop = asyncio.new_event_loop() | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type) | |||
@@ -149,8 +149,9 @@ def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_ | |||
content_length = get_flow_ctx(flow, CTX_CONTENT_LENGTH) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_filter_chunks: found content_length='+str(content_length)) | |||
if flex_stream is not None: | |||
chunks = flex_stream.iter_content(8192) | |||
if flex_flow is not None: | |||
# flex flows with errors are handled before we get here | |||
chunks = flex_flow.response_stream.iter_content(8192) | |||
try: | |||
buffer = b'' | |||
for chunk in chunks: | |||
@@ -190,10 +191,10 @@ def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_ | |||
yield None | |||
def bubble_modify(flow, flex_stream, req_id, user_agent, content_encoding, content_type, csp): | |||
def bubble_modify(flow, flex_flow, req_id, user_agent, content_encoding, content_type, csp): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_modify: modifying req_id='+req_id+' with content_type='+content_type) | |||
return lambda chunks: bubble_filter_chunks(flow, chunks, flex_stream, req_id, | |||
return lambda chunks: bubble_filter_chunks(flow, chunks, flex_flow, req_id, | |||
user_agent, content_encoding, content_type, csp) | |||
@@ -215,13 +216,15 @@ def abort_data(content_type): | |||
def responseheaders(flow): | |||
flex_flow = get_flow_ctx(flow, CTX_BUBBLE_FLEX) | |||
if flex_flow: | |||
flex_stream = process_flex(flex_flow) | |||
flex_flow = process_flex(flex_flow) | |||
else: | |||
flex_stream = None | |||
bubble_filter_response(flow, flex_stream) | |||
flex_flow = None | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('responseheaders: flex_flow = '+repr(flex_flow)) | |||
bubble_filter_response(flow, flex_flow) | |||
def bubble_filter_response(flow, flex_stream): | |||
def bubble_filter_response(flow, flex_flow): | |||
# only filter once -- flex routing may have pre-filtered | |||
if get_flow_ctx(flow, CTX_BUBBLE_FILTERED): | |||
return | |||
@@ -235,6 +238,11 @@ def bubble_filter_response(flow, flex_stream): | |||
else: | |||
special_bubble_response(flow) | |||
elif flex_flow and flex_flow.is_error(): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_filter_response: flex_flow had error, returning error_html: ' + repr(flex_flow.response_stream)) | |||
flow.response.stream = flex_flow.response_stream | |||
else: | |||
abort_code = get_flow_ctx(flow, CTX_BUBBLE_ABORT) | |||
if abort_code is not None: | |||
@@ -320,7 +328,7 @@ def bubble_filter_response(flow, flex_stream): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug(prefix+'content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type)) | |||
flow.response.stream = bubble_modify(flow, flex_stream, req_id, | |||
flow.response.stream = bubble_modify(flow, flex_flow, req_id, | |||
user_agent, content_encoding, content_type, csp) | |||
if content_length_value: | |||
flow.response.headers['transfer-encoding'] = 'chunked' | |||
@@ -264,7 +264,8 @@ class Rerouter: | |||
elif host is not None: | |||
client_addr = flow.client_conn.address[0] | |||
if is_flex_domain(client_addr, host): | |||
add_flow_ctx(flow, CTX_BUBBLE_FLEX, new_flex_flow(client_addr, host, flow)) | |||
flex_flow = new_flex_flow(client_addr, host, flow) | |||
add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('request: is_flex_domain('+host+') returned true, setting ctx: '+CTX_BUBBLE_FLEX) | |||