Ver a proveniência

add tooling to replay streams for debugging. pass bufsiz to stream wrappers. add initial fb shadowban js

pull/44/head
Jonathan Cobb há 4 anos
ascendente
cometimento
44f0f214f4
9 ficheiros alterados com 219 adições e 43 eliminações
  1. +26
    -0
      bin/bpyvenv.sh
  2. +61
    -0
      bin/breplay_stream.py
  3. +2
    -2
      bubble-server/src/main/java/bubble/service/stream/ActiveStreamState.java
  4. +2
    -1
      bubble-server/src/main/java/bubble/service/stream/StandardRuleEngineService.java
  5. +2
    -0
      bubble-server/src/main/resources/ansible/roles/mitmproxy/templates/bubble_config.py.j2
  6. +1
    -1
      bubble-server/src/main/resources/bubble-config.yml
  7. +72
    -0
      bubble-server/src/main/resources/bubble/rule/social/block/site/FB.js.hbs
  8. +52
    -38
      bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py
  9. +1
    -1
      utils/cobbzilla-utils

+ 26
- 0
bin/bpyvenv.sh Ver ficheiro

@@ -0,0 +1,26 @@
#!/bin/bash
#
# Set up python venv to run scripts in bin
#
function die {
echo 1>&2 "$0: ${1}"
exit 1
}

BUBBLE_HOME="$(cd $(dirname ${0})/.. && pwd)"

cd ${BUBBLE_HOME} || die "Error changing to ${BUBBLE_HOME} dir"

if [[ ! -d "${BUBBLE_HOME}/.venv" ]] ; then
python3 -m venv ./.venv || die "Error creating venv"
fi
. ${BUBBLE_HOME}/.venv/bin/activate || die "Error activating bubble venv"
python3 -m pip install requests || die "Error installing pip packages"

if [[ ! -z "${1}" ]] ; then
script=${1}
shift
echo python3 "${script}" "${@}"
else
echo "venv successfully set up"
fi

+ 61
- 0
bin/breplay_stream.py Ver ficheiro

@@ -0,0 +1,61 @@
#!/usr/bin/python3
import glob
import json
import requests
import sys

HEADER_FILTER_PASSTHRU = 'X-Bubble-Passthru'


def log (message):
print(message, file=sys.stderr, flush=True)


def replay_stream (prefix, out):
url_files = glob.glob(prefix+'*.url')
if url_files is None or len(url_files) == 0:
log('No files found matching prefix: '+prefix)
return

url_files.sort()
for u in url_files:
chunk_file = replace_suffix(u, '.data')
headers_file = replace_suffix(u, '.headers.json')
with open(u, mode='r') as f:
url = f.read()
with open(headers_file, mode='r') as f:
headers = json.load(f)
with open(chunk_file, mode='rb') as f:
chunk = f.read()
log('sending '+str(len(chunk))+' bytes to '+url)
try:
response_data = replay_request(url, headers, chunk)
except Exception as e:
log('error sending filter request: '+repr(e))
raise e
log('received '+str(len(response_data))+' bytes')
if len(response_data) > 0:
out.write(response_data)


def replace_suffix(f, suffix):
return f[0:f.rfind('.')] + suffix


def replay_request(url, headers, chunk):
response = requests.post(url, data=chunk, headers=headers)
if not response.ok:
log('replay_request: Error fetching ' + url + ', HTTP status ' + str(response.status_code))
return b''

elif HEADER_FILTER_PASSTHRU in response.headers:
log('replay_request: server returned X-Bubble-Passthru, not filtering subsequent requests')
return chunk

return response.content


if __name__ == "__main__":
with open('/tmp/replay_response', mode='wb') as out:
replay_stream(sys.argv[1], out)
out.close()

+ 2
- 2
bubble-server/src/main/java/bubble/service/stream/ActiveStreamState.java Ver ficheiro

