Pārlūkot izejas kodu

refactor and simplify filtering api

tags/v0.6.0
Jonathan Cobb pirms 5 gadiem
vecāks
revīzija
ffee1e747e
14 mainītis faili ar 191 papildinājumiem un 118 dzēšanām
  1. +1
    -1
      automation/roles/mitmproxy/files/bubble_api.py
  2. +6
    -6
      automation/roles/mitmproxy/files/bubble_modify.py
  3. +10
    -6
      automation/roles/mitmproxy/files/dns_spoofing.py
  4. +2
    -2
      bubble-server/src/main/java/bubble/ApiConstants.java
  5. +11
    -7
      bubble-server/src/main/java/bubble/resources/stream/FilterHttpRequest.java
  6. +84
    -40
      bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java
  7. +5
    -2
      bubble-server/src/main/java/bubble/resources/stream/FilterMatchersRequest.java
  8. +1
    -0
      bubble-server/src/main/java/bubble/resources/stream/FilterMatchersResponse.java
  9. +19
    -11
      bubble-server/src/main/java/bubble/resources/stream/ReverseProxyResource.java
  10. +1
    -1
      bubble-server/src/main/java/bubble/rule/analytics/TrafficAnalyticsRuleDriver.java
  11. +2
    -4
      bubble-server/src/main/java/bubble/rule/analytics/TrafficRecord.java
  12. +12
    -9
      bubble-server/src/main/java/bubble/rule/bblock/BubbleBlockRuleDriver.java
  13. +36
    -29
      bubble-server/src/main/java/bubble/service/stream/StandardRuleEngineService.java
  14. +1
    -0
      bubble-server/src/main/resources/logback.xml

+ 1
- 1
automation/roles/mitmproxy/files/bubble_api.py Parādīt failu

@@ -47,7 +47,7 @@ def bubble_matchers (req_id, remote_addr, flow, host):
'referer': referer,
'remoteAddr': remote_addr
}
response = requests.post('http://127.0.0.1:'+bubble_port+'/api/filter/matchers', headers=headers, json=data)
response = requests.post('http://127.0.0.1:'+bubble_port+'/api/filter/matchers/'+req_id, headers=headers, json=data)
if response.ok:
return response.json()
bubble_log('bubble_matchers response not OK, returning empty matchers array: '+str(response.status_code)+' / '+repr(response.text))


+ 6
- 6
automation/roles/mitmproxy/files/bubble_modify.py Parādīt failu

@@ -91,6 +91,7 @@ def responseheaders(flow):
elif HEADER_BUBBLE_MATCHERS in flow.request.headers and HEADER_BUBBLE_REQUEST_ID in flow.request.headers:
req_id = flow.request.headers[HEADER_BUBBLE_REQUEST_ID]
matchers = flow.request.headers[HEADER_BUBBLE_MATCHERS]
bubble_log('responseheaders: req_id='+req_id+' with matchers: '+matchers)
if HEADER_CONTENT_TYPE in flow.response.headers:
content_type = flow.response.headers[HEADER_CONTENT_TYPE]
if matchers:
@@ -98,16 +99,15 @@ def responseheaders(flow):
content_encoding = flow.response.headers[HEADER_CONTENT_ENCODING]
else:
content_encoding = None
bubble_log("responseheaders: content_encoding="+repr(content_encoding)
+ ", content_type="+repr(content_type)
+", req_id=" + req_id)
bubble_log('responseheaders: req_id='+req_id+' content_encoding='+repr(content_encoding)
+ ', content_type='+repr(content_type))
flow.response.stream = bubble_modify(req_id, content_encoding, content_type)
else:
bubble_log("responseheaders: no matchers, passing thru")
bubble_log('responseheaders: no matchers, passing thru')
pass
else:
bubble_log("responseheaders: no "+HEADER_CONTENT_TYPE +" header, passing thru")
bubble_log('responseheaders: no '+HEADER_CONTENT_TYPE +' header, passing thru')
pass
else:
bubble_log("responseheaders: no "+HEADER_BUBBLE_MATCHERS +" header, passing thru")
bubble_log('responseheaders: no '+HEADER_BUBBLE_MATCHERS +' header, passing thru')
pass

+ 10
- 6
automation/roles/mitmproxy/files/dns_spoofing.py Parādīt failu

@@ -26,20 +26,23 @@ class Rerouter:
try:
host = host.decode()
except (UnicodeDecodeError, AttributeError):
bubble_log("get_matchers: host "+str(host)+" could not be decoded, type="+str(type(host)))
return None
try:
host = str(host)
except Exception as e:
bubble_log("get_matchers: host "+repr(host)+" could not be decoded, type="+str(type(host))+" e="+repr(e))
return None

if host == bubble_host or host == bubble_host_alias:
bubble_log("get_matchers: request is for bubble itself ("+host+"), not matching")
return None

req_id = str(uuid.uuid4()) + '.' + str(time.time())
req_id = str(host) + '.' + str(uuid.uuid4()) + '.' + str(time.time())
resp = bubble_matchers(req_id, remote_addr, flow, host)
if resp and 'abort' in resp and resp['abort'] is not None:
bubble_log("get_matchers: received abort code for remote_addr/host: "+remote_addr+'/'+str(host)+': '+str(resp['abort']))
return {'abort': resp['abort']}

