From 8a4f9627eb289c7349b9e9ee682617b007ec8a5c Mon Sep 17 00:00:00 2001 From: Jonathan Cobb Date: Sun, 13 Sep 2020 08:48:25 -0400 Subject: [PATCH] fix bubble special requests, stream responses --- .../roles/mitmproxy/files/bubble_api.py | 67 +++++++++++++++---- .../mitmproxy/files/bubble_conn_check.py | 49 ++++++-------- .../roles/mitmproxy/files/bubble_flex.py | 31 +-------- .../roles/mitmproxy/files/bubble_request.py | 3 +- 4 files changed, 81 insertions(+), 69 deletions(-) diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py index dd841518..45b8ad65 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py @@ -17,12 +17,14 @@ import nest_asyncio import redis from bubble_vpn4 import wireguard_network_ipv4 from bubble_vpn6 import wireguard_network_ipv6 +from bubble_debug import get_stack from netaddr import IPAddress, IPNetwork from bubble_config import bubble_port, debug_capture_fqdn, \ bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6 from mitmproxy import http from mitmproxy.net.http import headers as nheaders +from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody bubble_log = logging.getLogger(__name__) @@ -154,6 +156,7 @@ async def _async_stream(client, name, url, async def _bubble_async(name, url, + client=None, headers=None, method='GET', data=None, @@ -161,11 +164,15 @@ async def _bubble_async(name, url, proxies=None, timeout=5, max_redirects=0): - async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client: - return await async_response(client, name, url, headers=headers, method=method, data=data, json=json) + if client is not None: + return await _async_stream(client, name, url, headers=headers, method=method, data=data, json=json, timeout=timeout, max_redirects=max_redirects) + else: + async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client: + return await async_response(client, name, url, headers=headers, method=method, data=data, json=json) def bubble_async(name, url, + client=None, headers=None, method='GET', data=None, @@ -176,6 +183,7 @@ def bubble_async(name, url, loop=asyncio.get_running_loop()): try: return loop.run_until_complete(_bubble_async(name, url, + client=client, headers=headers, method=method, data=data, @@ -184,19 +192,44 @@ def bubble_async(name, url, timeout=timeout, max_redirects=max_redirects)) except Exception as e: - bubble_log.error('bubble_async('+name+'): error: '+repr(e)) + bubble_log.error('bubble_async('+name+'): error: '+repr(e)+' from '+get_stack(e)) def bubble_async_request_json(name, url, headers, method='GET', json=None): - response = bubble_async(name, url, headers, method=method, json=json) + response = bubble_async(name, url, headers=headers, method=method, json=json) if response and response.status_code == 200: return response.json() + elif response: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+repr(response.status_code)) else: if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+str(response.status_code)) + bubble_log.debug('bubble_async_request_json('+name+'): error, no response') return None +def cleanup_async(url, loop, client, response): + def cleanup(): + errors = False + try: + loop.run_until_complete(response.aclose()) + except Exception as e: + bubble_log.error('cleanup_async: error closing response: '+repr(e)) + errors = True + try: + loop.run_until_complete(client.aclose()) + except Exception as e: + bubble_log.error('cleanup_async: error: '+repr(e)) + errors = True + if not errors: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('cleanup_async: successfully completed: '+url) + else: + if bubble_log.isEnabledFor(WARNING): + bubble_log.warning('cleanup_async: successfully completed (but had errors closing): ' + url) + return cleanup + + def bubble_conn_check(client_addr, server_addr, fqdns, security_level): if debug_capture_fqdn and fqdns: for f in debug_capture_fqdn: @@ -218,7 +251,7 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): 'clientAddr': client_addr } try: - return bubble_async_request_json(name, url, headers, method='POST', json=data) + return bubble_async_request_json(name, url, headers=headers, method='POST', json=data) except Exception as e: if bubble_log.isEnabledFor(ERROR): @@ -366,7 +399,13 @@ def is_not_from_vpn(client_addr): return ip not in VPN_IP4_CIDR and ip not in VPN_IP6_CIDR -def is_flex_domain(client_addr, fqdn): +def is_flex_domain(client_addr, server_addr, fqdns): + if fqdns is None or len(fqdns) != 1: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('is_flex_domain: no fqdns or multiple fqdns for server_addr '+server_addr+' ('+repr(fqdns)+'), returning False') + return False + fqdn = fqdns[0] + if fqdn == bubble_host or fqdn == bubble_host_alias or fqdn == bubble_sage_host: if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('is_flex_domain: (early) returning False for: '+fqdn) @@ -429,20 +468,24 @@ def special_bubble_response(flow): return uri = make_bubble_special_path(path) if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('special_bubble_response: sending special bubble request to '+uri) + bubble_log.debug('special_bubble_response: sending special bubble request to '+uri+' from '+get_stack()) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' } if flow.request.method == 'GET': - response = bubble_async(name, uri, headers=headers) + loop = asyncio.new_event_loop() + client = async_client(timeout=30) + response = bubble_async(name, uri, client=client, loop=loop, headers=headers) elif flow.request.method == 'POST': + loop = asyncio.new_event_loop() + client = async_client(timeout=30) if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content)) if flow.request.content: headers['Content-Length'] = str(len(flow.request.content)) - response = bubble_async(name, uri, json=flow.request.content, headers=headers) + response = bubble_async(name, uri, client=client, loop=loop, json=flow.request.content, headers=headers) else: if bubble_log.isEnabledFor(WARNING): @@ -454,7 +497,7 @@ def special_bubble_response(flow): response_headers = collect_response_headers(response) flow.response = http.HTTPResponse(http_version=http_version, status_code=response.status_code, - reason=response.reason, + reason=response.reason_phrase, headers=response_headers, content=None) if response is not None: @@ -463,7 +506,7 @@ def special_bubble_response(flow): flow.response.headers = collect_response_headers(response) flow.response.status_code = response.status_code flow.response.reason = status_reason(response.status_code) - flow.response.stream = lambda chunks: send_bubble_response(response) + flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_async(uri, loop, client, response)) def send_bubble_response(response): diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py index bfb9a410..8a15ba12 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py @@ -95,17 +95,16 @@ def conn_check_cache_prefix(client_addr, server_addr): return REDIS_CONN_CHECK_PREFIX + client_addr + '_' + server_addr -def fqdns_for_addr(server_addr): - prefix = REDIS_DNS_PREFIX + server_addr - keys = REDIS.keys(prefix + '_*') - if keys is None or len(keys) == 0: +def fqdns_for_addr(client_addr, server_addr): + key = REDIS_DNS_PREFIX + server_addr + '~' + client_addr + values = REDIS.smembers(key) + if values is None or len(values) == 0: if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('fqdns_for_addr: no FQDN found for addr '+str(server_addr)+', checking raw addr') - return '' + bubble_log.debug('fqdns_for_addr: no FQDN found for server_addr '+str(server_addr)+' and client_addr '+client_addr) + return None fqdns = [] - for k in keys: - fqdn = k.decode()[len(prefix)+1:] - fqdns.append(fqdn) + for fqdn in values: + fqdns.append(fqdn.decode()) return fqdns @@ -120,6 +119,8 @@ class TlsBlock(TlsLayer): class TlsFeedback(TlsLayer): + fqdns = None + security_level = None """ Monkey-patch _establish_tls_with_client to get feedback if TLS could be established successfully on the client connection (which may fail due to cert pinning). @@ -231,37 +232,31 @@ def check_connection(client_addr, server_addr, fqdns, security_level): return check_response -def check_passthru_flex(client_addr, server_addr, fqdns): - if fqdns: - for fqdn in fqdns: - if is_flex_domain(client_addr, fqdn): - return True - else: - return is_flex_domain(client_addr, server_addr) - - -def passthru_flex_port(client_addr, fqdns): - router = bubble_get_flex_router(client_addr) +def passthru_flex_port(client_addr, fqdn): + router = bubble_get_flex_router(client_addr, fqdn) if router is None or 'auth' not in router: if bubble_log.isEnabledFor(INFO): - bubble_log.info('apply_passthru_flex: no flex router for fqdn(s): '+repr(fqdns)) + bubble_log.info('apply_passthru_flex: no flex router for fqdn(s): '+repr(fqdn)) elif 'port' in router: return router['port'] else: if bubble_log.isEnabledFor(WARNING): - bubble_log.warning('apply_passthru_flex: flex router found but has no port ('+repr(router)+') for fqdn(s): '+repr(fqdns)) + bubble_log.warning('apply_passthru_flex: flex router found but has no port ('+repr(router)+') for fqdn(s): '+repr(fqdn)) return None def do_passthru(client_addr, server_addr, fqdns, layer): flex_port = None - if check_passthru_flex(client_addr, server_addr, fqdns): - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('do_passthru: applying flex passthru for server=' + server_addr + ', fqdns=' + str(fqdns)) - flex_port = passthru_flex_port(client_addr, fqdns) + if is_flex_domain(client_addr, server_addr, fqdns): + flex_port = passthru_flex_port(client_addr, fqdns[0]) if flex_port: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('do_passthru: applying flex passthru for server=' + server_addr + ', fqdns=' + str(fqdns)) layer_replacement = BubbleFlexPassthruLayer(layer.ctx, ('127.0.0.1', flex_port), fqdns[0], 443) layer.reply.send(layer_replacement) + else: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('do_passthru: detected flex passthru but no flex routers available for server=' + server_addr + ', fqdns=' + str(fqdns)) if flex_port is None: layer_replacement = RawTCPLayer(layer.ctx, ignore=True) layer.reply.send(layer_replacement) @@ -280,7 +275,7 @@ def next_layer(layer): bubble_log.debug('next_layer: using fqdn in SNI: '+ fqdn) fqdns = [fqdn] else: - fqdns = fqdns_for_addr(server_addr) + fqdns = fqdns_for_addr(client_addr, server_addr) if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('next_layer: NO fqdn in sni, using fqdns from DNS: '+ str(fqdns)) layer.fqdns = fqdns diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py index 8173fa5f..4dfcbe26 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py @@ -9,7 +9,7 @@ from mitmproxy import http from mitmproxy.net.http import headers as nheaders from mitmproxy.proxy.protocol.request_capture import RequestCapture -from bubble_api import bubble_get_flex_router, collect_response_headers, async_client, async_stream, \ +from bubble_api import bubble_get_flex_router, collect_response_headers, async_client, async_stream, cleanup_async, \ HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH, HEADER_CONTENT_TYPE import logging @@ -159,7 +159,7 @@ def process_flex(flex_flow): elif content_length is None or int(content_length) > 0: response_headers[HEADER_TRANSFER_ENCODING] = 'chunked' - flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_flex(url, loop, client, response)) + flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_async(url, loop, client, response)) else: response_headers[HEADER_CONTENT_LENGTH] = '0' @@ -176,30 +176,3 @@ def process_flex(flex_flow): async def async_chunk_iter(chunks): for chunk in chunks: yield chunk - - -def cleanup_flex(url, loop, client, response): - def cleanup(): - - errors = False - - try: - loop.run_until_complete(response.aclose()) - except Exception as e: - bubble_log.error('cleanup_flex: error closing response: '+repr(e)) - errors = True - - try: - loop.run_until_complete(client.aclose()) - except Exception as e: - bubble_log.error('cleanup_flex: error: '+repr(e)) - errors = True - - if not errors: - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('cleanup_flex: successfully completed: '+url) - else: - if bubble_log.isEnabledFor(WARNING): - bubble_log.warning('cleanup_flex: successfully completed (but had errors closing): ' + url) - - return cleanup diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py index 4d5b8702..a02d88bf 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py @@ -263,7 +263,8 @@ class Rerouter: elif host is not None: client_addr = flow.client_conn.address[0] - if is_flex_domain(client_addr, host): + server_addr= flow.server_conn.address[0] + if is_flex_domain(client_addr, server_addr, [host]): flex_flow = new_flex_flow(client_addr, host, flow) add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow) if bubble_log.isEnabledFor(DEBUG):