Browse Source

fix bubble special requests, stream responses

tags/v1.1.4
Jonathan Cobb 4 years ago
parent
commit
8a4f9627eb
4 changed files with 81 additions and 69 deletions
  1. +55
    -12
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py
  2. +22
    -27
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py
  3. +2
    -29
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py
  4. +2
    -1
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py

+ 55
- 12
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py View File

@@ -17,12 +17,14 @@ import nest_asyncio
import redis import redis
from bubble_vpn4 import wireguard_network_ipv4 from bubble_vpn4 import wireguard_network_ipv4
from bubble_vpn6 import wireguard_network_ipv6 from bubble_vpn6 import wireguard_network_ipv6
from bubble_debug import get_stack
from netaddr import IPAddress, IPNetwork from netaddr import IPAddress, IPNetwork


from bubble_config import bubble_port, debug_capture_fqdn, \ from bubble_config import bubble_port, debug_capture_fqdn, \
bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6 bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6
from mitmproxy import http from mitmproxy import http
from mitmproxy.net.http import headers as nheaders from mitmproxy.net.http import headers as nheaders
from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody


bubble_log = logging.getLogger(__name__) bubble_log = logging.getLogger(__name__)


@@ -154,6 +156,7 @@ async def _async_stream(client, name, url,




async def _bubble_async(name, url, async def _bubble_async(name, url,
client=None,
headers=None, headers=None,
method='GET', method='GET',
data=None, data=None,
@@ -161,11 +164,15 @@ async def _bubble_async(name, url,
proxies=None, proxies=None,
timeout=5, timeout=5,
max_redirects=0): max_redirects=0):
async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client:
return await async_response(client, name, url, headers=headers, method=method, data=data, json=json)
if client is not None:
return await _async_stream(client, name, url, headers=headers, method=method, data=data, json=json, timeout=timeout, max_redirects=max_redirects)
else:
async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client:
return await async_response(client, name, url, headers=headers, method=method, data=data, json=json)




def bubble_async(name, url, def bubble_async(name, url,
client=None,
headers=None, headers=None,
method='GET', method='GET',
data=None, data=None,
@@ -176,6 +183,7 @@ def bubble_async(name, url,
loop=asyncio.get_running_loop()): loop=asyncio.get_running_loop()):
try: try:
return loop.run_until_complete(_bubble_async(name, url, return loop.run_until_complete(_bubble_async(name, url,
client=client,
headers=headers, headers=headers,
method=method, method=method,
data=data, data=data,
@@ -184,19 +192,44 @@ def bubble_async(name, url,
timeout=timeout, timeout=timeout,
max_redirects=max_redirects)) max_redirects=max_redirects))
except Exception as e: except Exception as e:
bubble_log.error('bubble_async('+name+'): error: '+repr(e))
bubble_log.error('bubble_async('+name+'): error: '+repr(e)+' from '+get_stack(e))




def bubble_async_request_json(name, url, headers, method='GET', json=None): def bubble_async_request_json(name, url, headers, method='GET', json=None):
response = bubble_async(name, url, headers, method=method, json=json)
response = bubble_async(name, url, headers=headers, method=method, json=json)
if response and response.status_code == 200: if response and response.status_code == 200:
return response.json() return response.json()
elif response:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+repr(response.status_code))
else: else:
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('bubble_async_request_json('+name+'): received invalid HTTP status: '+str(response.status_code))
bubble_log.debug('bubble_async_request_json('+name+'): error, no response')
return None return None




def cleanup_async(url, loop, client, response):
def cleanup():
errors = False
try:
loop.run_until_complete(response.aclose())
except Exception as e:
bubble_log.error('cleanup_async: error closing response: '+repr(e))
errors = True
try:
loop.run_until_complete(client.aclose())
except Exception as e:
bubble_log.error('cleanup_async: error: '+repr(e))
errors = True
if not errors:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('cleanup_async: successfully completed: '+url)
else:
if bubble_log.isEnabledFor(WARNING):
bubble_log.warning('cleanup_async: successfully completed (but had errors closing): ' + url)
return cleanup


def bubble_conn_check(client_addr, server_addr, fqdns, security_level): def bubble_conn_check(client_addr, server_addr, fqdns, security_level):
if debug_capture_fqdn and fqdns: if debug_capture_fqdn and fqdns:
for f in debug_capture_fqdn: for f in debug_capture_fqdn:
@@ -218,7 +251,7 @@ def bubble_conn_check(client_addr, server_addr, fqdns, security_level):
'clientAddr': client_addr 'clientAddr': client_addr
} }
try: try:
return bubble_async_request_json(name, url, headers, method='POST', json=data)
return bubble_async_request_json(name, url, headers=headers, method='POST', json=data)


except Exception as e: except Exception as e:
if bubble_log.isEnabledFor(ERROR): if bubble_log.isEnabledFor(ERROR):
@@ -366,7 +399,13 @@ def is_not_from_vpn(client_addr):
return ip not in VPN_IP4_CIDR and ip not in VPN_IP6_CIDR return ip not in VPN_IP4_CIDR and ip not in VPN_IP6_CIDR




def is_flex_domain(client_addr, fqdn):
def is_flex_domain(client_addr, server_addr, fqdns):
if fqdns is None or len(fqdns) != 1:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('is_flex_domain: no fqdns or multiple fqdns for server_addr '+server_addr+' ('+repr(fqdns)+'), returning False')
return False
fqdn = fqdns[0]

if fqdn == bubble_host or fqdn == bubble_host_alias or fqdn == bubble_sage_host: if fqdn == bubble_host or fqdn == bubble_host_alias or fqdn == bubble_sage_host:
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('is_flex_domain: (early) returning False for: '+fqdn) bubble_log.debug('is_flex_domain: (early) returning False for: '+fqdn)
@@ -429,20 +468,24 @@ def special_bubble_response(flow):
return return
uri = make_bubble_special_path(path) uri = make_bubble_special_path(path)
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('special_bubble_response: sending special bubble request to '+uri)
bubble_log.debug('special_bubble_response: sending special bubble request to '+uri+' from '+get_stack())
headers = { headers = {
'Accept': 'application/json', 'Accept': 'application/json',
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
if flow.request.method == 'GET': if flow.request.method == 'GET':
response = bubble_async(name, uri, headers=headers)
loop = asyncio.new_event_loop()
client = async_client(timeout=30)
response = bubble_async(name, uri, client=client, loop=loop, headers=headers)


elif flow.request.method == 'POST': elif flow.request.method == 'POST':
loop = asyncio.new_event_loop()
client = async_client(timeout=30)
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content)) bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content))
if flow.request.content: if flow.request.content:
headers['Content-Length'] = str(len(flow.request.content)) headers['Content-Length'] = str(len(flow.request.content))
response = bubble_async(name, uri, json=flow.request.content, headers=headers)
response = bubble_async(name, uri, client=client, loop=loop, json=flow.request.content, headers=headers)


