diff --git a/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java b/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java index 8a6552bf..3cdf100d 100644 --- a/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java +++ b/bubble-server/src/main/java/bubble/rule/AppRuleDriver.java @@ -43,12 +43,12 @@ public interface AppRuleDriver { default InputStream doFilterRequest(InputStream in) { return in; } - default InputStream filterResponse(InputStream in) { - if (hasNext()) return doFilterResponse(getNext().filterResponse(in)); - return doFilterResponse(in); + default InputStream filterResponse(String requestId, InputStream in) { + if (hasNext()) return doFilterResponse(requestId, getNext().filterResponse(requestId, in)); + return doFilterResponse(requestId, in); } - default InputStream doFilterResponse(InputStream in) { return in; } + default InputStream doFilterResponse(String requestId, InputStream in) { return in; } default String resolveResource(String res, Map ctx) { final String resource = locateResource(res); diff --git a/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java b/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java index 693e7efe..f289ff7f 100644 --- a/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java +++ b/bubble-server/src/main/java/bubble/rule/social/block/UserBlocker.java @@ -62,8 +62,8 @@ public class UserBlocker extends AbstractAppRuleDriver { protected UserBlockerConfig configObject() { return json(getFullConfig(), UserBlockerConfig.class); } - @Override public InputStream doFilterResponse(InputStream in) { - final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(matcher, rule); + @Override public InputStream doFilterResponse(String requestId, InputStream in) { + final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(requestId, matcher, rule); filter.configure(getFullConfig()); filter.setDataDAO(appDataDAO); RegexFilterReader reader = new RegexFilterReader(in, RESPONSE_BUFSIZ, filter).setName("mainFilterReader"); diff --git a/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java b/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java index 2cbef9a3..6699caa8 100644 --- a/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java +++ b/bubble-server/src/main/java/bubble/rule/social/block/UserBlockerStreamFilter.java @@ -10,6 +10,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.ning.http.util.Base64; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.cobbzilla.util.collection.ExpirationEvictionPolicy; +import org.cobbzilla.util.collection.ExpirationMap; import org.cobbzilla.util.handlebars.HandlebarsUtil; import org.cobbzilla.util.io.regex.RegexChunk; import org.cobbzilla.util.io.regex.RegexChunkStreamer; @@ -21,6 +23,7 @@ import java.util.Map; import java.util.regex.Matcher; import static bubble.rule.social.block.UserBlockerConfig.STANDARD_JS_ENGINE; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.cobbzilla.util.json.JsonUtil.COMPACT_MAPPER; import static org.cobbzilla.util.json.JsonUtil.json; @@ -33,11 +36,13 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { public static final String PROP_UNBLOCK_URL = "unblockUrl"; public static final String PROP_CHUNK_START_REGEX = "chunkStartRegex"; + private String requestId; private AppMatcher matcher; private AppRule rule; @Setter private AppDataDAO dataDAO; - public UserBlockerStreamFilter(AppMatcher matcher, AppRule rule) { + public UserBlockerStreamFilter(String requestId, AppMatcher matcher, AppRule rule) { + this.requestId = requestId; this.matcher = matcher; this.rule = rule; } @@ -75,7 +80,7 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { if (userId == null) { log.warn("apply: no userId found for chunk: "+pageChunk); append(result, pageChunk.getData()); - } else if (!isUserBlocked(userId)) { + } else if (!isUserBlocked(requestId, userId)) { // add controls to unblocked comment, to allow it to be blocked append(result, decorate(pageChunk)); } else { @@ -141,15 +146,16 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { return HandlebarsUtil.apply(config.getHandlebars(), config.getBlockedCommentReplacement(), ctx); } - protected boolean isUserBlocked(String userId) { - // todo: cache these lookups for a while - if (userId == null) return false; - final String data = dataDAO.findValueByAppAndSiteAndKey(config.getApp(), matcher.getSite(), userId); - return Boolean.parseBoolean(data); + private Map blockCache = new ExpirationMap<>(MINUTES.toMillis(1), ExpirationEvictionPolicy.atime); + protected boolean isUserBlocked(String requestId, String userId) { + return blockCache.computeIfAbsent(requestId+":"+userId, k -> { + if (userId == null) return false; + final String data = dataDAO.findValueByAppAndSiteAndKey(config.getApp(), matcher.getSite(), userId); + return Boolean.parseBoolean(data); + }); } private boolean isCommentBlocked(RegexChunk blockedComment, RegexChunk candidateComment) { - // if it's the same user, of course it is blocked final String blockedUser = blockedComment.getProperty(PROP_USER_ID); final String thisUser = candidateComment.getProperty(PROP_USER_ID); diff --git a/bubble-server/src/main/java/bubble/service/stream/RuleEngine.java b/bubble-server/src/main/java/bubble/service/stream/RuleEngine.java index 28940bfd..5ac18873 100644 --- a/bubble-server/src/main/java/bubble/service/stream/RuleEngine.java +++ b/bubble-server/src/main/java/bubble/service/stream/RuleEngine.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import static bubble.client.BubbleApiClient.newHttpClientBuilder; +import static java.util.UUID.randomUUID; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MINUTES; import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; @@ -125,7 +126,8 @@ public class RuleEngine { final CloseableHttpResponse proxyResponse = httpClient.execute(get); // filter response. when stream is closed, close http client - final InputStream responseEntity = firstRule.getDriver().filterResponse(new HttpClosingFilterInputStream(httpClient, proxyResponse)); + final String requestId = randomUUID().toString(); + final InputStream responseEntity = firstRule.getDriver().filterResponse(requestId, new HttpClosingFilterInputStream(httpClient, proxyResponse)); // send response return sendResponse(responseEntity, proxyResponse); @@ -151,7 +153,7 @@ public class RuleEngine { if (empty(matcherIds)) return passthru(request); // have we seen this request before? - final ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(initRules(account, device, matcherIds))); + final ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(k, initRules(account, device, matcherIds))); final byte[] chunk = toBytes(request.getEntityStream()); if (last) { if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding LAST stream"); @@ -251,20 +253,24 @@ public class RuleEngine { private static class ActiveStreamState { + private String requestId; private MultiStream multiStream; private AppRuleHarness firstRule; private InputStream output = null; private long totalBytesWritten = 0; private long totalBytesRead = 0; - public ActiveStreamState(List rules) { this.firstRule = rules.get(0); } + public ActiveStreamState(String requestId, List rules) { + this.requestId = requestId; + this.firstRule = rules.get(0); + } public void addChunk(byte[] chunk) { if (log.isDebugEnabled()) log.debug("addChunk: adding "+chunk.length+" bytes"); totalBytesWritten += chunk.length; if (multiStream == null) { multiStream = new MultiStream(new ByteArrayInputStream(chunk)); - output = firstRule.getDriver().filterResponse(multiStream); + output = firstRule.getDriver().filterResponse(requestId, multiStream); } else { multiStream.addStream(new ByteArrayInputStream(chunk)); } @@ -275,7 +281,7 @@ public class RuleEngine { totalBytesWritten += chunk.length; if (multiStream == null) { multiStream = new MultiStream(new ByteArrayInputStream(chunk), true); - output = firstRule.getDriver().filterResponse(multiStream); + output = firstRule.getDriver().filterResponse(requestId, multiStream); } else { multiStream.addLastStream(new ByteArrayInputStream(chunk)); } diff --git a/utils/cobbzilla-utils b/utils/cobbzilla-utils index 5091e557..e1a46c09 160000 --- a/utils/cobbzilla-utils +++ b/utils/cobbzilla-utils @@ -1 +1 @@ -Subproject commit 5091e557f6c01d8e0191d458174613249d1f1631 +Subproject commit e1a46c09d4414954a84e91f96f7c70894f60dd3c