diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py index 45b8ad65..73aa28b1 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py @@ -17,14 +17,15 @@ import nest_asyncio import redis from bubble_vpn4 import wireguard_network_ipv4 from bubble_vpn6 import wireguard_network_ipv6 -from bubble_debug import get_stack from netaddr import IPAddress, IPNetwork +from bubble_debug import get_stack from bubble_config import bubble_port, debug_capture_fqdn, \ bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6 from mitmproxy import http from mitmproxy.net.http import headers as nheaders from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody +from mitmproxy.proxy.protocol.request_capture import RequestCapture bubble_log = logging.getLogger(__name__) @@ -165,7 +166,7 @@ async def _bubble_async(name, url, timeout=5, max_redirects=0): if client is not None: - return await _async_stream(client, name, url, headers=headers, method=method, data=data, json=json, timeout=timeout, max_redirects=max_redirects) + return await async_response(client, name, url, headers=headers, method=method, data=data, json=json) else: async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client: return await async_response(client, name, url, headers=headers, method=method, data=data, json=json) @@ -192,7 +193,7 @@ def bubble_async(name, url, timeout=timeout, max_redirects=max_redirects)) except Exception as e: - bubble_log.error('bubble_async('+name+'): error: '+repr(e)+' from '+get_stack(e)) + bubble_log.error('bubble_async('+name+'): error: '+repr(e)) def bubble_async_request_json(name, url, headers, method='GET', json=None): @@ -211,16 +212,24 @@ def bubble_async_request_json(name, url, headers, method='GET', json=None): def cleanup_async(url, loop, client, response): def cleanup(): errors = False - try: - loop.run_until_complete(response.aclose()) - except Exception as e: - bubble_log.error('cleanup_async: error closing response: '+repr(e)) - errors = True - try: - loop.run_until_complete(client.aclose()) - except Exception as e: - bubble_log.error('cleanup_async: error: '+repr(e)) - errors = True + if response is not None: + try: + loop.run_until_complete(response.aclose()) + except Exception as e: + bubble_log.error('cleanup_async: error closing response: '+repr(e)) + errors = True + if client is not None: + try: + loop.run_until_complete(client.aclose()) + except Exception as e: + bubble_log.error('cleanup_async: error closing client: '+repr(e)) + errors = True + if loop is not None: + try: + loop.close() + except Exception as e: + bubble_log.error('cleanup_async: error closing loop: '+repr(e)) + errors = True if not errors: if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('cleanup_async: successfully completed: '+url) @@ -375,7 +384,7 @@ def get_flow_ctx(flow, name): def is_bubble_request(ip, fqdns): # return ip in LOCAL_IPS - return ip in LOCAL_IPS and (bubble_host in fqdns or bubble_host_alias in fqdns) + return ip in LOCAL_IPS and fqdns and (bubble_host in fqdns or bubble_host_alias in fqdns) def is_bubble_special_path(path): @@ -466,9 +475,10 @@ def special_bubble_response(flow): if is_bubble_health_check(path): health_check_response(flow) return + uri = make_bubble_special_path(path) if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('special_bubble_response: sending special bubble request to '+uri+' from '+get_stack()) + bubble_log.debug('special_bubble_response: sending special bubble request to '+uri) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' @@ -476,16 +486,16 @@ def special_bubble_response(flow): if flow.request.method == 'GET': loop = asyncio.new_event_loop() client = async_client(timeout=30) - response = bubble_async(name, uri, client=client, loop=loop, headers=headers) + response = async_stream(client, name, uri, headers=headers, loop=loop) elif flow.request.method == 'POST': loop = asyncio.new_event_loop() client = async_client(timeout=30) - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content)) - if flow.request.content: - headers['Content-Length'] = str(len(flow.request.content)) - response = bubble_async(name, uri, client=client, loop=loop, json=flow.request.content, headers=headers) + data = None + if flow.request.content and flow.request.content: + headers[HEADER_CONTENT_LENGTH] = str(len(flow.request.content)) + data = flow.request.content + response = async_stream(client, name, uri, headers=headers, method='POST', data=data, loop=loop) else: if bubble_log.isEnabledFor(WARNING): diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py index 8a15ba12..863cee25 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py @@ -96,6 +96,10 @@ def conn_check_cache_prefix(client_addr, server_addr): def fqdns_for_addr(client_addr, server_addr): + if server_addr is None or client_addr is None or len(client_addr) == 0 or len(server_addr) == 0: + if bubble_log.isEnabledFor(WARNING): + bubble_log.warning('fqdns_for_addr: client_addr ('+repr(client_addr)+') or server_addr ('+repr(server_addr)+') was None or empty') + return None key = REDIS_DNS_PREFIX + server_addr + '~' + client_addr values = REDIS.smembers(key) if values is None or len(values) == 0: diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_debug.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_debug.py index 374ef82d..facad780 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_debug.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_debug.py @@ -4,6 +4,7 @@ import logging from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL +import inspect import os import threading import traceback @@ -23,10 +24,17 @@ BUBBLE_LOG_LEVEL_ENV_VAR = 'BUBBLE_LOG_LEVEL' DEFAULT_BUBBLE_LOG_LEVEL = 'INFO' BUBBLE_LOG_LEVEL = None +STACK_LINE = "[%s:%d] %s\n" + def get_stack(e=None): if e is None: - e = ValueError() + stack = '' + for frame in inspect.stack()[1:]: + file, line, func = frame[1:4] + stack = stack + (STACK_LINE % (file, line, func)) + return stack + return "".join(traceback.TracebackException.from_exception(e).format()) diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py index 4dfcbe26..853b78d7 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py @@ -26,6 +26,7 @@ class FlexFlow(RequestCapture): mitm_flow: None router: None request_chunks: None + response: None response_stream: None def __init__(self, flex_host, mitm_flow, router): @@ -60,9 +61,8 @@ def process_no_flex(flex_flow): reason='OK', headers=response_headers, content=None) - error_html = flex_flow.router['error_html'] flex_flow.response_stream = lambda chunks: error_html - flow.response.stream = lambda chunks: error_html + error_html = flex_flow.router['error_html'] if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('process_no_flex: no router found, returning error_html') return flex_flow @@ -169,7 +169,7 @@ def process_flex(flex_flow): if bubble_log.isEnabledFor(INFO): bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding...') - flex_flow.response_stream = response + flex_flow.response = response return flex_flow diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py index d7a43c95..fe54c91b 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py @@ -6,10 +6,14 @@ import json import re import urllib import traceback + from mitmproxy.net.http import Headers +from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody + from bubble_config import bubble_port, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri -from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, CTX_BUBBLE_FLEX, \ - status_reason, get_flow_ctx, add_flow_ctx, bubble_async, \ +from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ + CTX_BUBBLE_FLEX, CTX_BUBBLE_SPECIAL, \ + status_reason, get_flow_ctx, add_flow_ctx, bubble_async, async_client, cleanup_async, \ is_bubble_special_path, is_bubble_health_check, health_check_response, special_bubble_response, \ CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, CTX_BUBBLE_FILTERED, \ HEADER_CONTENT_TYPE, HEADER_CONTENT_ENCODING, HEADER_LOCATION, HEADER_CONTENT_LENGTH, \ @@ -54,7 +58,7 @@ def ensure_bubble_script_csp(csp): return new_csp -def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None): +def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None, client=None): name = 'filter_chunk' if debug_capture_fqdn: if debug_capture_fqdn in req_id: @@ -124,9 +128,9 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N f.write(url) f.close() - response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop) + response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop, client=client) if not response.status_code == 200: - err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) + err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) + ' content='+repr(response.content) if bubble_log.isEnabledFor(ERROR): bubble_log.error(err_message) return b'' @@ -140,7 +144,7 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N return response.content -def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_encoding, content_type, csp): +def bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, content_type, csp): loop = asyncio.new_event_loop() if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type) @@ -149,14 +153,11 @@ def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_en content_length = get_flow_ctx(flow, CTX_CONTENT_LENGTH) if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('bubble_filter_chunks: found content_length='+str(content_length)) - if flex_flow is not None: - # flex flows with errors are handled before we get here - chunks = flex_flow.response_stream.iter_content(8192) try: buffer = b'' for chunk in chunks: buffer = buffer + chunk - if len(buffer) < MIN_FILTER_CHUNK_SIZE: + if not last and len(buffer) < MIN_FILTER_CHUNK_SIZE: continue chunk_len = len(buffer) chunk = buffer @@ -189,15 +190,65 @@ def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_en bubble_log.error('bubble_filter_chunks: exception='+repr(e)) traceback.print_exc() yield None + finally: + loop.close() -def bubble_modify(flow, flex_flow, req_id, user_agent, content_encoding, content_type, csp): +def bubble_modify(flow, req_id, user_agent, content_encoding, content_type, csp): if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('bubble_modify: modifying req_id='+req_id+' with content_type='+content_type) - return lambda chunks: bubble_filter_chunks(flow, chunks, flex_flow, req_id, + return lambda chunks: bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, content_type, csp) +class AsyncStreamContext: + first = True + buffer = b'' + + +def async_filter_chunk(stream_body_obj, flow, req_id, user_agent, content_encoding, content_type, csp): + client = async_client() + loop = asyncio.new_event_loop() + stream_body_obj.ctx = AsyncStreamContext() + orig_finalize = stream_body_obj.finalize + + def _finalize(): + bubble_log.info('_finalize: cleaning up for '+req_id+' sent '+str(stream_body_obj.total)+' bytes') + if orig_finalize is not None: + orig_finalize() + cleanup_async('_async_filter_chunk('+req_id+')', loop, client, None) + + stream_body_obj.finalize = _finalize + stream_body_obj.total = 0 + + def _async_filter_chunk(chunk, last): + if chunk is None: + bubble_log.info('_async_filter_chunk: filtering None chunk (!!) last='+str(last)) + else: + bubble_log.info('_async_filter_chunk: filtering chunk of size = '+str(len(chunk))+' last=' + str(last)) + stream_body_obj.ctx.buffer = stream_body_obj.ctx.buffer + chunk + if not last and len(stream_body_obj.ctx.buffer) < MIN_FILTER_CHUNK_SIZE: + return None + chunk = stream_body_obj.ctx.buffer + stream_body_obj.ctx.buffer = b'' + if stream_body_obj.ctx.first: + stream_body_obj.ctx.first = False + new_chunk = filter_chunk(loop, flow, chunk, req_id, user_agent, last, + content_encoding=content_encoding, content_type=content_type, content_length=None, + csp=csp, client=client) + else: + new_chunk = filter_chunk(loop, flow, chunk, req_id, user_agent, last, client=client) + if new_chunk is None or len(chunk) == 0: + bubble_log.info('_async_filter_chunk: filtered chunk, got back None or zero chunk (means "send more data")') + return None + else: + bubble_log.info('_async_filter_chunk: filtered chunk, got back chunk of size '+str(len(new_chunk))) + stream_body_obj.total = stream_body_obj.total + len(new_chunk) + return new_chunk + + return _async_filter_chunk + + EMPTY_XML = [b''] EMPTY_JSON = [b'null'] EMPTY_DEFAULT = [] @@ -236,6 +287,7 @@ def bubble_filter_response(flow, flex_flow): if is_bubble_health_check(path): health_check_response(flow) else: + bubble_log.info('bubble_filter_response: sending special bubble response for path: '+path) special_bubble_response(flow) elif flex_flow and flex_flow.is_error(): @@ -328,28 +380,33 @@ def bubble_filter_response(flow, flex_flow): if bubble_log.isEnabledFor(DEBUG): bubble_log.debug(prefix+'content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type)) - flow.response.stream = bubble_modify(flow, flex_flow, req_id, - user_agent, content_encoding, content_type, csp) - if content_length_value: - flow.response.headers['transfer-encoding'] = 'chunked' - # find server_conn to set fake_chunks on - if flow.live and flow.live.ctx: - ctx = flow.live.ctx - while not hasattr(ctx, 'server_conn'): - if hasattr(ctx, 'ctx'): - ctx = ctx.ctx - else: + if flex_flow is not None: + # flex flows with errors are handled before we get here + bubble_log.info(prefix+' filtering async stream, starting with flow.response.stream = '+repr(flow.response.stream)) + flow.response.stream.filter_chunk = async_filter_chunk(flow.response.stream, flow, req_id, user_agent, content_encoding, content_type, csp) + else: + flow.response.stream = bubble_modify(flow, req_id, user_agent, content_encoding, content_type, csp) + if content_length_value: + flow.response.headers['transfer-encoding'] = 'chunked' + # find server_conn to set fake_chunks on + if flow.live and flow.live.ctx: + ctx = flow.live.ctx + while not hasattr(ctx, 'server_conn'): + if hasattr(ctx, 'ctx'): + ctx = ctx.ctx + else: + if bubble_log.isEnabledFor(ERROR): + bubble_log.error(prefix+'error finding server_conn for path '+path+'. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx))) + return + if not hasattr(ctx, 'server_conn'): if bubble_log.isEnabledFor(ERROR): - bubble_log.error(prefix+'error finding server_conn for path '+path+'. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx))) + bubble_log.error(prefix+'error finding server_conn for path '+path+'. ctx type='+str(type(ctx))+' vars='+str(vars(ctx))) return - if not hasattr(ctx, 'server_conn'): - if bubble_log.isEnabledFor(ERROR): - bubble_log.error(prefix+'error finding server_conn for path '+path+'. ctx type='+str(type(ctx))+' vars='+str(vars(ctx))) - return - content_length = int(content_length_value) - ctx.server_conn.rfile.fake_chunks = content_length - add_flow_ctx(flow, CTX_CONTENT_LENGTH, content_length) - add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0) + content_length = int(content_length_value) + if ctx.server_conn.rfile: + ctx.server_conn.rfile.fake_chunks = content_length + add_flow_ctx(flow, CTX_CONTENT_LENGTH, content_length) + add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0) else: if bubble_log.isEnabledFor(DEBUG): diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py index a02d88bf..9eb26ae2 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py @@ -32,7 +32,7 @@ from mitmproxy.net.http import headers as nheaders from bubble_api import bubble_matchers, bubble_activity_log, \ HEALTH_CHECK_URI, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_SPECIAL, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ CTX_BUBBLE_PASSTHRU, CTX_BUBBLE_FLEX, CTX_BUBBLE_REQUEST_ID, add_flow_ctx, parse_host_header, \ - is_bubble_special_path, special_bubble_response, is_bubble_health_check, \ + is_bubble_special_path, is_bubble_health_check, \ is_bubble_request, is_sage_request, is_not_from_vpn, is_flex_domain from bubble_config import bubble_host, bubble_host_alias from bubble_flex import new_flex_flow @@ -254,16 +254,13 @@ class Rerouter: def requestheaders(self, flow): host = self.bubble_handle_request(flow) path = flow.request.path - flow.request.capture_stream = True if is_bubble_special_path(path): - # if bubble_log.isEnabledFor(DEBUG): - # bubble_log.debug('request: is_bubble_special_path('+path+') returned true, sending special bubble response') - special_bubble_response(flow) + flow.request.force_no_stream = True elif host is not None: client_addr = flow.client_conn.address[0] - server_addr= flow.server_conn.address[0] + server_addr = flow.server_conn.address[0] if is_flex_domain(client_addr, server_addr, [host]): flex_flow = new_flex_flow(client_addr, host, flow) add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow)