Parcourir la source

fix support for filtering flex-routed responses

tags/v1.1.4
Jonathan Cobb il y a 4 ans
Parent
révision
9514ef03e5
6 fichiers modifiés avec 139 ajouts et 63 suppressions
  1. +31
    -21
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py
  2. +4
    -0
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py
  3. +9
    -1
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_debug.py
  4. +3
    -3
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py
  5. +89
    -32
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py
  6. +3
    -6
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py

+ 31
- 21
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_api.py Voir le fichier

@@ -17,14 +17,15 @@ import nest_asyncio
import redis
from bubble_vpn4 import wireguard_network_ipv4
from bubble_vpn6 import wireguard_network_ipv6
from bubble_debug import get_stack
from netaddr import IPAddress, IPNetwork

from bubble_debug import get_stack
from bubble_config import bubble_port, debug_capture_fqdn, \
bubble_host, bubble_host_alias, bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6
from mitmproxy import http
from mitmproxy.net.http import headers as nheaders
from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody
from mitmproxy.proxy.protocol.request_capture import RequestCapture

bubble_log = logging.getLogger(__name__)

@@ -165,7 +166,7 @@ async def _bubble_async(name, url,
timeout=5,
max_redirects=0):
if client is not None:
return await _async_stream(client, name, url, headers=headers, method=method, data=data, json=json, timeout=timeout, max_redirects=max_redirects)
return await async_response(client, name, url, headers=headers, method=method, data=data, json=json)
else:
async with async_client(proxies=proxies, timeout=timeout, max_redirects=max_redirects) as client:
return await async_response(client, name, url, headers=headers, method=method, data=data, json=json)
@@ -192,7 +193,7 @@ def bubble_async(name, url,
timeout=timeout,
max_redirects=max_redirects))
except Exception as e:
bubble_log.error('bubble_async('+name+'): error: '+repr(e)+' from '+get_stack(e))
bubble_log.error('bubble_async('+name+'): error: '+repr(e))


def bubble_async_request_json(name, url, headers, method='GET', json=None):
@@ -211,16 +212,24 @@ def bubble_async_request_json(name, url, headers, method='GET', json=None):
def cleanup_async(url, loop, client, response):
def cleanup():
errors = False
try:
loop.run_until_complete(response.aclose())
except Exception as e:
bubble_log.error('cleanup_async: error closing response: '+repr(e))
errors = True
try:
loop.run_until_complete(client.aclose())
except Exception as e:
bubble_log.error('cleanup_async: error: '+repr(e))
errors = True
if response is not None:
try:
loop.run_until_complete(response.aclose())
except Exception as e:
bubble_log.error('cleanup_async: error closing response: '+repr(e))
errors = True
if client is not None:
try:
loop.run_until_complete(client.aclose())
except Exception as e:
bubble_log.error('cleanup_async: error closing client: '+repr(e))
errors = True
if loop is not None:
try:
loop.close()
except Exception as e:
bubble_log.error('cleanup_async: error closing loop: '+repr(e))
errors = True
if not errors:
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('cleanup_async: successfully completed: '+url)
@@ -375,7 +384,7 @@ def get_flow_ctx(flow, name):

def is_bubble_request(ip, fqdns):
# return ip in LOCAL_IPS
return ip in LOCAL_IPS and (bubble_host in fqdns or bubble_host_alias in fqdns)
return ip in LOCAL_IPS and fqdns and (bubble_host in fqdns or bubble_host_alias in fqdns)


def is_bubble_special_path(path):
@@ -466,9 +475,10 @@ def special_bubble_response(flow):
if is_bubble_health_check(path):
health_check_response(flow)
return

uri = make_bubble_special_path(path)
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('special_bubble_response: sending special bubble request to '+uri+' from '+get_stack())
bubble_log.debug('special_bubble_response: sending special bubble request to '+uri)
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
@@ -476,16 +486,16 @@ def special_bubble_response(flow):
if flow.request.method == 'GET':
loop = asyncio.new_event_loop()
client = async_client(timeout=30)
response = bubble_async(name, uri, client=client, loop=loop, headers=headers)
response = async_stream(client, name, uri, headers=headers, loop=loop)

