diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py index cecf45f8..6757ab43 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py @@ -1,29 +1,33 @@ # # Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ # -import logging -from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL - -from mitmproxy import http -from mitmproxy.net.http import headers as nheaders - +import asyncio import json +import logging import re -import requests -import redis import subprocess import time import traceback import uuid from http import HTTPStatus -from netaddr import IPAddress, IPNetwork +from logging import INFO, DEBUG, WARNING, ERROR + +import httpx +import nest_asyncio +import redis from bubble_vpn4 import wireguard_network_ipv4 from bubble_vpn6 import wireguard_network_ipv6 +from netaddr import IPAddress, IPNetwork + from bubble_config import bubble_port, debug_capture_fqdn, \ - bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6, cert_validation_host + bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6 +from mitmproxy import http +from mitmproxy.net.http import headers as nheaders bubble_log = logging.getLogger(__name__) +nest_asyncio.apply() + HEADER_USER_AGENT = 'User-Agent' HEADER_CONTENT_LENGTH = 'Content-Length' HEADER_CONTENT_TYPE = 'Content-Type' @@ -43,6 +47,7 @@ CTX_BUBBLE_REQUEST_ID = 'X-Bubble-RequestId' CTX_CONTENT_LENGTH = 'X-Bubble-Content-Length' CTX_CONTENT_LENGTH_SENT = 'X-Bubble-Content-Length-Sent' CTX_BUBBLE_FILTERED = 'X-Bubble-Filtered' +CTX_BUBBLE_FLEX = 'X-Bubble-Flex' BUBBLE_URI_PREFIX = '/__bubble/' HEADER_HEALTH_CHECK = 'X-Mitm-Health' @@ -90,6 +95,107 @@ def bubble_activity_log(client_addr, server_addr, event, data): pass +def async_client(proxies=None, + timeout=5, + max_redirects=0): + return httpx.AsyncClient(timeout=timeout, max_redirects=max_redirects, proxies=proxies) + + +async def async_response(client, name, url, + headers=None, + method='GET', + data=None, + json=None): + if bubble_log.isEnabledFor(INFO): + bubble_log.info('bubble_async_request(' + name + '): starting async: ' + method + ' ' + url) + + response = await client.request(method=method, url=url, headers=headers, json=json, data=data) + + if bubble_log.isEnabledFor(INFO): + bubble_log.info('bubble_async_request(' + name + '): async request returned HTTP status ' + str(response.status_code)) + + if response.status_code != 200: + if bubble_log.isEnabledFor(ERROR): + bubble_log.error('bubble_async_request(' + name + '): API call failed: ' + repr(response)) + + return response + + +def async_stream(client, name, url, + headers=None, + method='GET', + data=None, + json=None, + timeout=5, + max_redirects=0, + loop=asyncio.get_running_loop()): + try: + return loop.run_until_complete(_async_stream(client, name, url, + headers=headers, + method=method, + data=data, + json=json, + timeout=timeout, + max_redirects=max_redirects)) + except Exception as e: + bubble_log.error('async_stream('+name+'): error with url='+url+' -- '+repr(e)) + + +async def _async_stream(client, name, url, + headers=None, + method='GET', + data=None, + json=None, + timeout=5, + max_redirects=0): + request = client.build_request(method=method, url=url, headers=headers, json=json, data=data) + return await client.send(request, stream=True, allow_redirects=(max_redirects > 0), timeout=timeout) + + +async def _bubble_async(name, url, + headers=None, + method='GET', + data=None, + json=None, + proxies=None, + timeout=5, + max_redirects=0): + async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client: + return await async_response(client, name, url, headers=headers, method=method, data=data, json=json) + + +def bubble_async(name, url, + headers=None, + method='GET', + data=None, + json=None, + proxies=None, + timeout=5, + max_redirects=0, + loop=asyncio.get_running_loop()): + try: + return loop.run_until_complete(_bubble_async(name, url, + headers=headers, + method=method, + data=data, + json=json, + proxies=proxies, + timeout=timeout, + max_redirects=max_redirects)) + except Exception as e: + bubble_log.error('bubble_async('+name+'): error: '+repr(e)) + + +def bubble_async_request_json(name, url, headers, method='GET', json=None): + response = bubble_async(name, url, headers, method=method, json=json) + if response and response.status_code == 200: + return response.json() + else: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+str(response.status_code)) + return None + + def bubble_conn_check(client_addr, server_addr, fqdns, security_level): if debug_capture_fqdn and fqdns: for f in debug_capture_fqdn: @@ -98,27 +204,24 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): bubble_log.debug('bubble_conn_check: debug_capture_fqdn detected, returning noop: '+f) return 'noop' + name = 'bubble_conn_check' + url = 'http://127.0.0.1:'+bubble_port+'/api/filter/check' headers = { 'X-Forwarded-For': client_addr, 'Accept': 'application/json', 'Content-Type': 'application/json' } + data = { + 'serverAddr': str(server_addr), + 'fqdns': fqdns, + 'clientAddr': client_addr + } try: - data = { - 'serverAddr': str(server_addr), - 'fqdns': fqdns, - 'clientAddr': client_addr - } - response = requests.post('http://127.0.0.1:'+bubble_port+'/api/filter/check', headers=headers, json=data) - if response.ok: - return response.json() - if bubble_log.isEnabledFor(ERROR): - bubble_log.error('bubble_conn_check API call failed: '+repr(response)) - return None + return bubble_async_request_json(name, url, headers, method='POST', json=data) except Exception as e: if bubble_log.isEnabledFor(ERROR): - bubble_log.error('bubble_conn_check API call failed: '+repr(e)) + bubble_log.error('bubble_conn_check: API call failed: '+repr(e)) traceback.print_exc() if security_level is not None and security_level['level'] == 'maximum': return False @@ -126,21 +229,14 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): def bubble_get_flex_router(client_addr): + name = 'bubble_get_flex_router' + url = 'http://127.0.0.1:' + bubble_port + '/api/filter/flexRouters' headers = { 'X-Forwarded-For': client_addr, 'Accept': 'application/json' } try: - response = requests.get('http://127.0.0.1:'+bubble_port+'/api/filter/flexRouters', headers=headers) - if response.ok: - return response.json() - elif response.status_code == 404: - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('bubble_get_flex_routes: no router found for '+client_addr) - else: - if bubble_log.isEnabledFor(ERROR): - bubble_log.error('bubble_get_flex_routes: API call failed with HTTP status: '+str(response.status_code)) - return None + return bubble_async_request_json(name, url, headers) except Exception as e: if bubble_log.isEnabledFor(ERROR): @@ -176,6 +272,8 @@ def bubble_matchers(req_id, client_addr, server_addr, flow, host): bubble_log.info('bubble_matchers: debug_capture_fqdn detected, returning DEBUG_MATCHER: '+host) return DEBUG_MATCHER + name = 'bubble_matchers' + url = 'http://127.0.0.1:'+bubble_port+'/api/filter/matchers/'+req_id headers = { 'X-Forwarded-For': client_addr, 'Accept': 'application/json', @@ -200,28 +298,29 @@ def bubble_matchers(req_id, client_addr, server_addr, flow, host): bubble_log.warning('bubble_matchers: error parsing Referer header: '+repr(e)) referer = 'NONE' + data = { + 'requestId': req_id, + 'fqdn': host, + 'uri': flow.request.path, + 'userAgent': user_agent, + 'referer': referer, + 'clientAddr': client_addr, + 'serverAddr': server_addr + } + try: - data = { - 'requestId': req_id, - 'fqdn': host, - 'uri': flow.request.path, - 'userAgent': user_agent, - 'referer': referer, - 'clientAddr': client_addr, - 'serverAddr': server_addr - } - response = requests.post('http://127.0.0.1:'+bubble_port+'/api/filter/matchers/'+req_id, headers=headers, json=data) - if response.ok: + response = bubble_async(name, url, headers=headers, method='POST', json=data) + if response.status_code == 200: return response.json() elif response.status_code == 403: if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('bubble_matchers response was FORBIDDEN, returning block: '+str(response.status_code)+' / '+repr(response.text)) + bubble_log.debug('bubble_matchers: response was FORBIDDEN, returning block: '+str(response.status_code)+' / '+repr(response.text)) return BLOCK_MATCHER if bubble_log.isEnabledFor(WARNING): - bubble_log.warning('bubble_matchers response not OK, returning empty matchers array: '+str(response.status_code)+' / '+repr(response.text)) + bubble_log.warning('bubble_matchers: response not OK, returning empty matchers array: '+str(response.status_code)+' / '+repr(response.text)) except Exception as e: if bubble_log.isEnabledFor(ERROR): - bubble_log.error('bubble_matchers API call failed: '+repr(e)) + bubble_log.error('bubble_matchers: API call failed: '+repr(e)) traceback.print_exc() return None @@ -313,6 +412,7 @@ def health_check_response(flow): def special_bubble_response(flow): + name = 'special_bubble_response' path = flow.request.path if is_bubble_health_check(path): health_check_response(flow) @@ -324,28 +424,24 @@ def special_bubble_response(flow): 'Accept': 'application/json', 'Content-Type': 'application/json' } - response = None if flow.request.method == 'GET': - response = requests.get(uri, headers=headers, stream=True) + response = bubble_async(name, uri, headers=headers) + elif flow.request.method == 'POST': if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content)) if flow.request.content: headers['Content-Length'] = str(len(flow.request.content)) - response = requests.post(uri, data=flow.request.content, headers=headers, stream=True) + response = bubble_async(name, uri, json=flow.request.content, headers=headers) else: if bubble_log.isEnabledFor(WARNING): bubble_log.warning('special_bubble_response: special bubble request: method '+flow.request.method+' not supported') - if flow.response is None: - http_version = get_http_version(response) - if http_version is None: - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('special_bubble_response: invalid HTTP version, bailing out') - return + return + if flow.response is None: + http_version = response.http_version response_headers = collect_response_headers(response) - flow.response = http.HTTPResponse(http_version=http_version, status_code=response.status_code, reason=response.reason, @@ -365,18 +461,6 @@ def send_bubble_response(response): yield chunk -def get_http_version(response): - raw_version = response.raw.version - if raw_version == 10: - return 'HTTP/1.0' - elif raw_version == 11: - return 'HTTP/1.1' - else: - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('get_http_version: invalid HTTP version detected, response.raw.version=='+repr(raw_version)) - return None - - def collect_response_headers(response, omit=None): response_headers = nheaders.Headers() for name in response.headers: diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py index 95d72968..cf12304b 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py @@ -1,54 +1,66 @@ # # Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ # +import asyncio + +from httpx._types import RequestData + +from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody + from mitmproxy import http -from bubble_api import bubble_get_flex_router, get_http_version, collect_response_headers, \ - HEADER_CONTENT_ENCODING, HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_FILTERED -from bubble_modify import bubble_filter_response +from mitmproxy.proxy.protocol.request_capture import RequestCapture -import requests +from bubble_api import bubble_get_flex_router, collect_response_headers, bubble_async, async_client, async_stream, \ + HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH import logging from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL + bubble_log = logging.getLogger(__name__) FLEX_TIMEOUT = 20 -def prepend_remainder_to_stream(remainder, raw): - first = True - while True: - chunk = raw.read(8192) - if first and chunk and len(remainder) > 0: - yield remainder + chunk - elif chunk: - yield chunk - else: - break - if first: - first = False +class FlexFlow(RequestCapture): + flex_host: None + mitm_flow: None + router: None + request_chunks: None + def __init__(self, flex_host, mitm_flow, router): + super().__init__() + self.flex_host = flex_host + self.mitm_flow = mitm_flow + self.router = router + mitm_flow.request.stream = self + mitm_flow.response = http.HTTPResponse(http_version='HTTP/1.1', + status_code=523, + reason='FlexFlow Not Initialized', + headers={}, + content=None) -def set_flex_response(client_addr, flex_host, flow): - if bubble_log.isEnabledFor(INFO): - bubble_log.info('set_flex_response: checking for flex router for host: '+flex_host) + def capture(self, chunks): + self.request_chunks = chunks + + +def new_flex_flow(client_addr, flex_host, flow): router = bubble_get_flex_router(client_addr) if router is None or 'auth' not in router: if bubble_log.isEnabledFor(INFO): - bubble_log.info('set_flex_response: no flex router for host: '+flex_host) - return + bubble_log.info('new_flex_flow: no flex router for host: '+flex_host) + return None if bubble_log.isEnabledFor(INFO): - bubble_log.info('set_flex_response: found router '+repr(router)+' for flex host: '+flex_host) - try: - process_flex(flex_host, flow, router) - except Exception as e: - if bubble_log.isEnabledFor(ERROR): - bubble_log.error('set_flex_response: error processing: '+repr(e)) + bubble_log.info('new_flex_flow: found router '+repr(router)+' for flex host: '+flex_host) + return FlexFlow(flex_host, flow, router) + +def process_flex(flex_flow): -def process_flex(flex_host, flow, router): + flex_host = flex_flow.flex_host + flow = flex_flow.mitm_flow + router = flex_flow.router # build the request URL method = flow.request.method @@ -68,41 +80,39 @@ def process_flex(flex_host, flow, router): proxy_url = router['proxyUrl'] proxies = {"http": proxy_url, "https": proxy_url} - # send request to flex router - request_body = flow.request.content if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('process_flex: sending flex request for ' + method +' ' + url +' to ' + proxy_url +' with headers=' + repr(request_headers) +' and body=' + repr(request_body)) + + loop = asyncio.new_event_loop() + client = async_client(proxies=proxies, timeout=30) try: - response = requests.request(method, url, - headers=request_headers, - timeout=(20, 120), - stream=True, - data=request_body, # use the original request body, if there is one - proxies=proxies, - allow_redirects=False) + response = async_stream(client, 'process_flex', url, + method=method, + headers=request_headers, + timeout=30, + data=async_chunk_iter(flex_flow.request_chunks), + loop=loop) if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('process_flex: response returned HTTP status '+str(response.status_code)+' for '+url) except Exception as e: if bubble_log.isEnabledFor(ERROR): bubble_log.error('process_flex: error sending request to '+url+': '+repr(e)) - return + return None - # Status line -- http version is buried in response.raw.version - http_version = get_http_version(response) - if http_version is None: - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('process_flex: invalid HTTP version, bailing out') - return + if response is None: + return None + + # Status line + http_version = response.http_version # Headers -- copy from requests dict to Headers multimap - # Remove Content-Length, Content-Encoding and Transfer-Encoding - # We will rechunk the output - response_headers = collect_response_headers(response, [HEADER_CONTENT_LENGTH, HEADER_CONTENT_ENCODING, HEADER_TRANSFER_ENCODING]) + # Remove Content-Length and Content-Encoding, we will rechunk the output + response_headers = collect_response_headers(response, [HEADER_CONTENT_LENGTH, HEADER_TRANSFER_ENCODING]) - # Construct the response + # Construct the real response flow.response = http.HTTPResponse(http_version=http_version, status_code=response.status_code, - reason=response.reason, + reason=response.reason_phrase, headers=response_headers, content=None) @@ -113,18 +123,48 @@ def process_flex(flex_host, flow, router): if response.status_code // 100 != 2: response_headers[HEADER_CONTENT_LENGTH] = '0' flow.response.stream = lambda chunks: [] + elif content_length is None or int(content_length) > 0: response_headers[HEADER_TRANSFER_ENCODING] = 'chunked' - flow.response.stream = lambda chunks: response.iter_content(8192) + flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_flex(url, loop, client, response)) + else: response_headers[HEADER_CONTENT_LENGTH] = '0' flow.response.stream = lambda chunks: [] # Apply filters - if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('process_flex: bubble filtering: '+url) - # flow.response.stream = lambda chunks: response.iter_content(8192) - bubble_filter_response(flow, response) - if bubble_log.isEnabledFor(INFO): - bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding ...') + bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding...') + return response + + +async def async_chunk_iter(chunks): + for chunk in chunks: + yield chunk + + +def cleanup_flex(url, loop, client, response): + def cleanup(): + + errors = False + + try: + loop.run_until_complete(response.aclose()) + except Exception as e: + bubble_log.error('cleanup_flex: error closing response: '+repr(e)) + errors = True + + try: + loop.run_until_complete(client.aclose()) + except Exception as e: + bubble_log.error('cleanup_flex: error: '+repr(e)) + errors = True + + if not errors: + if bubble_log.isEnabledFor(DEBUG): + bubble_log.debug('cleanup_flex: successfully completed: '+url) + else: + if bubble_log.isEnabledFor(WARNING): + bubble_log.warning('cleanup_flex: successfully completed (but had errors closing): ' + url) + + return cleanup diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py index 3c47e013..f77afe52 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py @@ -1,19 +1,20 @@ # # Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ # +import asyncio import json import re -import requests import urllib import traceback from mitmproxy.net.http import Headers from bubble_config import bubble_port, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri -from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ - status_reason, get_flow_ctx, add_flow_ctx, \ +from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, CTX_BUBBLE_FLEX, \ + status_reason, get_flow_ctx, add_flow_ctx, bubble_async, \ is_bubble_special_path, is_bubble_health_check, health_check_response, special_bubble_response, \ CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, CTX_BUBBLE_FILTERED, \ HEADER_CONTENT_TYPE, HEADER_CONTENT_ENCODING, HEADER_LOCATION, HEADER_CONTENT_LENGTH, \ HEADER_USER_AGENT, HEADER_FILTER_PASSTHRU, HEADER_CONTENT_SECURITY_POLICY, REDIS, redis_set +from bubble_flex import process_flex import logging from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL @@ -28,7 +29,7 @@ REDIS_FILTER_PASSTHRU_PREFIX = '__chunk_filter_pass__' REDIS_FILTER_PASSTHRU_DURATION = 600 DEBUG_STREAM_COUNTERS = {} -MIN_FILTER_CHUNK_SIZE = 16384 +MIN_FILTER_CHUNK_SIZE = 1024 * 32 # Filter data in 32KB chunks def add_csp_part(new_csp, part): @@ -53,7 +54,8 @@ def ensure_bubble_script_csp(csp): return new_csp -def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None): +def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None): + name = 'filter_chunk' if debug_capture_fqdn: if debug_capture_fqdn in req_id: if bubble_log.isEnabledFor(DEBUG): @@ -87,17 +89,21 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c else: url = url + '?last=true' + chunk_len = 0 + if bubble_log.isEnabledFor(DEBUG): + if chunk is not None: + chunk_len = len(chunk) if csp: if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('filter_chunk: url='+url+' (csp='+csp+')') - filter_headers = { + bubble_log.debug('filter_chunk: url='+url+' (csp='+csp+') size='+str(chunk_len)) + headers = { HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY, HEADER_CONTENT_SECURITY_POLICY: csp } else: if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('filter_chunk: url='+url+' (no csp)') - filter_headers = STANDARD_FILTER_HEADERS + bubble_log.debug('filter_chunk: url='+url+' (no csp) size='+str(chunk_len)) + headers = STANDARD_FILTER_HEADERS if debug_stream_fqdn and debug_stream_uri and debug_stream_fqdn in req_id and flow.request.path == debug_stream_uri: if req_id in DEBUG_STREAM_COUNTERS: @@ -112,14 +118,14 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c f.write(chunk) f.close() f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.headers.json', mode='w') - f.write(json.dumps(filter_headers)) + f.write(json.dumps(headers)) f.close() f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.url', mode='w') f.write(url) f.close() - response = requests.post(url, data=chunk, headers=filter_headers) - if not response.ok: + response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop) + if not response.status_code == 200: err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) if bubble_log.isEnabledFor(ERROR): bubble_log.error(err_message) @@ -135,9 +141,7 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_encoding, content_type, csp): - """ - chunks is a generator that can be used to iterate over all chunks. - """ + loop = asyncio.new_event_loop() if bubble_log.isEnabledFor(DEBUG): bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type) first = True @@ -165,20 +169,20 @@ def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_ else: last = False if first: - yield filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding, content_type, content_length, csp) + yield filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding, content_type, content_length, csp) first = False else: - yield filter_chunk(flow, chunk, req_id, user_agent, last) + yield filter_chunk(loop, flow, chunk, req_id, user_agent, last) # send whatever is left in the buffer if len(buffer) > 0: # bubble_log.debug('bubble_filter_chunks(end): sending remainder buffer of size '+str(len(buffer))) if first: - yield filter_chunk(flow, buffer, req_id, user_agent, last, content_encoding, content_type, content_length, csp) + yield filter_chunk(loop, flow, buffer, req_id, user_agent, last, content_encoding, content_type, content_length, csp) else: - yield filter_chunk(flow, buffer, req_id, user_agent, last) + yield filter_chunk(loop, flow, buffer, req_id, user_agent, last) if not content_length or not last: # bubble_log.debug('bubble_filter_chunks(end): sending last empty chunk') - yield filter_chunk(flow, None, req_id, user_agent, True) # get the last bits of data + yield filter_chunk(loop, flow, None, req_id, user_agent, True) # get the last bits of data except Exception as e: if bubble_log.isEnabledFor(ERROR): bubble_log.error('bubble_filter_chunks: exception='+repr(e)) @@ -209,7 +213,12 @@ def abort_data(content_type): def responseheaders(flow): - bubble_filter_response(flow, None) + flex_flow = get_flow_ctx(flow, CTX_BUBBLE_FLEX) + if flex_flow: + flex_stream = process_flex(flex_flow) + else: + flex_stream = None + bubble_filter_response(flow, flex_stream) def bubble_filter_response(flow, flex_stream): diff --git a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py index cc675982..7054a0f3 100644 --- a/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py +++ b/bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py @@ -31,11 +31,11 @@ from mitmproxy.net.http import headers as nheaders from bubble_api import bubble_matchers, bubble_activity_log, \ HEALTH_CHECK_URI, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_SPECIAL, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ - CTX_BUBBLE_PASSTHRU, CTX_BUBBLE_REQUEST_ID, add_flow_ctx, parse_host_header, \ + CTX_BUBBLE_PASSTHRU, CTX_BUBBLE_FLEX, CTX_BUBBLE_REQUEST_ID, add_flow_ctx, parse_host_header, \ is_bubble_special_path, special_bubble_response, is_bubble_health_check, \ is_bubble_request, is_sage_request, is_not_from_vpn, is_flex_domain from bubble_config import bubble_host, bubble_host_alias -from bubble_flex import set_flex_response +from bubble_flex import new_flex_flow import logging from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL @@ -251,9 +251,11 @@ class Rerouter: flow.request.port = port return host - def request(self, flow): + def requestheaders(self, flow): host = self.bubble_handle_request(flow) path = flow.request.path + flow.request.capture_stream = True + if is_bubble_special_path(path): # if bubble_log.isEnabledFor(DEBUG): # bubble_log.debug('request: is_bubble_special_path('+path+') returned true, sending special bubble response') @@ -262,9 +264,9 @@ class Rerouter: elif host is not None: client_addr = flow.client_conn.address[0] if is_flex_domain(client_addr, host): + add_flow_ctx(flow, CTX_BUBBLE_FLEX, new_flex_flow(client_addr, host, flow)) if bubble_log.isEnabledFor(DEBUG): - bubble_log.debug('request: is_flex_domain('+host+') returned true, sending flex response') - set_flex_response(client_addr, host, flow) + bubble_log.debug('request: is_flex_domain('+host+') returned true, setting ctx: '+CTX_BUBBLE_FLEX) addons = [Rerouter()]