From b2cd78c8c197fef66f1e092201fae35f83e1984f Mon Sep 17 00:00:00 2001 From: Jonathan Cobb Date: Thu, 23 Jan 2020 03:54:22 -0500 Subject: [PATCH] mitm interception and filtering now works --- .../roles/mitmproxy/files/bubble_modify.py | 3 +- .../roles/mitmproxy/files/dns_spoofing.py | 2 +- .../resources/stream/FilterHttpResource.java | 9 +- .../stream/ReverseProxyResource.java | 2 +- .../main/java/bubble/rule/AppRuleDriver.java | 9 +- .../rule/analytics/TrafficAnalytics.java | 2 +- .../bubble/rule/social/block/UserBlocker.java | 8 +- .../social/block/UserBlockerStreamFilter.java | 6 +- .../stream/AppRuleHarness.java | 2 +- .../stream/RuleEngine.java | 104 ++++++++++++++++-- bubble-server/src/main/resources/logback.xml | 5 +- utils/cobbzilla-utils | 2 +- 12 files changed, 120 insertions(+), 34 deletions(-) rename bubble-server/src/main/java/bubble/{resources => service}/stream/AppRuleHarness.java (95%) rename bubble-server/src/main/java/bubble/{resources => service}/stream/RuleEngine.java (64%) diff --git a/automation/roles/mitmproxy/files/bubble_modify.py b/automation/roles/mitmproxy/files/bubble_modify.py index ffe57e71..5be107f0 100644 --- a/automation/roles/mitmproxy/files/bubble_modify.py +++ b/automation/roles/mitmproxy/files/bubble_modify.py @@ -26,7 +26,8 @@ def filter_chunk(chunk, req_id, content_type=None, device=None, matchers=None): if not response.ok: err_message = "filter_chunk: Error fetching " + url + ", HTTP status " + str(response.status_code) bubble_log(err_message) - raise RuntimeError(err_message) + return b'' + # raise RuntimeError(err_message) return response.content # NOOP code: diff --git a/automation/roles/mitmproxy/files/dns_spoofing.py b/automation/roles/mitmproxy/files/dns_spoofing.py index 79bce207..16e81069 100644 --- a/automation/roles/mitmproxy/files/dns_spoofing.py +++ b/automation/roles/mitmproxy/files/dns_spoofing.py @@ -45,7 +45,7 @@ class Rerouter: host = host.decode() except (UnicodeDecodeError, AttributeError): bubble_log("get_matchers: host "+str(host)+" could not be decoded, type="+str(type(host))) - pass + return None resp = bubble_matchers(remote_addr, flow, host) if (not resp) or (not 'matchers' in resp) or (not 'device' in resp): diff --git a/bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java b/bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java index 7e04e13d..f85bbaaf 100644 --- a/bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java +++ b/bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java @@ -7,6 +7,7 @@ import bubble.model.account.Account; import bubble.model.app.AppMatcher; import bubble.model.device.Device; import bubble.service.cloud.DeviceIdService; +import bubble.service.stream.RuleEngine; import lombok.extern.slf4j.Slf4j; import org.cobbzilla.util.collection.ArrayUtil; import org.cobbzilla.util.collection.ExpirationMap; @@ -189,14 +190,6 @@ public class FilterHttpResource { log.debug("filterHttp: starting with requestId="+requestId+", deviceId="+deviceId+", matchersJson="+matchersJson+", contentType="+contentType+", last="+last); } -// // for now, just try to return unmodified... -// if (last != null && last) { -// log.debug("filterHttp: DEBUG: last chunk detected, returning empty response"); -// return ok(); // no response body -// } else { -// log.debug("filterHttp: DEBUG: chunk detected, returning chunk as passthru response"); -// return passthru(request); -// } final boolean isLast = last != null && last; return ruleEngine.applyRulesToChunkAndSendResponse(request, filterRequest.getId(), filterRequest.getAccount(), filterRequest.getDevice(), diff --git a/bubble-server/src/main/java/bubble/resources/stream/ReverseProxyResource.java b/bubble-server/src/main/java/bubble/resources/stream/ReverseProxyResource.java index fbcddd1d..5195c209 100644 --- a/bubble-server/src/main/java/bubble/resources/stream/ReverseProxyResource.java +++ b/bubble-server/src/main/java/bubble/resources/stream/ReverseProxyResource.java @@ -7,10 +7,10 @@ import bubble.model.app.AppMatcher; import bubble.model.device.Device; import bubble.server.BubbleConfiguration; import bubble.service.cloud.DeviceIdService; +import bubble.service.stream.RuleEngine; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.cobbzilla.util.http.URIBean; -import org.cobbzilla.util.string.StringUtil; import org.glassfish.grizzly.http.server.Request; import org.glassfish.jersey.server.ContainerRequest; import org.glassfish.jersey.server.ContainerResponse; diff --git a/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java b/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java index 0121f287..8a6552bf 100644 --- a/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java +++ b/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java @@ -4,7 +4,7 @@ import bubble.model.account.Account; import bubble.model.app.AppMatcher; import bubble.model.app.AppRule; import bubble.model.device.Device; -import bubble.resources.stream.AppRuleHarness; +import bubble.service.stream.AppRuleHarness; import bubble.resources.stream.FilterMatchersRequest; import com.fasterxml.jackson.databind.JsonNode; import com.github.jknack.handlebars.Handlebars; @@ -50,13 +50,6 @@ public interface AppRuleDriver { default InputStream doFilterResponse(InputStream in) { return in; } - default InputStream filterResponseChunk(String requestId, InputStream in, boolean last) { - if (hasNext()) return doFilterResponseChunk(requestId, getNext().filterResponseChunk(requestId, in, last), last); - return doFilterResponse(in); - } - - default InputStream doFilterResponseChunk(String requestId, InputStream in, boolean last) { return in; } - default String resolveResource(String res, Map ctx) { final String resource = locateResource(res); if (resource == null || resource.trim().length() == 0) return ""; diff --git a/bubble-server/src/main/java/bubble/rule/analytics/TrafficAnalytics.java b/bubble-server/src/main/java/bubble/rule/analytics/TrafficAnalytics.java index 5a05f71f..07f2554f 100644 --- a/bubble-server/src/main/java/bubble/rule/analytics/TrafficAnalytics.java +++ b/bubble-server/src/main/java/bubble/rule/analytics/TrafficAnalytics.java @@ -4,7 +4,7 @@ import bubble.dao.app.AppDataDAO; import bubble.model.account.Account; import bubble.model.app.AppData; import bubble.resources.stream.FilterMatchersRequest; -import bubble.resources.stream.AppRuleHarness; +import bubble.service.stream.AppRuleHarness; import bubble.rule.AbstractAppRuleDriver; import org.glassfish.grizzly.http.server.Request; import org.glassfish.jersey.server.ContainerRequest; diff --git a/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java b/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java index f4614bdc..693e7efe 100644 --- a/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java +++ b/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java @@ -7,6 +7,7 @@ import bubble.service.cloud.RequestCoordinationService; import com.fasterxml.jackson.databind.JsonNode; import com.github.jknack.handlebars.Handlebars; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.input.ReaderInputStream; import org.cobbzilla.util.io.regex.RegexFilterReader; import org.cobbzilla.util.io.regex.RegexInsertionFilter; @@ -25,16 +26,19 @@ import static org.cobbzilla.util.daemon.ZillaRuntime.die; import static org.cobbzilla.util.json.JsonUtil.json; import static org.cobbzilla.util.string.StringUtil.UTF8cs; +@Slf4j public class UserBlocker extends AbstractAppRuleDriver { private static final int RESPONSE_BUFSIZ = (int) (64 * Bytes.KB); @Autowired private RequestCoordinationService requestService; + @Autowired private AppDataDAO appDataDAO; private final String requestId = randomUUID().toString().replace("-", "_"); // This gets called after autowiring, so `configuration` object will be non-null by now @Getter(lazy=true) private final JsonNode fullConfig = initFullConfig(); + private JsonNode initFullConfig() { UserBlockerConfig userBlockerConfig; try { @@ -52,14 +56,16 @@ public class UserBlocker extends AbstractAppRuleDriver { final String json = json(userBlockerConfig); requestService.set(UserBlocker.class.getName(), requestId, json); + return json(json, JsonNode.class); } protected UserBlockerConfig configObject() { return json(getFullConfig(), UserBlockerConfig.class); } @Override public InputStream doFilterResponse(InputStream in) { - final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(matcher, rule, configuration.getBean(AppDataDAO.class)); + final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(matcher, rule); filter.configure(getFullConfig()); + filter.setDataDAO(appDataDAO); RegexFilterReader reader = new RegexFilterReader(in, RESPONSE_BUFSIZ, filter).setName("mainFilterReader"); final UserBlockerConfig config = configObject(); diff --git a/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java b/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java index f8b29ac6..2cbef9a3 100644 --- a/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java +++ b/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java @@ -8,6 +8,7 @@ import bubble.model.app.AppMatcher; import bubble.model.app.AppRule; import com.fasterxml.jackson.databind.JsonNode; import com.ning.http.util.Base64; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.cobbzilla.util.handlebars.HandlebarsUtil; import org.cobbzilla.util.io.regex.RegexChunk; @@ -34,12 +35,11 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { private AppMatcher matcher; private AppRule rule; - private AppDataDAO dataDAO; + @Setter private AppDataDAO dataDAO; - public UserBlockerStreamFilter(AppMatcher matcher, AppRule rule, AppDataDAO dataDAO) { + public UserBlockerStreamFilter(AppMatcher matcher, AppRule rule) { this.matcher = matcher; this.rule = rule; - this.dataDAO = dataDAO; } private enum UserBlockerStreamState { seeking_comments, blocking_comments } diff --git a/bubble-server/src/main/java/bubble/resources/stream/AppRuleHarness.java b/bubble-server/src/main/java/bubble/service/stream/AppRuleHarness.java similarity index 95% rename from bubble-server/src/main/java/bubble/resources/stream/AppRuleHarness.java rename to bubble-server/src/main/java/bubble/service/stream/AppRuleHarness.java index b577f221..8ca2ca37 100644 --- a/bubble-server/src/main/java/bubble/resources/stream/AppRuleHarness.java +++ b/bubble-server/src/main/java/bubble/service/stream/AppRuleHarness.java @@ -1,4 +1,4 @@ -package bubble.resources.stream; +package bubble.service.stream; import bubble.model.app.AppMatcher; import bubble.model.app.RuleDriver; diff --git a/bubble-server/src/main/java/bubble/resources/stream/RuleEngine.java b/bubble-server/src/main/java/bubble/service/stream/RuleEngine.java similarity index 64% rename from bubble-server/src/main/java/bubble/resources/stream/RuleEngine.java rename to bubble-server/src/main/java/bubble/service/stream/RuleEngine.java index 13ec5458..b4a07b22 100644 --- a/bubble-server/src/main/java/bubble/resources/stream/RuleEngine.java +++ b/bubble-server/src/main/java/bubble/service/stream/RuleEngine.java @@ -1,4 +1,4 @@ -package bubble.resources.stream; +package bubble.service.stream; import bubble.dao.app.AppMatcherDAO; import bubble.dao.app.AppRuleDAO; @@ -8,6 +8,7 @@ import bubble.model.app.AppMatcher; import bubble.model.app.AppRule; import bubble.model.app.RuleDriver; import bubble.model.device.Device; +import bubble.resources.stream.FilterMatchersRequest; import bubble.rule.AppRuleDriver; import bubble.server.BubbleConfiguration; import lombok.Cleanup; @@ -23,9 +24,13 @@ import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; +import org.cobbzilla.util.collection.ExpirationEvictionPolicy; +import org.cobbzilla.util.collection.ExpirationMap; import org.cobbzilla.util.http.HttpClosingFilterInputStream; import org.cobbzilla.util.http.HttpMethods; import org.cobbzilla.util.http.URIBean; +import org.cobbzilla.util.io.NullInputStream; +import org.cobbzilla.util.io.multi.MultiStream; import org.cobbzilla.wizard.stream.ByteStreamingOutput; import org.cobbzilla.wizard.stream.SendableResource; import org.cobbzilla.wizard.stream.StreamStreamingOutput; @@ -35,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.ws.rs.core.Response; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -43,9 +49,12 @@ import java.util.List; import java.util.Map; import static bubble.client.BubbleApiClient.newHttpClientBuilder; +import static java.util.concurrent.TimeUnit.MINUTES; import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; +import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; import static org.cobbzilla.util.daemon.ZillaRuntime.empty; import static org.cobbzilla.util.http.HttpStatusCodes.OK; +import static org.cobbzilla.util.io.StreamUtil.DEFAULT_BUFFER_SIZE; import static org.cobbzilla.wizard.resources.ResourceUtil.send; @Service @Slf4j @@ -128,6 +137,8 @@ public class RuleEngine { public Response passthru(ContainerRequest request) { return passthru(request.getEntityStream()); } + private final Map activeProcessors = new ExpirationMap<>(MINUTES.toMillis(5), ExpirationEvictionPolicy.atime); + public Response applyRulesToChunkAndSendResponse(ContainerRequest request, String requestId, Account account, @@ -135,14 +146,29 @@ public class RuleEngine { String[] matcherIds, boolean last) throws IOException { - if (empty(matcherIds)) return passthru(null, request); + if (empty(matcherIds)) return passthru(request); + + // have we seen this request before? + ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(initRules(account, device, matcherIds))); + final byte[] chunk = toBytes(request.getEntityStream()); + if (last) { + if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding LAST stream"); + state.addLastChunk(chunk); + } else { + if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding a stream"); + state.addChunk(chunk); + } - // init rules - final List ruleHarnesses = initRules(account, device, matcherIds); - final AppRuleHarness firstRule = ruleHarnesses.get(0); + if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: sending as much filtered data as we can right now (which may be nothing)"); +// return sendResponse(new ByteArrayInputStream(chunk)); // noop for testing + return sendResponse(state.getResponseStream(last)); + } - final InputStream responseEntity = firstRule.getDriver().filterResponseChunk(requestId, request.getEntityStream(), last); - return sendResponse(responseEntity); + public byte[] toBytes(InputStream entityStream) throws IOException { + if (entityStream == null) return EMPTY_BYTE_ARRAY; + final ByteArrayOutputStream bout = new ByteArrayOutputStream(8192); + IOUtils.copyLarge(entityStream, bout); + return bout.toByteArray(); } private List initRules(Account account, Device device, String[] matcherIds) { @@ -217,4 +243,68 @@ public class RuleEngine { @Getter(lazy=true) private final HttpClientBuilder httpClientBuilder = newHttpClientBuilder(1000, 50); public CloseableHttpClient newHttpConn() { return getHttpClientBuilder().build(); } + private static class ActiveStreamState { + + private MultiStream multiStream; + private AppRuleHarness firstRule; + private InputStream output = null; + private long totalBytesWritten = 0; + private long totalBytesRead = 0; + + public ActiveStreamState(List rules) { this.firstRule = rules.get(0); } + + public void addChunk(byte[] chunk) { + if (log.isDebugEnabled()) log.debug("addChunk: adding "+chunk.length+" bytes"); + totalBytesWritten += chunk.length; + if (multiStream == null) { + multiStream = new MultiStream(new ByteArrayInputStream(chunk)); + output = firstRule.getDriver().filterResponse(multiStream); + } else { + multiStream.addStream(new ByteArrayInputStream(chunk)); + } + } + + public void addLastChunk(byte[] chunk) { + if (log.isDebugEnabled()) log.debug("addLastChunk: adding "+chunk.length+" bytes"); + totalBytesWritten += chunk.length; + if (multiStream == null) { + multiStream = new MultiStream(new ByteArrayInputStream(chunk), true); + output = firstRule.getDriver().filterResponse(multiStream); + } else { + multiStream.addLastStream(new ByteArrayInputStream(chunk)); + } + } + + public InputStream getResponseStream(boolean last) throws IOException { + if (log.isDebugEnabled()) log.debug("getResponseStream: starting with last="+last+", totalBytesWritten="+totalBytesWritten+", totalBytesRead="+totalBytesRead); + // read to end of all streams, there is no more data coming in + if (last) { + if (log.isDebugEnabled()) log.debug("getResponseStream: last==true, returning full output"); + return output; + } + + // try to read as many bytes as we have written, and have not yet read, less a safety buffer + final int bytesToRead = (int) (totalBytesWritten - totalBytesRead - (4*DEFAULT_BUFFER_SIZE)); + if (bytesToRead < 0) { + // we shouldn't try to read yet, less than 1024 bytes have been written + if (log.isDebugEnabled()) log.debug("getResponseStream: not enough data written (bytesToRead="+bytesToRead+"), can't read anything yet"); + return NullInputStream.instance; + } + + if (log.isDebugEnabled()) log.debug("getResponseStream: trying to read "+bytesToRead+" bytes"); + final byte[] buffer = new byte[bytesToRead]; + final int bytesRead = output.read(buffer); + if (log.isDebugEnabled()) log.debug("getResponseStream: actually read "+bytesRead+" bytes"); + if (bytesRead == -1) { + // nothing to return + if (log.isDebugEnabled()) log.debug("getResponseStream: end of stream, returning NullInputStream"); + return NullInputStream.instance; + } + + if (log.isDebugEnabled()) log.debug("getResponseStream: read "+bytesRead+", returning buffer"); + totalBytesRead += bytesRead; + return new ByteArrayInputStream(buffer, 0, bytesRead); + } + } + } diff --git a/bubble-server/src/main/resources/logback.xml b/bubble-server/src/main/resources/logback.xml index 88c6b869..273a6a6f 100644 --- a/bubble-server/src/main/resources/logback.xml +++ b/bubble-server/src/main/resources/logback.xml @@ -36,7 +36,10 @@ - + + + + diff --git a/utils/cobbzilla-utils b/utils/cobbzilla-utils index 2c282967..5091e557 160000 --- a/utils/cobbzilla-utils +++ b/utils/cobbzilla-utils @@ -1 +1 @@ -Subproject commit 2c28296705e818833b2a1325bc2201ef19fe4bf1 +Subproject commit 5091e557f6c01d8e0191d458174613249d1f1631