else: else:
if bubble_log.isEnabledFor(WARNING): if bubble_log.isEnabledFor(WARNING):
@@ -454,7 +497,7 @@ def special_bubble_response(flow):
response_headers = collect_response_headers(response) response_headers = collect_response_headers(response)
flow.response = http.HTTPResponse(http_version=http_version, flow.response = http.HTTPResponse(http_version=http_version,
status_code=response.status_code, status_code=response.status_code,
reason=response.reason,
reason=response.reason_phrase,
headers=response_headers, headers=response_headers,
content=None) content=None)
if response is not None: if response is not None:
@@ -463,7 +506,7 @@ def special_bubble_response(flow):
flow.response.headers = collect_response_headers(response) flow.response.headers = collect_response_headers(response)
flow.response.status_code = response.status_code flow.response.status_code = response.status_code
flow.response.reason = status_reason(response.status_code) flow.response.reason = status_reason(response.status_code)
flow.response.stream = lambda chunks: send_bubble_response(response)
flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_async(uri, loop, client, response))




def send_bubble_response(response): def send_bubble_response(response):


+ 22
- 27
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py View File

@@ -95,17 +95,16 @@ def conn_check_cache_prefix(client_addr, server_addr):
return REDIS_CONN_CHECK_PREFIX + client_addr + '_' + server_addr return REDIS_CONN_CHECK_PREFIX + client_addr + '_' + server_addr




