@@ -139,7 +139,7 @@ public class AppMatcher extends IdentifiableBase implements AppTemplateEntity, H | |||
@Getter @Setter private Boolean requestCheck; | |||
public boolean requestCheck () { return bool(requestCheck); } | |||
@ECSearchable @ECField(index=130) | |||
@ECSearchable @ECField(index=140) | |||
@Column(nullable=false) | |||
@Getter @Setter private Integer priority = 0; | |||
@@ -154,4 +154,6 @@ public interface AppRuleDriver { | |||
return ConnectionCheckResponse.noop; | |||
} | |||
default boolean couldModify(FilterHttpRequest request) { return false; } | |||
} |
@@ -33,6 +33,8 @@ public class JsUserBlockerRuleDriver extends AbstractAppRuleDriver { | |||
public static final String CTX_APPLY_BLOCKS_JS = "APPLY_BLOCKS_JS"; | |||
@Override public boolean couldModify(FilterHttpRequest request) { return true; } | |||
@Override public InputStream doFilterResponse(FilterHttpRequest filterRequest, InputStream in) { | |||
if (!isHtml(filterRequest.getContentType())) return in; | |||
@@ -30,6 +30,8 @@ import static org.cobbzilla.util.string.StringUtil.UTF8cs; | |||
@Slf4j | |||
public class UserBlockerRuleDriver extends AbstractAppRuleDriver { | |||
@Override public boolean couldModify(FilterHttpRequest request) { return true; } | |||
// This gets called after autowiring, so `configuration` object will be non-null by now | |||
@Getter(lazy=true) private final JsonNode fullConfig = initFullConfig(); | |||
@@ -5,6 +5,7 @@ | |||
package bubble.service.stream; | |||
import bubble.resources.stream.FilterHttpRequest; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.apache.commons.io.IOUtils; | |||
import org.cobbzilla.util.collection.ExpirationEvictionPolicy; | |||
@@ -12,7 +13,6 @@ import org.cobbzilla.util.collection.ExpirationMap; | |||
import org.cobbzilla.util.http.HttpContentEncodingType; | |||
import org.cobbzilla.util.io.FilterInputStreamViaOutputStream; | |||
import org.cobbzilla.util.io.FixedByteArrayInputStream; | |||
import org.cobbzilla.util.io.NullInputStream; | |||
import org.cobbzilla.util.io.multi.MultiStream; | |||
import org.cobbzilla.util.system.Bytes; | |||
@@ -22,9 +22,12 @@ import java.io.IOException; | |||
import java.io.InputStream; | |||
import java.util.List; | |||
import java.util.Map; | |||
import java.util.concurrent.TimeUnit; | |||
import static java.util.concurrent.TimeUnit.DAYS; | |||
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.empty; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.shortError; | |||
import static org.cobbzilla.util.io.NullInputStream.NULL_STREAM; | |||
@Slf4j | |||
class ActiveStreamState { | |||
@@ -32,11 +35,16 @@ class ActiveStreamState { | |||
public static final long DEFAULT_BYTE_BUFFER_SIZE = (8 * Bytes.KB); | |||
public static final long MAX_BYTE_BUFFER_SIZE = (64 * Bytes.KB); | |||
private FilterHttpRequest request; | |||
private String requestId; | |||
// do not wrap input with encoding stream until we have received at least this many bytes | |||
// this avoids errors when creating a GZIPInputStream when only one or a few bytes are available | |||
public static final long MIN_BYTES_BEFORE_WRAP = 256; | |||
private final FilterHttpRequest request; | |||
private final String requestId; | |||
private final AppRuleHarness firstRule; | |||
@Getter private final boolean passthru; | |||
private HttpContentEncodingType encoding; | |||
private MultiStream multiStream; | |||
private AppRuleHarness firstRule; | |||
private InputStream output = null; | |||
private long totalBytesWritten = 0; | |||
private long totalBytesRead = 0; | |||
@@ -47,6 +55,31 @@ class ActiveStreamState { | |||
this.requestId = request.getId(); | |||
this.encoding = request.getEncoding(); | |||
this.firstRule = rules.get(0); | |||
final String prefix = "ActiveStreamState("+requestId+"): "; | |||
if (empty(rules)) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"no rules, returning passthru"); | |||
passthru = true; | |||
} else if (noApplicableRules(rules)) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"no applicable rules, returning passthru"); | |||
passthru = true; | |||
} else { | |||
passthru = false; | |||
} | |||
} | |||
public boolean noApplicableRules(List<AppRuleHarness> rules) { | |||
for (AppRuleHarness appRule : rules) { | |||
if (appRule.getDriver().couldModify(request)) { | |||
if (log.isTraceEnabled()) log.trace("noApplicableRules("+requestId+"): appRule "+appRule.getRule().getName()+"/"+appRule.getDriver().getClass().getName()+" could modify request, returning false"); | |||
return false; | |||
} else { | |||
if (log.isTraceEnabled()) log.trace("noApplicableRules("+requestId+"): appRule "+appRule.getRule().getName()+"/"+appRule.getDriver().getClass().getName()+" could NOT modify request"); | |||
} | |||
} | |||
return true; | |||
} | |||
private String prefix(String s) { return s+"("+requestId+"): "; } | |||
@@ -66,11 +99,16 @@ class ActiveStreamState { | |||
final byte[] chunk = toBytes(in, chunkLength); | |||
if (log.isDebugEnabled()) log.debug(prefix("addChunk") + "adding " + chunk.length + " bytes"); | |||
totalBytesWritten += chunk.length; | |||
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(chunk); | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk)); | |||
output = outputStream(firstRule.getDriver().filterResponse(request, inputStream(multiStream))); | |||
multiStream = new MultiStream(chunkStream); | |||
} else { | |||
multiStream.addStream(new ByteArrayInputStream(chunk)); | |||
multiStream.addStream(chunkStream); | |||
} | |||
// do not wrap input with encoding stream until we have received at least MIN_BYTES_BEFORE_WRAP bytes | |||
// this avoids errors when creating a GZIPInputStream when only one or a few bytes are available | |||
if (output == null && totalBytesWritten > MIN_BYTES_BEFORE_WRAP) { | |||
output = outputStream(firstRule.getDriver().filterResponse(request, inputStream(multiStream))); | |||
} | |||
} | |||
} | |||
@@ -79,11 +117,14 @@ class ActiveStreamState { | |||
final byte[] chunk = toBytes(in, chunkLength); | |||
if (log.isDebugEnabled()) log.debug(prefix("addLastChunk")+"adding "+chunk.length+" bytes"); | |||
totalBytesWritten += chunk.length; | |||
final ByteArrayInputStream chunkStream = new ByteArrayInputStream(chunk); | |||
if (multiStream == null) { | |||
multiStream = new MultiStream(new ByteArrayInputStream(chunk), true); | |||
output = outputStream(firstRule.getDriver().filterResponse(request, inputStream(multiStream))); | |||
multiStream = new MultiStream(chunkStream, true); | |||
} else { | |||
multiStream.addLastStream(new ByteArrayInputStream(chunk)); | |||
multiStream.addLastStream(chunkStream); | |||
} | |||
if (output == null) { | |||
output = outputStream(firstRule.getDriver().filterResponse(request, inputStream(multiStream))); | |||
} | |||
} | |||
@@ -93,18 +134,22 @@ class ActiveStreamState { | |||
// read to end of all streams, there is no more data coming in | |||
if (last) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"last==true, returning full output"); | |||
return output; | |||
return finalOutputOrNullStream(); | |||
} | |||
if (request.hasContentLength() && totalBytesWritten >= request.getContentLength()) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"all bytes written, returning full output"); | |||
return output; | |||
return finalOutputOrNullStream(); | |||
} | |||
final int bytesToRead = (int) (totalBytesWritten - totalBytesRead - (2 * MAX_BYTE_BUFFER_SIZE)); | |||
if (bytesToRead < 0) { | |||
// we shouldn't try to read yet, less than 1024 bytes have been written | |||
if (log.isDebugEnabled()) log.debug(prefix + "not enough data written (bytesToRead=" + bytesToRead + "), can't read anything yet"); | |||
return NullInputStream.instance; | |||
return NULL_STREAM; | |||
} | |||
if (output == null) { | |||
if (log.isDebugEnabled()) log.debug(prefix + "not enough data written (output is null and bytesToRead=" + bytesToRead + "), can't read anything yet"); | |||
return NULL_STREAM; | |||
} | |||
if (log.isDebugEnabled()) log.debug(prefix+"trying to read "+bytesToRead+" bytes from output="+output.getClass().getSimpleName()); | |||
@@ -114,7 +159,7 @@ class ActiveStreamState { | |||
if (bytesRead == -1) { | |||
// nothing to return | |||
if (log.isDebugEnabled()) log.debug(prefix+"end of stream, returning NullInputStream"); | |||
return NullInputStream.instance; | |||
return NULL_STREAM; | |||
} | |||
if (log.isDebugEnabled()) log.debug(prefix+"read "+bytesRead+", returning buffer"); | |||
@@ -123,7 +168,15 @@ class ActiveStreamState { | |||
return new FixedByteArrayInputStream(buffer, 0, bytesRead); | |||
} | |||
private Map<String, String> doNotWrap = new ExpirationMap<>(TimeUnit.DAYS.toMillis(1), ExpirationEvictionPolicy.atime); | |||
public InputStream finalOutputOrNullStream() { | |||
if (output == null) { | |||
if (log.isErrorEnabled()) log.error("finalOutputOrNullStream: output was null!"); | |||
return NULL_STREAM; | |||
} | |||
return output; | |||
} | |||
private final Map<String, String> doNotWrap = new ExpirationMap<>(DAYS.toMillis(1), ExpirationEvictionPolicy.atime); | |||
private InputStream inputStream(MultiStream baseStream) throws IOException { | |||
final String prefix = prefix("inputStream"); | |||
@@ -145,7 +198,7 @@ class ActiveStreamState { | |||
if (log.isDebugEnabled()) log.debug(prefix+"returning baseStream wrapped in " + wrapped.getClass().getSimpleName()); | |||
return wrapped; | |||
} catch (IOException e) { | |||
if (log.isWarnEnabled()) log.warn(prefix+"error wrapping with "+encoding+", sending as-is (perhaps missing a byte or two)"); | |||
if (log.isWarnEnabled()) log.warn(prefix+"error wrapping with "+encoding+", sending as-is (perhaps missing a byte or two): "+shortError(e), e); | |||
doNotWrap.put(url, url); | |||
return baseStream; | |||
} | |||
@@ -46,9 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.stereotype.Service; | |||
import javax.ws.rs.core.Response; | |||
import java.io.ByteArrayOutputStream; | |||
import java.io.IOException; | |||
import java.io.InputStream; | |||
import java.io.*; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
import java.util.Map; | |||
@@ -63,12 +61,15 @@ import static org.apache.http.HttpHeaders.TRANSFER_ENCODING; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.empty; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.hashOf; | |||
import static org.cobbzilla.util.http.HttpStatusCodes.OK; | |||
import static org.cobbzilla.util.io.FileUtil.abs; | |||
import static org.cobbzilla.wizard.cache.redis.RedisService.ALL_KEYS; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.send; | |||
@Service @Slf4j | |||
public class StandardRuleEngineService implements RuleEngineService { | |||
public static final String HEADER_PASSTHRU = "X-Bubble-Passthru"; | |||
@Autowired private AppRuleDAO ruleDAO; | |||
@Autowired private RuleDriverDAO driverDAO; | |||
@Autowired private BubbleConfiguration configuration; | |||
@@ -139,6 +140,7 @@ public class StandardRuleEngineService implements RuleEngineService { | |||
public Response passthru(InputStream stream) { | |||
final SendableResource response = new SendableResource(new StreamStreamingOutput(stream)) | |||
.addHeader(HEADER_PASSTHRU, HEADER_PASSTHRU) | |||
.setStatus(OK); | |||
return send(response); | |||
} | |||
@@ -147,6 +149,8 @@ public class StandardRuleEngineService implements RuleEngineService { | |||
private final Map<String, ActiveStreamState> activeProcessors = new ExpirationMap<>(MINUTES.toMillis(5), ExpirationEvictionPolicy.atime); | |||
private static final boolean DEBUG_CAPTURE = false; | |||
public Response applyRulesToChunkAndSendResponse(ContainerRequest request, | |||
FilterHttpRequest filterRequest, | |||
Integer chunkLength, | |||
@@ -155,21 +159,37 @@ public class StandardRuleEngineService implements RuleEngineService { | |||
if (!filterRequest.hasMatchers()) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding no matchers, returning passthru"); | |||
return passthru(request); | |||
} else { | |||
log.info(prefix+" applying "); | |||
} | |||
// for debugging problematic requests | |||
if (DEBUG_CAPTURE) { | |||
final byte[] bytes = IOUtils.toByteArray(request.getEntityStream()); | |||
final File temp = new File("/tmp/"+filterRequest.getId()+".raw"); | |||
try (FileOutputStream out = new FileOutputStream(temp, true)) { | |||
log.debug(prefix+"stashed "+bytes.length+" bytes in "+abs(temp)); | |||
IOUtils.copy(new ByteArrayInputStream(bytes), out); | |||
} | |||
return sendResponse(new ByteArrayInputStream(bytes)); // noop for testing | |||
} | |||
// have we seen this request before? | |||
final ActiveStreamState state = activeProcessors.computeIfAbsent(filterRequest.getId(), | |||
k -> new ActiveStreamState(filterRequest, initRules(filterRequest))); | |||
if (state.isPassthru()) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"state is passthru"); | |||
return passthru(request); | |||
} | |||
if (last) { | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding LAST stream"); | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding LAST stream with length="+chunkLength); | |||
state.addLastChunk(request.getEntityStream(), chunkLength); | |||
} else { | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding a stream"); | |||
if (log.isDebugEnabled()) log.debug(prefix+"adding a stream with length="+chunkLength); | |||
state.addChunk(request.getEntityStream(), chunkLength); | |||
} | |||
if (log.isDebugEnabled()) log.debug(prefix+"sending as much filtered data as we can right now (which may be nothing)"); | |||
// return sendResponse(new ByteArrayInputStream(chunk)); // noop for testing | |||
return sendResponse(state.getResponseStream(last)); | |||
} | |||
@@ -45,9 +45,9 @@ | |||
<!-- <logger name="bubble.filters.BubbleRateLimitFilter" level="TRACE" />--> | |||
<!-- <logger name="org.cobbzilla.wizard.filters.RateLimitFilter" level="TRACE" />--> | |||
<!-- <logger name="bubble.service.stream.StandardRuleEngineService" level="DEBUG" />--> | |||
<logger name="bubble.service.stream.ActiveStreamState" level="INFO" /> | |||
<logger name="bubble.resources.stream" level="DEBUG" /> | |||
<logger name="bubble.resources.stream.FilterHttpResource" level="TRACE" /> | |||
<logger name="bubble.service.stream.ActiveStreamState" level="WARN" /> | |||
<logger name="bubble.resources.stream" level="WARN" /> | |||
<logger name="bubble.resources.stream.FilterHttpResource" level="WARN" /> | |||
<!-- <logger name="bubble.resources.stream.FilterHttpResource" level="INFO" />--> | |||
<logger name="bubble.service.stream" level="INFO" /> | |||
<!-- <logger name="bubble.service.account.StandardAccountMessageService" level="DEBUG" />--> | |||
@@ -20,6 +20,7 @@ MITMDUMP_PID_FILE.close() | |||
HEADER_USER_AGENT = 'User-Agent' | |||
HEADER_REFERER = 'Referer' | |||
HEADER_FILTER_PASSTHRU = 'X-Bubble-Passthru' | |||
CTX_BUBBLE_MATCHERS='X-Bubble-Matchers' | |||
CTX_BUBBLE_ABORT='X-Bubble-Abort' | |||
@@ -27,9 +27,8 @@ from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer | |||
from mitmproxy.exceptions import TlsProtocolException | |||
from mitmproxy.net import tls as net_tls | |||
from bubble_api import bubble_log, bubble_conn_check, bubble_activity_log, redis_set | |||
from bubble_api import bubble_log, bubble_conn_check, bubble_activity_log, REDIS, redis_set | |||
from bubble_config import bubble_sage_host, bubble_sage_ip4, bubble_sage_ip6, cert_validation_host | |||
import redis | |||
import json | |||
import subprocess | |||
import traceback | |||
@@ -39,8 +38,6 @@ REDIS_CONN_CHECK_PREFIX = 'bubble_conn_check_' | |||
REDIS_CHECK_DURATION = 60 * 60 # 1 hour timeout | |||
REDIS_KEY_DEVICE_SECURITY_LEVEL_PREFIX = 'bubble_device_security_level_' # defined in StandardDeviceIdService | |||
REDIS = redis.Redis(host='127.0.0.1', port=6379, db=0) | |||
FORCE_PASSTHRU = {'passthru': True} | |||
FORCE_BLOCK = {'block': True} | |||
@@ -114,8 +111,8 @@ class TlsFeedback(TlsLayer): | |||
except TlsProtocolException as e: | |||
tb = traceback.format_exc() | |||
if 'OpenSSL.SSL.ZeroReturnError' in tb: | |||
bubble_log('_establish_tls_with_client: TLS error for '+str(server_address)+', ignoring SSL zero return error for client '+client_address) | |||
return | |||
bubble_log('_establish_tls_with_client: TLS error for '+str(server_address)+'/fqdns='+str(self.fqdns)+', raising SSL zero return error for client '+client_address) | |||
raise e | |||
elif self.fqdns is not None and len(self.fqdns) > 0: | |||
for fqdn in self.fqdns: | |||
@@ -8,7 +8,8 @@ import traceback | |||
from mitmproxy.net.http import Headers | |||
from bubble_config import bubble_port, bubble_host_alias | |||
from bubble_api import CTX_BUBBLE_MATCHERS, CTX_BUBBLE_ABORT, BUBBLE_URI_PREFIX, \ | |||
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, bubble_log, get_flow_ctx, add_flow_ctx | |||
CTX_BUBBLE_REQUEST_ID, CTX_CONTENT_LENGTH, CTX_CONTENT_LENGTH_SENT, bubble_log, get_flow_ctx, add_flow_ctx, \ | |||
HEADER_FILTER_PASSTHRU, REDIS, redis_set | |||
BUFFER_SIZE = 4096 | |||
HEADER_CONTENT_TYPE = 'Content-Type' | |||
@@ -17,8 +18,19 @@ HEADER_CONTENT_ENCODING = 'Content-Encoding' | |||
HEADER_TRANSFER_ENCODING = 'Transfer-Encoding' | |||
BINARY_DATA_HEADER = {HEADER_CONTENT_TYPE: 'application/octet-stream'} | |||
REDIS_FILTER_PASSTHRU_PREFIX = '__chunk_filter_pass__' | |||
REDIS_FILTER_PASSTHRU_DURATION = 600 | |||
def filter_chunk(flow, chunk, req_id, last, content_encoding=None, content_type=None, content_length=None): | |||
# should we just passthru? | |||
redis_passthru_key = REDIS_FILTER_PASSTHRU_PREFIX + flow.request.method + ':' + flow.request.url | |||
do_pass = REDIS.get(redis_passthru_key) | |||
if do_pass: | |||
# bubble_log('filter_chunk: req_id='+req_id+': passthru found in redis, returning chunk') | |||
REDIS.touch(redis_passthru_key) | |||
return chunk | |||
def filter_chunk(chunk, req_id, last, content_encoding=None, content_type=None, content_length=None): | |||
url = 'http://127.0.0.1:' + bubble_port + '/api/filter/apply/' + req_id | |||
params_added = False | |||
if chunk and content_type: | |||
@@ -43,6 +55,11 @@ def filter_chunk(chunk, req_id, last, content_encoding=None, content_type=None, | |||
bubble_log(err_message) | |||
return b'' | |||
elif HEADER_FILTER_PASSTHRU in response.headers: | |||
bubble_log('filter_chunk: server returned X-Bubble-Passthru, not filtering subsequent requests') | |||
redis_set(redis_passthru_key, 'passthru', ex=REDIS_FILTER_PASSTHRU_DURATION) | |||
return chunk | |||
return response.content | |||
@@ -63,12 +80,12 @@ def bubble_filter_chunks(flow, chunks, req_id, content_encoding, content_type): | |||
else: | |||
last = False | |||
if first: | |||
yield filter_chunk(chunk, req_id, last, content_encoding, content_type, content_length) | |||
yield filter_chunk(flow, chunk, req_id, last, content_encoding, content_type, content_length) | |||
first = False | |||
else: | |||
yield filter_chunk(chunk, req_id, last) | |||
yield filter_chunk(flow, chunk, req_id, last) | |||
if not content_length: | |||
yield filter_chunk(None, req_id, True) # get the last bits of data | |||
yield filter_chunk(flow, None, req_id, True) # get the last bits of data | |||
except Exception as e: | |||
bubble_log('bubble_filter_chunks: exception='+repr(e)) | |||
traceback.print_exc() | |||
@@ -1 +1 @@ | |||
Subproject commit fcf3243c2cd2e514cb66799b4990962c9901cc9f | |||
Subproject commit 6095552f8359790d073a891eb4bcbd31bb1db3ed |
@@ -1 +1 @@ | |||
Subproject commit c9c5f373876228aba3ac6bb4eed0beff8b936198 | |||
Subproject commit 77a8875358d5878da929783862d70d023bb3ad40 |