diff --git a/src/main/java/org/cobbzilla/util/daemon/SimpleDaemon.java b/src/main/java/org/cobbzilla/util/daemon/SimpleDaemon.java index c87697e..9087ee9 100755 --- a/src/main/java/org/cobbzilla/util/daemon/SimpleDaemon.java +++ b/src/main/java/org/cobbzilla/util/daemon/SimpleDaemon.java @@ -17,7 +17,7 @@ public abstract class SimpleDaemon implements Runnable { public SimpleDaemon (String name) { this.name = name; } - @Getter private String name; + @Getter private final String name; @Getter private long lastProcessTime = 0; private volatile Thread mainThread = null; diff --git a/src/main/java/org/cobbzilla/util/io/BlockedInputStream.java b/src/main/java/org/cobbzilla/util/io/BlockedInputStream.java new file mode 100644 index 0000000..3abb8c5 --- /dev/null +++ b/src/main/java/org/cobbzilla/util/io/BlockedInputStream.java @@ -0,0 +1,21 @@ +package org.cobbzilla.util.io; + +import java.io.IOException; +import java.io.InputStream; + +import static java.util.concurrent.TimeUnit.DAYS; +import static org.cobbzilla.util.system.Sleep.sleep; + +public class BlockedInputStream extends InputStream { + + @Override public int read() throws IOException { + sleep(DAYS.toMillis(100), "blocking"); + return -1; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + sleep(DAYS.toMillis(100), "blocking"); + return super.read(b, off, len); + } + +} diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiReader.java b/src/main/java/org/cobbzilla/util/io/multi/MultiReader.java index 56296f6..535b786 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiReader.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiReader.java @@ -1,5 +1,6 @@ package org.cobbzilla.util.io.multi; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -16,6 +17,7 @@ public class MultiReader extends Reader { private Reader currentReader; private int readerIndex = 0; private boolean endOfReaders = false; + @Getter private final MultiUnderflowHandler underflow = new MultiUnderflowHandler(); public MultiReader (Reader r, boolean last) { if (last) { @@ -53,17 +55,21 @@ public class MultiReader extends Reader { int count = currentReader.read(buf, off, len); if (count == -1) { if (readerIndex == readers.size()-1) { - return endOfReaders ? -1 : 0; + if (endOfReaders) return -1; + underflow.handleUnderflow(); + return 0; } currentReader.close(); readerIndex++; currentReader = readers.get(readerIndex); return read(buf, off, len); } + underflow.handleSuccessfulRead(); return count; } @Override public void close() throws IOException { if (currentReader != null) currentReader.close(); + underflow.close(); } } diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java b/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java index 001cf64..6437aaa 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java @@ -1,5 +1,6 @@ package org.cobbzilla.util.io.multi; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; @@ -14,8 +15,12 @@ public class MultiStream extends InputStream { private InputStream currentStream; private int streamIndex = 0; private boolean endOfStreams = false; + @Getter private final MultiUnderflowHandler underflow = new MultiUnderflowHandler(); - public MultiStream (InputStream r, boolean last) { + public MultiStream (InputStream r, boolean last) { this(r, last, "no-name"); } + + public MultiStream (InputStream r, boolean last, String name) { + underflow.setHandlerName(name); if (last) { addLastStream(r); } else { @@ -24,10 +29,12 @@ public class MultiStream extends InputStream { currentStream = r; } - public int pendingStreamCount () { return streams.size() - streamIndex; } + public MultiStream (InputStream r, String name) { this(r, false, name); } public MultiStream (InputStream r) { this(r, false); } + public int pendingStreamCount () { return streams.size() - streamIndex; } + @Override public String toString () { return "MultiStream{"+streams.size()+" streams, index="+streamIndex+", EOS="+endOfStreams+"}"; } @@ -52,7 +59,9 @@ public class MultiStream extends InputStream { if (val == -1) { if (streamIndex == streams.size()-1) { if (log.isTraceEnabled()) log.trace("read(byte): end of all streams? this="+this); - return endOfStreams ? -1 : 0; + if (endOfStreams) return -1; + underflow.handleUnderflow(); + return 0; } currentStream.close(); streamIndex++; @@ -63,16 +72,20 @@ public class MultiStream extends InputStream { } else { if (log.isTraceEnabled()) log.trace("read(byte): one byte read. this="+this); } + underflow.handleSuccessfulRead(); return val; } @Override public int read(byte[] buf, int off, int len) throws IOException { if (log.isTraceEnabled()) log.trace("read(byte[]): trying to read "+len+" bytes. this="+this); final int count = currentStream.read(buf, off, len); + log.error("read: got "+count+" bytes"); if (count == -1) { if (streamIndex == streams.size()-1) { if (log.isTraceEnabled()) log.trace("read(byte[]): end of all streams? this="+this); - return endOfStreams ? -1 : 0; + if (endOfStreams) return -1; + underflow.handleUnderflow(); + return 0; } currentStream.close(); streamIndex++; @@ -83,12 +96,14 @@ public class MultiStream extends InputStream { } else { if (log.isTraceEnabled()) log.trace("read(byte[]): "+count+" bytes read. this="+this); } + underflow.handleSuccessfulRead(); return count; } @Override public void close() throws IOException { if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). this="+this); if (currentStream != null) currentStream.close(); + underflow.close(); } } diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java new file mode 100644 index 0000000..ee02c26 --- /dev/null +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java @@ -0,0 +1,88 @@ +package org.cobbzilla.util.io.multi; + +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import org.cobbzilla.util.daemon.SimpleDaemon; +import org.cobbzilla.util.io.regex.MultiUnderflowException; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static java.util.UUID.randomUUID; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.cobbzilla.util.daemon.ZillaRuntime.now; +import static org.cobbzilla.util.daemon.ZillaRuntime.terminate; +import static org.cobbzilla.util.system.Sleep.sleep; + +@Slf4j @Accessors(chain=true) +public class MultiUnderflowHandler extends SimpleDaemon { + + private final static Map handlers = new ConcurrentHashMap<>(); + + @Getter @Setter private static long checkInterval = SECONDS.toMillis(20); + private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); + + @Override protected long getSleepTime() { return checkInterval; } + + @Override protected void process() { + for (Iterator iter = handlers.values().iterator(); iter.hasNext(); ) { + final MultiUnderflowHandler underflow = iter.next(); + if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { + log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); + iter.remove(); + terminate(underflow.getThread(), TERMINATE_TIMEOUT); + } + } + } + + public MultiUnderflowHandler () { start(); } + + @Getter private final String id = randomUUID().toString(); + @Getter @Setter private String handlerName; + @Getter private long minUnderflowSleep = 10; + public MultiUnderflowHandler setMinUnderflowSleep(long s) { underflowSleep = minUnderflowSleep = s; return this; } + + @Getter @Setter private long maxUnderflowSleep = 500; + + @Getter @Setter private long lastRead = now(); + @Getter @Setter private long firstUnderflow = 0; + @Getter @Setter private long underflowTimeout = SECONDS.toMillis(60); + private long underflowSleep = minUnderflowSleep; + @Getter private Thread thread; + + public void handleUnderflow() throws IOException { + if (thread == null) { + thread = Thread.currentThread(); + handlers.put(id, this); + } + if (firstUnderflow == 0) { + if (log.isDebugEnabled()) log.debug(handlerName+": first data underflow"); + firstUnderflow = now(); + } else if (now() - firstUnderflow > underflowTimeout) { + if (log.isErrorEnabled()) log.error(handlerName+": underflow timeout, throwing MultiUnderflowException"); + throw new MultiUnderflowException(handlerName); + } + if (log.isDebugEnabled()) log.debug(handlerName+": data underflow, sleeping for "+ underflowSleep); + sleep(underflowSleep); + underflowSleep *= 2; + if (underflowSleep > maxUnderflowSleep) underflowSleep = maxUnderflowSleep; + } + + public void handleSuccessfulRead() { + if (thread == null) { + thread = Thread.currentThread(); + handlers.put(id, this); + } + firstUnderflow = 0; + lastRead = now(); + underflowSleep = minUnderflowSleep; + } + + public void close() { + + } +} diff --git a/src/main/java/org/cobbzilla/util/io/regex/MultiUnderflowException.java b/src/main/java/org/cobbzilla/util/io/regex/MultiUnderflowException.java new file mode 100644 index 0000000..9909fb1 --- /dev/null +++ b/src/main/java/org/cobbzilla/util/io/regex/MultiUnderflowException.java @@ -0,0 +1,13 @@ +package org.cobbzilla.util.io.regex; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.io.IOException; + +@AllArgsConstructor +public class MultiUnderflowException extends IOException { + + @Getter private final String name; + +} diff --git a/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java b/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java index eaf7c31..282527d 100644 --- a/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java +++ b/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java @@ -1,20 +1,23 @@ package org.cobbzilla.util.io.regex; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; +import org.cobbzilla.util.io.BlockedInputStream; import org.cobbzilla.util.io.multi.MultiReader; +import org.cobbzilla.util.io.multi.MultiStream; +import org.cobbzilla.util.io.multi.MultiUnderflowHandler; import org.junit.Test; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.io.StringWriter; +import java.io.*; +import java.util.concurrent.atomic.AtomicReference; import static org.cobbzilla.util.daemon.ZillaRuntime.background; import static org.cobbzilla.util.daemon.ZillaRuntime.die; +import static org.cobbzilla.util.io.regex.RegexReplacementFilter.DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH; import static org.cobbzilla.util.system.Sleep.sleep; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.*; +@Slf4j public class RegexFilterReaderTest { public static final String TEST_STRING_1 = "this is a string\nand another string with a lone a near the end\nfoo."; @@ -43,7 +46,8 @@ public class RegexFilterReaderTest { @Test public void testRegexReaderIncludeMatch() throws Exception { final Reader reader = new StringReader(TEST_STRING_INCLUDE_MATCH); - final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter("]*>", "!INSERTED_DATA"); + final RegexStreamFilter regexStreamFilter + = new RegexReplacementFilter("]*>", DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH+"INSERTED_DATA"); final RegexFilterReader regexFilterReader = new RegexFilterReader(reader, 1024, regexStreamFilter); final StringWriter result = new StringWriter(); IOUtils.copyLarge(regexFilterReader, result); @@ -55,7 +59,8 @@ public class RegexFilterReaderTest { @Test public void testRegexReaderIncludeMatchInMiddle() throws Exception { final Reader reader = new StringReader(TEST_STRING_INCLUDE_MATCH_MIDDLE); - final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter("]*>", "INSERTED_!DATA"); + final RegexStreamFilter regexStreamFilter + = new RegexReplacementFilter("]*>", "INSERTED_"+DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH+"DATA"); final RegexFilterReader regexFilterReader = new RegexFilterReader(reader, 1024, regexStreamFilter); final StringWriter result = new StringWriter(); IOUtils.copyLarge(regexFilterReader, result); @@ -139,4 +144,83 @@ public class RegexFilterReaderTest { t.join(1000); assertFalse("Expected copy thread to finish", t.isAlive()); assertEquals("multi reader failed to get expected output", EXPECTED_MULTI_RESULT2, result.toString()); - }} + } + + @Test public void testMultiReaderUnderflow() throws Exception { + final StringReader reader1 = new StringReader("some test data1 ".repeat(1000)); + final StringReader reader2 = new StringReader("some test data2 ".repeat(1000)); + final StringReader reader3 = new StringReader("some test data3 ".repeat(1000)); + + final MultiReader multiReader = new MultiReader(reader1); + multiReader.getUnderflow() + .setMinUnderflowSleep(1000) + .setMaxUnderflowSleep(1000) + .setUnderflowTimeout(5000); + + final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter(" test ", " bogus "); + final RegexFilterReader regexFilterReader = new RegexFilterReader(multiReader, 8, regexStreamFilter); + final StringWriter result = new StringWriter(); + + final AtomicReference exRef = new AtomicReference<>(null); + final Thread t = background(() -> { + try { + IOUtils.copyLarge(regexFilterReader, result); + } catch (IOException e) { + exRef.set(e); + } + }); + + sleep(multiReader.getUnderflow().getMaxUnderflowSleep()); + log.info("adding reader2..."); + multiReader.addReader(reader2); + log.info("added reader2..."); + sleep(multiReader.getUnderflow().getUnderflowTimeout()*2); + log.info("adding reader3..."); + multiReader.addReader(reader3); + log.info("added reader3..."); + + t.join(multiReader.getUnderflow().getUnderflowTimeout()+100); + assertFalse("Expected copy thread to finish", t.isAlive()); + assertNotNull("Expected copy thread to have an exception", exRef.get()); + assertTrue("expected multi reader failed to get data1 output", result.toString().contains(" bogus data1 ")); + assertTrue("expected multi reader failed to get data2 output", result.toString().contains(" bogus data2 ")); + assertFalse("expected multi reader failed to NOT get data3 output", result.toString().contains(" bogus data3 ")); + } + + @Test public void testMultiStreamUnderflow() throws Exception { + final InputStream stream1 = new ByteArrayInputStream("some test data1 ".repeat(1000).getBytes()); + final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes()); + final InputStream stream3 = new BlockedInputStream(); + + MultiUnderflowHandler.setCheckInterval(1000); + final MultiStream multiStream = new MultiStream(stream1); + multiStream.getUnderflow() + .setMinUnderflowSleep(1000) + .setMaxUnderflowSleep(1000) + .setUnderflowTimeout(5000); + multiStream.addStream(stream2); + multiStream.addStream(stream3); + + final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter(" test ", " bogus "); + final RegexFilterReader regexFilterReader = new RegexFilterReader(multiStream, 8, regexStreamFilter); + final StringWriter result = new StringWriter(); + + final AtomicReference exRef = new AtomicReference<>(null); + final Thread t = background(() -> { + try { + IOUtils.copyLarge(regexFilterReader, result); + } catch (Exception e) { + exRef.set(e); + } + }); + + sleep(multiStream.getUnderflow().getUnderflowTimeout()*2); + + t.join(multiStream.getUnderflow().getUnderflowTimeout()+100); + assertFalse("Expected copy thread to finish", t.isAlive()); + assertNotNull("Expected copy thread to have an exception", exRef.get()); + assertTrue("expected multi stream failed to get data1 output", result.toString().contains(" bogus data1 ")); + assertTrue("expected multi stream failed to get data2 output", result.toString().contains(" bogus data2 ")); + } + +}