@@ -29,6 +29,11 @@ from mitmproxy.utils import strutils | |||
bubble_log = logging.getLogger(__name__) | |||
log_debug = bubble_log.isEnabledFor(DEBUG) | |||
log_info = bubble_log.isEnabledFor(INFO) | |||
log_warning = bubble_log.isEnabledFor(WARNING) | |||
log_error = bubble_log.isEnabledFor(ERROR) | |||
nest_asyncio.apply() | |||
HEADER_USER_AGENT = 'User-Agent' | |||
@@ -93,7 +98,7 @@ def bubble_activity_log(client_addr, server_addr, event, data): | |||
'event': event, | |||
'data': str(data) | |||
}) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_activity_log: setting '+key+' = '+value) | |||
redis_set(key, value, BUBBLE_ACTIVITY_LOG_EXPIRATION) | |||
pass | |||
@@ -110,16 +115,16 @@ async def async_response(client, name, url, | |||
method='GET', | |||
data=None, | |||
json=None): | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('bubble_async_request(' + name + '): starting async: ' + method + ' ' + url) | |||
response = await client.request(method=method, url=url, headers=headers, json=json, data=data) | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('bubble_async_request(' + name + '): async request returned HTTP status ' + str(response.status_code)) | |||
if response.status_code != 200: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('bubble_async_request(' + name + '): API call failed ('+url+'): ' + repr(response)) | |||
return response | |||
@@ -202,10 +207,10 @@ def bubble_async_request_json(name, url, headers, method='GET', json=None): | |||
if response and response.status_code == 200: | |||
return response.json() | |||
elif response: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+repr(response.status_code)) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_async_request_json('+name+'): error, no response') | |||
return None | |||
@@ -232,10 +237,10 @@ def cleanup_async(url, loop, client, response): | |||
bubble_log.error('cleanup_async: error closing loop: '+repr(e)) | |||
errors = True | |||
if not errors: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('cleanup_async: successfully completed: '+url) | |||
else: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('cleanup_async: successfully completed (but had errors closing): ' + url) | |||
return cleanup | |||
@@ -244,7 +249,7 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | |||
if debug_capture_fqdn and fqdns: | |||
for f in debug_capture_fqdn: | |||
if f in fqdns: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_conn_check: debug_capture_fqdn detected, returning noop: '+f) | |||
return 'noop' | |||
@@ -264,7 +269,7 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | |||
return bubble_async_request_json(name, url, headers=headers, method='POST', json=data) | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('bubble_conn_check: API call failed: '+repr(e)) | |||
traceback.print_exc() | |||
if security_level is not None and security_level['level'] == 'maximum': | |||
@@ -283,7 +288,7 @@ def bubble_get_flex_router(client_addr, host): | |||
return bubble_async_request_json(name, url, headers) | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('bubble_get_flex_routes: API call failed with exception: '+repr(e)) | |||
traceback.print_exc() | |||
return None | |||
@@ -312,7 +317,7 @@ BLOCK_MATCHER = { | |||
def bubble_matchers(req_id, client_addr, server_addr, flow, host): | |||
if debug_capture_fqdn and host and host in debug_capture_fqdn: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('bubble_matchers: debug_capture_fqdn detected, returning DEBUG_MATCHER: '+host) | |||
return DEBUG_MATCHER | |||
@@ -324,21 +329,21 @@ def bubble_matchers(req_id, client_addr, server_addr, flow, host): | |||
'Content-Type': 'application/json' | |||
} | |||
if HEADER_USER_AGENT not in flow.request.headers: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('bubble_matchers: no User-Agent header, setting to UNKNOWN') | |||
user_agent = 'UNKNOWN' | |||
else: | |||
user_agent = flow.request.headers[HEADER_USER_AGENT] | |||
if HEADER_REFERER not in flow.request.headers: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_matchers: no Referer header, setting to NONE') | |||
referer = 'NONE' | |||
else: | |||
try: | |||
referer = flow.request.headers[HEADER_REFERER].encode().decode() | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('bubble_matchers: error parsing Referer header: '+repr(e)) | |||
referer = 'NONE' | |||
@@ -357,13 +362,13 @@ def bubble_matchers(req_id, client_addr, server_addr, flow, host): | |||
if response.status_code == 200: | |||
return response.json() | |||
elif response.status_code == 403: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_matchers: response was FORBIDDEN, returning block: '+str(response.status_code)+' / '+repr(response.text)) | |||
return BLOCK_MATCHER | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('bubble_matchers: response not OK, returning empty matchers array: '+str(response.status_code)+' / '+repr(response.text)) | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('bubble_matchers: API call failed: '+repr(e)) | |||
traceback.print_exc() | |||
return None | |||
@@ -411,13 +416,13 @@ def is_not_from_vpn(client_addr): | |||
def is_flex_domain(client_addr, server_addr, fqdns): | |||
if fqdns is None or len(fqdns) != 1: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('is_flex_domain: no fqdns or multiple fqdns for server_addr '+server_addr+' ('+repr(fqdns)+'), returning False') | |||
return False | |||
fqdn = fqdns[0] | |||
if fqdn == bubble_host or fqdn == bubble_host_alias or (bubble_sage_host is not None and fqdn == bubble_sage_host): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('is_flex_domain: (early) returning False for: '+fqdn) | |||
return False | |||
check_fqdn = fqdn | |||
@@ -425,7 +430,7 @@ def is_flex_domain(client_addr, server_addr, fqdns): | |||
exclusion_set = 'flexExcludeLists~' + client_addr + '~UNION' | |||
excluded = REDIS.sismember(exclusion_set, fqdn) | |||
if excluded: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('is_flex_domain: returning False for excluded flex domain: ' + fqdn + ' (check=' + check_fqdn + ')') | |||
return False | |||
@@ -433,11 +438,11 @@ def is_flex_domain(client_addr, server_addr, fqdns): | |||
while '.' in check_fqdn: | |||
found = REDIS.sismember(flex_set, check_fqdn) | |||
if found: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('is_flex_domain: returning True for: '+fqdn+' (check='+check_fqdn+')') | |||
return True | |||
check_fqdn = check_fqdn[check_fqdn.index('.')+1:] | |||
# if bubble_log.isEnabledFor(DEBUG): | |||
# if log_debug: | |||
# bubble_log.debug('is_flex_domain: returning False for: '+fqdn) | |||
return False | |||
@@ -530,7 +535,7 @@ def _replace_in_headers(headers: nheaders.Headers, modifiers_dict: dict) -> int: | |||
return repl_count | |||
def response_header_modify(flow) -> int: | |||
def response_header_modify(flow): | |||
if flow.response is None: | |||
return None | |||
@@ -551,8 +556,8 @@ def _header_modify(client_addr: str, ctx: dict, headers: nheaders.Headers) -> in | |||
modifiers_dict[regex] = replacement | |||
repl_count += _replace_in_headers(headers, modifiers_dict) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('_header_modify: replacing headers - replacements count: ' + repl_count) | |||
if log_debug: | |||
bubble_log.debug('_header_modify: replacing headers - replacements count: '+str(repl_count)) | |||
return repl_count | |||
@@ -576,7 +581,7 @@ def _replace_modifier_values(s: str, ctx: dict) -> str: | |||
def health_check_response(flow): | |||
# if bubble_log.isEnabledFor(DEBUG): | |||
# if log_debug: | |||
# bubble_log.debug('health_check_response: special bubble health check request, responding with OK') | |||
response_headers = nheaders.Headers() | |||
response_headers[HEADER_HEALTH_CHECK] = 'OK' | |||
@@ -596,7 +601,7 @@ def health_check_response(flow): | |||
def tarpit_response(flow, host): | |||
# if bubble_log.isEnabledFor(DEBUG): | |||
# if log_debug: | |||
# bubble_log.debug('health_check_response: special bubble health check request, responding with OK') | |||
response_headers = nheaders.Headers() | |||
response_headers[HEADER_LOCATION] = 'http://'+host+':'+str(TARPIT_PORT)+'/admin/index.php' | |||
@@ -625,7 +630,7 @@ def special_bubble_response(flow): | |||
return | |||
uri = make_bubble_special_path(path) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('special_bubble_response: sending special bubble '+flow.request.method+' to '+uri) | |||
headers = { | |||
'Accept': 'application/json', | |||
@@ -638,12 +643,12 @@ def special_bubble_response(flow): | |||
elif flow.request.method == 'POST': | |||
if include_request_headers(flow.request.path): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('special_bubble_request: including client headers: '+repr(flow.request.headers)) | |||
# add client request headers | |||
for name, value in flow.request.headers.items(): | |||
headers['X-Bubble-Client-Header-'+name] = value | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('special_bubble_request: NOW headers='+repr(headers)) | |||
data = None | |||
@@ -656,7 +661,7 @@ def special_bubble_response(flow): | |||
response = async_stream(client, name, uri, headers=headers, method='POST', data=data, loop=loop) | |||
else: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('special_bubble_response: special bubble request: method '+flow.request.method+' not supported') | |||
return | |||
@@ -669,7 +674,7 @@ def special_bubble_response(flow): | |||
headers=response_headers, | |||
content=None) | |||
if response is not None: | |||
# if bubble_log.isEnabledFor(DEBUG): | |||
# if log_debug: | |||
# bubble_log.debug('special_bubble_response: special bubble request: response status = '+str(response.status_code)) | |||
flow.response.headers = collect_response_headers(response) | |||
flow.response.status_code = response.status_code | |||
@@ -39,6 +39,10 @@ from bubble_flex_passthru import BubbleFlexPassthruLayer | |||
bubble_log = logging.getLogger(__name__) | |||
log_debug = bubble_log.isEnabledFor(DEBUG) | |||
log_info = bubble_log.isEnabledFor(INFO) | |||
log_warning = bubble_log.isEnabledFor(WARNING) | |||
REDIS_DNS_PREFIX = 'bubble_dns_' | |||
REDIS_CONN_CHECK_PREFIX = 'bubble_conn_check_' | |||
REDIS_CHECK_DURATION = 60 * 60 # 1 hour timeout | |||
@@ -62,18 +66,18 @@ def get_device_security_level(client_addr, fqdns): | |||
return {'level': SEC_MAX} | |||
level = level.decode() | |||
if level == SEC_STD: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.info('get_device_security_level: checking for max_required_fqdns against fqdns='+repr(fqdns)) | |||
if fqdns: | |||
max_required_fqdns = REDIS.smembers(REDIS_KEY_DEVICE_SITE_MAX_SECURITY_LEVEL_PREFIX+client_addr) | |||
if max_required_fqdns is not None: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.info('get_device_security_level: found max_required_fqdns='+repr(max_required_fqdns)) | |||
for max_required in max_required_fqdns: | |||
max_required = max_required.decode() | |||
for fqdn in fqdns: | |||
if max_required == fqdn or (max_required.startswith('*.') and fqdn.endswith(max_required[1:])): | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('get_device_security_level: returning maximum for fqdn '+fqdn+' based on max_required='+max_required) | |||
return {'level': SEC_MAX, 'pinned': True} | |||
return {'level': level} | |||
@@ -97,13 +101,13 @@ def conn_check_cache_prefix(client_addr, server_addr): | |||
def fqdns_for_addr(client_addr, server_addr): | |||
if server_addr is None or client_addr is None or len(client_addr) == 0 or len(server_addr) == 0: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('fqdns_for_addr: client_addr ('+repr(client_addr)+') or server_addr ('+repr(server_addr)+') was None or empty') | |||
return None | |||
key = REDIS_DNS_PREFIX + server_addr + '~' + client_addr | |||
values = REDIS.smembers(key) | |||
if values is None or len(values) == 0: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('fqdns_for_addr: no FQDN found for server_addr '+str(server_addr)+' and client_addr '+client_addr) | |||
return None | |||
fqdns = [] | |||
@@ -117,7 +121,7 @@ class TlsBlock(TlsLayer): | |||
Monkey-patch __call__ to drop this connection entirely | |||
""" | |||
def __call__(self): | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('TlsBlock: blocking') | |||
return | |||
@@ -138,18 +142,18 @@ class TlsFeedback(TlsLayer): | |||
except TlsProtocolException as e: | |||
if self.do_block: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('_establish_tls_with_client: TLS error for '+str(server_address)+'/fqdns='+str(self.fqdns)+' and do_block==True, raising error for client '+client_address) | |||
raise e | |||
tb = traceback.format_exc() | |||
if 'OpenSSL.SSL.ZeroReturnError' in tb: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('_establish_tls_with_client: TLS error for '+str(server_address)+'/fqdns='+str(self.fqdns)+', raising SSL zero return error for client '+client_address) | |||
raise e | |||
elif 'SysCallError' in tb: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('_establish_tls_with_client: TLS error for '+str(server_address)+'/fqdns='+str(self.fqdns)+', raising SysCallError for client '+client_address) | |||
raise e | |||
@@ -159,25 +163,25 @@ class TlsFeedback(TlsLayer): | |||
if security_level['level'] == SEC_MAX: | |||
if 'pinned' in security_level and security_level['pinned']: | |||
redis_set(cache_key, json.dumps({'fqdns': [fqdn], 'addr': server_address, 'passthru': False, 'block': False, 'reason': 'tls_failure_pinned'}), ex=REDIS_CHECK_DURATION) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('_establish_tls_with_client: TLS error for '+str(server_address)+', enabling block (security_level=maximum/pinned) for client '+client_address+' with cache_key='+cache_key+' and fqdn='+fqdn+': '+repr(e)) | |||
else: | |||
redis_set(cache_key, json.dumps({'fqdns': [fqdn], 'addr': server_address, 'passthru': False, 'block': True, 'reason': 'tls_failure'}), ex=REDIS_CHECK_DURATION) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('_establish_tls_with_client: TLS error for '+str(server_address)+', enabling block (security_level=maximum) for client '+client_address+' with cache_key='+cache_key+' and fqdn='+fqdn+': '+repr(e)) | |||
else: | |||
redis_set(cache_key, json.dumps({'fqdns': [fqdn], 'addr': server_address, 'passthru': True, 'reason': 'tls_failure'}), ex=REDIS_CHECK_DURATION) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('_establish_tls_with_client: TLS error for '+str(server_address)+', enabling passthru for client '+client_address+' with cache_key='+cache_key+' and fqdn='+fqdn+': '+repr(e)) | |||
else: | |||
cache_key = conn_check_cache_prefix(client_address, server_address) | |||
if security_level['level'] == SEC_MAX: | |||
redis_set(cache_key, json.dumps({'fqdns': None, 'addr': server_address, 'passthru': False, 'block': True, 'reason': 'tls_failure'}), ex=REDIS_CHECK_DURATION) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('_establish_tls_with_client: TLS error for '+str(server_address)+', enabling block (security_level=maximum) for client '+client_address+' with cache_key='+cache_key+' and server_address='+server_address+': '+repr(e)) | |||
else: | |||
redis_set(cache_key, json.dumps({'fqdns': None, 'addr': server_address, 'passthru': True, 'reason': 'tls_failure'}), ex=REDIS_CHECK_DURATION) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('_establish_tls_with_client: TLS error for '+str(server_address)+', enabling passthru for client '+client_address+' with cache_key='+cache_key+' and server_address='+server_address+': '+repr(e)) | |||
raise e | |||
@@ -186,26 +190,26 @@ def check_bubble_connection(client_addr, server_addr, fqdns, security_level): | |||
check_response = bubble_conn_check(client_addr, server_addr, fqdns, security_level) | |||
if check_response is None or check_response == 'error': | |||
if security_level['level'] == SEC_MAX: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', security_level=maximum, returning Block') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'block': True, 'reason': 'bubble_error'} | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning True') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'reason': 'bubble_error'} | |||
elif check_response == 'passthru': | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning True') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': True, 'reason': 'bubble_passthru'} | |||
elif check_response == 'block': | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning Block') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'block': True, 'reason': 'bubble_block'} | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('check_bubble_connection: bubble API returned ' + str(check_response) +' for FQDN/addr ' + str(fqdns) +'/' + str(server_addr) + ', returning False') | |||
return {'fqdns': fqdns, 'addr': server_addr, 'passthru': False, 'reason': 'bubble_no_passthru'} | |||
@@ -219,19 +223,19 @@ def check_connection(client_addr, server_addr, fqdns, security_level): | |||
check_json = REDIS.get(cache_key) | |||
if check_json is None or len(check_json) == 0: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'not in redis or empty, calling check_bubble_connection against fqdns='+str(fqdns)) | |||
check_response = check_bubble_connection(client_addr, server_addr, fqdns, security_level) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'check_bubble_connection('+str(fqdns)+') returned '+str(check_response)+", storing in redis...") | |||
redis_set(cache_key, json.dumps(check_response), ex=REDIS_CHECK_DURATION) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'found check_json='+str(check_json)+', touching key in redis') | |||
check_response = json.loads(check_json) | |||
REDIS.touch(cache_key) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'returning '+str(check_response)) | |||
return check_response | |||
@@ -239,12 +243,12 @@ def check_connection(client_addr, server_addr, fqdns, security_level): | |||
def passthru_flex_port(client_addr, fqdn): | |||
router = bubble_get_flex_router(client_addr, fqdn) | |||
if router is None or 'auth' not in router: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('apply_passthru_flex: no flex router for fqdn(s): '+repr(fqdn)) | |||
elif 'port' in router: | |||
return router['port'] | |||
else: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('apply_passthru_flex: flex router found but has no port ('+repr(router)+') for fqdn(s): '+repr(fqdn)) | |||
return None | |||
@@ -254,12 +258,12 @@ def do_passthru(client_addr, server_addr, fqdns, layer): | |||
if is_flex_domain(client_addr, server_addr, fqdns): | |||
flex_port = passthru_flex_port(client_addr, fqdns[0]) | |||
if flex_port: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('do_passthru: applying flex passthru for server=' + server_addr + ', fqdns=' + str(fqdns)) | |||
layer_replacement = BubbleFlexPassthruLayer(layer.ctx, ('127.0.0.1', flex_port), fqdns[0], 443) | |||
layer.reply.send(layer_replacement) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('do_passthru: detected flex passthru but no flex routers available for server=' + server_addr + ', fqdns=' + str(fqdns)) | |||
if flex_port is None: | |||
layer_replacement = RawTCPLayer(layer.ctx, ignore=True) | |||
@@ -271,89 +275,91 @@ def next_layer(layer): | |||
client_hello = net_tls.ClientHello.from_file(layer.client_conn.rfile) | |||
client_addr = layer.client_conn.address[0] | |||
server_addr = layer.server_conn.address[0] | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('next_layer: STARTING: client='+ client_addr+' server='+server_addr) | |||
if log_debug: | |||
bubble_log.debug('next_layer: STARTING: client='+client_addr+' server='+server_addr) | |||
if client_hello.sni: | |||
fqdn = client_hello.sni.decode() | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('next_layer: using fqdn in SNI: '+ fqdn) | |||
if log_debug: | |||
bubble_log.debug('next_layer: using fqdn in SNI: '+fqdn) | |||
fqdns = [fqdn] | |||
else: | |||
fqdns = fqdns_for_addr(client_addr, server_addr) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('next_layer: NO fqdn in sni, using fqdns from DNS: '+ str(fqdns)) | |||
if log_debug: | |||
bubble_log.debug('next_layer: NO fqdn in sni, using fqdns from DNS: '+str(fqdns)) | |||
layer.fqdns = fqdns | |||
no_fqdns = fqdns is None or len(fqdns) == 0 | |||
security_level = get_device_security_level(client_addr, fqdns) | |||
layer.security_level = security_level | |||
layer.do_block = False | |||
if is_bubble_request(server_addr, fqdns): | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: enabling passthru for LOCAL bubble='+server_addr+' (bubble_host ('+bubble_host+') in fqdns or bubble_host_alias ('+bubble_host_alias+') in fqdns) regardless of security_level='+repr(security_level)+' for client='+client_addr+', fqdns='+repr(fqdns)) | |||
check = FORCE_PASSTHRU | |||
elif is_sage_request(server_addr, fqdns): | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: enabling passthru for SAGE server='+server_addr+' regardless of security_level='+repr(security_level)+' for client='+client_addr) | |||
check = FORCE_PASSTHRU | |||
elif is_not_from_vpn(client_addr): | |||
# todo: add to fail2ban | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('next_layer: enabling block for non-VPN client='+client_addr+', fqdns='+str(fqdns)) | |||
bubble_activity_log(client_addr, server_addr, 'conn_block_non_vpn', fqdns) | |||
layer.__class__ = TlsBlock | |||
return | |||
elif security_level['level'] == SEC_OFF: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: enabling passthru for server='+server_addr+' because security_level='+repr(security_level)+' for client='+client_addr) | |||
check = FORCE_PASSTHRU | |||
elif fqdns is not None and len(fqdns) == 1 and cert_validation_host == fqdns[0] and security_level['level'] != SEC_BASIC: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: NOT enabling passthru for server='+server_addr+' because fqdn is cert_validation_host ('+cert_validation_host+') for client='+client_addr) | |||
return | |||
elif (security_level['level'] == SEC_STD or security_level['level'] == SEC_BASIC) and no_fqdns: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: enabling passthru for server='+server_addr+' because no FQDN found and security_level='+repr(security_level)+' for client='+client_addr) | |||
check = FORCE_PASSTHRU | |||
elif security_level['level'] == SEC_MAX and no_fqdns: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: disabling passthru (no TlsFeedback) for server='+server_addr+' because no FQDN found and security_level='+repr(security_level)+' for client='+client_addr) | |||
check = FORCE_BLOCK | |||
else: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('next_layer: calling check_connection for server='+server_addr+', fqdns='+str(fqdns)+', client='+client_addr+' with security_level='+repr(security_level)) | |||
check = check_connection(client_addr, server_addr, fqdns, security_level) | |||
if check is None or ('passthru' in check and check['passthru']): | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('next_layer: enabling passthru for server=' + server_addr+', fqdns='+str(fqdns)) | |||
if log_info: | |||
bubble_log.info('next_layer: CONN-DECISION: PASSTHRU '+str(fqdns)+' for server=' + server_addr) | |||
bubble_activity_log(client_addr, server_addr, 'tls_passthru', fqdns) | |||
do_passthru(client_addr, server_addr, fqdns, layer) | |||
elif 'block' in check and check['block']: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('next_layer: enabling block for server=' + server_addr+', fqdns='+str(fqdns)) | |||
bubble_activity_log(client_addr, server_addr, 'conn_block', fqdns) | |||
if show_block_stats(client_addr, fqdns) and security_level['level'] != SEC_BASIC: | |||
if log_info: | |||
bubble_log.info('next_layer: CONN-DECISION: ALLOW '+str(fqdns)+' (would block but security_level='+repr(security_level)+' and show_block_stats=True) for server='+server_addr) | |||
layer.do_block = True | |||
layer.__class__ = TlsFeedback | |||
else: | |||
if log_info: | |||
bubble_log.info('next_layer: CONN-DECISION: BLOCK '+str(fqdns)+' for server='+server_addr) | |||
layer.__class__ = TlsBlock | |||
elif security_level['level'] == SEC_BASIC: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('next_layer: check='+repr(check)+' but security_level='+repr(security_level)+', enabling passthru for server=' + server_addr+', fqdns='+str(fqdns)) | |||
if log_info: | |||
bubble_log.info('next_layer: CONN-DECISION: PASSTHRU '+str(fqdns)+' (check='+repr(check)+' but security_level='+repr(security_level)+') server='+server_addr) | |||
bubble_activity_log(client_addr, server_addr, 'tls_passthru', fqdns) | |||
do_passthru(client_addr, server_addr, fqdns, layer) | |||
else: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('next_layer: disabling passthru (with TlsFeedback) for client_addr='+client_addr+', server_addr='+server_addr+', fqdns='+str(fqdns)) | |||
if log_info: | |||
bubble_log.info('next_layer: CONN-DECISION: ALLOW '+str(fqdns)+' (with TlsFeedback) for client_addr='+client_addr+', server_addr='+server_addr) | |||
bubble_activity_log(client_addr, server_addr, 'tls_intercept', fqdns) | |||
layer.__class__ = TlsFeedback |
@@ -18,6 +18,11 @@ from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | |||
bubble_log = logging.getLogger(__name__) | |||
log_debug = bubble_log.isEnabledFor(DEBUG) | |||
log_info = bubble_log.isEnabledFor(INFO) | |||
log_warning = bubble_log.isEnabledFor(WARNING) | |||
log_error = bubble_log.isEnabledFor(ERROR) | |||
FLEX_TIMEOUT = 20 | |||
@@ -63,7 +68,7 @@ def process_no_flex(flex_flow): | |||
content=None) | |||
flex_flow.response_stream = lambda chunks: error_html | |||
error_html = flex_flow.router['error_html'] | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('process_no_flex: no router found, returning error_html') | |||
return flex_flow | |||
@@ -71,11 +76,11 @@ def process_no_flex(flex_flow): | |||
def new_flex_flow(client_addr, flex_host, flow): | |||
router = bubble_get_flex_router(client_addr, flex_host) | |||
if router is None or 'auth' not in router: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('new_flex_flow: no flex router for host: '+flex_host) | |||
return None | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('new_flex_flow: found router '+repr(router)+' for flex host: '+flex_host) | |||
return FlexFlow(flex_host, flow, router) | |||
@@ -83,11 +88,11 @@ def new_flex_flow(client_addr, flex_host, flow): | |||
def process_flex(flex_flow): | |||
if flex_flow.is_error(): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('process_flex: no router found, returning default flow') | |||
return process_no_flex(flex_flow) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('process_flex: using router: '+repr(flex_flow.router)) | |||
flex_host = flex_flow.flex_host | |||
@@ -112,7 +117,7 @@ def process_flex(flex_flow): | |||
proxy_url = router['proxyUrl'] | |||
proxies = {"http": proxy_url, "https": proxy_url} | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('process_flex: sending flex request for '+method+' '+url+' to '+proxy_url) | |||
loop = asyncio.new_event_loop() | |||
@@ -124,10 +129,10 @@ def process_flex(flex_flow): | |||
timeout=30, | |||
data=async_chunk_iter(flex_flow.request_chunks), | |||
loop=loop) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('process_flex: response returned HTTP status '+str(response.status_code)+' for '+url) | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('process_flex: error sending request to '+url+': '+repr(e)) | |||
# todo: catch TimeoutException, try another flex router; remember the last router that works for this client_addr | |||
return None | |||
@@ -166,7 +171,7 @@ def process_flex(flex_flow): | |||
flow.response.stream = lambda chunks: [] | |||
# Apply filters | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding...') | |||
flex_flow.response = response | |||
@@ -12,7 +12,7 @@ from mitmproxy.net.http import Headers | |||
from bubble_config import bubble_port, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri | |||
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, CTX_BUBBLE_FLEX, \ | |||
status_reason, get_flow_ctx, add_flow_ctx, bubble_async, async_client, cleanup_async, \ | |||
status_reason, update_host_and_port, get_flow_ctx, add_flow_ctx, bubble_async, async_client, cleanup_async, \ | |||
is_bubble_special_path, is_bubble_health_check, health_check_response, special_bubble_response, \ | |||
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, CTX_BUBBLE_FILTERED, \ | |||
HEADER_CONTENT_TYPE, HEADER_CONTENT_ENCODING, HEADER_LOCATION, HEADER_CONTENT_LENGTH, \ | |||
@@ -24,6 +24,11 @@ from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | |||
bubble_log = logging.getLogger(__name__) | |||
log_debug = bubble_log.isEnabledFor(DEBUG) | |||
log_info = bubble_log.isEnabledFor(INFO) | |||
log_warning = bubble_log.isEnabledFor(WARNING) | |||
log_error = bubble_log.isEnabledFor(ERROR) | |||
BUFFER_SIZE = 4096 | |||
CONTENT_TYPE_BINARY = 'application/octet-stream' | |||
STANDARD_FILTER_HEADERS = {HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY} | |||
@@ -102,7 +107,7 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
name = 'filter_chunk' | |||
if debug_capture_fqdn: | |||
if debug_capture_fqdn in req_id: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('filter_chunk: debug_capture_fqdn detected, capturing: '+debug_capture_fqdn) | |||
f = open('/tmp/bubble_capture_'+req_id, mode='ab', buffering=0) | |||
f.write(chunk) | |||
@@ -113,7 +118,7 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
redis_passthru_key = REDIS_FILTER_PASSTHRU_PREFIX + flow.request.method + '~~~' + user_agent + ':' + flow.request.url | |||
do_pass = REDIS.get(redis_passthru_key) | |||
if do_pass: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('filter_chunk: req_id='+req_id+': passthru found in redis, returning chunk') | |||
REDIS.touch(redis_passthru_key) | |||
return chunk | |||
@@ -134,18 +139,18 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
url = url + '?last=true' | |||
chunk_len = 0 | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
if chunk is not None: | |||
chunk_len = len(chunk) | |||
if csp: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('filter_chunk: url='+url+' (csp='+csp+') size='+str(chunk_len)) | |||
headers = { | |||
HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY, | |||
HEADER_CONTENT_SECURITY_POLICY: csp | |||
} | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('filter_chunk: url='+url+' (no csp) size='+str(chunk_len)) | |||
headers = STANDARD_FILTER_HEADERS | |||
@@ -155,7 +160,7 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
else: | |||
count = 0 | |||
DEBUG_STREAM_COUNTERS[req_id] = count | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('filter_chunk: debug_stream detected, capturing: '+debug_stream_fqdn) | |||
f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.data', mode='wb', buffering=0) | |||
if chunk is not None: | |||
@@ -171,12 +176,12 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop, client=client) | |||
if not response.status_code == 200: | |||
err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) + ' content='+repr(response.content) | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error(err_message) | |||
return b'' | |||
elif HEADER_FILTER_PASSTHRU in response.headers: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('filter_chunk: server returned X-Bubble-Passthru, not filtering subsequent requests') | |||
redis_set(redis_passthru_key, 'passthru', ex=REDIS_FILTER_PASSTHRU_DURATION) | |||
return chunk | |||
@@ -186,12 +191,12 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N | |||
def bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, content_type, csp): | |||
loop = asyncio.new_event_loop() | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type) | |||
first = True | |||
last = False | |||
content_length = get_flow_ctx(flow, CTX_CONTENT_LENGTH) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_filter_chunks: found content_length='+str(content_length)) | |||
try: | |||
buffer = b'' | |||
@@ -205,7 +210,7 @@ def bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, con | |||
if content_length: | |||
bytes_sent = get_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT) | |||
last = chunk_len + bytes_sent >= content_length | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_filter_chunks: content_length = '+str(content_length)+', bytes_sent = '+str(bytes_sent)) | |||
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, bytes_sent + chunk_len) | |||
else: | |||
@@ -226,7 +231,7 @@ def bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, con | |||
# bubble_log.debug('bubble_filter_chunks(end): sending last empty chunk') | |||
yield filter_chunk(loop, flow, None, req_id, user_agent, True) # get the last bits of data | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error('bubble_filter_chunks: exception='+repr(e)) | |||
traceback.print_exc() | |||
yield None | |||
@@ -235,7 +240,7 @@ def bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, con | |||
def bubble_modify(flow, req_id, user_agent, content_encoding, content_type, csp): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_modify: modifying req_id='+req_id+' with content_type='+content_type) | |||
return lambda chunks: bubble_filter_chunks(flow, chunks, req_id, | |||
user_agent, content_encoding, content_type, csp) | |||
@@ -319,19 +324,21 @@ def bubble_filter_response(flow, flex_flow): | |||
if get_flow_ctx(flow, CTX_BUBBLE_FILTERED): | |||
return | |||
add_flow_ctx(flow, CTX_BUBBLE_FILTERED, True) | |||
update_host_and_port(flow) | |||
path = flow.request.path | |||
host = flow.request.host | |||
log_url = flow.request.scheme + '://' + host + path | |||
client_addr = flow.client_conn.address[0] | |||
if is_bubble_special_path(path): | |||
if is_bubble_health_check(path): | |||
health_check_response(flow) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_filter_response: sending special bubble response for path: '+path) | |||
special_bubble_response(flow) | |||
elif flex_flow and flex_flow.is_error(): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_filter_response: flex_flow had error, returning error_html: ' + repr(flex_flow.response_stream)) | |||
flow.response.stream = flex_flow.response_stream | |||
@@ -340,8 +347,8 @@ def bubble_filter_response(flow, flex_flow): | |||
if abort_code is not None: | |||
abort_location = get_flow_ctx(flow, CTX_BUBBLE_LOCATION) | |||
if abort_location is not None: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('bubble_filter_response: redirecting request with HTTP status '+str(abort_code)+' to: '+abort_location+', path was: '+path) | |||
if log_info: | |||
bubble_log.info('bubble_filter_response: MOD-DECISION: REDIRECT '+log_url+' redirecting request with HTTP status '+str(abort_code)+' to: '+abort_location+', path was: '+path) | |||
flow.response.headers = Headers() | |||
flow.response.headers[HEADER_LOCATION] = abort_location | |||
flow.response.status_code = abort_code | |||
@@ -352,35 +359,37 @@ def bubble_filter_response(flow, flex_flow): | |||
content_type = flow.response.headers[HEADER_CONTENT_TYPE] | |||
else: | |||
content_type = None | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('bubble_filter_response: aborting request from '+client_addr+' with HTTP status '+str(abort_code)+', path was: '+path) | |||
if log_info: | |||
bubble_log.info('bubble_filter_response: MOD-DECISION: ABORT '+log_url+' aborting request from '+client_addr+' with HTTP status '+str(abort_code)+', path was: '+path) | |||
flow.response.headers = Headers() | |||
flow.response.status_code = abort_code | |||
flow.response.reason = status_reason(abort_code) | |||
flow.response.stream = lambda chunks: abort_data(content_type) | |||
elif flow.response.status_code // 100 != 2: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('bubble_filter_response: response had HTTP status '+str(flow.response.status_code)+', returning as-is: '+path) | |||
if log_info: | |||
bubble_log.info('bubble_filter_response: MOD-DECISION: NOT-OK '+log_url+' response had HTTP status '+str(flow.response.status_code)+', returning as-is: '+path) | |||
flow.response.headers[HEADER_CONTENT_LENGTH] = '0' | |||
pass | |||
elif flow.response.headers is None or len(flow.response.headers) == 0: | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('bubble_filter_response: response had HTTP status '+str(flow.response.status_code)+', and NO response headers, returning as-is: '+path) | |||
if log_info: | |||
bubble_log.info('bubble_filter_response: MOD-DECISION: NO-HEADERS '+log_url+' response had HTTP status '+str(flow.response.status_code)+', and NO response headers, returning as-is: '+path) | |||
pass | |||
elif HEADER_CONTENT_LENGTH in flow.response.headers and flow.response.headers[HEADER_CONTENT_LENGTH] == "0": | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info('bubble_filter_response: response had HTTP status '+str(flow.response.status_code)+', and '+HEADER_CONTENT_LENGTH+' was zero, returning as-is: '+path) | |||
if log_info: | |||
bubble_log.info('bubble_filter_response: MOD-DECISION: NO-LENGTH '+log_url+' response had HTTP status '+str(flow.response.status_code)+', and '+HEADER_CONTENT_LENGTH+' was zero, returning as-is: '+path) | |||
pass | |||
else: | |||
req_id = get_flow_ctx(flow, CTX_BUBBLE_REQUEST_ID) | |||
matchers = get_flow_ctx(flow, CTX_BUBBLE_MATCHERS) | |||
prefix = 'bubble_filter_response(req_id='+str(req_id)+'): ' | |||
if log_info: | |||
bubble_log.info('bubble_filter_response: MOD-DECISION: FILTER '+log_url+' with matchers='+repr(matchers)) | |||
if req_id is not None and matchers is not None: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+' matchers: '+repr(matchers)) | |||
if HEADER_USER_AGENT in flow.request.headers: | |||
user_agent = flow.request.headers[HEADER_USER_AGENT] | |||
@@ -397,11 +406,11 @@ def bubble_filter_response(flow, flex_flow): | |||
type_regex = '^text/html.*' | |||
if re.match(type_regex, content_type): | |||
any_content_type_matches = True | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'found at least one matcher for content_type ('+content_type+'), filtering: '+path) | |||
break | |||
if not any_content_type_matches: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'no matchers for content_type ('+content_type+'), passing thru: '+path) | |||
return | |||
@@ -417,7 +426,7 @@ def bubble_filter_response(flow, flex_flow): | |||
csp = None | |||
content_length_value = flow.response.headers.pop(HEADER_CONTENT_LENGTH, None) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type)) | |||
if flex_flow is not None: | |||
@@ -435,11 +444,11 @@ def bubble_filter_response(flow, flex_flow): | |||
if hasattr(ctx, 'ctx'): | |||
ctx = ctx.ctx | |||
else: | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error(prefix+'error finding server_conn for path '+path+'. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx))) | |||
return | |||
if not hasattr(ctx, 'server_conn'): | |||
if bubble_log.isEnabledFor(ERROR): | |||
if log_error: | |||
bubble_log.error(prefix+'error finding server_conn for path '+path+'. ctx type='+str(type(ctx))+' vars='+str(vars(ctx))) | |||
return | |||
content_length = int(content_length_value) | |||
@@ -449,14 +458,14 @@ def bubble_filter_response(flow, flex_flow): | |||
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'no matchers, passing thru: '+path) | |||
pass | |||
else: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning(prefix+'no '+HEADER_CONTENT_TYPE+' header, passing thru: '+path) | |||
pass | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug(prefix+'no '+CTX_BUBBLE_MATCHERS+' in ctx, passing thru: '+path) | |||
pass |
@@ -42,6 +42,10 @@ from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | |||
bubble_log = logging.getLogger(__name__) | |||
log_debug = bubble_log.isEnabledFor(DEBUG) | |||
log_info = bubble_log.isEnabledFor(INFO) | |||
log_warning = bubble_log.isEnabledFor(WARNING) | |||
class Rerouter: | |||
@staticmethod | |||
@@ -50,8 +54,8 @@ class Rerouter: | |||
return None | |||
if is_bubble_special_path(flow.request.path): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug("get_matchers: not filtering special bubble path: "+flow.request.path) | |||
if log_debug: | |||
bubble_log.debug("not filtering special bubble path: "+flow.request.path) | |||
return None | |||
client_addr = str(flow.client_conn.address[0]) | |||
@@ -62,22 +66,23 @@ class Rerouter: | |||
try: | |||
host = str(host) | |||
except Exception as e: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('get_matchers: host '+repr(host)+' could not be decoded, type='+str(type(host))+' e='+repr(e)) | |||
return None | |||
if host == bubble_host or host == bubble_host_alias: | |||
if bubble_log.isEnabledFor(INFO): | |||
if log_info: | |||
bubble_log.info('get_matchers: request is for bubble itself ('+host+'), not matching') | |||
return None | |||
prefix = 'get_matchers('+host+flow.request.path+'): ' | |||
req_id = str(host) + '.' + str(uuid.uuid4()) + '.' + str(time.time()) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug("get_matchers: requesting match decision for req_id="+req_id) | |||
if log_debug: | |||
bubble_log.debug(prefix+"requesting match decision for req_id="+req_id) | |||
resp = bubble_matchers(req_id, client_addr, server_addr, flow, host) | |||
if not resp: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('get_matchers: no response for client_addr/host: '+client_addr+'/'+str(host)) | |||
return None | |||
@@ -85,21 +90,21 @@ class Rerouter: | |||
if 'matchers' in resp and resp['matchers'] is not None: | |||
for m in resp['matchers']: | |||
if 'urlRegex' in m: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('get_matchers: checking for match of path='+flow.request.path+' against regex: '+m['urlRegex']) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('get_matchers: checking for match of path='+flow.request.path+' -- NO regex, skipping') | |||
continue | |||
if re.match(m['urlRegex'], flow.request.path): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('get_matchers: rule matched, adding rule: '+m['rule']) | |||
matchers.append(m) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('get_matchers: rule (regex='+m['urlRegex']+') did NOT match, skipping rule: '+m['rule']) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('get_matchers: no matchers. response='+repr(resp)) | |||
decision = None | |||
@@ -107,8 +112,8 @@ class Rerouter: | |||
decision = resp['decision'] | |||
matcher_response = {'decision': decision, 'matchers': matchers, 'request_id': req_id} | |||
if bubble_log.isEnabledFor(INFO): | |||
bubble_log.info("get_matchers: returning "+repr(matcher_response)) | |||
if log_info: | |||
bubble_log.info(prefix+"returning "+repr(matcher_response)) | |||
return matcher_response | |||
def bubble_handle_request(self, flow): | |||
@@ -140,21 +145,21 @@ class Rerouter: | |||
return None | |||
elif is_bubble_request(server_addr, fqdns): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_handle_request: redirecting to https for LOCAL bubble=' + server_addr +' (bubble_host (' + bubble_host +') in fqdns or bubble_host_alias (' + bubble_host_alias +') in fqdns) for client=' + client_addr +', fqdns=' + repr(fqdns) +', path=' + path) | |||
add_flow_ctx(flow, CTX_BUBBLE_ABORT, 301) | |||
add_flow_ctx(flow, CTX_BUBBLE_LOCATION, 'https://' + host + path) | |||
return None | |||
elif is_sage_request(server_addr, fqdns): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('bubble_handle_request: redirecting to https for SAGE server='+server_addr+' for client='+client_addr) | |||
add_flow_ctx(flow, CTX_BUBBLE_ABORT, 301) | |||
add_flow_ctx(flow, CTX_BUBBLE_LOCATION, 'https://' + host + path) | |||
return None | |||
elif is_not_from_vpn(client_addr): | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('bubble_handle_request: sending to tarpit: non-VPN client='+client_addr+', url='+log_url+' host='+host) | |||
bubble_activity_log(client_addr, server_addr, 'http_tarpit_non_vpn', fqdns) | |||
tarpit_response(flow, host) | |||
@@ -167,32 +172,36 @@ class Rerouter: | |||
if matcher_response: | |||
has_decision = 'decision' in matcher_response and matcher_response['decision'] is not None | |||
if has_decision and matcher_response['decision'] == 'pass_thru': | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_handle_request: passthru response returned, passing thru...') | |||
if log_info: | |||
bubble_log.info('bubble_handle_request: REQ-DECISION PASSTHRU '+log_url+' passthru response returned, passing thru...') | |||
add_flow_ctx(flow, CTX_BUBBLE_PASSTHRU, True) | |||
bubble_activity_log(client_addr, server_addr, 'http_passthru', log_url) | |||
return host | |||
elif has_decision and matcher_response['decision'].startswith('abort_'): | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_handle_request: found abort code: ' + str(matcher_response['decision']) + ', aborting') | |||
if matcher_response['decision'] == 'abort_ok': | |||
decision = str(matcher_response['decision']) | |||
if log_debug: | |||
bubble_log.debug('bubble_handle_request: found abort code: '+decision+', aborting') | |||
if decision == 'abort_ok': | |||
abort_code = 200 | |||
elif matcher_response['decision'] == 'abort_not_found': | |||
elif decision == 'abort_not_found': | |||
abort_code = 404 | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_handle_request: unknown abort code: ' + str(matcher_response['decision']) + ', aborting with 404 Not Found') | |||
if log_debug: | |||
bubble_log.debug('bubble_handle_request: unknown abort code: '+decision+', aborting with 404 Not Found') | |||
abort_code = 404 | |||
flow.request.headers = nheaders.Headers([]) | |||
flow.request.content = b'' | |||
add_flow_ctx(flow, CTX_BUBBLE_ABORT, abort_code) | |||
bubble_activity_log(client_addr, server_addr, 'http_abort' + str(abort_code), log_url) | |||
if log_info: | |||
bubble_log.info('bubble_handle_request: REQ-DECISION: BLOCK '+log_url+' ('+decision+')') | |||
return None | |||
elif has_decision and matcher_response['decision'] == 'no_match': | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_handle_request: decision was no_match, passing thru...') | |||
if log_info: | |||
decision = str(matcher_response['decision']) | |||
bubble_log.info('bubble_handle_request: REQ-DECISION: ALLOW '+log_url+' ('+decision+')') | |||
bubble_activity_log(client_addr, server_addr, 'http_no_match', log_url) | |||
return host | |||
@@ -200,29 +209,29 @@ class Rerouter: | |||
and 'request_id' in matcher_response | |||
and len(matcher_response['matchers']) > 0): | |||
req_id = matcher_response['request_id'] | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug("bubble_handle_request: found request_id: " + req_id + ' with matchers: ' + repr(matcher_response['matchers'])) | |||
if log_info: | |||
bubble_log.info('bubble_handle_request: REQ-DECISION: FILTER '+log_url+' found request_id: '+req_id+' with matchers: '+repr(matcher_response['matchers'])) | |||
add_flow_ctx(flow, CTX_BUBBLE_MATCHERS, matcher_response['matchers']) | |||
add_flow_ctx(flow, CTX_BUBBLE_REQUEST_ID, req_id) | |||
bubble_activity_log(client_addr, server_addr, 'http_match', log_url) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_handle_request: no rules returned, passing thru...') | |||
if log_info: | |||
bubble_log.info('bubble_handle_request: REQ-DECISION: ALLOW '+log_url+' no rules returned') | |||
bubble_activity_log(client_addr, server_addr, 'http_no_rules', log_url) | |||
else: | |||
if bubble_log.isEnabledFor(DEBUG): | |||
bubble_log.debug('bubble_handle_request: no matcher_response returned, passing thru...') | |||
if log_info: | |||
bubble_log.info('bubble_handle_request: REQ-DECISION: ALLOW '+log_url+'no matcher_response') | |||
# bubble_activity_log(client_addr, server_addr, 'http_no_matcher_response', log_url) | |||
elif is_http and is_not_from_vpn(client_addr): | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('bubble_handle_request: sending to tarpit: non-VPN client='+client_addr) | |||
bubble_activity_log(client_addr, server_addr, 'http_tarpit_non_vpn', [server_addr]) | |||
tarpit_response(flow, host) | |||
return None | |||
else: | |||
if bubble_log.isEnabledFor(WARNING): | |||
if log_warning: | |||
bubble_log.warning('bubble_handle_request: no sni/host found, not applying rules to path: ' + path) | |||
bubble_activity_log(client_addr, server_addr, 'http_no_sni_or_host', [server_addr]) | |||
@@ -241,7 +250,7 @@ class Rerouter: | |||
if is_flex_domain(client_addr, server_addr, [host]): | |||
flex_flow = new_flex_flow(client_addr, host, flow) | |||
add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow) | |||
if bubble_log.isEnabledFor(DEBUG): | |||
if log_debug: | |||
bubble_log.debug('request: is_flex_domain('+host+') returned true, setting ctx: '+CTX_BUBBLE_FLEX) | |||