def fqdns_for_addr(server_addr):
prefix = REDIS_DNS_PREFIX + server_addr
keys = REDIS.keys(prefix + '_*')
if keys is None or len(keys) == 0:
def fqdns_for_addr(client_addr, server_addr):
key = REDIS_DNS_PREFIX + server_addr + '~' + client_addr
values = REDIS.smembers(key)
if values is None or len(values) == 0:
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('fqdns_for_addr: no FQDN found for addr '+str(server_addr)+', checking raw addr')
return ''
bubble_log.debug('fqdns_for_addr: no FQDN found for server_addr '+str(server_addr)+' and client_addr '+client_addr)
return None
fqdns = [] fqdns = []
for k in keys:
fqdn = k.decode()[len(prefix)+1:]
fqdns.append(fqdn)
for fqdn in values:
fqdns.append(fqdn.decode())
return fqdns return fqdns




@@ -120,6 +119,8 @@ class TlsBlock(TlsLayer):




class TlsFeedback(TlsLayer): class TlsFeedback(TlsLayer):
fqdns = None
security_level = None
""" """
Monkey-patch _establish_tls_with_client to get feedback if TLS could be established Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
successfully on the client connection (which may fail due to cert pinning). successfully on the client connection (which may fail due to cert pinning).
@@ -231,37 +232,31 @@ def check_connection(client_addr, server_addr, fqdns, security_level):
return check_response return check_response




def check_passthru_flex(client_addr, server_addr, fqdns):
if fqdns:
for fqdn in fqdns:
if is_flex_domain(client_addr, fqdn):
return True
else:
return is_flex_domain(client_addr, server_addr)


def passthru_flex_port(client_addr, fqdns):
router = bubble_get_flex_router(client_addr)
def passthru_flex_port(client_addr, fqdn):
router = bubble_get_flex_router(client_addr, fqdn)
if router is None or 'auth' not in router: if router is None or 'auth' not in router:
if bubble_log.isEnabledFor(INFO): if bubble_log.isEnabledFor(INFO):
bubble_log.info('apply_passthru_flex: no flex router for fqdn(s): '+repr(fqdns))
bubble_log.info('apply_passthru_flex: no flex router for fqdn(s): '+repr(fqdn))
elif 'port' in router: elif 'port' in router:
return router['port'] return router['port']
else: else:
if bubble_log.isEnabledFor(WARNING): if bubble_log.isEnabledFor(WARNING):
bubble_log.warning('apply_passthru_flex: flex router found but has no port ('+repr(router)+') for fqdn(s): '+repr(fqdns))
bubble_log.warning('apply_passthru_flex: flex router found but has no port ('+repr(router)+') for fqdn(s): '+repr(fqdn))
return None return None