if (not resp) or (not 'matchers' in resp):
if (not resp) or (not 'matchers' in resp) or (resp['matchers'] is None):
bubble_log("get_matchers: no matchers for remote_addr/host: "+remote_addr+'/'+str(host))
return None
matcher_ids = []
@@ -87,9 +90,10 @@ class Rerouter:
elif ('matchers' in matcher_response
and 'request_id' in matcher_response
and len(matcher_response['matchers']) > 0):
bubble_log("dns_spoofing.request: found request_id: " + ' '.join(matcher_response['matchers']))
req_id = matcher_response['request_id']
bubble_log("dns_spoofing.request: found request_id: " + req_id + ' with matchers: ' + ' '.join(matcher_response['matchers']))
flow.request.headers[HEADER_BUBBLE_MATCHERS] = json.dumps(matcher_response['matchers'])
flow.request.headers[HEADER_BUBBLE_REQUEST_ID] = matcher_response['request_id']
flow.request.headers[HEADER_BUBBLE_REQUEST_ID] = req_id
else:
bubble_log('dns_spoofing.request: no rules returned, passing thru...')
else:


+ 2
- 2
bubble-server/src/main/java/bubble/ApiConstants.java Parādīt failu

@@ -21,8 +21,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
import static org.apache.http.HttpHeaders.ACCEPT_LANGUAGE;
import static org.apache.http.HttpHeaders.USER_AGENT;
import static org.apache.http.HttpHeaders.*;
import static org.cobbzilla.util.daemon.ZillaRuntime.die;
import static org.cobbzilla.util.io.FileUtil.abs;
import static org.cobbzilla.util.io.StreamUtil.stream2string;
@@ -234,6 +233,7 @@ public class ApiConstants {
}

public static String getUserAgent(ContainerRequest ctx) { return ctx.getHeaderString(USER_AGENT); }
public static String getReferer(ContainerRequest ctx) { return ctx.getHeaderString(REFERER); }

public static final String DETECT_LOCALE = "detect";



+ 11
- 7
bubble-server/src/main/java/bubble/resources/stream/FilterHttpRequest.java Parādīt failu

@@ -1,15 +1,18 @@
package bubble.resources.stream;

import bubble.model.account.Account;
import bubble.model.app.AppMatcher;
import bubble.model.device.Device;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.commons.lang.ArrayUtils;
import org.cobbzilla.util.collection.NameAndValue;
import org.cobbzilla.util.http.HttpContentEncodingType;

import java.util.List;

import static org.cobbzilla.util.daemon.ZillaRuntime.empty;

@NoArgsConstructor @Accessors(chain=true)
@@ -23,12 +26,13 @@ public class FilterHttpRequest {
@Getter @Setter private NameAndValue[] meta;
@Getter @Setter private String contentType;

@Getter @Setter private String[] matchers;

public boolean hasMatchers() { return !empty(matchers); }

public boolean hasMatcher (String matcherId) {
if (empty(matcherId) || empty(matchers)) return false;
return ArrayUtils.contains(matchers, matcherId);
if (empty(matcherId) || !hasMatchers()) return false;
return matchersResponse.getMatchers().stream().anyMatch(m -> m.getUuid().equals(matcherId));
}

public boolean hasMatchers() { return matchersResponse != null && matchersResponse.hasMatchers(); }

@JsonIgnore public List<AppMatcher> getMatchers() { return !hasMatchers() ? null : matchersResponse.getMatchers(); }

}

+ 84
- 40
bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java Parādīt failu

@@ -88,16 +88,20 @@ public class FilterHttpResource {
final RedisService cache = getMatchersCache();

final String requestId = filterRequest.getRequestId();
final String prefix = "getMatchersResponse("+requestId+"): ";
final String cacheKey = filterRequest.cacheKey();
final String matchersJson = cache.get(cacheKey);
if (matchersJson != null) {
return json(matchersJson, FilterMatchersResponse.class)
.setRequestId(requestId);
final FilterMatchersResponse cached = json(matchersJson, FilterMatchersResponse.class);
cache.set(requestId, json(cached.setRequestId(requestId), COMPACT_MAPPER), EX, MATCHERS_CACHE_TIMEOUT);
if (log.isTraceEnabled()) log.trace(prefix+"found cached response for cacheKey="+cacheKey+" and set for requestId "+requestId+": "+json(cached, COMPACT_MAPPER));
return cached;
}

final FilterMatchersResponse response = findMatchers(filterRequest, req, request);
cache.set(cacheKey, json(response), EX, MATCHERS_CACHE_TIMEOUT);
cache.set(requestId, json(response), EX, MATCHERS_CACHE_TIMEOUT);
if (log.isTraceEnabled()) log.trace(prefix+"writing cache-miss to redis under keys "+cacheKey+" and "+requestId+": "+json(response, COMPACT_MAPPER));
cache.set(cacheKey, json(response, COMPACT_MAPPER), EX, MATCHERS_CACHE_TIMEOUT);
cache.set(requestId, json(response, COMPACT_MAPPER), EX, MATCHERS_CACHE_TIMEOUT);
return response;
}

@@ -105,56 +109,68 @@ public class FilterHttpResource {
final RedisService cache = getMatchersCache();
final String matchersJson = cache.get(requestId);
if (matchersJson != null) return json(matchersJson, FilterMatchersResponse.class);
if (log.isTraceEnabled()) log.trace("getMatchersResponseByRequestId: no FilterMatchersResponse for requestId: "+requestId);
return null;
}

