|
@@ -3,11 +3,11 @@ package bubble.service.stream; |
|
|
import bubble.resources.stream.FilterHttpRequest; |
|
|
import bubble.resources.stream.FilterHttpRequest; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import org.apache.commons.io.IOUtils; |
|
|
import org.apache.commons.io.IOUtils; |
|
|
import org.apache.commons.io.output.ChunkedOutputStream; |
|
|
|
|
|
import org.cobbzilla.util.collection.ExpirationEvictionPolicy; |
|
|
import org.cobbzilla.util.collection.ExpirationEvictionPolicy; |
|
|
import org.cobbzilla.util.collection.ExpirationMap; |
|
|
import org.cobbzilla.util.collection.ExpirationMap; |
|
|
import org.cobbzilla.util.http.HttpContentEncodingType; |
|
|
import org.cobbzilla.util.http.HttpContentEncodingType; |
|
|
import org.cobbzilla.util.io.FilterInputStreamViaOutputStream; |
|
|
import org.cobbzilla.util.io.FilterInputStreamViaOutputStream; |
|
|
|
|
|
import org.cobbzilla.util.io.FixedByteArrayInputStream; |
|
|
import org.cobbzilla.util.io.NullInputStream; |
|
|
import org.cobbzilla.util.io.NullInputStream; |
|
|
import org.cobbzilla.util.io.multi.MultiStream; |
|
|
import org.cobbzilla.util.io.multi.MultiStream; |
|
|
import org.cobbzilla.util.system.Bytes; |
|
|
import org.cobbzilla.util.system.Bytes; |
|
@@ -21,7 +21,6 @@ import java.util.Map; |
|
|
import java.util.concurrent.TimeUnit; |
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
|
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; |
|
|
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY; |
|
|
import static org.cobbzilla.util.http.HttpUtil.makeHttpChunk; |
|
|
|
|
|
|
|
|
|
|
|
@Slf4j |
|
|
@Slf4j |
|
|
class ActiveStreamState { |
|
|
class ActiveStreamState { |
|
@@ -86,15 +85,21 @@ class ActiveStreamState { |
|
|
// read to end of all streams, there is no more data coming in |
|
|
// read to end of all streams, there is no more data coming in |
|
|
if (last) { |
|
|
if (last) { |
|
|
if (log.isDebugEnabled()) log.debug(prefix+"last==true, returning full output"); |
|
|
if (log.isDebugEnabled()) log.debug(prefix+"last==true, returning full output"); |
|
|
return new FilterInputStreamViaOutputStream(output, ChunkedOutputStream::new); |
|
|
|
|
|
|
|
|
return output; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// if we have 5 streams pending, send *something* now, or MITM proxy will not send us anything more |
|
|
// if we have 5 streams pending, send *something* now, or MITM proxy will not send us anything more |
|
|
final int bytesToRead; |
|
|
final int bytesToRead; |
|
|
if (multiStream.pendingStreamCount() >= MAX_PENDING_STREAMS) { |
|
|
if (multiStream.pendingStreamCount() >= MAX_PENDING_STREAMS) { |
|
|
// send a measly 10% of the data, that should be OK, right? |
|
|
|
|
|
bytesToRead = (int) (totalBytesWritten - totalBytesRead)/10; |
|
|
|
|
|
if (log.isDebugEnabled()) log.debug(prefix + "pendingStreamCount ("+multiStream.pendingStreamCount()+") >= "+MAX_PENDING_STREAMS+" - setting bytesToRead=" + bytesToRead + " (max "+(totalBytesWritten - totalBytesRead)+")"); |
|
|
|
|
|
|
|
|
// do we have at least 8k to send? |
|
|
|
|
|
if (totalBytesWritten - totalBytesRead >= 8*Bytes.KB) { |
|
|
|
|
|
bytesToRead = (int) (8*Bytes.KB); |
|
|
|
|
|
if (log.isDebugEnabled()) log.debug(prefix + "pendingStreamCount ("+multiStream.pendingStreamCount()+") >= "+MAX_PENDING_STREAMS+" and > 8K bytes available, returning everything to read: " + bytesToRead+" bytes"); |
|
|
|
|
|
} else { |
|
|
|
|
|
// send 20% of what we have |
|
|
|
|
|
bytesToRead = (int) (totalBytesWritten - totalBytesRead)/5; |
|
|
|
|
|
if (log.isDebugEnabled()) log.debug(prefix + "pendingStreamCount ("+multiStream.pendingStreamCount()+") >= "+MAX_PENDING_STREAMS+" and < 8K bytes available, returning 20% read: " + bytesToRead+" bytes"); |
|
|
|
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
// try to read as many bytes as we have written, and have not yet read, less a safety buffer |
|
|
// try to read as many bytes as we have written, and have not yet read, less a safety buffer |
|
|
bytesToRead = (int) (totalBytesWritten - totalBytesRead - (2 * MAX_BYTE_BUFFER_SIZE)); |
|
|
bytesToRead = (int) (totalBytesWritten - totalBytesRead - (2 * MAX_BYTE_BUFFER_SIZE)); |
|
@@ -105,7 +110,7 @@ class ActiveStreamState { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (log.isDebugEnabled()) log.debug(prefix+"trying to read "+bytesToRead+" bytes"); |
|
|
|
|
|
|
|
|
if (log.isDebugEnabled()) log.debug(prefix+"trying to read "+bytesToRead+" bytes from output="+output.getClass().getSimpleName()); |
|
|
final byte[] buffer = new byte[bytesToRead]; |
|
|
final byte[] buffer = new byte[bytesToRead]; |
|
|
final int bytesRead = output.read(buffer); |
|
|
final int bytesRead = output.read(buffer); |
|
|
if (log.isDebugEnabled()) log.debug(prefix+"actually read "+bytesRead+" bytes"); |
|
|
if (log.isDebugEnabled()) log.debug(prefix+"actually read "+bytesRead+" bytes"); |
|
@@ -118,7 +123,7 @@ class ActiveStreamState { |
|
|
if (log.isDebugEnabled()) log.debug(prefix+"read "+bytesRead+", returning buffer"); |
|
|
if (log.isDebugEnabled()) log.debug(prefix+"read "+bytesRead+", returning buffer"); |
|
|
totalBytesRead += bytesRead; |
|
|
totalBytesRead += bytesRead; |
|
|
|
|
|
|
|
|
return new ByteArrayInputStream(makeHttpChunk(buffer, bytesRead)); |
|
|
|
|
|
|
|
|
return new FixedByteArrayInputStream(buffer, 0, bytesRead); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private Map<String, String> doNotWrap = new ExpirationMap<>(TimeUnit.DAYS.toMillis(1), ExpirationEvictionPolicy.atime); |
|
|
private Map<String, String> doNotWrap = new ExpirationMap<>(TimeUnit.DAYS.toMillis(1), ExpirationEvictionPolicy.atime); |
|
|