elif flow.request.method == 'POST':
loop = asyncio.new_event_loop()
client = async_client(timeout=30)
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('special_bubble_response: special bubble request: POST content is '+str(flow.request.content))
if flow.request.content:
headers['Content-Length'] = str(len(flow.request.content))
response = bubble_async(name, uri, client=client, loop=loop, json=flow.request.content, headers=headers)
data = None
if flow.request.content and flow.request.content:
headers[HEADER_CONTENT_LENGTH] = str(len(flow.request.content))
data = flow.request.content
response = async_stream(client, name, uri, headers=headers, method='POST', data=data, loop=loop)

else:
if bubble_log.isEnabledFor(WARNING):


+ 4
- 0
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_conn_check.py Voir le fichier

@@ -96,6 +96,10 @@ def conn_check_cache_prefix(client_addr, server_addr):


def fqdns_for_addr(client_addr, server_addr):
if server_addr is None or client_addr is None or len(client_addr) == 0 or len(server_addr) == 0:
if bubble_log.isEnabledFor(WARNING):
bubble_log.warning('fqdns_for_addr: client_addr ('+repr(client_addr)+') or server_addr ('+repr(server_addr)+') was None or empty')
return None
key = REDIS_DNS_PREFIX + server_addr + '~' + client_addr
values = REDIS.smembers(key)
if values is None or len(values) == 0:


+ 9
- 1
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_debug.py Voir le fichier

@@ -4,6 +4,7 @@
import logging
from logging import INFO, DEBUG, WARNING, ERROR, CRITICAL

import inspect
import os
import threading
import traceback
@@ -23,10 +24,17 @@ BUBBLE_LOG_LEVEL_ENV_VAR = 'BUBBLE_LOG_LEVEL'
DEFAULT_BUBBLE_LOG_LEVEL = 'INFO'
BUBBLE_LOG_LEVEL = None

STACK_LINE = "[%s:%d] %s\n"


def get_stack(e=None):
if e is None:
e = ValueError()
stack = ''
for frame in inspect.stack()[1:]:
file, line, func = frame[1:4]
stack = stack + (STACK_LINE % (file, line, func))
return stack

return "".join(traceback.TracebackException.from_exception(e).format())




+ 3
- 3
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_flex.py Voir le fichier

@@ -26,6 +26,7 @@ class FlexFlow(RequestCapture):
mitm_flow: None
router: None
request_chunks: None
response: None
response_stream: None

def __init__(self, flex_host, mitm_flow, router):
@@ -60,9 +61,8 @@ def process_no_flex(flex_flow):
reason='OK',
headers=response_headers,
content=None)
error_html = flex_flow.router['error_html']
flex_flow.response_stream = lambda chunks: error_html
flow.response.stream = lambda chunks: error_html
error_html = flex_flow.router['error_html']
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('process_no_flex: no router found, returning error_html')
return flex_flow
@@ -169,7 +169,7 @@ def process_flex(flex_flow):
if bubble_log.isEnabledFor(INFO):
bubble_log.info('process_flex: successfully requested url '+url+' from flex router, proceeding...')

flex_flow.response_stream = response
flex_flow.response = response
return flex_flow




+ 89
- 32
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py Voir le fichier

@@ -6,10 +6,14 @@ import json
import re
import urllib
import traceback

from mitmproxy.net.http import Headers
from mitmproxy.proxy.protocol.async_stream_body import AsyncStreamBody

from bubble_config import bubble_port, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, CTX_BUBBLE_FLEX, \
status_reason, get_flow_ctx, add_flow_ctx, bubble_async, \
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \
CTX_BUBBLE_FLEX, CTX_BUBBLE_SPECIAL, \
status_reason, get_flow_ctx, add_flow_ctx, bubble_async, async_client, cleanup_async, \
is_bubble_special_path, is_bubble_health_check, health_check_response, special_bubble_response, \
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, CTX_BUBBLE_FILTERED, \
HEADER_CONTENT_TYPE, HEADER_CONTENT_ENCODING, HEADER_LOCATION, HEADER_CONTENT_LENGTH, \
@@ -54,7 +58,7 @@ def ensure_bubble_script_csp(csp):
return new_csp


def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None):
def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None, client=None):
name = 'filter_chunk'
if debug_capture_fqdn:
if debug_capture_fqdn in req_id:
@@ -124,9 +128,9 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N
f.write(url)
f.close()

