@@ -1,29 +1,33 @@ | |||||
# | # | ||||
# Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ | # Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ | ||||
# | # | ||||
import logging | |||||
from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | |||||
from mitmproxy import http | |||||
from mitmproxy.net.http import headers as nheaders | |||||
import asyncio | |||||
import json | import json | ||||
import logging | |||||
import re | import re | ||||
import requests | |||||
import redis | |||||
import subprocess | import subprocess | ||||
import time | import time | ||||
import traceback | import traceback | ||||
import uuid | import uuid | ||||
from http import HTTPStatus | from http import HTTPStatus | ||||
from netaddr import IPAddress, IPNetwork | |||||
from logging import INFO, DEBUG, WARNING, ERROR | |||||
import httpx | |||||
import nest_asyncio | |||||
import redis | |||||
from bubble_vpn4 import wireguard_network_ipv4 | from bubble_vpn4 import wireguard_network_ipv4 | ||||
from bubble_vpn6 import wireguard_network_ipv6 | from bubble_vpn6 import wireguard_network_ipv6 | ||||
from netaddr import IPAddress, IPNetwork | |||||
from bubble_config import bubble_port, debug_capture_fqdn, \ | from bubble_config import bubble_port, debug_capture_fqdn, \ | ||||
bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6, cert_validation_host | |||||
bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6 | |||||
from mitmproxy import http | |||||
from mitmproxy.net.http import headers as nheaders | |||||
bubble_log = logging.getLogger(__name__) | bubble_log = logging.getLogger(__name__) | ||||
nest_asyncio.apply() | |||||
HEADER_USER_AGENT = 'User-Agent' | HEADER_USER_AGENT = 'User-Agent' | ||||
HEADER_CONTENT_LENGTH = 'Content-Length' | HEADER_CONTENT_LENGTH = 'Content-Length' | ||||
HEADER_CONTENT_TYPE = 'Content-Type' | HEADER_CONTENT_TYPE = 'Content-Type' | ||||
@@ -43,6 +47,7 @@ CTX_BUBBLE_REQUEST_ID = 'X-Bubble-RequestId' | |||||
CTX_CONTENT_LENGTH = 'X-Bubble-Content-Length' | CTX_CONTENT_LENGTH = 'X-Bubble-Content-Length' | ||||
CTX_CONTENT_LENGTH_SENT = 'X-Bubble-Content-Length-Sent' | CTX_CONTENT_LENGTH_SENT = 'X-Bubble-Content-Length-Sent' | ||||
CTX_BUBBLE_FILTERED = 'X-Bubble-Filtered' | CTX_BUBBLE_FILTERED = 'X-Bubble-Filtered' | ||||
CTX_BUBBLE_FLEX = 'X-Bubble-Flex' | |||||
BUBBLE_URI_PREFIX = '/__bubble/' | BUBBLE_URI_PREFIX = '/__bubble/' | ||||
HEADER_HEALTH_CHECK = 'X-Mitm-Health' | HEADER_HEALTH_CHECK = 'X-Mitm-Health' | ||||
@@ -90,6 +95,107 @@ def bubble_activity_log(client_addr, server_addr, event, data): | |||||
pass | pass | ||||
def async_client(proxies=None, | |||||
timeout=5, | |||||
max_redirects=0): | |||||
return httpx.AsyncClient(timeout=timeout, max_redirects=max_redirects, proxies=proxies) | |||||
async def async_response(client, name, url, | |||||
headers=None, | |||||
method='GET', | |||||
data=None, | |||||
json=None): | |||||
if bubble_log.isEnabledFor(INFO): | |||||
bubble_log.info('bubble_async_request(' + name + '): starting async: ' + method + ' ' + url) | |||||
response = await client.request(method=method, url=url, headers=headers, json=json, data=data) | |||||
if bubble_log.isEnabledFor(INFO): | |||||
bubble_log.info('bubble_async_request(' + name + '): async request returned HTTP status ' + str(response.status_code)) | |||||
if response.status_code != 200: | |||||
if bubble_log.isEnabledFor(ERROR): | |||||
bubble_log.error('bubble_async_request(' + name + '): API call failed: ' + repr(response)) | |||||
return response | |||||
def async_stream(client, name, url, | |||||
headers=None, | |||||
method='GET', | |||||
data=None, | |||||
json=None, | |||||
timeout=5, | |||||
max_redirects=0, | |||||
loop=asyncio.get_running_loop()): | |||||
try: | |||||
return loop.run_until_complete(_async_stream(client, name, url, | |||||
headers=headers, | |||||
method=method, | |||||
data=data, | |||||
json=json, | |||||
timeout=timeout, | |||||
max_redirects=max_redirects)) | |||||
except Exception as e: | |||||
bubble_log.error('async_stream('+name+'): error with url='+url+' -- '+repr(e)) | |||||
async def _async_stream(client, name, url, | |||||
headers=None, | |||||
method='GET', | |||||
data=None, | |||||
json=None, | |||||
timeout=5, | |||||
max_redirects=0): | |||||
request = client.build_request(method=method, url=url, headers=headers, json=json, data=data) | |||||
return await client.send(request, stream=True, allow_redirects=(max_redirects > 0), timeout=timeout) | |||||
async def _bubble_async(name, url, | |||||
headers=None, | |||||
method='GET', | |||||
data=None, | |||||
json=None, | |||||
proxies=None, | |||||
timeout=5, | |||||
max_redirects=0): | |||||
async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client: | |||||
return await async_response(client, name, url, headers=headers, method=method, data=data, json=json) | |||||
def bubble_async(name, url, | |||||
headers=None, | |||||
method='GET', | |||||
data=None, | |||||
json=None, | |||||
proxies=None, | |||||
timeout=5, | |||||
max_redirects=0, | |||||
loop=asyncio.get_running_loop()): | |||||
try: | |||||
return loop.run_until_complete(_bubble_async(name, url, | |||||
headers=headers, | |||||
method=method, | |||||
data=data, | |||||
json=json, | |||||
proxies=proxies, | |||||
timeout=timeout, | |||||
max_redirects=max_redirects)) | |||||
except Exception as e: | |||||
bubble_log.error('bubble_async('+name+'): error: '+repr(e)) | |||||
def bubble_async_request_json(name, url, headers, method='GET', json=None): | |||||
response = bubble_async(name, url, headers, method=method, json=json) | |||||
if response and response.status_code == 200: | |||||
return response.json() | |||||
else: | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+str(response.status_code)) | |||||
return None | |||||
def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | ||||
if debug_capture_fqdn and fqdns: | if debug_capture_fqdn and fqdns: | ||||
for f in debug_capture_fqdn: | for f in debug_capture_fqdn: | ||||
@@ -98,27 +204,24 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | |||||
bubble_log.debug('bubble_conn_check: debug_capture_fqdn detected, returning noop: '+f) | bubble_log.debug('bubble_conn_check: debug_capture_fqdn detected, returning noop: '+f) | ||||
return 'noop' | return 'noop' | ||||
name = 'bubble_conn_check' | |||||
url = 'http://127.0.0.1:'+bubble_port+'/api/filter/check' | |||||
headers = { | headers = { | ||||
'X-Forwarded-For': client_addr, | 'X-Forwarded-For': client_addr, | ||||
'Accept': 'application/json', | 'Accept': 'application/json', | ||||
'Content-Type': 'application/json' | 'Content-Type': 'application/json' | ||||
} | } | ||||
data = { | |||||
'serverAddr': str(server_addr), | |||||
'fqdns': fqdns, | |||||
'clientAddr': client_addr | |||||
} | |||||
try: | try: | ||||
data = { | |||||
'serverAddr': str(server_addr), | |||||
'fqdns': fqdns, | |||||
'clientAddr': client_addr | |||||
} | |||||
response = requests.post('http://127.0.0.1:'+bubble_port+'/api/filter/check', headers=headers, json=data) | |||||
if response.ok: | |||||
return response.json() | |||||
if bubble_log.isEnabledFor(ERROR): | |||||
bubble_log.error('bubble_conn_check API call failed: '+repr(response)) | |||||
return None | |||||
return bubble_async_request_json(name, url, headers, method='POST', json=data) | |||||
except Exception as e: | except Exception as e: | ||||
if bubble_log.isEnabledFor(ERROR): | if bubble_log.isEnabledFor(ERROR): | ||||
bubble_log.error('bubble_conn_check API call failed: '+repr(e)) | |||||
bubble_log.error('bubble_conn_check: API call failed: '+repr(e)) | |||||
traceback.print_exc() | traceback.print_exc() | ||||
if security_level is not None and security_level['level'] == 'maximum': | if security_level is not None and security_level['level'] == 'maximum': | ||||
return False | return False | ||||
@@ -126,21 +229,14 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level): | |||||
def bubble_get_flex_router(client_addr): | def bubble_get_flex_router(client_addr): | ||||
name = 'bubble_get_flex_router' | |||||
url = 'http://127.0.0.1:' + bubble_port + '/api/filter/flexRouters' | |||||
headers = { | headers = { | ||||
'X-Forwarded-For': client_addr, | 'X-Forwarded-For': client_addr, | ||||
'Accept': 'application/json' | 'Accept': 'application/json' | ||||
} | } | ||||
try: | try: | ||||
response = requests.get('http://127.0.0.1:'+bubble_port+'/api/filter/flexRouters', headers=headers) | |||||
if response.ok: | |||||
return response.json() | |||||
elif response.status_code == 404: | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('bubble_get_flex_routes: no router found for '+client_addr) | |||||
else: | |||||
if bubble_log.isEnabledFor(ERROR): | |||||
bubble_log.error('bubble_get_flex_routes: API call failed with HTTP status: '+str(response.status_code)) | |||||
return None | |||||
return bubble_async_request_json(name, url, headers) | |||||
except Exception as e: | except Exception as e: | ||||
if bubble_log.isEnabledFor(ERROR): | if bubble_log.isEnabledFor(ERROR): | ||||
@@ -176,6 +272,8 @@ def bubble_matchers(req_id, client_addr, server_addr, flow, host): | |||||
bubble_log.info('bubble_matchers: debug_capture_fqdn detected, returning DEBUG_MATCHER: '+host) | bubble_log.info('bubble_matchers: debug_capture_fqdn detected, returning DEBUG_MATCHER: '+host) | ||||
return DEBUG_MATCHER | return DEBUG_MATCHER | ||||
name = 'bubble_matchers' | |||||
url = 'http://127.0.0.1:'+bubble_port+'/api/filter/matchers/'+req_id | |||||
headers = { | headers = { | ||||
'X-Forwarded-For': client_addr, | 'X-Forwarded-For': client_addr, | ||||
'Accept': 'application/json', | 'Accept': 'application/json', | ||||
@@ -200,28 +298,29 @@ def bubble_matchers(req_id, client_addr, server_addr, flow, host): | |||||
bubble_log.warning('bubble_matchers: error parsing Referer header: '+repr(e)) | bubble_log.warning('bubble_matchers: error parsing Referer header: '+repr(e)) | ||||
referer = 'NONE' | referer = 'NONE' | ||||
data = { | |||||
'requestId': req_id, | |||||
'fqdn': host, | |||||
'uri': flow.request.path, | |||||
'userAgent': user_agent, | |||||
'referer': referer, | |||||
'clientAddr': client_addr, | |||||
'serverAddr': server_addr | |||||
} | |||||
try: | try: | ||||
data = { | |||||
'requestId': req_id, | |||||
'fqdn': host, | |||||
'uri': flow.request.path, | |||||
'userAgent': user_agent, | |||||
'referer': referer, | |||||
'clientAddr': client_addr, | |||||
'serverAddr': server_addr | |||||
} | |||||
response = requests.post('http://127.0.0.1:'+bubble_port+'/api/filter/matchers/'+req_id, headers=headers, json=data) | |||||
if response.ok: | |||||
response = bubble_async(name, url, headers=headers, method='POST', json=data) | |||||
if response.status_code == 200: | |||||
return response.json() | return response.json() | ||||
elif response.status_code == 403: | elif response.status_code == 403: | ||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('bubble_matchers response was FORBIDDEN, returning block: '+str(response.status_code)+' / '+repr(response.text)) | |||||
bubble_log.debug('bubble_matchers: response was FORBIDDEN, returning block: '+str(response.status_code)+' / '+repr(response.text)) | |||||
return BLOCK_MATCHER | return BLOCK_MATCHER | ||||
if bubble_log.isEnabledFor(WARNING): | if bubble_log.isEnabledFor(WARNING): | ||||
bubble_log.warning('bubble_matchers response not OK, returning empty matchers array: '+str(response.status_code)+' / '+repr(response.text)) | |||||
bubble_log.warning('bubble_matchers: response not OK, returning empty matchers array: '+str(response.status_code)+' / '+repr(response.text)) | |||||
except Exception as e: | except Exception as e: | ||||
if bubble_log.isEnabledFor(ERROR): | if bubble_log.isEnabledFor(ERROR): | ||||
bubble_log.error('bubble_matchers API call failed: '+repr(e)) | |||||
bubble_log.error('bubble_matchers: API call failed: '+repr(e)) | |||||
traceback.print_exc() | traceback.print_exc() | ||||
return None | return None | ||||
@@ -313,6 +412,7 @@ def health_check_response(flow): | |||||
def special_bubble_response(flow): | def special_bubble_response(flow): | ||||
name = 'special_bubble_response' | |||||
path = flow.request.path | path = flow.request.path | ||||
if is_bubble_health_check(path): | if is_bubble_health_check(path): | ||||
health_check_response(flow) | health_check_response(flow) | ||||
@@ -324,28 +424,24 @@ def special_bubble_response(flow): | |||||
'Accept': 'application/json', | 'Accept': 'application/json', | ||||
'Content-Type': 'application/json' | 'Content-Type': 'application/json' | ||||
} | } | ||||
response = None | |||||
if flow.request.method == 'GET': | if flow.request.method == 'GET': | ||||
response = requests.get(uri, headers=headers, stream=True) | |||||
response = bubble_async(name, uri, headers=headers) | |||||
elif flow.request.method == 'POST': | elif flow.request.method == 'POST': | ||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content)) | bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content)) | ||||
if flow.request.content: | if flow.request.content: | ||||
headers['Content-Length'] = str(len(flow.request.content)) | headers['Content-Length'] = str(len(flow.request.content)) | ||||
response = requests.post(uri, data=flow.request.content, headers=headers, stream=True) | |||||
response = bubble_async(name, uri, json=flow.request.content, headers=headers) | |||||
else: | else: | ||||
if bubble_log.isEnabledFor(WARNING): | if bubble_log.isEnabledFor(WARNING): | ||||
bubble_log.warning('special_bubble_response: special bubble request: method '+flow.request.method+' not supported') | bubble_log.warning('special_bubble_response: special bubble request: method '+flow.request.method+' not supported') | ||||
if flow.response is None: | |||||
http_version = get_http_version(response) | |||||
if http_version is None: | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('special_bubble_response: invalid HTTP version, bailing out') | |||||
return | |||||
return | |||||
if flow.response is None: | |||||
http_version = response.http_version | |||||
response_headers = collect_response_headers(response) | response_headers = collect_response_headers(response) | ||||
flow.response = http.HTTPResponse(http_version=http_version, | flow.response = http.HTTPResponse(http_version=http_version, | ||||
status_code=response.status_code, | status_code=response.status_code, | ||||
reason=response.reason, | reason=response.reason, | ||||
@@ -365,18 +461,6 @@ def send_bubble_response(response): | |||||
yield chunk | yield chunk | ||||
def get_http_version(response): | |||||
raw_version = response.raw.version | |||||
if raw_version == 10: | |||||
return 'HTTP/1.0' | |||||
elif raw_version == 11: | |||||
return 'HTTP/1.1' | |||||
else: | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('get_http_version: invalid HTTP version detected, response.raw.version=='+repr(raw_version)) | |||||
return None | |||||
def collect_response_headers(response, omit=None): | def collect_response_headers(response, omit=None): | ||||
response_headers = nheaders.Headers() | response_headers = nheaders.Headers() | ||||
for name in response.headers: | for name in response.headers: | ||||
@@ -1,54 +1,66 @@ | |||||
# | # | ||||
# Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ | # Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ | ||||
# | # | ||||
import asyncio | |||||
from httpx._types import RequestData | |||||
from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody | |||||
from mitmproxy import http | from mitmproxy import http | ||||
from bubble_api import bubble_get_flex_router, get_http_version, collect_response_headers, \ | |||||
HEADER_CONTENT_ENCODING, HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_FILTERED | |||||
from bubble_modify import bubble_filter_response | |||||
from mitmproxy.proxy.protocol.request_capture import RequestCapture | |||||
import requests | |||||
from bubble_api import bubble_get_flex_router, collect_response_headers, bubble_async, async_client, async_stream, \ | |||||
HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH | |||||
import logging | import logging | ||||
from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | ||||
bubble_log = logging.getLogger(__name__) | bubble_log = logging.getLogger(__name__) | ||||
FLEX_TIMEOUT = 20 | FLEX_TIMEOUT = 20 | ||||
def prepend_remainder_to_stream(remainder, raw): | |||||
first = True | |||||
while True: | |||||
chunk = raw.read(8192) | |||||
if first and chunk and len(remainder) > 0: | |||||
yield remainder + chunk | |||||
elif chunk: | |||||
yield chunk | |||||
else: | |||||
break | |||||
if first: | |||||
first = False | |||||
class FlexFlow(RequestCapture): | |||||
flex_host: None | |||||
mitm_flow: None | |||||
router: None | |||||
request_chunks: None | |||||
def __init__(self, flex_host, mitm_flow, router): | |||||
super().__init__() | |||||
self.flex_host = flex_host | |||||
self.mitm_flow = mitm_flow | |||||
self.router = router | |||||
mitm_flow.request.stream = self | |||||
mitm_flow.response = http.HTTPResponse(http_version='HTTP/1.1', | |||||
status_code=523, | |||||
reason='FlexFlow Not Initialized', | |||||
headers={}, | |||||
content=None) | |||||
def set_flex_response(client_addr, flex_host, flow): | |||||
if bubble_log.isEnabledFor(INFO): | |||||
bubble_log.info('set_flex_response: checking for flex router for host: '+flex_host) | |||||
def capture(self, chunks): | |||||
self.request_chunks = chunks | |||||
def new_flex_flow(client_addr, flex_host, flow): | |||||
router = bubble_get_flex_router(client_addr) | router = bubble_get_flex_router(client_addr) | ||||
if router is None or 'auth' not in router: | if router is None or 'auth' not in router: | ||||
if bubble_log.isEnabledFor(INFO): | if bubble_log.isEnabledFor(INFO): | ||||
bubble_log.info('set_flex_response: no flex router for host: '+flex_host) | |||||
return | |||||
bubble_log.info('new_flex_flow: no flex router for host: '+flex_host) | |||||
return None | |||||
if bubble_log.isEnabledFor(INFO): | if bubble_log.isEnabledFor(INFO): | ||||
bubble_log.info('set_flex_response: found router '+repr(router)+' for flex host: '+flex_host) | |||||
try: | |||||
process_flex(flex_host, flow, router) | |||||
except Exception as e: | |||||
if bubble_log.isEnabledFor(ERROR): | |||||
bubble_log.error('set_flex_response: error processing: '+repr(e)) | |||||
bubble_log.info('new_flex_flow: found router '+repr(router)+' for flex host: '+flex_host) | |||||
return FlexFlow(flex_host, flow, router) | |||||
def process_flex(flex_flow): | |||||
def process_flex(flex_host, flow, router): | |||||
flex_host = flex_flow.flex_host | |||||
flow = flex_flow.mitm_flow | |||||
router = flex_flow.router | |||||
# build the request URL | # build the request URL | ||||
method = flow.request.method | method = flow.request.method | ||||
@@ -68,41 +80,39 @@ def process_flex(flex_host, flow, router): | |||||
proxy_url = router['proxyUrl'] | proxy_url = router['proxyUrl'] | ||||
proxies = {"http": proxy_url, "https": proxy_url} | proxies = {"http": proxy_url, "https": proxy_url} | ||||
# send request to flex router | |||||
request_body = flow.request.content | |||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('process_flex: sending flex request for ' + method +' ' + url +' to ' + proxy_url +' with headers=' + repr(request_headers) +' and body=' + repr(request_body)) | bubble_log.debug('process_flex: sending flex request for ' + method +' ' + url +' to ' + proxy_url +' with headers=' + repr(request_headers) +' and body=' + repr(request_body)) | ||||
loop = asyncio.new_event_loop() | |||||
client = async_client(proxies=proxies, timeout=30) | |||||
try: | try: | ||||
response = requests.request(method, url, | |||||
headers=request_headers, | |||||
timeout=(20, 120), | |||||
stream=True, | |||||
data=request_body, # use the original request body, if there is one | |||||
proxies=proxies, | |||||
allow_redirects=False) | |||||
response = async_stream(client, 'process_flex', url, | |||||
method=method, | |||||
headers=request_headers, | |||||
timeout=30, | |||||
data=async_chunk_iter(flex_flow.request_chunks), | |||||
loop=loop) | |||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('process_flex: response returned HTTP status '+str(response.status_code)+' for '+url) | bubble_log.debug('process_flex: response returned HTTP status '+str(response.status_code)+' for '+url) | ||||
except Exception as e: | except Exception as e: | ||||
if bubble_log.isEnabledFor(ERROR): | if bubble_log.isEnabledFor(ERROR): | ||||
bubble_log.error('process_flex: error sending request to '+url+': '+repr(e)) | bubble_log.error('process_flex: error sending request to '+url+': '+repr(e)) | ||||
return | |||||
return None | |||||
# Status line -- http version is buried in response.raw.version | |||||
http_version = get_http_version(response) | |||||
if http_version is None: | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('process_flex: invalid HTTP version, bailing out') | |||||
return | |||||
if response is None: | |||||
return None | |||||
# Status line | |||||
http_version = response.http_version | |||||
# Headers -- copy from requests dict to Headers multimap | # Headers -- copy from requests dict to Headers multimap | ||||
# Remove Content-Length, Content-Encoding and Transfer-Encoding | |||||
# We will rechunk the output | |||||
response_headers = collect_response_headers(response, [HEADER_CONTENT_LENGTH, HEADER_CONTENT_ENCODING, HEADER_TRANSFER_ENCODING]) | |||||
# Remove Content-Length and Content-Encoding, we will rechunk the output | |||||
response_headers = collect_response_headers(response, [HEADER_CONTENT_LENGTH, HEADER_TRANSFER_ENCODING]) | |||||
# Construct the response | |||||
# Construct the real response | |||||
flow.response = http.HTTPResponse(http_version=http_version, | flow.response = http.HTTPResponse(http_version=http_version, | ||||
status_code=response.status_code, | status_code=response.status_code, | ||||
reason=response.reason, | |||||
reason=response.reason_phrase, | |||||
headers=response_headers, | headers=response_headers, | ||||
content=None) | content=None) | ||||
@@ -113,18 +123,48 @@ def process_flex(flex_host, flow, router): | |||||
if response.status_code // 100 != 2: | if response.status_code // 100 != 2: | ||||
response_headers[HEADER_CONTENT_LENGTH] = '0' | response_headers[HEADER_CONTENT_LENGTH] = '0' | ||||
flow.response.stream = lambda chunks: [] | flow.response.stream = lambda chunks: [] | ||||
elif content_length is None or int(content_length) > 0: | elif content_length is None or int(content_length) > 0: | ||||
response_headers[HEADER_TRANSFER_ENCODING] = 'chunked' | response_headers[HEADER_TRANSFER_ENCODING] = 'chunked' | ||||
flow.response.stream = lambda chunks: response.iter_content(8192) | |||||
flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_flex(url, loop, client, response)) | |||||
else: | else: | ||||
response_headers[HEADER_CONTENT_LENGTH] = '0' | response_headers[HEADER_CONTENT_LENGTH] = '0' | ||||
flow.response.stream = lambda chunks: [] | flow.response.stream = lambda chunks: [] | ||||
# Apply filters | # Apply filters | ||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('process_flex: bubble filtering: '+url) | |||||
# flow.response.stream = lambda chunks: response.iter_content(8192) | |||||
bubble_filter_response(flow, response) | |||||
if bubble_log.isEnabledFor(INFO): | if bubble_log.isEnabledFor(INFO): | ||||
bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding ...') | |||||
bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding...') | |||||
return response | |||||
async def async_chunk_iter(chunks): | |||||
for chunk in chunks: | |||||
yield chunk | |||||
def cleanup_flex(url, loop, client, response): | |||||
def cleanup(): | |||||
errors = False | |||||
try: | |||||
loop.run_until_complete(response.aclose()) | |||||
except Exception as e: | |||||
bubble_log.error('cleanup_flex: error closing response: '+repr(e)) | |||||
errors = True | |||||
try: | |||||
loop.run_until_complete(client.aclose()) | |||||
except Exception as e: | |||||
bubble_log.error('cleanup_flex: error: '+repr(e)) | |||||
errors = True | |||||
if not errors: | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
bubble_log.debug('cleanup_flex: successfully completed: '+url) | |||||
else: | |||||
if bubble_log.isEnabledFor(WARNING): | |||||
bubble_log.warning('cleanup_flex: successfully completed (but had errors closing): ' + url) | |||||
return cleanup |
@@ -1,19 +1,20 @@ | |||||
# | # | ||||
# Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ | # Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/ | ||||
# | # | ||||
import asyncio | |||||
import json | import json | ||||
import re | import re | ||||
import requests | |||||
import urllib | import urllib | ||||
import traceback | import traceback | ||||
from mitmproxy.net.http import Headers | from mitmproxy.net.http import Headers | ||||
from bubble_config import bubble_port, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri | from bubble_config import bubble_port, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri | ||||
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ | |||||
status_reason, get_flow_ctx, add_flow_ctx, \ | |||||
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, CTX_BUBBLE_FLEX, \ | |||||
status_reason, get_flow_ctx, add_flow_ctx, bubble_async, \ | |||||
is_bubble_special_path, is_bubble_health_check, health_check_response, special_bubble_response, \ | is_bubble_special_path, is_bubble_health_check, health_check_response, special_bubble_response, \ | ||||
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, CTX_BUBBLE_FILTERED, \ | CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, CTX_BUBBLE_FILTERED, \ | ||||
HEADER_CONTENT_TYPE, HEADER_CONTENT_ENCODING, HEADER_LOCATION, HEADER_CONTENT_LENGTH, \ | HEADER_CONTENT_TYPE, HEADER_CONTENT_ENCODING, HEADER_LOCATION, HEADER_CONTENT_LENGTH, \ | ||||
HEADER_USER_AGENT, HEADER_FILTER_PASSTHRU, HEADER_CONTENT_SECURITY_POLICY, REDIS, redis_set | HEADER_USER_AGENT, HEADER_FILTER_PASSTHRU, HEADER_CONTENT_SECURITY_POLICY, REDIS, redis_set | ||||
from bubble_flex import process_flex | |||||
import logging | import logging | ||||
from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | ||||
@@ -28,7 +29,7 @@ REDIS_FILTER_PASSTHRU_PREFIX = '__chunk_filter_pass__' | |||||
REDIS_FILTER_PASSTHRU_DURATION = 600 | REDIS_FILTER_PASSTHRU_DURATION = 600 | ||||
DEBUG_STREAM_COUNTERS = {} | DEBUG_STREAM_COUNTERS = {} | ||||
MIN_FILTER_CHUNK_SIZE = 16384 | |||||
MIN_FILTER_CHUNK_SIZE = 1024 * 32 # Filter data in 32KB chunks | |||||
def add_csp_part(new_csp, part): | def add_csp_part(new_csp, part): | ||||
@@ -53,7 +54,8 @@ def ensure_bubble_script_csp(csp): | |||||
return new_csp | return new_csp | ||||
def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None): | |||||
def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None): | |||||
name = 'filter_chunk' | |||||
if debug_capture_fqdn: | if debug_capture_fqdn: | ||||
if debug_capture_fqdn in req_id: | if debug_capture_fqdn in req_id: | ||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
@@ -87,17 +89,21 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c | |||||
else: | else: | ||||
url = url + '?last=true' | url = url + '?last=true' | ||||
chunk_len = 0 | |||||
if bubble_log.isEnabledFor(DEBUG): | |||||
if chunk is not None: | |||||
chunk_len = len(chunk) | |||||
if csp: | if csp: | ||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('filter_chunk: url='+url+' (csp='+csp+')') | |||||
filter_headers = { | |||||
bubble_log.debug('filter_chunk: url='+url+' (csp='+csp+') size='+str(chunk_len)) | |||||
headers = { | |||||
HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY, | HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY, | ||||
HEADER_CONTENT_SECURITY_POLICY: csp | HEADER_CONTENT_SECURITY_POLICY: csp | ||||
} | } | ||||
else: | else: | ||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('filter_chunk: url='+url+' (no csp)') | |||||
filter_headers = STANDARD_FILTER_HEADERS | |||||
bubble_log.debug('filter_chunk: url='+url+' (no csp) size='+str(chunk_len)) | |||||
headers = STANDARD_FILTER_HEADERS | |||||
if debug_stream_fqdn and debug_stream_uri and debug_stream_fqdn in req_id and flow.request.path == debug_stream_uri: | if debug_stream_fqdn and debug_stream_uri and debug_stream_fqdn in req_id and flow.request.path == debug_stream_uri: | ||||
if req_id in DEBUG_STREAM_COUNTERS: | if req_id in DEBUG_STREAM_COUNTERS: | ||||
@@ -112,14 +118,14 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c | |||||
f.write(chunk) | f.write(chunk) | ||||
f.close() | f.close() | ||||
f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.headers.json', mode='w') | f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.headers.json', mode='w') | ||||
f.write(json.dumps(filter_headers)) | |||||
f.write(json.dumps(headers)) | |||||
f.close() | f.close() | ||||
f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.url', mode='w') | f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.url', mode='w') | ||||
f.write(url) | f.write(url) | ||||
f.close() | f.close() | ||||
response = requests.post(url, data=chunk, headers=filter_headers) | |||||
if not response.ok: | |||||
response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop) | |||||
if not response.status_code == 200: | |||||
err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) | err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) | ||||
if bubble_log.isEnabledFor(ERROR): | if bubble_log.isEnabledFor(ERROR): | ||||
bubble_log.error(err_message) | bubble_log.error(err_message) | ||||
@@ -135,9 +141,7 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c | |||||
def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_encoding, content_type, csp): | def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_encoding, content_type, csp): | ||||
""" | |||||
chunks is a generator that can be used to iterate over all chunks. | |||||
""" | |||||
loop = asyncio.new_event_loop() | |||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type) | bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type) | ||||
first = True | first = True | ||||
@@ -165,20 +169,20 @@ def bubble_filter_chunks(flow, chunks, flex_stream, req_id, user_agent, content_ | |||||
else: | else: | ||||
last = False | last = False | ||||
if first: | if first: | ||||
yield filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding, content_type, content_length, csp) | |||||
yield filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding, content_type, content_length, csp) | |||||
first = False | first = False | ||||
else: | else: | ||||
yield filter_chunk(flow, chunk, req_id, user_agent, last) | |||||
yield filter_chunk(loop, flow, chunk, req_id, user_agent, last) | |||||
# send whatever is left in the buffer | # send whatever is left in the buffer | ||||
if len(buffer) > 0: | if len(buffer) > 0: | ||||
# bubble_log.debug('bubble_filter_chunks(end): sending remainder buffer of size '+str(len(buffer))) | # bubble_log.debug('bubble_filter_chunks(end): sending remainder buffer of size '+str(len(buffer))) | ||||
if first: | if first: | ||||
yield filter_chunk(flow, buffer, req_id, user_agent, last, content_encoding, content_type, content_length, csp) | |||||
yield filter_chunk(loop, flow, buffer, req_id, user_agent, last, content_encoding, content_type, content_length, csp) | |||||
else: | else: | ||||
yield filter_chunk(flow, buffer, req_id, user_agent, last) | |||||
yield filter_chunk(loop, flow, buffer, req_id, user_agent, last) | |||||
if not content_length or not last: | if not content_length or not last: | ||||
# bubble_log.debug('bubble_filter_chunks(end): sending last empty chunk') | # bubble_log.debug('bubble_filter_chunks(end): sending last empty chunk') | ||||
yield filter_chunk(flow, None, req_id, user_agent, True) # get the last bits of data | |||||
yield filter_chunk(loop, flow, None, req_id, user_agent, True) # get the last bits of data | |||||
except Exception as e: | except Exception as e: | ||||
if bubble_log.isEnabledFor(ERROR): | if bubble_log.isEnabledFor(ERROR): | ||||
bubble_log.error('bubble_filter_chunks: exception='+repr(e)) | bubble_log.error('bubble_filter_chunks: exception='+repr(e)) | ||||
@@ -209,7 +213,12 @@ def abort_data(content_type): | |||||
def responseheaders(flow): | def responseheaders(flow): | ||||
bubble_filter_response(flow, None) | |||||
flex_flow = get_flow_ctx(flow, CTX_BUBBLE_FLEX) | |||||
if flex_flow: | |||||
flex_stream = process_flex(flex_flow) | |||||
else: | |||||
flex_stream = None | |||||
bubble_filter_response(flow, flex_stream) | |||||
def bubble_filter_response(flow, flex_stream): | def bubble_filter_response(flow, flex_stream): | ||||
@@ -31,11 +31,11 @@ from mitmproxy.net.http import headers as nheaders | |||||
from bubble_api import bubble_matchers, bubble_activity_log, \ | from bubble_api import bubble_matchers, bubble_activity_log, \ | ||||
HEALTH_CHECK_URI, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_SPECIAL, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ | HEALTH_CHECK_URI, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_SPECIAL, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \ | ||||
CTX_BUBBLE_PASSTHRU, CTX_BUBBLE_REQUEST_ID, add_flow_ctx, parse_host_header, \ | |||||
CTX_BUBBLE_PASSTHRU, CTX_BUBBLE_FLEX, CTX_BUBBLE_REQUEST_ID, add_flow_ctx, parse_host_header, \ | |||||
is_bubble_special_path, special_bubble_response, is_bubble_health_check, \ | is_bubble_special_path, special_bubble_response, is_bubble_health_check, \ | ||||
is_bubble_request, is_sage_request, is_not_from_vpn, is_flex_domain | is_bubble_request, is_sage_request, is_not_from_vpn, is_flex_domain | ||||
from bubble_config import bubble_host, bubble_host_alias | from bubble_config import bubble_host, bubble_host_alias | ||||
from bubble_flex import set_flex_response | |||||
from bubble_flex import new_flex_flow | |||||
import logging | import logging | ||||
from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL | ||||
@@ -251,9 +251,11 @@ class Rerouter: | |||||
flow.request.port = port | flow.request.port = port | ||||
return host | return host | ||||
def request(self, flow): | |||||
def requestheaders(self, flow): | |||||
host = self.bubble_handle_request(flow) | host = self.bubble_handle_request(flow) | ||||
path = flow.request.path | path = flow.request.path | ||||
flow.request.capture_stream = True | |||||
if is_bubble_special_path(path): | if is_bubble_special_path(path): | ||||
# if bubble_log.isEnabledFor(DEBUG): | # if bubble_log.isEnabledFor(DEBUG): | ||||
# bubble_log.debug('request: is_bubble_special_path('+path+') returned true, sending special bubble response') | # bubble_log.debug('request: is_bubble_special_path('+path+') returned true, sending special bubble response') | ||||
@@ -262,9 +264,9 @@ class Rerouter: | |||||
elif host is not None: | elif host is not None: | ||||
client_addr = flow.client_conn.address[0] | client_addr = flow.client_conn.address[0] | ||||
if is_flex_domain(client_addr, host): | if is_flex_domain(client_addr, host): | ||||
add_flow_ctx(flow, CTX_BUBBLE_FLEX, new_flex_flow(client_addr, host, flow)) | |||||
if bubble_log.isEnabledFor(DEBUG): | if bubble_log.isEnabledFor(DEBUG): | ||||
bubble_log.debug('request: is_flex_domain('+host+') returned true, sending flex response') | |||||
set_flex_response(client_addr, host, flow) | |||||
bubble_log.debug('request: is_flex_domain('+host+') returned true, setting ctx: '+CTX_BUBBLE_FLEX) | |||||
addons = [Rerouter()] | addons = [Rerouter()] |