@@ -1,22 +1,24 @@ | |||
import requests | |||
import traceback | |||
import sys | |||
import datetime | |||
from bubble_config import bubble_network, bubble_port | |||
HEADER_USER_AGENT = 'User-Agent' | |||
HEADER_REFERER = 'Referer' | |||
HEADER_BUBBLE_MATCHERS='X-Bubble-Matchers' | |||
HEADER_BUBBLE_DEVICE='X-Bubble-Device' | |||
HEADER_BUBBLE_ABORT='X-Bubble-Abort' | |||
HEADER_BUBBLE_REQUEST_ID='X-Bubble-RequestId' | |||
CTX_BUBBLE_MATCHERS='X-Bubble-Matchers' | |||
CTX_BUBBLE_ABORT='X-Bubble-Abort' | |||
CTX_BUBBLE_REQUEST_ID='X-Bubble-RequestId' | |||
CTX_CONTENT_LENGTH='X-Bubble-Content-Length' | |||
CTX_CONTENT_LENGTH_SENT='X-Bubble-Content-Length-Sent' | |||
BUBBLE_URI_PREFIX='/__bubble/' | |||
def bubble_log (message): | |||
print(message, file=sys.stderr) | |||
def bubble_log(message): | |||
print(str(datetime.datetime.time(datetime.datetime.now()))+': ' + message, file=sys.stderr) | |||
def bubble_matchers (req_id, remote_addr, flow, host): | |||
def bubble_matchers(req_id, remote_addr, flow, host): | |||
headers = { | |||
'X-Forwarded-For': remote_addr, | |||
'Accept' : 'application/json', | |||
@@ -55,3 +57,15 @@ def bubble_matchers (req_id, remote_addr, flow, host): | |||
bubble_log('bubble_matchers API call failed: '+repr(e)) | |||
traceback.print_exc() | |||
return None | |||
def add_flow_ctx(flow, name, value): | |||
if not hasattr(flow, 'bubble_ctx'): | |||
flow.bubble_ctx = {} | |||
flow.bubble_ctx[name] = value | |||
def get_flow_ctx(flow, name): | |||
if not hasattr(flow, 'bubble_ctx'): | |||
return None | |||
if not name in flow.bubble_ctx: | |||
return None | |||
return flow.bubble_ctx[name] |
@@ -1,10 +1,10 @@ | |||
import requests | |||
import urllib | |||
import json | |||
from mitmproxy import http | |||
import traceback | |||
from mitmproxy.net.http import Headers | |||
from bubble_config import bubble_port, bubble_host_alias | |||
from bubble_api import HEADER_BUBBLE_MATCHERS, HEADER_BUBBLE_ABORT, BUBBLE_URI_PREFIX, HEADER_BUBBLE_REQUEST_ID, bubble_log | |||
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, BUBBLE_URI_PREFIX, \ | |||
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, bubble_log, get_flow_ctx, add_flow_ctx | |||
BUFFER_SIZE = 4096 | |||
HEADER_CONTENT_TYPE = 'Content-Type' | |||
@@ -14,16 +14,24 @@ HEADER_TRANSFER_ENCODING = 'Transfer-Encoding' | |||
BINARY_DATA_HEADER = {HEADER_CONTENT_TYPE: 'application/octet-stream'} | |||
def filter_chunk(chunk, req_id, content_encoding=None, content_type=None): | |||
def filter_chunk(chunk, req_id, last, content_encoding=None, content_type=None, content_length=None): | |||
url = 'http://127.0.0.1:' + bubble_port + '/api/filter/apply/' + req_id | |||
if content_type and chunk: | |||
params_added = False | |||
if chunk and content_type: | |||
params_added = True | |||
url = (url | |||
+ '?contentType=' + urllib.parse.quote_plus(content_type)) | |||
+ '?type=' + urllib.parse.quote_plus(content_type)) | |||
if content_encoding: | |||
url = url + '&encoding=' + urllib.parse.quote_plus(content_encoding) | |||
elif not chunk: | |||
url = url + '?last=true' | |||
bubble_log("filter_chunk: url="+url) | |||
if content_length: | |||
url = url + '&length=' + str(content_length) | |||
if last: | |||
if params_added: | |||
url = url + '&last=true' | |||
else: | |||
url = url + '?last=true' | |||
bubble_log('filter_chunk: url='+url) | |||
response = requests.post(url, data=chunk, headers=BINARY_DATA_HEADER) | |||
if not response.ok: | |||
@@ -34,22 +42,37 @@ def filter_chunk(chunk, req_id, content_encoding=None, content_type=None): | |||
return response.content | |||
def bubble_filter_chunks(flow, chunks, req_id, content_encoding, content_type): | |||
def bubble_filter_chunks(flow, chunks, req_id, content_encoding, content_type, content_length): | |||
""" | |||
chunks is a generator that can be used to iterate over all chunks. | |||
""" | |||
first = True | |||
for chunk in chunks: | |||
if first: | |||
yield filter_chunk(chunk, req_id, content_encoding, content_type) | |||
first = False | |||
else: | |||
yield filter_chunk(chunk, req_id) | |||
yield filter_chunk(None, req_id) # get the last bits of data | |||
content_length = get_flow_ctx(flow, CTX_CONTENT_LENGTH) | |||
try: | |||
for chunk in chunks: | |||
if content_length: | |||
bytes_sent = get_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT) | |||
chunk_len = len(chunk) | |||
last = chunk_len + bytes_sent >= content_length | |||
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, bytes_sent + chunk_len) | |||
else: | |||
last = False | |||
if first: | |||
yield filter_chunk(chunk, req_id, last, content_encoding, content_type, content_length) | |||
first = False | |||
else: | |||
yield filter_chunk(chunk, req_id, last) | |||
if not content_length: | |||
yield filter_chunk(None, req_id, True) # get the last bits of data | |||
except Exception as e: | |||
bubble_log('bubble_filter_chunks: exception='+repr(e)) | |||
traceback.print_exc() | |||
yield None | |||
def bubble_modify(flow, req_id, content_encoding, content_type, content_length): | |||
return lambda chunks: bubble_filter_chunks(flow, chunks, req_id, content_encoding, content_type, content_length) | |||
def bubble_modify(flow, req_id, content_encoding, content_type): | |||
return lambda chunks: bubble_filter_chunks(flow, chunks, req_id, content_encoding, content_type) | |||
def send_bubble_response(response): | |||
for chunk in response.iter_content(8192): | |||
@@ -59,8 +82,7 @@ def send_bubble_response(response): | |||
def responseheaders(flow): | |||
if flow.request.path and flow.request.path.startswith(BUBBLE_URI_PREFIX): | |||
bubble_log('responseheaders: request path starts with '+BUBBLE_URI_PREFIX+', sending to bubble') | |||
uri = 'https://' + bubble_host_alias + ':1443/' + flow.request.path[len(BUBBLE_URI_PREFIX):] | |||
uri = 'https://' + bubble_host_alias + ':' + str(bubble_ssl_port) + '/' + flow.request.path[len(BUBBLE_URI_PREFIX):] | |||
bubble_log('responseheaders: sending special bubble request to '+uri) | |||
headers = { | |||
'Accept' : 'application/json', | |||
@@ -83,35 +105,55 @@ def responseheaders(flow): | |||
flow.response.status_code = response.status_code | |||
flow.response.stream = lambda chunks: send_bubble_response(response) | |||
elif HEADER_BUBBLE_ABORT in flow.request.headers: | |||
abort_code = int(flow.request.headers[HEADER_BUBBLE_ABORT]) | |||
bubble_log('responseheaders: aborting request with HTTP status '+abort_code) | |||
flow.response.headers = Headers() | |||
flow.response.status_code = abort_code | |||
flow.response.stream = lambda chunks: None | |||
elif HEADER_BUBBLE_MATCHERS in flow.request.headers and HEADER_BUBBLE_REQUEST_ID in flow.request.headers: | |||
req_id = flow.request.headers[HEADER_BUBBLE_REQUEST_ID] | |||
matchers = flow.request.headers[HEADER_BUBBLE_MATCHERS] | |||
bubble_log('responseheaders: req_id='+req_id+' with matchers: '+matchers) | |||
if HEADER_CONTENT_TYPE in flow.response.headers: | |||
content_type = flow.response.headers[HEADER_CONTENT_TYPE] | |||
if matchers: | |||
if HEADER_CONTENT_ENCODING in flow.response.headers: | |||
content_encoding = flow.response.headers[HEADER_CONTENT_ENCODING] | |||
else: | |||
content_encoding = None | |||
else: | |||
abort_code = get_flow_ctx(flow, CTX_BUBBLE_ABORT) | |||
if abort_code is not None: | |||
bubble_log('responseheaders: aborting request with HTTP status '+str(abort_code)) | |||
flow.response.headers = Headers() | |||
flow.response.status_code = abort_code | |||
flow.response.stream = lambda chunks: None | |||
flow.response.headers.pop(HEADER_CONTENT_LENGTH, None) | |||
bubble_log('responseheaders: req_id='+req_id+' content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type)) | |||
# flow.response.headers[HEADER_TRANSFER_ENCODING] = 'chunked' | |||
flow.response.stream = bubble_modify(flow, req_id, content_encoding, content_type) | |||
else: | |||
req_id = get_flow_ctx(flow, CTX_BUBBLE_REQUEST_ID) | |||
matchers = get_flow_ctx(flow, CTX_BUBBLE_MATCHERS) | |||
if req_id is not None and matchers is not None: | |||
bubble_log('responseheaders: req_id='+req_id+' with matchers: '+repr(matchers)) | |||
if HEADER_CONTENT_TYPE in flow.response.headers: | |||
content_type = flow.response.headers[HEADER_CONTENT_TYPE] | |||
if matchers: | |||
if HEADER_CONTENT_ENCODING in flow.response.headers: | |||
content_encoding = flow.response.headers[HEADER_CONTENT_ENCODING] | |||
else: | |||
content_encoding = None | |||
content_length_value = flow.response.headers.pop(HEADER_CONTENT_LENGTH, None) | |||
bubble_log('responseheaders: req_id='+req_id+' content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type)) | |||
flow.response.stream = bubble_modify(flow, req_id, content_encoding, content_type, content_length_value) | |||
if content_length_value: | |||
flow.response.headers['transfer-encoding'] = 'chunked' | |||
# find server_conn to set fake_chunks on | |||
if flow.live and flow.live.ctx: | |||
ctx = flow.live.ctx | |||
while not hasattr(ctx, 'server_conn'): | |||
if hasattr(ctx, 'ctx'): | |||
ctx = ctx.ctx | |||
else: | |||
bubble_log('responseheaders: error finding server_conn. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx))) | |||
return | |||
if not hasattr(ctx, 'server_conn'): | |||
bubble_log('responseheaders: error finding server_conn. ctx type='+str(type(ctx))+' vars='+str(vars(ctx))) | |||
return | |||
content_length = int(content_length_value) | |||
ctx.server_conn.rfile.fake_chunks = content_length | |||
add_flow_ctx(flow, CTX_CONTENT_LENGTH, content_length) | |||
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0) | |||
else: | |||
bubble_log('responseheaders: no matchers, passing thru') | |||
pass | |||
else: | |||
bubble_log('responseheaders: no '+HEADER_CONTENT_TYPE +' header, passing thru') | |||
pass | |||
else: | |||
bubble_log('responseheaders: no matchers, passing thru') | |||
bubble_log('responseheaders: no '+CTX_BUBBLE_MATCHERS +' in ctx, passing thru') | |||
pass | |||
else: | |||
bubble_log('responseheaders: no '+HEADER_CONTENT_TYPE +' header, passing thru') | |||
pass | |||
else: | |||
bubble_log('responseheaders: no '+HEADER_BUBBLE_MATCHERS +' header, passing thru') | |||
pass |
@@ -1,10 +1,8 @@ | |||
import json | |||
import re | |||
import time | |||
import uuid | |||
from bubble_api import bubble_matchers, bubble_log, HEADER_BUBBLE_MATCHERS, BUBBLE_URI_PREFIX, HEADER_BUBBLE_ABORT, HEADER_BUBBLE_REQUEST_ID | |||
from bubble_api import bubble_matchers, bubble_log, CTX_BUBBLE_MATCHERS, BUBBLE_URI_PREFIX, CTX_BUBBLE_ABORT, CTX_BUBBLE_REQUEST_ID, add_flow_ctx | |||
from bubble_config import bubble_host, bubble_host_alias | |||
from mitmproxy import ctx | |||
# This regex extracts splits the host header into host and port. | |||
# Handles the edge case of IPv6 addresses containing colons. | |||
@@ -29,21 +27,21 @@ class Rerouter: | |||
try: | |||
host = str(host) | |||
except Exception as e: | |||
bubble_log("get_matchers: host "+repr(host)+" could not be decoded, type="+str(type(host))+" e="+repr(e)) | |||
bubble_log('get_matchers: host '+repr(host)+' could not be decoded, type='+str(type(host))+' e='+repr(e)) | |||
return None | |||
if host == bubble_host or host == bubble_host_alias: | |||
bubble_log("get_matchers: request is for bubble itself ("+host+"), not matching") | |||
bubble_log('get_matchers: request is for bubble itself ('+host+'), not matching') | |||
return None | |||
req_id = str(host) + '.' + str(uuid.uuid4()) + '.' + str(time.time()) | |||
resp = bubble_matchers(req_id, remote_addr, flow, host) | |||
if resp and 'abort' in resp and resp['abort'] is not None: | |||
bubble_log("get_matchers: received abort code for remote_addr/host: "+remote_addr+'/'+str(host)+': '+str(resp['abort'])) | |||
bubble_log('get_matchers: received abort code for remote_addr/host: '+remote_addr+'/'+str(host)+': '+str(resp['abort'])) | |||
return {'abort': resp['abort']} | |||
if (not resp) or (not 'matchers' in resp) or (resp['matchers'] is None): | |||
bubble_log("get_matchers: no matchers for remote_addr/host: "+remote_addr+'/'+str(host)) | |||
bubble_log('get_matchers: no matchers for remote_addr/host: '+remote_addr+'/'+str(host)) | |||
return None | |||
matcher_ids = [] | |||
for m in resp['matchers']: | |||
@@ -87,15 +85,15 @@ class Rerouter: | |||
if matcher_response: | |||
if 'abort' in matcher_response and matcher_response['abort'] is not None: | |||
bubble_log('dns_spoofing.request: found abort code: ' + str(matcher_response['abort']) + ', aborting') | |||
flow.request.headers[HEADER_BUBBLE_ABORT] = str(matcher_response['abort']) | |||
add_flow_ctx(flow, CTX_BUBBLE_ABORT, matcher_response['abort']) | |||
elif ('matchers' in matcher_response | |||
and 'request_id' in matcher_response | |||
and len(matcher_response['matchers']) > 0): | |||
req_id = matcher_response['request_id'] | |||
bubble_log("dns_spoofing.request: found request_id: " + req_id + ' with matchers: ' + ' '.join(matcher_response['matchers'])) | |||
flow.request.headers[HEADER_BUBBLE_MATCHERS] = json.dumps(matcher_response['matchers']) | |||
flow.request.headers[HEADER_BUBBLE_REQUEST_ID] = req_id | |||
add_flow_ctx(flow, CTX_BUBBLE_MATCHERS, matcher_response['matchers']) | |||
add_flow_ctx(flow, CTX_BUBBLE_REQUEST_ID, req_id) | |||
else: | |||
bubble_log('dns_spoofing.request: no rules returned, passing thru...') | |||
else: | |||
@@ -8,7 +8,7 @@ mitmdump \ | |||
--listen-port ${MITM_PORT} \ | |||
--showhost \ | |||
--no-http2 \ | |||
--set block_global=false \ | |||
--set block_global=true \ | |||
--set block_private=false \ | |||
--set termlog_verbosity=debug \ | |||
--set flow_detail=3 \ | |||
@@ -0,0 +1,21 @@ | |||
#!/bin/bash | |||
MITM_DIR=${1:?no mitm dir specified} | |||
CERT_NAME=${2:?no cert name specified} | |||
if [[ ! -d "${MITM_DIR}" ]] ; then | |||
echo "mitm dir does not exist or is not a directory: ${MITM_DIR}" | |||
exit 1 | |||
fi | |||
OPTIONS_FILE="${MITM_DIR}/mitmproxy/options.py" | |||
if [[ ! -f "${OPTIONS_FILE}" ]] ; then | |||
echo "options.py not found in mitm dir: ${MITM_DIR}" | |||
exit 1 | |||
fi | |||
if [[ $(cat "${OPTIONS_FILE}" | egrep '^CONF_BASENAME =' | grep "${CERT_NAME}" | wc -l | tr -d ' ') -eq 0 ]] ; then | |||
temp="$(mktemp /tmp/options.py.XXXXXXX)" | |||
cat "${OPTIONS_FILE}" | sed -e 's/^CONF_BASENAME\s*=.*/CONF_BASENAME = "'"${CERT_NAME}"'"/' > "${temp}" | |||
mv "${temp}" "${OPTIONS_FILE}" | |||
fi |
@@ -46,9 +46,6 @@ | |||
- bubble_modify.py | |||
- run_mitmdump.sh | |||
- name: Set ownership of mitmproxy files | |||
shell: chown -R mitmproxy /home/mitmproxy/mitmproxy | |||
- name: Install cert helper scripts | |||
copy: | |||
src: "{{ item }}" | |||
@@ -58,8 +55,15 @@ | |||
mode: 0500 | |||
with_items: | |||
- install_cert.sh | |||
- set_cert_name.sh | |||
- reuse_bubble_mitm_certs.sh | |||
- name: Set the cert name | |||
shell: set_cert_name.sh /home/mitmproxy/mitmproxy {{ server_alias }} | |||
- name: Set ownership of mitmproxy files | |||
shell: chown -R mitmproxy /home/mitmproxy/mitmproxy | |||
- name: Reuse bubble mitm certs if available | |||
shell: reuse_bubble_mitm_certs.sh | |||
@@ -2,4 +2,4 @@ bubble_network = '{{ bubble_network }}' | |||
bubble_port = '{{ admin_port }}'; | |||
bubble_host = '{{ server_name }}' | |||
bubble_host_alias = '{{ server_alias }}' | |||
bubble_ssl_port = '{{ ssl_port }}' | |||
bubble_ssl_port = {{ ssl_port }} |
@@ -24,6 +24,9 @@ public class FilterHttpRequest { | |||
@Getter @Setter private Account account; | |||
@Getter @Setter private String contentType; | |||
@Getter @Setter private Long contentLength; | |||
public boolean hasContentLength () { return contentLength != null; } | |||
public boolean hasMatcher (String matcherId) { | |||
if (empty(matcherId) || !hasMatchers()) return false; | |||
return matchersResponse.getMatchers().stream().anyMatch(m -> m.getUuid().equals(matcherId)); | |||
@@ -39,6 +39,7 @@ import static org.cobbzilla.util.http.HttpContentTypes.APPLICATION_JSON; | |||
import static org.cobbzilla.util.json.JsonUtil.COMPACT_MAPPER; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.network.NetworkUtil.isLocalIpv4; | |||
import static org.cobbzilla.util.string.StringUtil.trimQuotes; | |||
import static org.cobbzilla.wizard.cache.redis.RedisService.EX; | |||
import static org.cobbzilla.wizard.model.NamedEntity.names; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.*; | |||
@@ -259,7 +260,8 @@ public class FilterHttpResource { | |||
@Context ContainerRequest request, | |||
@PathParam("requestId") String requestId, | |||
@QueryParam("encoding") String contentEncoding, | |||
@QueryParam("contentType") String contentType, | |||
@QueryParam("type") String contentType, | |||
@QueryParam("length") Long contentLength, | |||
@QueryParam("last") Boolean last) throws IOException { | |||
// only mitmproxy is allowed to call us, and this should always be a local address | |||
@@ -269,6 +271,7 @@ public class FilterHttpResource { | |||
return forbidden(); | |||
} | |||
requestId = trimQuotes(requestId); | |||
if (empty(requestId)) { | |||
if (log.isDebugEnabled()) log.debug("filterHttp: no requestId provided, returning passthru"); | |||
return passthru(request); | |||
@@ -292,12 +295,12 @@ public class FilterHttpResource { | |||
// mitmproxy provides Content-Length, which helps us right-size the input byte buffer | |||
final String contentLengthHeader = req.getHeader(CONTENT_LENGTH); | |||
Integer contentLength; | |||
Integer chunkLength; | |||
try { | |||
contentLength = empty(contentLengthHeader) ? null : Integer.parseInt(contentLengthHeader); | |||
chunkLength = empty(contentLengthHeader) ? null : Integer.parseInt(contentLengthHeader); | |||
} catch (Exception e) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"error parsing Content-Length ("+contentLengthHeader+"): "+shortError(e)); | |||
contentLength = null; | |||
chunkLength = null; | |||
} | |||
final FilterMatchersResponse matchersResponse = getMatchersResponseByRequestId(requestId); | |||
@@ -368,7 +371,8 @@ public class FilterHttpResource { | |||
.setDevice(device) | |||
.setAccount(caller) | |||
.setEncoding(encoding) | |||
.setContentType(contentType); | |||
.setContentType(contentType) | |||
.setContentLength(contentLength); | |||
if (log.isDebugEnabled()) log.trace(prefix+"start filterRequest="+json(filterRequest, COMPACT_MAPPER)); | |||
getActiveRequestCache().set(requestId, json(filterRequest, COMPACT_MAPPER), EX, ACTIVE_REQUEST_TIMEOUT); | |||
} else { | |||
@@ -381,7 +385,7 @@ public class FilterHttpResource { | |||
} | |||
} | |||
return ruleEngine.applyRulesToChunkAndSendResponse(request, filterRequest, contentLength, isLast); | |||
return ruleEngine.applyRulesToChunkAndSendResponse(request, filterRequest, chunkLength, isLast); | |||
} | |||
public Response passthru(@Context ContainerRequest request) { return ruleEngine.passthru(request); } | |||
@@ -25,9 +25,8 @@ import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; | |||
@Slf4j | |||
class ActiveStreamState { | |||
public static final int DEFAULT_BYTE_BUFFER_SIZE = (int) (8 * Bytes.KB); | |||
public static final int MAX_BYTE_BUFFER_SIZE = (int) (64 * Bytes.KB); | |||
public static final int MAX_PENDING_STREAMS = 5; | |||
public static final long DEFAULT_BYTE_BUFFER_SIZE = (8 * Bytes.KB); | |||
public static final long MAX_BYTE_BUFFER_SIZE = (64 * Bytes.KB); | |||
private FilterHttpRequest request; | |||
private String requestId; | |||
@@ -48,27 +47,32 @@ class ActiveStreamState { | |||
private String prefix(String s) { return s+"("+requestId+"): "; } | |||
public static byte[] toBytes(InputStream in, Integer contentLength) throws IOException { | |||
public static byte[] toBytes(InputStream in, Integer chunkLength) throws IOException { | |||
if (in == null) return EMPTY_BYTE_ARRAY; | |||
final ByteArrayOutputStream bout = new ByteArrayOutputStream(contentLength == null ? DEFAULT_BYTE_BUFFER_SIZE : Math.min(contentLength, MAX_BYTE_BUFFER_SIZE)); | |||
final ByteArrayOutputStream bout = new ByteArrayOutputStream((int) (chunkLength == null ? DEFAULT_BYTE_BUFFER_SIZE : Math.min(chunkLength, MAX_BYTE_BUFFER_SIZE))); | |||
IOUtils.copyLarge(in, bout); | |||
return bout.toByteArray(); | |||
} | |||
public void addChunk(InputStream in, Integer contentLength) throws IOException { | |||
final byte[] chunk = toBytes(in, contentLength); | |||
if (log.isDebugEnabled()) log.debug(prefix("addChunk")+"adding "+chunk.length+" bytes"); | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk)); | |||
output = outputStream(firstRule.getDriver().filterResponse(request, inputStream(multiStream))); | |||
public void addChunk(InputStream in, Integer chunkLength) throws IOException { | |||
if (request.hasContentLength() && totalBytesWritten + chunkLength >= request.getContentLength()) { | |||
if (log.isDebugEnabled()) log.debug(prefix("addChunk")+"detected lastChunk, calling addLastChunk"); | |||
addLastChunk(in, chunkLength); | |||
} else { | |||
multiStream.addStream(new ByteArrayInputStream(chunk)); | |||
final byte[] chunk = toBytes(in, chunkLength); | |||
if (log.isDebugEnabled()) log.debug(prefix("addChunk") + "adding " + chunk.length + " bytes"); | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk)); | |||
output = outputStream(firstRule.getDriver().filterResponse(request, inputStream(multiStream))); | |||
} else { | |||
multiStream.addStream(new ByteArrayInputStream(chunk)); | |||
} | |||
} | |||
} | |||
public void addLastChunk(InputStream in, Integer contentLength) throws IOException { | |||
final byte[] chunk = toBytes(in, contentLength); | |||
public void addLastChunk(InputStream in, Integer chunkLength) throws IOException { | |||
final byte[] chunk = toBytes(in, chunkLength); | |||
if (log.isDebugEnabled()) log.debug(prefix("addLastChunk")+"adding "+chunk.length+" bytes"); | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
@@ -88,26 +92,15 @@ class ActiveStreamState { | |||
return output; | |||
} | |||
// if we have 5 streams pending, send *something* now, or MITM proxy will not send us anything more | |||
final int bytesToRead; | |||
if (multiStream.pendingStreamCount() >= MAX_PENDING_STREAMS) { | |||
// do we have at least 8k to send? | |||
if (totalBytesWritten - totalBytesRead >= 8*Bytes.KB) { | |||
bytesToRead = (int) (8*Bytes.KB); | |||
if (log.isDebugEnabled()) log.debug(prefix + "pendingStreamCount ("+multiStream.pendingStreamCount()+") >= "+MAX_PENDING_STREAMS+" and > 8K bytes available, returning everything to read: " + bytesToRead+" bytes"); | |||
} else { | |||
// send 20% of what we have | |||
bytesToRead = (int) (totalBytesWritten - totalBytesRead)/5; | |||
if (log.isDebugEnabled()) log.debug(prefix + "pendingStreamCount ("+multiStream.pendingStreamCount()+") >= "+MAX_PENDING_STREAMS+" and < 8K bytes available, returning 20% read: " + bytesToRead+" bytes"); | |||
} | |||
} else { | |||
// try to read as many bytes as we have written, and have not yet read, less a safety buffer | |||
bytesToRead = (int) (totalBytesWritten - totalBytesRead - (2 * MAX_BYTE_BUFFER_SIZE)); | |||
if (bytesToRead < 0) { | |||
// we shouldn't try to read yet, less than 1024 bytes have been written | |||
if (log.isDebugEnabled()) log.debug(prefix + "not enough data written (bytesToRead=" + bytesToRead + "), can't read anything yet"); | |||
return NullInputStream.instance; | |||
} | |||
if (request.hasContentLength() && totalBytesWritten >= request.getContentLength()) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"all bytes written, returning full output"); | |||
return output; | |||
} | |||
final int bytesToRead = (int) (totalBytesWritten - totalBytesRead - (2 * MAX_BYTE_BUFFER_SIZE)); | |||
if (bytesToRead < 0) { | |||
// we shouldn't try to read yet, less than 1024 bytes have been written | |||
if (log.isDebugEnabled()) log.debug(prefix + "not enough data written (bytesToRead=" + bytesToRead + "), can't read anything yet"); | |||
return NullInputStream.instance; | |||
} | |||
if (log.isDebugEnabled()) log.debug(prefix+"trying to read "+bytesToRead+" bytes from output="+output.getClass().getSimpleName()); | |||
@@ -138,7 +138,7 @@ public class StandardRuleEngineService implements RuleEngineService { | |||
public Response applyRulesToChunkAndSendResponse(ContainerRequest request, | |||
FilterHttpRequest filterRequest, | |||
Integer contentLength, | |||
Integer chunkLength, | |||
boolean last) throws IOException { | |||
final String prefix = "applyRulesToChunkAndSendResponse("+filterRequest.getId()+"): "; | |||
if (!filterRequest.hasMatchers()) { | |||
@@ -151,10 +151,10 @@ public class StandardRuleEngineService implements RuleEngineService { | |||
k -> new ActiveStreamState(filterRequest, initRules(filterRequest))); | |||
if (last) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding LAST stream"); | |||
state.addLastChunk(request.getEntityStream(), contentLength); | |||
state.addLastChunk(request.getEntityStream(), chunkLength); | |||
} else { | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding a stream"); | |||
state.addChunk(request.getEntityStream(), contentLength); | |||
state.addChunk(request.getEntityStream(), chunkLength); | |||
} | |||
if (log.isDebugEnabled()) log.debug(prefix+"sending as much filtered data as we can right now (which may be nothing)"); | |||
@@ -43,6 +43,7 @@ | |||
<logger name="bubble.service.stream.StandardRuleEngineService" level="DEBUG" /> | |||
<logger name="bubble.service.stream.ActiveStreamState" level="DEBUG" /> | |||
<logger name="bubble.resources.stream" level="DEBUG" /> | |||
<!-- <logger name="bubble.resources.stream.FilterHttpResource" level="TRACE" />--> | |||
<logger name="bubble.service.stream" level="INFO" /> | |||
<logger name="bubble.resources.message" level="INFO" /> | |||
<logger name="bubble.app.analytics" level="DEBUG" /> | |||
@@ -1 +1 @@ | |||
Subproject commit cab44ebab3b60ac9a848add899c1db156b904c3d | |||
Subproject commit 14be79545b1b6dfb1a3d7e5ecd85428109155c7f |