response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop)
response = bubble_async(name, url, headers=headers, method='POST', data=chunk, loop=loop, client=client)
if not response.status_code == 200:
err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code)
err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code) + ' content='+repr(response.content)
if bubble_log.isEnabledFor(ERROR):
bubble_log.error(err_message)
return b''
@@ -140,7 +144,7 @@ def filter_chunk(loop, flow, chunk, req_id, user_agent, last, content_encoding=N
return response.content


def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_encoding, content_type, csp):
def bubble_filter_chunks(flow, chunks, req_id, user_agent, content_encoding, content_type, csp):
loop = asyncio.new_event_loop()
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('bubble_filter_chunks: starting with content_type='+content_type)
@@ -149,14 +153,11 @@ def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_en
content_length = get_flow_ctx(flow, CTX_CONTENT_LENGTH)
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('bubble_filter_chunks: found content_length='+str(content_length))
if flex_flow is not None:
# flex flows with errors are handled before we get here
chunks = flex_flow.response_stream.iter_content(8192)
try:
buffer = b''
for chunk in chunks:
buffer = buffer + chunk
if len(buffer) < MIN_FILTER_CHUNK_SIZE:
if not last and len(buffer) < MIN_FILTER_CHUNK_SIZE:
continue
chunk_len = len(buffer)
chunk = buffer
@@ -189,15 +190,65 @@ def bubble_filter_chunks(flow, chunks, flex_flow, req_id, user_agent, content_en
bubble_log.error('bubble_filter_chunks: exception='+repr(e))
traceback.print_exc()
yield None
finally:
loop.close()


def bubble_modify(flow, flex_flow, req_id, user_agent, content_encoding, content_type, csp):
def bubble_modify(flow, req_id, user_agent, content_encoding, content_type, csp):
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug('bubble_modify: modifying req_id='+req_id+' with content_type='+content_type)
return lambda chunks: bubble_filter_chunks(flow, chunks, flex_flow, req_id,
return lambda chunks: bubble_filter_chunks(flow, chunks, req_id,
user_agent, content_encoding, content_type, csp)


class AsyncStreamContext:
first = True
buffer = b''


def async_filter_chunk(stream_body_obj, flow, req_id, user_agent, content_encoding, content_type, csp):
client = async_client()
loop = asyncio.new_event_loop()
stream_body_obj.ctx = AsyncStreamContext()
orig_finalize = stream_body_obj.finalize

def _finalize():
bubble_log.info('_finalize: cleaning up for '+req_id+' sent '+str(stream_body_obj.total)+' bytes')
if orig_finalize is not None:
orig_finalize()
cleanup_async('_async_filter_chunk('+req_id+')', loop, client, None)

stream_body_obj.finalize = _finalize
stream_body_obj.total = 0

def _async_filter_chunk(chunk, last):
if chunk is None:
bubble_log.info('_async_filter_chunk: filtering None chunk (!!) last='+str(last))
else:
bubble_log.info('_async_filter_chunk: filtering chunk of size = '+str(len(chunk))+' last=' + str(last))
stream_body_obj.ctx.buffer = stream_body_obj.ctx.buffer + chunk
if not last and len(stream_body_obj.ctx.buffer) < MIN_FILTER_CHUNK_SIZE:
return None
chunk = stream_body_obj.ctx.buffer
stream_body_obj.ctx.buffer = b''
if stream_body_obj.ctx.first:
stream_body_obj.ctx.first = False
new_chunk = filter_chunk(loop, flow, chunk, req_id, user_agent, last,
content_encoding=content_encoding, content_type=content_type, content_length=None,
csp=csp, client=client)
else:
new_chunk = filter_chunk(loop, flow, chunk, req_id, user_agent, last, client=client)
if new_chunk is None or len(chunk) == 0:
bubble_log.info('_async_filter_chunk: filtered chunk, got back None or zero chunk (means "send more data")')
return None
else:
bubble_log.info('_async_filter_chunk: filtered chunk, got back chunk of size '+str(len(new_chunk)))
stream_body_obj.total = stream_body_obj.total + len(new_chunk)
return new_chunk

return _async_filter_chunk


EMPTY_XML = [b'<?xml version="1.0" encoding="UTF-8"?><html></html>']
EMPTY_JSON = [b'null']
EMPTY_DEFAULT = []
@@ -236,6 +287,7 @@ def bubble_filter_response(flow, flex_flow):
if is_bubble_health_check(path):
health_check_response(flow)
else:
bubble_log.info('bubble_filter_response: sending special bubble response for path: '+path)
special_bubble_response(flow)

elif flex_flow and flex_flow.is_error():
@@ -328,28 +380,33 @@ def bubble_filter_response(flow, flex_flow):
if bubble_log.isEnabledFor(DEBUG):
bubble_log.debug(prefix+'content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type))

flow.response.stream = bubble_modify(flow, flex_flow, req_id,
user_agent, content_encoding, content_type, csp)
if content_length_value:
flow.response.headers['transfer-encoding'] = 'chunked'
# find server_conn to set fake_chunks on
if flow.live and flow.live.ctx:
ctx = flow.live.ctx
while not hasattr(ctx, 'server_conn'):
if hasattr(ctx, 'ctx'):
ctx = ctx.ctx
else:
if flex_flow is not None:
# flex flows with errors are handled before we get here
bubble_log.info(prefix+' filtering async stream, starting with flow.response.stream = '+repr(flow.response.stream))
flow.response.stream.filter_chunk = async_filter_chunk(flow.response.stream, flow, req_id, user_agent, content_encoding, content_type, csp)
else:
flow.response.stream = bubble_modify(flow, req_id, user_agent, content_encoding, content_type, csp)
if content_length_value:
flow.response.headers['transfer-encoding'] = 'chunked'
# find server_conn to set fake_chunks on
if flow.live and flow.live.ctx:
ctx = flow.live.ctx
while not hasattr(ctx, 'server_conn'):
if hasattr(ctx, 'ctx'):
ctx = ctx.ctx
else:
if bubble_log.isEnabledFor(ERROR):
bubble_log.error(prefix+'error finding server_conn for path '+path+'. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx)))
return
if not hasattr(ctx, 'server_conn'):
if bubble_log.isEnabledFor(ERROR):
bubble_log.error(prefix+'error finding server_conn for path '+path+'. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx)))
bubble_log.error(prefix+'error finding server_conn for path '+path+'. ctx type='+str(type(ctx))+' vars='+str(vars(ctx)))
return
if not hasattr(ctx, 'server_conn'):
if bubble_log.isEnabledFor(ERROR):
bubble_log.error(prefix+'error finding server_conn for path '+path+'. ctx type='+str(type(ctx))+' vars='+str(vars(ctx)))
return
content_length = int(content_length_value)
ctx.server_conn.rfile.fake_chunks = content_length
add_flow_ctx(flow, CTX_CONTENT_LENGTH, content_length)
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0)
content_length = int(content_length_value)
if ctx.server_conn.rfile:
ctx.server_conn.rfile.fake_chunks = content_length
add_flow_ctx(flow, CTX_CONTENT_LENGTH, content_length)
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0)

else:
if bubble_log.isEnabledFor(DEBUG):


+ 3
- 6
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_request.py Voir le fichier

@@ -32,7 +32,7 @@ from mitmproxy.net.http import headers as nheaders
from bubble_api import bubble_matchers, bubble_activity_log, \
HEALTH_CHECK_URI, CTX_BUBBLE_MATCHERS, CTX_BUBBLE_SPECIAL, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, \
CTX_BUBBLE_PASSTHRU, CTX_BUBBLE_FLEX, CTX_BUBBLE_REQUEST_ID, add_flow_ctx, parse_host_header, \
is_bubble_special_path, special_bubble_response, is_bubble_health_check, \
is_bubble_special_path, is_bubble_health_check, \
is_bubble_request, is_sage_request, is_not_from_vpn, is_flex_domain
from bubble_config import bubble_host, bubble_host_alias
from bubble_flex import new_flex_flow
@@ -254,16 +254,13 @@ class Rerouter:
def requestheaders(self, flow):
host = self.bubble_handle_request(flow)
path = flow.request.path
flow.request.capture_stream = True

if is_bubble_special_path(path):
# if bubble_log.isEnabledFor(DEBUG):
# bubble_log.debug('request: is_bubble_special_path('+path+') returned true, sending special bubble response')
special_bubble_response(flow)
flow.request.force_no_stream = True

elif host is not None:
client_addr = flow.client_conn.address[0]
server_addr= flow.server_conn.address[0]
server_addr = flow.server_conn.address[0]
if is_flex_domain(client_addr, server_addr, [host]):
flex_flow = new_flex_flow(client_addr, host, flow)
add_flow_ctx(flow, CTX_BUBBLE_FLEX, flex_flow)


Chargement…
Annuler
Enregistrer