@POST @Path(EP_MATCHERS)
@POST @Path(EP_MATCHERS+"/{requestId}")
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
public Response selectMatchers(@Context Request req,
@Context ContainerRequest request,
@PathParam("requestId") String requestId,
FilterMatchersRequest filterRequest) {
final String remoteHost = getRemoteHost(req);
final String mitmAddr = req.getRemoteAddr();
if (log.isDebugEnabled()) log.debug("selectMatchers: starting for remoteHost=" + remoteHost + ", mitmAddr=" + mitmAddr+", filterRequest="+json(filterRequest, COMPACT_MAPPER));

if (filterRequest == null || !filterRequest.hasRequestId() || empty(requestId) || !requestId.equals(filterRequest.getRequestId())) {
if (log.isDebugEnabled()) log.debug("selectMatchers: no filterRequest, missing requestId, or mismatch, returning forbidden");
return forbidden();
}
// only mitmproxy is allowed to call us, and this should always be a local address
if (!isLocalIpv4(mitmAddr)) return forbidden();
if (!isLocalIpv4(mitmAddr)) {
if (log.isDebugEnabled()) log.debug("selectMatchers: mitmAddr ("+mitmAddr+") was not local IPv4 for filterRequest ("+filterRequest.getRequestId()+"), returning forbidden");
return forbidden();
}

final String vpnAddr = filterRequest.getRemoteAddr();
if (empty(vpnAddr)) {
if (log.isDebugEnabled()) log.debug("findMatchers: no VPN address provided, returning no matchers");
final String prefix = "selectMatchers("+filterRequest.getRequestId()+"): ";
if (log.isDebugEnabled()) log.debug(prefix+"starting for filterRequest="+json(filterRequest, COMPACT_MAPPER));

if (!filterRequest.hasRemoteAddr()) {
if (log.isDebugEnabled()) log.debug(prefix+"no VPN address provided, returning no matchers");
return ok(NO_MATCHERS);
}

final String vpnAddr = filterRequest.getRemoteAddr();
final Device device = deviceIdService.findDeviceByIp(vpnAddr);
if (device == null) {
if (log.isDebugEnabled()) log.debug("findMatchers: device not found for IP "+vpnAddr+", returning no matchers");
if (log.isDebugEnabled()) log.debug(prefix+"device not found for IP "+vpnAddr+", returning no matchers");
return ok(NO_MATCHERS);
} else if (log.isDebugEnabled()) {
log.debug("findMatchers: found device "+device.id()+" for IP "+vpnAddr);
log.debug(prefix+"found device "+device.id()+" for IP "+vpnAddr);
}
filterRequest.setDevice(device.getUuid());
return ok(getMatchersResponse(filterRequest, req, request));
final FilterMatchersResponse response = getMatchersResponse(filterRequest, req, request);
if (log.isDebugEnabled()) log.debug(prefix+"returning response: "+json(response, COMPACT_MAPPER));
return ok(response);
}

private FilterMatchersResponse findMatchers(FilterMatchersRequest filterRequest, Request req, ContainerRequest request) {

final String prefix = "findMatchers("+filterRequest.getRequestId()+"): ";
final Device device = findDevice(filterRequest.getDevice());
if (device == null) {
if (log.isDebugEnabled()) log.debug("findMatchers: findDevice("+ filterRequest.getDevice() +") returned null, returning no matchers");
if (log.isDebugEnabled()) log.debug(prefix+"findDevice("+ filterRequest.getDevice() +") returned null, returning no matchers");
return NO_MATCHERS;
}
final String accountUuid = device.getAccount();
final Account caller = findCaller(accountUuid);
if (caller == null) {
if (log.isDebugEnabled()) log.debug("findMatchers: account "+ accountUuid +" not found for device "+device.id()+", returning no matchers");
if (log.isDebugEnabled()) log.debug(prefix+"account "+ accountUuid +" not found for device "+device.id()+", returning no matchers");
return NO_MATCHERS;
}

final String fqdn = filterRequest.getFqdn();
final List<AppMatcher> matchers = matcherDAO.findByAccountAndFqdnAndEnabled(accountUuid, fqdn);
if (log.isDebugEnabled()) log.debug("findMatchers: found "+matchers.size()+" candidate matchers");
if (log.isDebugEnabled()) log.debug(prefix+"found "+matchers.size()+" candidate matchers");
final List<AppMatcher> removeMatchers;
if (matchers.isEmpty()) {
removeMatchers = Collections.emptyList();
@@ -163,7 +179,7 @@ public class FilterHttpResource {
removeMatchers = new ArrayList<>();
for (AppMatcher matcher : matchers) {
if (matcher.matches(uri)) {
final FilterMatchDecision matchResponse = ruleEngine.preprocess(filterRequest, req, request, caller, device, matcher.getUuid());
final FilterMatchDecision matchResponse = ruleEngine.preprocess(filterRequest, req, request, caller, device, matcher);
switch (matchResponse) {
case abort_ok: return FilterMatchersResponse.ABORT_OK;
case abort_not_found: return FilterMatchersResponse.ABORT_NOT_FOUND;
@@ -178,9 +194,10 @@ public class FilterHttpResource {
}
matchers.removeAll(removeMatchers);

if (log.isDebugEnabled()) log.debug("findMatchers: after pre-processing, returning "+matchers.size()+" matchers");
if (log.isDebugEnabled()) log.debug(prefix+"after pre-processing, returning "+matchers.size()+" matchers");
return new FilterMatchersResponse()
.setDecision(empty(matchers) ? FilterMatchDecision.no_match : FilterMatchDecision.match)
.setRequest(filterRequest)
.setMatchers(matchers);
}

@@ -194,16 +211,18 @@ public class FilterHttpResource {
@QueryParam("contentType") String contentType,
@QueryParam("last") Boolean last) throws IOException {

final String remoteHost = getRemoteHost(req);
final String mitmAddr = req.getRemoteAddr();

// only mitmproxy is allowed to call us, and this should always be a local address
if (!isLocalIpv4(mitmAddr)) return forbidden();
final String mitmAddr = req.getRemoteAddr();
if (!isLocalIpv4(mitmAddr)) {
if (log.isDebugEnabled()) log.debug("filterHttp: mitmAddr ("+mitmAddr+") was not local IPv4, returning forbidden");
return forbidden();
}

if (empty(requestId)) {
if (log.isDebugEnabled()) log.debug("filterHttp: no requestId provided, returning passthru");
return passthru(request);
}
final String prefix = "filterHttp("+requestId+"): ";

final boolean isLast = bool(last);

@@ -215,7 +234,7 @@ public class FilterHttpResource {
try {
encoding = HttpContentEncodingType.fromString(contentEncoding);
} catch (Exception e) {
if (log.isWarnEnabled()) log.warn("filterHttp: invalid encoding ("+contentEncoding+"), returning passthru");
if (log.isWarnEnabled()) log.warn(prefix+"invalid encoding ("+contentEncoding+"), returning passthru");
return passthru(request);
}
}
@@ -226,46 +245,70 @@ public class FilterHttpResource {
try {
contentLength = empty(contentLengthHeader) ? null : Integer.parseInt(contentLengthHeader);
} catch (Exception e) {
if (log.isDebugEnabled()) log.debug("filterHttp: error parsing Content-Length ("+contentLengthHeader+"): "+shortError(e));
if (log.isDebugEnabled()) log.debug(prefix+"error parsing Content-Length ("+contentLengthHeader+"): "+shortError(e));
contentLength = null;
}

final FilterMatchersResponse matchersResponse = getMatchersResponseByRequestId(requestId);
if (matchersResponse == null) {
if (log.isWarnEnabled()) log.warn("filterHttp: FilterMatchersResponse not found, returning passthru");
if (log.isWarnEnabled()) log.warn(prefix+"FilterMatchersResponse not found, returning passthru");
return passthru(request);

} else if (matchersResponse.hasAbort()) {
if (log.isWarnEnabled()) log.warn("filterHttp: FilterMatchersResponse has abort code "+matchersResponse.httpStatus()+", MITM should have aborted. We are aborting now.");
}
if (log.isDebugEnabled()) log.debug(prefix+"found FilterMatchersResponse: "+json(matchersResponse, COMPACT_MAPPER));
if (matchersResponse.hasAbort()) {
if (log.isWarnEnabled()) log.warn(prefix+"FilterMatchersResponse has abort code "+matchersResponse.httpStatus()+", MITM should have aborted. We are aborting now.");
return status(matchersResponse.httpStatus());

} else if (!matchersResponse.getRequest().hasDevice()) {
if (log.isWarnEnabled()) log.warn("filterHttp: FilterMatchersResponse has no device, returning passthru");
} else if (!matchersResponse.hasMatchers()) {
if (log.isWarnEnabled()) log.warn(prefix+"FilterMatchersResponse has no matchers, returning passthru");
return passthru(request);

} else if (!matchersResponse.hasMatchers()) {
if (log.isWarnEnabled()) log.warn("filterHttp: FilterMatchersResponse has no matchers, returning passthru");
} else if (!matchersResponse.hasRequest()) {
if (log.isWarnEnabled()) log.warn(prefix + "FilterMatchersResponse has no request, returning passthru");
return passthru(request);

} else {
final FilterMatchDecision decision = matchersResponse.getDecision();
if (decision != FilterMatchDecision.match) {
switch (decision) {
case no_match:
if (log.isWarnEnabled()) log.warn(prefix + "FilterMatchersResponse decision was not match: "+ decision +", returning passthru");
return passthru(request);
case abort_not_found:
case abort_ok:
if (log.isWarnEnabled()) log.warn(prefix + "FilterMatchersResponse decision was not match: "+ decision +", returning "+matchersResponse.httpStatus());
return status(matchersResponse.httpStatus());
default:
if (log.isWarnEnabled()) log.warn(prefix + "FilterMatchersResponse decision was unknown: "+ decision +", returning passthru");
return passthru(request);
}

} else if (!matchersResponse.getRequest().hasDevice()) {
if (log.isWarnEnabled()) log.warn(prefix+"FilterMatchersResponse has no device, returning passthru");
return passthru(request);
}
}
matchersResponse.setRequestId(requestId);

FilterHttpRequest filterRequest = getActiveRequest(requestId);
if (filterRequest == null) {
if (log.isDebugEnabled()) log.debug(prefix+"filterRequest not found, initiating...");
if (empty(contentType)) {
if (log.isDebugEnabled()) log.debug("filterHttp: filter request not found, and contentType provided, returning passthru");
if (log.isDebugEnabled()) log.debug(prefix+"filter request not found, and no contentType provided, returning passthru");
return passthru(request);
}

final Device device = findDevice(matchersResponse.getRequest().getDevice());
if (device == null) {
if (log.isDebugEnabled()) log.debug("filterHttp: device "+matchersResponse.getRequest().getDevice()+" not found, returning passthru");
if (log.isDebugEnabled()) log.debug(prefix+"device "+matchersResponse.getRequest().getDevice()+" not found, returning passthru");
return passthru(request);
} else {
if (log.isDebugEnabled()) log.debug("filterHttp: found device: "+device.id()+" ... ");
if (log.isDebugEnabled()) log.debug(prefix+"found device: "+device.id()+" ... ");
}
final Account caller = findCaller(device.getAccount());
if (caller == null) {
if (log.isDebugEnabled()) log.debug("filterHttp: account "+device.getAccount()+" not found, returning passthru");
if (log.isDebugEnabled()) log.debug(prefix+"account "+device.getAccount()+" not found, returning passthru");
return passthru(request);
}
filterRequest = new FilterHttpRequest()
@@ -275,14 +318,15 @@ public class FilterHttpResource {
.setAccount(caller)
.setEncoding(encoding)
.setContentType(contentType);
if (log.isDebugEnabled()) log.trace("filterHttp: start filterRequest="+json(filterRequest, COMPACT_MAPPER));
if (log.isDebugEnabled()) log.trace(prefix+"start filterRequest="+json(filterRequest, COMPACT_MAPPER));
getActiveRequestCache().set(requestId, json(filterRequest, COMPACT_MAPPER), EX, ACTIVE_REQUEST_TIMEOUT);
} else {
if (log.isDebugEnabled()) log.debug(prefix+"filterRequest found, continuing...");
if (log.isTraceEnabled()) {
if (isLast) {
log.trace("filterHttp: last filterRequest=" + json(filterRequest, COMPACT_MAPPER));
log.trace(prefix+"last filterRequest=" + json(filterRequest, COMPACT_MAPPER));
} else {
log.trace("filterHttp: continuing filterRequest=" + json(filterRequest, COMPACT_MAPPER));
log.trace(prefix+"continuing filterRequest=" + json(filterRequest, COMPACT_MAPPER));
}
}
}


+ 5
- 2
bubble-server/src/main/java/bubble/resources/stream/FilterMatchersRequest.java Parādīt failu

@@ -13,12 +13,17 @@ import static org.cobbzilla.util.daemon.ZillaRuntime.hashOf;
public class FilterMatchersRequest {

@Getter @Setter private String requestId;
public boolean hasRequestId () { return !empty(requestId); }

@Getter @Setter private String device;
public boolean hasDevice() { return !empty(device); }

@Getter @Setter private String fqdn;
@Getter @Setter private String uri;
@Getter @Setter private String userAgent;
@Getter @Setter private String referer;
@Getter @Setter private String remoteAddr;
public boolean hasRemoteAddr() { return !empty(remoteAddr); }

// note: we do *not* include the requestId in the cache, if we did then the
// FilterHttpResource.matchersCache cache would be useless, since every cache entry would be unique
@@ -26,6 +31,4 @@ public class FilterMatchersRequest {

@JsonIgnore public String getUrl() { return fqdn + "/" + uri; }

public boolean hasDevice() { return !empty(device); }

}

+ 1
- 0
bubble-server/src/main/java/bubble/resources/stream/FilterMatchersResponse.java Parādīt failu

@@ -20,6 +20,7 @@ public class FilterMatchersResponse {
public static final FilterMatchersResponse ABORT_NOT_FOUND = new FilterMatchersResponse().setDecision(FilterMatchDecision.abort_not_found);

@Getter @Setter private FilterMatchersRequest request;
public boolean hasRequest () { return request != null; }

@Getter @Setter private FilterMatchDecision decision;



+ 19
- 11
bubble-server/src/main/java/bubble/resources/stream/ReverseProxyResource.java Parādīt failu

@@ -5,6 +5,7 @@ import bubble.dao.app.AppRuleDAO;
import bubble.model.account.Account;
import bubble.model.app.AppMatcher;
import bubble.model.device.Device;
import bubble.rule.FilterMatchDecision;
import bubble.server.BubbleConfiguration;
import bubble.service.cloud.DeviceIdService;
import bubble.service.stream.StandardRuleEngineService;
@@ -22,19 +23,14 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.*;

import static bubble.ApiConstants.PROXY_ENDPOINT;
import static bubble.ApiConstants.getRemoteHost;
import static bubble.ApiConstants.*;
import static java.util.UUID.randomUUID;
import static org.cobbzilla.util.daemon.ZillaRuntime.die;
import static org.cobbzilla.util.daemon.ZillaRuntime.empty;
import static org.cobbzilla.util.http.HttpContentTypes.CONTENT_TYPE_ANY;
import static org.cobbzilla.util.json.JsonUtil.json;
import static org.cobbzilla.util.string.StringUtil.EMPTY_ARRAY;
import static org.cobbzilla.wizard.resources.ResourceUtil.*;

@Path(PROXY_ENDPOINT)
@@ -69,7 +65,7 @@ public class ReverseProxyResource {
return ruleEngine.passthru(ub, request);
} else {
// find rules by regex
final Set<String> matcherIds = new TreeSet<>();
final Map<String, AppMatcher> matchedMatchers = new HashMap<>();
for (AppMatcher m : matchers) {
// check for regex match
if (m.matches(ub.getFullPath())) {
@@ -78,15 +74,27 @@ public class ReverseProxyResource {
log.debug("get: matcher("+m.getUuid()+") blocks request, returning 404 Not Found for "+ub.getFullPath());
return notFound(ub.getFullPath());
}
matcherIds.add(m.getUuid());
matchedMatchers.put(m.getUuid(), m);
}
}

final String id = randomUUID().toString();
final FilterHttpRequest filterRequest = new FilterHttpRequest()
.setId(randomUUID().toString())
.setId(id)
.setAccount(account)
.setDevice(device)
.setMatchers(matcherIds.toArray(EMPTY_ARRAY));
.setMatchersResponse(new FilterMatchersResponse()
.setRequest(new FilterMatchersRequest()
.setRequestId(id)
.setFqdn(ub.getHost())
.setUri(ub.getFullPath())
.setUserAgent(getUserAgent(request))
.setReferer(getReferer(request))
.setRemoteAddr(remoteHost)
.setDevice(device.getUuid()))
.setRequestId(id)
.setDecision(FilterMatchDecision.match)
.setMatchers(new ArrayList<>(matchedMatchers.values())));

filterHttpResource.getActiveRequestCache().set(filterRequest.getId(), json(filterRequest));



+ 1
- 1
bubble-server/src/main/java/bubble/rule/analytics/TrafficAnalyticsRuleDriver.java Parādīt failu

@@ -43,7 +43,7 @@ public class TrafficAnalyticsRuleDriver extends AbstractAppRuleDriver {
final String site = ruleHarness.getMatcher().getSite();
final String fqdn = filter.getFqdn();

final TrafficRecord rec = new TrafficRecord(filter, account, device, req);
final TrafficRecord rec = new TrafficRecord(filter, account, device);
recordRecentTraffic(rec);
incrementCounters(account, device, app, site, fqdn);
return FilterMatchDecision.no_match; // we are done, don't need to look at/modify stream


+ 2
- 4
bubble-server/src/main/java/bubble/rule/analytics/TrafficRecord.java Parādīt failu

@@ -7,9 +7,7 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.glassfish.grizzly.http.server.Request;

import static bubble.ApiConstants.getRemoteHost;
import static java.util.UUID.randomUUID;
import static org.cobbzilla.util.daemon.ZillaRuntime.now;

@@ -28,12 +26,12 @@ public class TrafficRecord {
@Getter @Setter private String userAgent;
@Getter @Setter private String referer;

public TrafficRecord(FilterMatchersRequest filter, Account account, Device device, Request req) {
public TrafficRecord(FilterMatchersRequest filter, Account account, Device device) {
setAccountName(account == null ? null : account.getName());
setAccountUuid(account == null ? null : account.getUuid());
setDeviceName(device == null ? null : device.getName());
setDeviceUuid(device == null ? null : device.getUuid());
setIp(getRemoteHost(req));
setIp(filter.getRemoteAddr());
setFqdn(filter.getFqdn());
setUri(filter.getUri());
setUserAgent(filter.getUserAgent());


+ 12
- 9
bubble-server/src/main/java/bubble/rule/bblock/BubbleBlockRuleDriver.java Parādīt failu

@@ -96,23 +96,24 @@ public class BubbleBlockRuleDriver extends TrafficAnalyticsRuleDriver {
final String app = ruleHarness.getRule().getApp();
final String site = ruleHarness.getMatcher().getSite();
final String fqdn = filter.getFqdn();
final String prefix = "preprocess("+filter.getRequestId()+"): ";

final BlockDecision decision = getDecision(filter.getFqdn(), filter.getUri());
switch (decision.getDecisionType()) {
case block:
if (log.isDebugEnabled()) log.debug("preprocess: decision is BLOCK");
if (log.isDebugEnabled()) log.debug(prefix+"decision is BLOCK");
incrementCounters(account, device, app, site, fqdn);
return FilterMatchDecision.abort_not_found; // block this request

case allow: default:
if (log.isDebugEnabled()) log.debug("preprocess: decision is ALLOW");
if (log.isDebugEnabled()) log.debug(prefix+"decision is ALLOW");
return FilterMatchDecision.no_match;

case filter:
if (log.isDebugEnabled()) log.debug("preprocess: decision is FILTER");
if (log.isDebugEnabled()) log.debug(prefix+"decision is FILTER");
final List<BlockSpec> specs = decision.getSpecs();
if (empty(specs)) {
log.warn("getFilterMatchResponse: decision was 'filter' but no specs were found, returning no_match");
log.warn(prefix+"decision was 'filter' but no specs were found, returning no_match");
return FilterMatchDecision.no_match;
} else {
return FilterMatchDecision.match;
@@ -127,32 +128,34 @@ public class BubbleBlockRuleDriver extends TrafficAnalyticsRuleDriver {
@Override public InputStream doFilterResponse(FilterHttpRequest filterRequest, InputStream in) {

final FilterMatchersRequest request = filterRequest.getMatchersResponse().getRequest();
final String prefix = "doFilterResponse("+filterRequest.getId()+"): ";

// Now that we know the content type, re-check the BlockList
final String contentType = filterRequest.getContentType();
final BlockDecision decision = blockList.getDecision(request.getFqdn(), request.getUri(), contentType, true);
if (log.isDebugEnabled()) log.debug(prefix+"preprocess decision was "+decision+", but now we know contentType="+contentType);
switch (decision.getDecisionType()) {
case block:
log.warn("doFilterRequest: preprocessed request was filtered, but ultimate decision was block (contentType="+contentType+"), returning EMPTY_STREAM");
log.warn(prefix+"preprocessed request was filtered, but ultimate decision was block (contentType="+contentType+"), returning EMPTY_STREAM");
return EMPTY_STREAM;
case allow:
log.warn("doFilterRequest: preprocessed request was filtered, but ultimate decision was allow (contentType="+contentType+"), returning as-is");
log.warn(prefix+"preprocessed request was filtered, but ultimate decision was allow (contentType="+contentType+"), returning as-is");
return in;
case filter:
if (!decision.hasSpecs()) {
// should never happen
log.warn("doFilterRequest: preprocessed request was filtered, but ultimate decision was filtered (contentType="+contentType+"), but no filters provided, returning as-is");
log.warn(prefix+"preprocessed request was filtered, but ultimate decision was filtered (contentType="+contentType+"), but no filters provided, returning as-is");
return in;
}
break;
default:
// should never happen
log.warn("doFilterRequest: preprocessed request was filtered, but ultimate decision was invalid, returning EMPTY_STREAM");
log.warn(prefix+"preprocessed request was filtered, but ultimate decision was invalid, returning EMPTY_STREAM");
return EMPTY_STREAM;
}

if (!HttpContentTypes.isHtml(contentType)) {
log.warn("doFilterRequest: cannot request non-html response ("+request.getUrl()+"), returning as-is: "+contentType);
log.warn(prefix+"cannot request non-html response ("+request.getUrl()+"), returning as-is: "+contentType);
return in;
}



+ 36
- 29
bubble-server/src/main/java/bubble/service/stream/StandardRuleEngineService.java Parādīt failu

@@ -29,6 +29,7 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.cobbzilla.util.collection.ExpirationEvictionPolicy;
import org.cobbzilla.util.collection.ExpirationMap;
import org.cobbzilla.util.collection.NameAndValue;
import org.cobbzilla.util.collection.SingletonList;
import org.cobbzilla.util.http.HttpClosingFilterInputStream;
import org.cobbzilla.util.http.HttpContentEncodingType;
import org.cobbzilla.util.http.HttpMethods;
@@ -55,6 +56,7 @@ import java.util.List;
import java.util.Map;

import static bubble.client.BubbleApiClient.newHttpClientBuilder;
import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH;
@@ -78,8 +80,8 @@ public class StandardRuleEngineService implements RuleEngineService {
ContainerRequest request,
Account account,
Device device,
String matcherUuid) {
final AppRuleHarness ruleHarness = initRules(account, device, new String[]{ matcherUuid }).get(0);
AppMatcher matcher) {
final AppRuleHarness ruleHarness = initRules(account, device, new SingletonList<AppMatcher>(matcher)).get(0);
return ruleHarness.getDriver().preprocess(ruleHarness, filter, account, device, req, request);
}

@@ -147,22 +149,25 @@ public class StandardRuleEngineService implements RuleEngineService {
FilterHttpRequest filterRequest,
Integer contentLength,
boolean last) throws IOException {

if (!filterRequest.hasMatchers()) return passthru(request);
final String prefix = "applyRulesToChunkAndSendResponse("+filterRequest.getId()+"): ";
if (!filterRequest.hasMatchers()) {
if (log.isDebugEnabled()) log.debug(prefix+"adding no matchers, returning passthru");
return passthru(request);
}

// have we seen this request before?
final ActiveStreamState state = activeProcessors.computeIfAbsent(filterRequest.getId(),
k -> new ActiveStreamState(filterRequest, initRules(filterRequest)));
final byte[] chunk = toBytes(request.getEntityStream(), contentLength);
if (last) {
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding LAST stream");
if (log.isDebugEnabled()) log.debug(prefix+"adding LAST stream");
state.addLastChunk(chunk);
} else {
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding a stream");
if (log.isDebugEnabled()) log.debug(prefix+"adding a stream");
state.addChunk(chunk);
}

if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: sending as much filtered data as we can right now (which may be nothing)");
if (log.isDebugEnabled()) log.debug(prefix+"sending as much filtered data as we can right now (which may be nothing)");
// return sendResponse(new ByteArrayInputStream(chunk)); // noop for testing
return sendResponse(state.getResponseStream(last));
}
@@ -185,13 +190,10 @@ public class StandardRuleEngineService implements RuleEngineService {
return initRules(filterRequest.getAccount(), filterRequest.getDevice(), filterRequest.getMatchers());
}

private List<AppRuleHarness> initRules(Account account, Device device, String[] matcherIds) {
final String cacheKey = hashOf(account.getUuid(), device.getUuid(), matcherIds);
private List<AppRuleHarness> initRules(Account account, Device device, List<AppMatcher> matchers) {
if (empty(matchers)) return emptyList();
final String cacheKey = hashOf(account.getUuid(), device.getUuid(), matchers);
return ruleCache.computeIfAbsent(cacheKey, k -> {
final List<AppMatcher> matchers = matcherDAO.findByUuids(matcherIds);
if (matchers.size() != matcherIds.length) {
log.warn("initRules: duplicate rules, or could not resolve some rule(s)");
}
final List<AppRuleHarness> ruleHarnesses = new ArrayList<>();
for (AppMatcher m : matchers) {
final AppRule rule = ruleDAO.findByUuid(m.getRule());
@@ -201,11 +203,11 @@ public class StandardRuleEngineService implements RuleEngineService {
}
ruleHarnesses.add(new AppRuleHarness(m, rule));
}
return initRules(account, device, ruleHarnesses);
return initRuleHarnesses(account, device, ruleHarnesses);
});
}

private List<AppRuleHarness> initRules(Account account, Device device, List<AppRuleHarness> rules) {
private List<AppRuleHarness> initRuleHarnesses(Account account, Device device, List<AppRuleHarness> rules) {
for (AppRuleHarness h : rules) {
final RuleDriver ruleDriver = driverDAO.findByUuid(h.getRule().getDriver());
if (ruleDriver == null) {
@@ -282,8 +284,10 @@ public class StandardRuleEngineService implements RuleEngineService {
this.firstRule = rules.get(0);
}

private String prefix(String s) { return s+"("+requestId+"): "; }

public void addChunk(byte[] chunk) throws IOException {
if (log.isDebugEnabled()) log.debug("addChunk: adding "+chunk.length+" bytes");
if (log.isDebugEnabled()) log.debug(prefix("addChunk")+"adding "+chunk.length+" bytes");
totalBytesWritten += chunk.length;
if (multiStream == null) {
multiStream = new MultiStream(new ByteArrayInputStream(chunk));
@@ -294,7 +298,7 @@ public class StandardRuleEngineService implements RuleEngineService {
}

public void addLastChunk(byte[] chunk) throws IOException {
if (log.isDebugEnabled()) log.debug("addLastChunk: adding "+chunk.length+" bytes");
if (log.isDebugEnabled()) log.debug(prefix("addLastChunk")+"adding "+chunk.length+" bytes");
totalBytesWritten += chunk.length;
if (multiStream == null) {
multiStream = new MultiStream(new ByteArrayInputStream(chunk), true);
@@ -305,10 +309,11 @@ public class StandardRuleEngineService implements RuleEngineService {
}

public InputStream getResponseStream(boolean last) throws IOException {
if (log.isDebugEnabled()) log.debug("getResponseStream: starting with last="+last+", totalBytesWritten="+totalBytesWritten+", totalBytesRead="+totalBytesRead);
final String prefix = prefix("getResponseStream");
if (log.isDebugEnabled()) log.debug(prefix+"starting with last="+last+", totalBytesWritten="+totalBytesWritten+", totalBytesRead="+totalBytesRead);
// read to end of all streams, there is no more data coming in
if (last) {
if (log.isDebugEnabled()) log.debug("getResponseStream: last==true, returning full output");
if (log.isDebugEnabled()) log.debug(prefix+"last==true, returning full output");
return output;
}

@@ -316,47 +321,49 @@ public class StandardRuleEngineService implements RuleEngineService {
final int bytesToRead = (int) (totalBytesWritten - totalBytesRead - (2*MAX_BYTE_BUFFER_SIZE));
if (bytesToRead < 0) {
// we shouldn't try to read yet, less than 1024 bytes have been written
if (log.isDebugEnabled()) log.debug("getResponseStream: not enough data written (bytesToRead="+bytesToRead+"), can't read anything yet");
if (log.isDebugEnabled()) log.debug(prefix+"not enough data written (bytesToRead="+bytesToRead+"), can't read anything yet");
return NullInputStream.instance;
}

if (log.isDebugEnabled()) log.debug("getResponseStream: trying to read "+bytesToRead+" bytes");
if (log.isDebugEnabled()) log.debug(prefix+"trying to read "+bytesToRead+" bytes");
final byte[] buffer = new byte[bytesToRead];
final int bytesRead = output.read(buffer);
if (log.isDebugEnabled()) log.debug("getResponseStream: actually read "+bytesRead+" bytes");
if (log.isDebugEnabled()) log.debug(prefix+"actually read "+bytesRead+" bytes");
if (bytesRead == -1) {
// nothing to return
if (log.isDebugEnabled()) log.debug("getResponseStream: end of stream, returning NullInputStream");
if (log.isDebugEnabled()) log.debug(prefix+"end of stream, returning NullInputStream");
return NullInputStream.instance;
}

if (log.isDebugEnabled()) log.debug("getResponseStream: read "+bytesRead+", returning buffer");
if (log.isDebugEnabled()) log.debug(prefix+"read "+bytesRead+", returning buffer");
totalBytesRead += bytesRead;
return new ByteArrayInputStream(buffer, 0, bytesRead);
}

private InputStream inputStream(MultiStream baseStream) throws IOException {
final String prefix = prefix("inputStream");
if (encoding == null) {
if (log.isDebugEnabled()) log.debug("inputStream: returning baseStream unmodified");
if (log.isDebugEnabled()) log.debug(prefix+"returning baseStream unmodified");
return baseStream;
}
try {
final InputStream wrapped = encoding.wrapInput(baseStream);
if (log.isDebugEnabled()) log.debug("inputStream: returning baseStream wrapped in " + wrapped.getClass().getSimpleName());
if (log.isDebugEnabled()) log.debug(prefix+"returning baseStream wrapped in " + wrapped.getClass().getSimpleName());
return wrapped;
} catch (IOException e) {
if (log.isWarnEnabled()) log.warn("inputStream: error wrapping with "+encoding+", sending as-is (perhaps missing a byte or two)");
if (log.isWarnEnabled()) log.warn(prefix+"error wrapping with "+encoding+", sending as-is (perhaps missing a byte or two)");
return baseStream;
}
}

private InputStream outputStream(InputStream in) throws IOException {
final String prefix = prefix("outputStream");
if (encoding == null) {
if (log.isDebugEnabled()) log.debug("outputStream: returning baseStream unmodified");
if (log.isDebugEnabled()) log.debug(prefix+"returning baseStream unmodified");
return in;
}
final FilterInputStreamViaOutputStream wrapped = encoding.wrapInputAsOutput(in);
if (log.isDebugEnabled()) log.debug("outputStream: returning baseStream wrapped in "+wrapped.getOutputStreamClass().getSimpleName());
if (log.isDebugEnabled()) log.debug(prefix+"returning baseStream wrapped in "+wrapped.getOutputStreamClass().getSimpleName());
return wrapped;
}
}


+ 1
- 0
bubble-server/src/main/resources/logback.xml Parādīt failu

@@ -37,6 +37,7 @@
<logger name="org.cobbzilla.wizard.server.listener.BrowserLauncherListener" level="INFO" />
<logger name="bubble.service.notify.NotificationService" level="WARN" />
<logger name="bubble.rule.bblock.BubbleBlockRuleDriver" level="DEBUG" />
<logger name="bubble.service.stream.StandardRuleEngineService" level="DEBUG" />
<logger name="bubble.resources.stream" level="DEBUG" />
<logger name="bubble.service.stream" level="INFO" />
<logger name="bubble.resources.message" level="INFO" />


Notiek ielāde…
Atcelt
Saglabāt