|
|
@@ -49,10 +49,12 @@ import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
|
|
|
|
import static bubble.client.BubbleApiClient.newHttpClientBuilder; |
|
|
|
import static java.util.concurrent.TimeUnit.HOURS; |
|
|
|
import static java.util.concurrent.TimeUnit.MINUTES; |
|
|
|
import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH; |
|
|
|
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; |
|
|
|
import static org.cobbzilla.util.daemon.ZillaRuntime.empty; |
|
|
|
import static org.cobbzilla.util.daemon.ZillaRuntime.hashOf; |
|
|
|
import static org.cobbzilla.util.http.HttpStatusCodes.OK; |
|
|
|
import static org.cobbzilla.util.io.StreamUtil.DEFAULT_BUFFER_SIZE; |
|
|
|
import static org.cobbzilla.wizard.resources.ResourceUtil.send; |
|
|
@@ -149,7 +151,7 @@ public class RuleEngine { |
|
|
|
if (empty(matcherIds)) return passthru(request); |
|
|
|
|
|
|
|
// have we seen this request before? |
|
|
|
ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(initRules(account, device, matcherIds))); |
|
|
|
final ActiveStreamState state = activeProcessors.computeIfAbsent(requestId, k -> new ActiveStreamState(initRules(account, device, matcherIds))); |
|
|
|
final byte[] chunk = toBytes(request.getEntityStream()); |
|
|
|
if (last) { |
|
|
|
if (log.isDebugEnabled()) log.debug("applyRulesToChunkAndSendResponse: adding LAST stream"); |
|
|
@@ -171,22 +173,26 @@ public class RuleEngine { |
|
|
|
return bout.toByteArray(); |
|
|
|
} |
|
|
|
|
|
|
|
private ExpirationMap<String, List<AppRuleHarness>> ruleCache = new ExpirationMap<>(HOURS.toMillis(1), ExpirationEvictionPolicy.atime); |
|
|
|
|
|
|
|
private List<AppRuleHarness> initRules(Account account, Device device, String[] matcherIds) { |
|
|
|
final List<AppMatcher> matchers = matcherDAO.findByUuids(matcherIds); |
|
|
|
if (matchers.size() != matcherIds.length) { |
|
|
|
log.warn("initRules: duplicate rules, or could not resolve some rule(s)"); |
|
|
|
} |
|
|
|
List<AppRuleHarness> ruleHarnesses = new ArrayList<>(); |
|
|
|
for (AppMatcher m : matchers) { |
|
|
|
final AppRule rule = ruleDAO.findByUuid(m.getRule()); |
|
|
|
if (rule == null) { |
|
|
|
log.warn("initRules: rule not found ("+m.getRule()+") for matcher: "+m.getUuid()); |
|
|
|
continue; |
|
|
|
final String cacheKey = hashOf(account.getUuid(), device.getUuid(), matcherIds); |
|
|
|
return ruleCache.computeIfAbsent(cacheKey, k -> { |
|
|
|
final List<AppMatcher> matchers = matcherDAO.findByUuids(matcherIds); |
|
|
|
if (matchers.size() != matcherIds.length) { |
|
|
|
log.warn("initRules: duplicate rules, or could not resolve some rule(s)"); |
|
|
|
} |
|
|
|
ruleHarnesses.add(new AppRuleHarness(m, rule)); |
|
|
|
} |
|
|
|
ruleHarnesses = initRules(account, device, ruleHarnesses); |
|
|
|
return ruleHarnesses; |
|
|
|
final List<AppRuleHarness> ruleHarnesses = new ArrayList<>(); |
|
|
|
for (AppMatcher m : matchers) { |
|
|
|
final AppRule rule = ruleDAO.findByUuid(m.getRule()); |
|
|
|
if (rule == null) { |
|
|
|
log.warn("initRules: rule not found ("+m.getRule()+") for matcher: "+m.getUuid()); |
|
|
|
continue; |
|
|
|
} |
|
|
|
ruleHarnesses.add(new AppRuleHarness(m, rule)); |
|
|
|
} |
|
|
|
return initRules(account, device, ruleHarnesses); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
private List<AppRuleHarness> initRules(Account account, Device device, List<AppRuleHarness> rules) { |
|
|
|