def do_passthru(client_addr, server_addr, fqdns, layer): def do_passthru(client_addr, server_addr, fqdns, layer):
flex_port = None flex_port = None
if check_passthru_flex(client_addr, server_addr, fqdns):
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('do_passthru: applying flex passthru for server=' + server_addr + ', fqdns=' + str(fqdns))
flex_port = passthru_flex_port(client_addr, fqdns)
if is_flex_domain(client_addr, server_addr, fqdns):
flex_port = passthru_flex_port(client_addr, fqdns[0])
if flex_port: if flex_port:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('do_passthru: applying flex passthru for server=' + server_addr + ', fqdns=' + str(fqdns))
layer_replacement = BubbleFlexPassthruLayer(layer.ctx, ('127.0.0.1', flex_port), fqdns[0], 443) layer_replacement = BubbleFlexPassthruLayer(layer.ctx, ('127.0.0.1', flex_port), fqdns[0], 443)
layer.reply.send(layer_replacement) layer.reply.send(layer_replacement)
else:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('do_passthru: detected flex passthru but no flex routers available for server=' + server_addr + ', fqdns=' + str(fqdns))
if flex_port is None: if flex_port is None:
layer_replacement = RawTCPLayer(layer.ctx, ignore=True) layer_replacement = RawTCPLayer(layer.ctx, ignore=True)
layer.reply.send(layer_replacement) layer.reply.send(layer_replacement)
@@ -280,7 +275,7 @@ def next_layer(layer):
bubble_log.debug('next_layer: using fqdn in SNI: '+ fqdn) bubble_log.debug('next_layer: using fqdn in SNI: '+ fqdn)
fqdns = [fqdn] fqdns = [fqdn]
else: else:
fqdns = fqdns_for_addr(server_addr)
fqdns = fqdns_for_addr(client_addr, server_addr)
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('next_layer: NO fqdn in sni, using fqdns from DNS: '+ str(fqdns)) bubble_log.debug('next_layer: NO fqdn in sni, using fqdns from DNS: '+ str(fqdns))
layer.fqdns = fqdns layer.fqdns = fqdns


+ 2
- 29
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py View File

@@ -9,7 +9,7 @@ from mitmproxy import http
from mitmproxy.net.http import headers as nheaders from mitmproxy.net.http import headers as nheaders
from mitmproxy.proxy.protocol.request_capture import RequestCapture from mitmproxy.proxy.protocol.request_capture import RequestCapture


from bubble_api import bubble_get_flex_router, collect_response_headers, async_client, async_stream, \
from bubble_api import bubble_get_flex_router, collect_response_headers, async_client, async_stream, cleanup_async, \
HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH, HEADER_CONTENT_TYPE HEADER_TRANSFER_ENCODING, HEADER_CONTENT_LENGTH, HEADER_CONTENT_TYPE


import logging import logging
@@ -159,7 +159,7 @@ def process_flex(flex_flow):


elif content_length is None or int(content_length) > 0: elif content_length is None or int(content_length) > 0:
response_headers[HEADER_TRANSFER_ENCODING] = 'chunked' response_headers[HEADER_TRANSFER_ENCODING] = 'chunked'
flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_flex(url, loop, client, response))
flow.response.stream = AsyncStreamBody(owner=client, loop=loop, chunks=response.aiter_raw(), finalize=cleanup_async(url, loop, client, response))


else: else:
response_headers[HEADER_CONTENT_LENGTH] = '0' response_headers[HEADER_CONTENT_LENGTH] = '0'
@@ -176,30 +176,3 @@ def process_flex(flex_flow):
async def async_chunk_iter(chunks): async def async_chunk_iter(chunks):
for chunk in chunks: for chunk in chunks:
yield chunk yield chunk


def cleanup_flex(url, loop, client, response):
def cleanup():

errors = False

try:
loop.run_until_complete(response.aclose())
except Exception as e:
bubble_log.error('cleanup_flex: error closing response: '+repr(e))
errors = True

try:
loop.run_until_complete(client.aclose())
except Exception as e:
bubble_log.error('cleanup_flex: error: '+repr(e))
errors = True

if not errors:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('cleanup_flex: successfully completed: '+url)
else:
if bubble_log.isEnabledFor(WARNING):
bubble_log.warning('cleanup_flex: successfully completed (but had errors closing): ' + url)

return cleanup

+ 2
- 1
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py View File

@@ -263,7 +263,8 @@ class Rerouter:


elif host is not None: elif host is not None:
client_addr = flow.client_conn.address[0] client_addr = flow.client_conn.address[0]
if is_flex_domain(client_addr, host):
server_addr= flow.server_conn.address[0]
if is_flex_domain(client_addr, server_addr, [host]):
flex_flow = new_flex_flow(client_addr, host, flow) flex_flow = new_flex_flow(client_addr, host, flow)
add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow) add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow)
if bubble_log.isEnabledFor(DEBUG): if bubble_log.isEnabledFor(DEBUG):


Loading…
Cancel
Save