@@ -32,7 +32,7 @@ import static org.cobbzilla.util.io.NullInputStream.NULL_STREAM;
@Slf4j
class ActiveStreamState {

public static final long DEFAULT_BYTE_BUFFER_SIZE = (8 * Bytes.KB);
public static final int DEFAULT_BYTE_BUFFER_SIZE = (int) (8 * Bytes.KB);
public static final long MAX_BYTE_BUFFER_SIZE = (64 * Bytes.KB);

// do not wrap input with encoding stream until we have received at least this many bytes
@@ -194,7 +194,7 @@ class ActiveStreamState {
return baseStream;
}
try {
final InputStream wrapped = encoding.wrapInput(baseStream);
final InputStream wrapped = encoding.wrapInput(baseStream, DEFAULT_BYTE_BUFFER_SIZE);
if (log.isDebugEnabled()) log.debug(prefix+"returning baseStream wrapped in " + wrapped.getClass().getSimpleName());
return wrapped;
} catch (IOException e) {


+ 2
- 1
bubble-server/src/main/java/bubble/service/stream/StandardRuleEngineService.java Ver ficheiro

@@ -85,6 +85,7 @@ public class StandardRuleEngineService implements RuleEngineService {
@Autowired private RedisService redis;

public static final long MATCHERS_CACHE_TIMEOUT = MINUTES.toSeconds(15);
// public static final long MATCHERS_CACHE_TIMEOUT = HOURS.toSeconds(15); // extend timeout when debugging replayed streams
@Getter(lazy=true) private final RedisService matchersCache = redis.prefixNamespace(getClass().getSimpleName()+".matchers");

public FilterMatchDecision preprocess(FilterMatchersRequest filter,
@@ -167,7 +168,7 @@ public class StandardRuleEngineService implements RuleEngineService {
if (log.isDebugEnabled()) log.debug(prefix+"no request modifiers, returning passthru");
return passthru(request);
} else {
log.info(prefix+" applying matchers: "+filterRequest.getMatcherNames());
if (log.isDebugEnabled()) log.debug(prefix+" applying matchers: "+filterRequest.getMatcherNames()+" to uri: "+filterRequest.getMatchersResponse().getRequest().getUri());
}

// have we seen this request before?


+ 2
- 0
bubble-server/src/main/resources/ansible/roles/mitmproxy/templates/bubble_config.py.j2 Ver ficheiro

@@ -8,3 +8,5 @@ bubble_sage_ip4 = '{{ sage_ip4 }}'
bubble_sage_ip6 = '{{ sage_ip6 }}'
cert_validation_host = '{{ cert_validation_host }}'
debug_capture_fqdn = None
debug_stream_fqdn = None
debug_stream_uri = None

+ 1
- 1
bubble-server/src/main/resources/bubble-config.yml Ver ficheiro

@@ -13,7 +13,7 @@ testMode: {{#exists BUBBLE_TEST_MODE}}{{BUBBLE_TEST_MODE}}{{else}}false{{/exists

database:
driver: org.postgresql.Driver
url: jdbc:postgresql://127.0.0.1:5432/bubble
url: jdbc:postgresql://127.0.0.1:5432/{{#exists BUBBLE_DB_NAME}}{{BUBBLE_DB_NAME}}{{else}}bubble{{/exists}}
user: bubble
password: '{{#exists BUBBLE_PG_PASSWORD}}{{BUBBLE_PG_PASSWORD}}{{else}}{{key_file '.BUBBLE_PG_PASSWORD'}}{{/exists}}'



+ 72
- 0
bubble-server/src/main/resources/bubble/rule/social/block/site/FB.js.hbs Ver ficheiro

@@ -0,0 +1,72 @@
function {{JS_PREFIX}}_getElementsByXPath(xpath, parent) {
let results = [];
let query = document.evaluate(xpath, parent || document, null, XPathResult.ORDERED_NODE_SNAPSHOT_TYPE, null);
for (let i = 0, length = query.snapshotLength; i < length; ++i) {
results.push(query.snapshotItem(i));
}
return results;
}

const {{JS_PREFIX}}_site_host = location.protocol + '//' + window.location.hostname + '/';

function {{JS_PREFIX}}_apply_blocks(blocked_users) {
let articles = {{JS_PREFIX}}_getElementsByXPath('//div[@role="article" and @aria-posinset]');
let sitePrefix = {{JS_PREFIX}}_site_host;
for (let i=0; i<articles.length; i++) {
let article = articles[i];
let authorLink = article.getElementsByTagName('a')[0];
let authorHref = authorLink.href;
if (!authorHref.href.startsWith(sitePrefix)) continue;
let authorName = authorHref.substring(sitePrefix.length);
let qPos = authorName.indexOf('?');
if (qPos !== -1) {
authorName = authorName.substring(0, qPos);
authorLink.href = sitePrefix + authorName; // todo: clean up all links, not just this one
}
if (blocked_users !== null && blocked_users.includes(authorName)) {
console.log('removing post by author: ' + authorName);
try {
article.className = article.className + ' _bubble_blocked';
node.style = 'display: none';
} catch (e) {
console.log('error removing post by author: ' + authorName+': '+e);
}

} else {
// have we visited this article before?
if (article.className.indexOf('{{JS_PREFIX}}_bubble_block') === -1) {
console.log('VISITING article node for author: '+authorName);
const spans = authorLink.parentNode.nextSibling.getElementsByTagName('span');
let sepSpan = null;
for (let i=0; i<spans.length; i++) {
let s = spans[i];
if (s.innerHTML === ' · ') {
sepSpan = s;
break;
}
}
if (sepSpan === null) {
console.log('no insertion point found');
continue;
}
const blockControl = document.createElement('div');
blockControl.style.textAlign = 'center';
article.className = article.className += ' {{JS_PREFIX}}_bubble_block';

const line = document.createElement('hr');
const imgHolder = document.createElement('img');
imgHolder.src = '/__bubble/api/filter/assets/{{BUBBLE_REQUEST_ID}}/UserBlocker/icon?raw=true';
imgHolder.width = 16;

const blockLink = document.createElement('a');
blockLink.addEventListener("click", function (event) { {{JS_PREFIX}}_block_user(authorName); return false; });
blockLink.appendChild(line);
blockLink.appendChild(imgHolder);
blockControl.appendChild(blockLink);

sepSpan.appendChild(blockControl);
sepSpan.appendChild(document.createTextNode(' · '));
}
}
}
}

+ 52
- 38
bubble-server/src/main/resources/packer/roles/mitmproxy/files/bubble_modify.py Ver ficheiro

@@ -1,13 +1,14 @@
#
# Copyright (c) 2020 Bubble, Inc. All rights reserved. For personal (non-commercial) use, see license: https://getbubblenow.com/bubble-license/
#
import json
import re
import requests
import urllib
import uuid
import traceback
from mitmproxy.net.http import Headers
from bubble_config import bubble_port, bubble_host_alias, debug_capture_fqdn
from bubble_config import bubble_port, bubble_host_alias, debug_capture_fqdn, debug_stream_fqdn, debug_stream_uri
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, CTX_BUBBLE_LOCATION, BUBBLE_URI_PREFIX, \
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, bubble_log, get_flow_ctx, add_flow_ctx, \
HEADER_USER_AGENT, HEADER_FILTER_PASSTHRU, HEADER_CONTENT_SECURITY_POLICY, REDIS, redis_set, parse_host_header
@@ -24,6 +25,7 @@ STANDARD_FILTER_HEADERS = {HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY}
REDIS_FILTER_PASSTHRU_PREFIX = '__chunk_filter_pass__'
REDIS_FILTER_PASSTHRU_DURATION = 600

DEBUG_STREAM_COUNTERS = {}

def add_csp_part(new_csp, part):
if len(new_csp) > 0:
@@ -64,26 +66,12 @@ def ensure_bubble_script_csp(csp):

def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, content_type=None, content_length=None, csp=None):
if debug_capture_fqdn:
host = None
if flow.client_conn.tls_established:
sni = flow.client_conn.connection.get_servername()
if sni:
host = str(sni)
else:
host_header = flow.request.host_header
if host_header:
m = parse_host_header.match(host_header)
if m:
host = str(m.group("host").strip("[]"))
if host:
if host.startswith("b'"):
host = host[2:-1]
if host in debug_capture_fqdn:
bubble_log('filter_chunk: debug_capture_fqdn detected, capturing: '+host)
f = open('/tmp/bubble_capture_'+req_id, mode='ab', buffering=0)
f.write(chunk)
f.close()
return chunk
if debug_capture_fqdn in req_id:
bubble_log('filter_chunk: debug_capture_fqdn detected, capturing: '+debug_capture_fqdn)
f = open('/tmp/bubble_capture_'+req_id, mode='ab', buffering=0)
f.write(chunk)
f.close()
return chunk

# should we just passthru?
redis_passthru_key = REDIS_FILTER_PASSTHRU_PREFIX + flow.request.method + '~~~' + user_agent + ':' + flow.request.url
@@ -97,8 +85,7 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c
params_added = False
if chunk and content_type:
params_added = True
url = (url
+ '?type=' + urllib.parse.quote_plus(content_type))
url = url + '?type=' + urllib.parse.quote_plus(content_type)
if content_encoding:
url = url + '&encoding=' + urllib.parse.quote_plus(content_encoding)
if content_length:
@@ -111,15 +98,33 @@ def filter_chunk(flow, chunk, req_id, user_agent, last, content_encoding=None, c

if csp:
# bubble_log('filter_chunk: url='+url+' (csp='+csp+')')
bubble_log('filter_chunk: url='+url+' (with csp)')
bubble_log('filter_chunk: url='+url+' (with csp) (last='+str(last)+')')
filter_headers = {
HEADER_CONTENT_TYPE: CONTENT_TYPE_BINARY,
HEADER_CONTENT_SECURITY_POLICY: csp
}
else:
bubble_log('filter_chunk: url='+url+' (no csp)')
bubble_log('filter_chunk: url='+url+' (no csp) (last='+str(last)+')')
filter_headers = STANDARD_FILTER_HEADERS

if debug_stream_fqdn and debug_stream_uri and debug_stream_fqdn in req_id and flow.request.path == debug_stream_uri:
if req_id in DEBUG_STREAM_COUNTERS:
count = DEBUG_STREAM_COUNTERS[req_id] + 1
else:
count = 0
DEBUG_STREAM_COUNTERS[req_id] = count
bubble_log('filter_chunk: debug_stream detected, capturing: '+debug_stream_fqdn)
f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.data', mode='wb', buffering=0)
if chunk is not None:
f.write(chunk)
f.close()
f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.headers.json', mode='w')
f.write(json.dumps(filter_headers))
f.close()
f = open('/tmp/bubble_stream_'+req_id+'_chunk'+"{:04d}".format(count)+'.url', mode='w')
f.write(url)
f.close()

response = requests.post(url, data=chunk, headers=filter_headers)
if not response.ok:
err_message = 'filter_chunk: Error fetching ' + url + ', HTTP status ' + str(response.status_code)
@@ -174,8 +179,9 @@ def send_bubble_response(response):

def responseheaders(flow):

if flow.request.path and flow.request.path.startswith(BUBBLE_URI_PREFIX):
uri = 'http://127.0.0.1:' + bubble_port + '/' + flow.request.path[len(BUBBLE_URI_PREFIX):]
path = flow.request.path
if path and path.startswith(BUBBLE_URI_PREFIX):
uri = 'http://127.0.0.1:' + bubble_port + '/' + path[len(BUBBLE_URI_PREFIX):]
bubble_log('responseheaders: sending special bubble request to '+uri)
headers = {
'Accept' : 'application/json',
@@ -203,19 +209,27 @@ def responseheaders(flow):
if abort_code is not None:
abort_location = get_flow_ctx(flow, CTX_BUBBLE_LOCATION)
if abort_location is not None:
bubble_log('responseheaders: redirecting request with HTTP status '+str(abort_code)+' to: '+abort_location)
bubble_log('responseheaders: redirecting request with HTTP status '+str(abort_code)+' to: '+abort_location+', path was: '+path)
flow.response.headers = Headers()
flow.response.headers[HEADER_LOCATION] = abort_location
flow.response.status_code = abort_code
flow.response.stream = lambda chunks: []
else:
bubble_log('responseheaders: aborting request with HTTP status '+str(abort_code))
bubble_log('responseheaders: aborting request with HTTP status '+str(abort_code)+', path was: '+path)
flow.response.headers = Headers()
flow.response.status_code = abort_code
flow.response.stream = lambda chunks: []

elif flow.response.status_code // 100 != 2:
bubble_log('responseheaders: response had HTTP status '+str(flow.response.status_code)+', returning as-is')
bubble_log('responseheaders: response had HTTP status '+str(flow.response.status_code)+', returning as-is: '+path)
pass

elif flow.response.headers is None or len(flow.response.headers) == 0:
bubble_log('responseheaders: response had HTTP status '+str(flow.response.status_code)+', and NO response headers, returning as-is: '+path)
pass

elif HEADER_CONTENT_LENGTH in flow.response.headers and flow.response.headers[HEADER_CONTENT_LENGTH] == "0":
bubble_log('responseheaders: response had HTTP status '+str(flow.response.status_code)+', and '+HEADER_CONTENT_LENGTH+' was zero, returning as-is: '+path)
pass

else:
@@ -239,10 +253,10 @@ def responseheaders(flow):
typeRegex = '^text/html.*'
if re.match(typeRegex, content_type):
any_content_type_matches = True
bubble_log(prefix+'found at least one matcher for content_type ('+content_type+'), filtering')
bubble_log(prefix+'found at least one matcher for content_type ('+content_type+'), filtering: '+path)
break
if not any_content_type_matches:
bubble_log(prefix+'no matchers for content_type ('+content_type+'), passing thru')
bubble_log(prefix+'no matchers for content_type ('+content_type+'), passing thru: '+path)
return

if HEADER_CONTENT_ENCODING in flow.response.headers:
@@ -257,7 +271,7 @@ def responseheaders(flow):
csp = None

content_length_value = flow.response.headers.pop(HEADER_CONTENT_LENGTH, None)
bubble_log(prefix+'content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type))
# bubble_log(prefix+'content_encoding='+repr(content_encoding) + ', content_type='+repr(content_type))
flow.response.stream = bubble_modify(flow, req_id, user_agent, content_encoding, content_type, csp)
if content_length_value:
flow.response.headers['transfer-encoding'] = 'chunked'
@@ -268,10 +282,10 @@ def responseheaders(flow):
if hasattr(ctx, 'ctx'):
ctx = ctx.ctx
else:
bubble_log(prefix+'error finding server_conn. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx)))
bubble_log(prefix+'error finding server_conn for path '+path+'. last ctx has no further ctx. type='+str(type(ctx))+' vars='+str(vars(ctx)))
return
if not hasattr(ctx, 'server_conn'):
bubble_log(prefix+'error finding server_conn. ctx type='+str(type(ctx))+' vars='+str(vars(ctx)))
bubble_log(prefix+'error finding server_conn for path '+path+'. ctx type='+str(type(ctx))+' vars='+str(vars(ctx)))
return
content_length = int(content_length_value)
ctx.server_conn.rfile.fake_chunks = content_length
@@ -279,11 +293,11 @@ def responseheaders(flow):
add_flow_ctx(flow, CTX_CONTENT_LENGTH_SENT, 0)

else:
bubble_log(prefix+'no matchers, passing thru')
bubble_log(prefix+'no matchers, passing thru: '+path)
pass
else:
bubble_log(prefix+'no '+HEADER_CONTENT_TYPE +' header, passing thru')
bubble_log(prefix+'no '+HEADER_CONTENT_TYPE +' header, passing thru: '+path)
pass
else:
bubble_log(prefix+'no '+CTX_BUBBLE_MATCHERS +' in ctx, passing thru')
bubble_log(prefix+'no '+CTX_BUBBLE_MATCHERS +' in ctx, passing thru: '+path)
pass

+ 1
- 1
utils/cobbzilla-utils

@@ -1 +1 @@
Subproject commit 74230fcc51ee1177a898de135a30b5b70a760e12
Subproject commit aecde1f41f9bbb64e2043634677b346756fb75a8

Carregando…
Cancelar
Guardar