@@ -43,12 +43,12 @@ public interface AppRuleDriver { | |||
default InputStream doFilterRequest(InputStream in) { return in; } | |||
default InputStream filterResponse(InputStream in) { | |||
if (hasNext()) return doFilterResponse(getNext().filterResponse(in)); | |||
return doFilterResponse(in); | |||
default InputStream filterResponse(String requestId, InputStream in) { | |||
if (hasNext()) return doFilterResponse(requestId, getNext().filterResponse(requestId, in)); | |||
return doFilterResponse(requestId, in); | |||
} | |||
default InputStream doFilterResponse(InputStream in) { return in; } | |||
default InputStream doFilterResponse(String requestId, InputStream in) { return in; } | |||
default String resolveResource(String res, Map<String, Object> ctx) { | |||
final String resource = locateResource(res); | |||
@@ -62,8 +62,8 @@ public class UserBlocker extends AbstractAppRuleDriver { | |||
protected UserBlockerConfig configObject() { return json(getFullConfig(), UserBlockerConfig.class); } | |||
@Override public InputStream doFilterResponse(InputStream in) { | |||
final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(matcher, rule); | |||
@Override public InputStream doFilterResponse(String requestId, InputStream in) { | |||
final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(requestId, matcher, rule); | |||
filter.configure(getFullConfig()); | |||
filter.setDataDAO(appDataDAO); | |||
RegexFilterReader reader = new RegexFilterReader(in, RESPONSE_BUFSIZ, filter).setName("mainFilterReader"); | |||
@@ -10,6 +10,8 @@ import com.fasterxml.jackson.databind.JsonNode; | |||
import com.ning.http.util.Base64; | |||
import lombok.Setter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.collection.ExpirationEvictionPolicy; | |||
import org.cobbzilla.util.collection.ExpirationMap; | |||
import org.cobbzilla.util.handlebars.HandlebarsUtil; | |||
import org.cobbzilla.util.io.regex.RegexChunk; | |||
import org.cobbzilla.util.io.regex.RegexChunkStreamer; | |||
@@ -21,6 +23,7 @@ import java.util.Map; | |||
import java.util.regex.Matcher; | |||
import static bubble.rule.social.block.UserBlockerConfig.STANDARD_JS_ENGINE; | |||
import static java.util.concurrent.TimeUnit.MINUTES; | |||
import static org.cobbzilla.util.json.JsonUtil.COMPACT_MAPPER; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
@@ -33,11 +36,13 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { | |||
public static final String PROP_UNBLOCK_URL = "unblockUrl"; | |||
public static final String PROP_CHUNK_START_REGEX = "chunkStartRegex"; | |||
private String requestId; | |||
private AppMatcher matcher; | |||
private AppRule rule; | |||
@Setter private AppDataDAO dataDAO; | |||
public UserBlockerStreamFilter(AppMatcher matcher, AppRule rule) { | |||
public UserBlockerStreamFilter(String requestId, AppMatcher matcher, AppRule rule) { | |||
this.requestId = requestId; | |||
this.matcher = matcher; | |||
this.rule = rule; | |||
} | |||
@@ -75,7 +80,7 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { | |||
if (userId == null) { | |||
log.warn("apply: no userId found for chunk: "+pageChunk); | |||
append(result, pageChunk.getData()); | |||
} else if (!isUserBlocked(userId)) { | |||
} else if (!isUserBlocked(requestId, userId)) { | |||
// add controls to unblocked comment, to allow it to be blocked | |||
append(result, decorate(pageChunk)); | |||
} else { | |||
@@ -141,15 +146,16 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { | |||
return HandlebarsUtil.apply(config.getHandlebars(), config.getBlockedCommentReplacement(), ctx); | |||
} | |||
protected boolean isUserBlocked(String userId) { | |||
// todo: cache these lookups for a while | |||
if (userId == null) return false; | |||
final String data = dataDAO.findValueByAppAndSiteAndKey(config.getApp(), matcher.getSite(), userId); | |||
return Boolean.parseBoolean(data); | |||
private Map<String, Boolean> blockCache = new ExpirationMap<>(MINUTES.toMillis(1), ExpirationEvictionPolicy.atime); | |||
protected boolean isUserBlocked(String requestId, String userId) { | |||
return blockCache.computeIfAbsent(requestId+":"+userId, k -> { | |||
if (userId == null) return false; | |||
final String data = dataDAO.findValueByAppAndSiteAndKey(config.getApp(), matcher.getSite(), userId); | |||
return Boolean.parseBoolean(data); | |||
}); | |||
} | |||
private boolean isCommentBlocked(RegexChunk blockedComment, RegexChunk candidateComment) { | |||
// if it's the same user, of course it is blocked | |||
final String blockedUser = blockedComment.getProperty(PROP_USER_ID); | |||
final String thisUser = candidateComment.getProperty(PROP_USER_ID); | |||
@@ -49,6 +49,7 @@ import java.util.List; | |||
import java.util.Map; | |||
import static bubble.client.BubbleApiClient.newHttpClientBuilder; | |||
import static java.util.UUID.randomUUID; | |||
import static java.util.concurrent.TimeUnit.HOURS; | |||
import static java.util.concurrent.TimeUnit.MINUTES; | |||
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; | |||
@@ -125,7 +126,8 @@ public class RuleEngine { | |||
final CloseableHttpResponse proxyResponse = httpClient.execute(get); | |||
// filter response. when stream is closed, close http client | |||
final InputStream responseEntity = firstRule.getDriver().filterResponse(new HttpClosingFilterInputStream(httpClient, proxyResponse)); | |||
final String requestId = randomUUID().toString(); | |||
final InputStream responseEntity = firstRule.getDriver().filterResponse(requestId, new HttpClosingFilterInputStream(httpClient, proxyResponse)); | |||
// send response | |||
return sendResponse(responseEntity, proxyResponse); | |||
@@ -151,7 +153,7 @@ public class RuleEngine { | |||
if (empty(matcherIds)) return passthru(request); | |||
// have we seen this request before? | |||
final ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(initRules(account, device, matcherIds))); | |||
final ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(k, initRules(account, device, matcherIds))); | |||
final byte[] chunk = toBytes(request.getEntityStream()); | |||
if (last) { | |||
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding LAST stream"); | |||
@@ -251,20 +253,24 @@ public class RuleEngine { | |||
private static class ActiveStreamState { | |||
private String requestId; | |||
private MultiStream multiStream; | |||
private AppRuleHarness firstRule; | |||
private InputStream output = null; | |||
private long totalBytesWritten = 0; | |||
private long totalBytesRead = 0; | |||
public ActiveStreamState(List<AppRuleHarness> rules) { this.firstRule = rules.get(0); } | |||
public ActiveStreamState(String requestId, List<AppRuleHarness> rules) { | |||
this.requestId = requestId; | |||
this.firstRule = rules.get(0); | |||
} | |||
public void addChunk(byte[] chunk) { | |||
if (log.isDebugEnabled()) log.debug("addChunk: adding "+chunk.length+" bytes"); | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk)); | |||
output = firstRule.getDriver().filterResponse(multiStream); | |||
output = firstRule.getDriver().filterResponse(requestId, multiStream); | |||
} else { | |||
multiStream.addStream(new ByteArrayInputStream(chunk)); | |||
} | |||
@@ -275,7 +281,7 @@ public class RuleEngine { | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk), true); | |||
output = firstRule.getDriver().filterResponse(multiStream); | |||
output = firstRule.getDriver().filterResponse(requestId, multiStream); | |||
} else { | |||
multiStream.addLastStream(new ByteArrayInputStream(chunk)); | |||
} | |||
@@ -1 +1 @@ | |||
Subproject commit 5091e557f6c01d8e0191d458174613249d1f1631 | |||
Subproject commit e1a46c09d4414954a84e91f96f7c70894f60dd3c |