@@ -26,7 +26,8 @@ def filter_chunk(chunk, req_id, content_type=None, device=None, matchers=None): | |||
if not response.ok: | |||
err_message = "filter_chunk: Error fetching " + url + ", HTTP status " + str(response.status_code) | |||
bubble_log(err_message) | |||
raise RuntimeError(err_message) | |||
return b'' | |||
# raise RuntimeError(err_message) | |||
return response.content | |||
# NOOP code: | |||
@@ -45,7 +45,7 @@ class Rerouter: | |||
host = host.decode() | |||
except (UnicodeDecodeError, AttributeError): | |||
bubble_log("get_matchers: host "+str(host)+" could not be decoded, type="+str(type(host))) | |||
pass | |||
return None | |||
resp = bubble_matchers(remote_addr, flow, host) | |||
if (not resp) or (not 'matchers' in resp) or (not 'device' in resp): | |||
@@ -7,6 +7,7 @@ import bubble.model.account.Account; | |||
import bubble.model.app.AppMatcher; | |||
import bubble.model.device.Device; | |||
import bubble.service.cloud.DeviceIdService; | |||
import bubble.service.stream.RuleEngine; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.collection.ArrayUtil; | |||
import org.cobbzilla.util.collection.ExpirationMap; | |||
@@ -189,14 +190,6 @@ public class FilterHttpResource { | |||
log.debug("filterHttp: starting with requestId="+requestId+", deviceId="+deviceId+", matchersJson="+matchersJson+", contentType="+contentType+", last="+last); | |||
} | |||
// // for now, just try to return unmodified... | |||
// if (last != null && last) { | |||
// log.debug("filterHttp: DEBUG: last chunk detected, returning empty response"); | |||
// return ok(); // no response body | |||
// } else { | |||
// log.debug("filterHttp: DEBUG: chunk detected, returning chunk as passthru response"); | |||
// return passthru(request); | |||
// } | |||
final boolean isLast = last != null && last; | |||
return ruleEngine.applyRulesToChunkAndSendResponse(request, | |||
filterRequest.getId(), filterRequest.getAccount(), filterRequest.getDevice(), | |||
@@ -7,10 +7,10 @@ import bubble.model.app.AppMatcher; | |||
import bubble.model.device.Device; | |||
import bubble.server.BubbleConfiguration; | |||
import bubble.service.cloud.DeviceIdService; | |||
import bubble.service.stream.RuleEngine; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.http.URIBean; | |||
import org.cobbzilla.util.string.StringUtil; | |||
import org.glassfish.grizzly.http.server.Request; | |||
import org.glassfish.jersey.server.ContainerRequest; | |||
import org.glassfish.jersey.server.ContainerResponse; | |||
@@ -4,7 +4,7 @@ import bubble.model.account.Account; | |||
import bubble.model.app.AppMatcher; | |||
import bubble.model.app.AppRule; | |||
import bubble.model.device.Device; | |||
import bubble.resources.stream.AppRuleHarness; | |||
import bubble.service.stream.AppRuleHarness; | |||
import bubble.resources.stream.FilterMatchersRequest; | |||
import com.fasterxml.jackson.databind.JsonNode; | |||
import com.github.jknack.handlebars.Handlebars; | |||
@@ -50,13 +50,6 @@ public interface AppRuleDriver { | |||
default InputStream doFilterResponse(InputStream in) { return in; } | |||
default InputStream filterResponseChunk(String requestId, InputStream in, boolean last) { | |||
if (hasNext()) return doFilterResponseChunk(requestId, getNext().filterResponseChunk(requestId, in, last), last); | |||
return doFilterResponse(in); | |||
} | |||
default InputStream doFilterResponseChunk(String requestId, InputStream in, boolean last) { return in; } | |||
default String resolveResource(String res, Map<String, Object> ctx) { | |||
final String resource = locateResource(res); | |||
if (resource == null || resource.trim().length() == 0) return ""; | |||
@@ -4,7 +4,7 @@ import bubble.dao.app.AppDataDAO; | |||
import bubble.model.account.Account; | |||
import bubble.model.app.AppData; | |||
import bubble.resources.stream.FilterMatchersRequest; | |||
import bubble.resources.stream.AppRuleHarness; | |||
import bubble.service.stream.AppRuleHarness; | |||
import bubble.rule.AbstractAppRuleDriver; | |||
import org.glassfish.grizzly.http.server.Request; | |||
import org.glassfish.jersey.server.ContainerRequest; | |||
@@ -7,6 +7,7 @@ import bubble.service.cloud.RequestCoordinationService; | |||
import com.fasterxml.jackson.databind.JsonNode; | |||
import com.github.jknack.handlebars.Handlebars; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.apache.commons.io.input.ReaderInputStream; | |||
import org.cobbzilla.util.io.regex.RegexFilterReader; | |||
import org.cobbzilla.util.io.regex.RegexInsertionFilter; | |||
@@ -25,16 +26,19 @@ import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.string.StringUtil.UTF8cs; | |||
@Slf4j | |||
public class UserBlocker extends AbstractAppRuleDriver { | |||
private static final int RESPONSE_BUFSIZ = (int) (64 * Bytes.KB); | |||
@Autowired private RequestCoordinationService requestService; | |||
@Autowired private AppDataDAO appDataDAO; | |||
private final String requestId = randomUUID().toString().replace("-", "_"); | |||
// This gets called after autowiring, so `configuration` object will be non-null by now | |||
@Getter(lazy=true) private final JsonNode fullConfig = initFullConfig(); | |||
private JsonNode initFullConfig() { | |||
UserBlockerConfig userBlockerConfig; | |||
try { | |||
@@ -52,14 +56,16 @@ public class UserBlocker extends AbstractAppRuleDriver { | |||
final String json = json(userBlockerConfig); | |||
requestService.set(UserBlocker.class.getName(), requestId, json); | |||
return json(json, JsonNode.class); | |||
} | |||
protected UserBlockerConfig configObject() { return json(getFullConfig(), UserBlockerConfig.class); } | |||
@Override public InputStream doFilterResponse(InputStream in) { | |||
final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(matcher, rule, configuration.getBean(AppDataDAO.class)); | |||
final UserBlockerStreamFilter filter = new UserBlockerStreamFilter(matcher, rule); | |||
filter.configure(getFullConfig()); | |||
filter.setDataDAO(appDataDAO); | |||
RegexFilterReader reader = new RegexFilterReader(in, RESPONSE_BUFSIZ, filter).setName("mainFilterReader"); | |||
final UserBlockerConfig config = configObject(); | |||
@@ -8,6 +8,7 @@ import bubble.model.app.AppMatcher; | |||
import bubble.model.app.AppRule; | |||
import com.fasterxml.jackson.databind.JsonNode; | |||
import com.ning.http.util.Base64; | |||
import lombok.Setter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.handlebars.HandlebarsUtil; | |||
import org.cobbzilla.util.io.regex.RegexChunk; | |||
@@ -34,12 +35,11 @@ public class UserBlockerStreamFilter implements RegexStreamFilter { | |||
private AppMatcher matcher; | |||
private AppRule rule; | |||
private AppDataDAO dataDAO; | |||
@Setter private AppDataDAO dataDAO; | |||
public UserBlockerStreamFilter(AppMatcher matcher, AppRule rule, AppDataDAO dataDAO) { | |||
public UserBlockerStreamFilter(AppMatcher matcher, AppRule rule) { | |||
this.matcher = matcher; | |||
this.rule = rule; | |||
this.dataDAO = dataDAO; | |||
} | |||
private enum UserBlockerStreamState { seeking_comments, blocking_comments } | |||
@@ -1,4 +1,4 @@ | |||
package bubble.resources.stream; | |||
package bubble.service.stream; | |||
import bubble.model.app.AppMatcher; | |||
import bubble.model.app.RuleDriver; |
@@ -1,4 +1,4 @@ | |||
package bubble.resources.stream; | |||
package bubble.service.stream; | |||
import bubble.dao.app.AppMatcherDAO; | |||
import bubble.dao.app.AppRuleDAO; | |||
@@ -8,6 +8,7 @@ import bubble.model.app.AppMatcher; | |||
import bubble.model.app.AppRule; | |||
import bubble.model.app.RuleDriver; | |||
import bubble.model.device.Device; | |||
import bubble.resources.stream.FilterMatchersRequest; | |||
import bubble.rule.AppRuleDriver; | |||
import bubble.server.BubbleConfiguration; | |||
import lombok.Cleanup; | |||
@@ -23,9 +24,13 @@ import org.apache.http.client.methods.HttpRequestBase; | |||
import org.apache.http.entity.InputStreamEntity; | |||
import org.apache.http.impl.client.CloseableHttpClient; | |||
import org.apache.http.impl.client.HttpClientBuilder; | |||
import org.cobbzilla.util.collection.ExpirationEvictionPolicy; | |||
import org.cobbzilla.util.collection.ExpirationMap; | |||
import org.cobbzilla.util.http.HttpClosingFilterInputStream; | |||
import org.cobbzilla.util.http.HttpMethods; | |||
import org.cobbzilla.util.http.URIBean; | |||
import org.cobbzilla.util.io.NullInputStream; | |||
import org.cobbzilla.util.io.multi.MultiStream; | |||
import org.cobbzilla.wizard.stream.ByteStreamingOutput; | |||
import org.cobbzilla.wizard.stream.SendableResource; | |||
import org.cobbzilla.wizard.stream.StreamStreamingOutput; | |||
@@ -35,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.stereotype.Service; | |||
import javax.ws.rs.core.Response; | |||
import java.io.ByteArrayInputStream; | |||
import java.io.ByteArrayOutputStream; | |||
import java.io.IOException; | |||
import java.io.InputStream; | |||
@@ -43,9 +49,12 @@ import java.util.List; | |||
import java.util.Map; | |||
import static bubble.client.BubbleApiClient.newHttpClientBuilder; | |||
import static java.util.concurrent.TimeUnit.MINUTES; | |||
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; | |||
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.empty; | |||
import static org.cobbzilla.util.http.HttpStatusCodes.OK; | |||
import static org.cobbzilla.util.io.StreamUtil.DEFAULT_BUFFER_SIZE; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.send; | |||
@Service @Slf4j | |||
@@ -128,6 +137,8 @@ public class RuleEngine { | |||
public Response passthru(ContainerRequest request) { return passthru(request.getEntityStream()); } | |||
private final Map<String, ActiveStreamState> activeProcessors = new ExpirationMap<>(MINUTES.toMillis(5), ExpirationEvictionPolicy.atime); | |||
public Response applyRulesToChunkAndSendResponse(ContainerRequest request, | |||
String requestId, | |||
Account account, | |||
@@ -135,14 +146,29 @@ public class RuleEngine { | |||
String[] matcherIds, | |||
boolean last) throws IOException { | |||
if (empty(matcherIds)) return passthru(null, request); | |||
if (empty(matcherIds)) return passthru(request); | |||
// have we seen this request before? | |||
ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(initRules(account, device, matcherIds))); | |||
final byte[] chunk = toBytes(request.getEntityStream()); | |||
if (last) { | |||
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding LAST stream"); | |||
state.addLastChunk(chunk); | |||
} else { | |||
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding a stream"); | |||
state.addChunk(chunk); | |||
} | |||
// init rules | |||
final List<AppRuleHarness> ruleHarnesses = initRules(account, device, matcherIds); | |||
final AppRuleHarness firstRule = ruleHarnesses.get(0); | |||
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: sending as much filtered data as we can right now (which may be nothing)"); | |||
// return sendResponse(new ByteArrayInputStream(chunk)); // noop for testing | |||
return sendResponse(state.getResponseStream(last)); | |||
} | |||
final InputStream responseEntity = firstRule.getDriver().filterResponseChunk(requestId, request.getEntityStream(), last); | |||
return sendResponse(responseEntity); | |||
public byte[] toBytes(InputStream entityStream) throws IOException { | |||
if (entityStream == null) return EMPTY_BYTE_ARRAY; | |||
final ByteArrayOutputStream bout = new ByteArrayOutputStream(8192); | |||
IOUtils.copyLarge(entityStream, bout); | |||
return bout.toByteArray(); | |||
} | |||
private List<AppRuleHarness> initRules(Account account, Device device, String[] matcherIds) { | |||
@@ -217,4 +243,68 @@ public class RuleEngine { | |||
@Getter(lazy=true) private final HttpClientBuilder httpClientBuilder = newHttpClientBuilder(1000, 50); | |||
public CloseableHttpClient newHttpConn() { return getHttpClientBuilder().build(); } | |||
private static class ActiveStreamState { | |||
private MultiStream multiStream; | |||
private AppRuleHarness firstRule; | |||
private InputStream output = null; | |||
private long totalBytesWritten = 0; | |||
private long totalBytesRead = 0; | |||
public ActiveStreamState(List<AppRuleHarness> rules) { this.firstRule = rules.get(0); } | |||
public void addChunk(byte[] chunk) { | |||
if (log.isDebugEnabled()) log.debug("addChunk: adding "+chunk.length+" bytes"); | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk)); | |||
output = firstRule.getDriver().filterResponse(multiStream); | |||
} else { | |||
multiStream.addStream(new ByteArrayInputStream(chunk)); | |||
} | |||
} | |||
public void addLastChunk(byte[] chunk) { | |||
if (log.isDebugEnabled()) log.debug("addLastChunk: adding "+chunk.length+" bytes"); | |||
totalBytesWritten += chunk.length; | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk), true); | |||
output = firstRule.getDriver().filterResponse(multiStream); | |||
} else { | |||
multiStream.addLastStream(new ByteArrayInputStream(chunk)); | |||
} | |||
} | |||
public InputStream getResponseStream(boolean last) throws IOException { | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: starting with last="+last+", totalBytesWritten="+totalBytesWritten+", totalBytesRead="+totalBytesRead); | |||
// read to end of all streams, there is no more data coming in | |||
if (last) { | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: last==true, returning full output"); | |||
return output; | |||
} | |||
// try to read as many bytes as we have written, and have not yet read, less a safety buffer | |||
final int bytesToRead = (int) (totalBytesWritten - totalBytesRead - (4*DEFAULT_BUFFER_SIZE)); | |||
if (bytesToRead < 0) { | |||
// we shouldn't try to read yet, less than 1024 bytes have been written | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: not enough data written (bytesToRead="+bytesToRead+"), can't read anything yet"); | |||
return NullInputStream.instance; | |||
} | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: trying to read "+bytesToRead+" bytes"); | |||
final byte[] buffer = new byte[bytesToRead]; | |||
final int bytesRead = output.read(buffer); | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: actually read "+bytesRead+" bytes"); | |||
if (bytesRead == -1) { | |||
// nothing to return | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: end of stream, returning NullInputStream"); | |||
return NullInputStream.instance; | |||
} | |||
if (log.isDebugEnabled()) log.debug("getResponseStream: read "+bytesRead+", returning buffer"); | |||
totalBytesRead += bytesRead; | |||
return new ByteArrayInputStream(buffer, 0, bytesRead); | |||
} | |||
} | |||
} |
@@ -36,7 +36,10 @@ | |||
<logger name="org.cobbzilla.wizard.dao.AbstractCRUDDAO" level="WARN" /> | |||
<logger name="org.cobbzilla.wizard.server.listener.BrowserLauncherListener" level="INFO" /> | |||
<logger name="bubble.service.notify.NotificationService" level="WARN" /> | |||
<logger name="bubble.resources.stream" level="DEBUG" /> | |||
<logger name="bubble.resources.stream" level="INFO" /> | |||
<logger name="org.cobbzilla.util.io.regex" level="INFO" /> | |||
<logger name="org.cobbzilla.util.io.multi" level="INFO" /> | |||
<logger name="bubble.rule.social.block" level="INFO" /> | |||
<!-- <logger name="bubble.service.cloud.StandardNetworkService" level="INFO" />--> | |||
<logger name="bubble.resources.notify" level="WARN" /> | |||
<logger name="bubble.client" level="WARN" /> | |||
@@ -1 +1 @@ | |||
Subproject commit 2c28296705e818833b2a1325bc2201ef19fe4bf1 | |||
Subproject commit 5091e557f6c01d8e0191d458174613249d1f1631 |