diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java b/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java index 2e16835..5548a71 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java @@ -101,7 +101,8 @@ public class MultiStream extends InputStream { } @Override public void close() throws IOException { - if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). this="+this); + if (log.isInfoEnabled()) log.info("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName()); + if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName()); if (currentStream != null) currentStream.close(); underflow.close(); } diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java index fd62ccc..ba402f2 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java @@ -8,6 +8,7 @@ import org.cobbzilla.util.daemon.SimpleDaemon; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import static java.util.concurrent.TimeUnit.SECONDS; import static org.cobbzilla.util.daemon.ZillaRuntime.now; @@ -20,9 +21,11 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { private final Map handlers = new ConcurrentHashMap<>(); - @Getter @Setter private static long checkInterval = SECONDS.toMillis(20); + @Getter @Setter private long checkInterval = SECONDS.toMillis(20); private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); + @Getter @Setter private Function terminateThreadFunc = null; + @Override protected long getSleepTime() { return checkInterval; } @Override protected void process() { @@ -39,8 +42,12 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { } else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { iter.remove(); - if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); - terminate(underflow.getThread(), TERMINATE_TIMEOUT); + if (terminateThreadFunc == null || terminateThreadFunc.apply(underflow.getThread())) { + if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); + terminate(underflow.getThread(), TERMINATE_TIMEOUT); + } else { + if (log.isErrorEnabled()) log.error("process: underflow timed out, removing but NOT terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); + } } } } diff --git a/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java b/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java index 0b4905b..699e62d 100644 --- a/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java +++ b/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java @@ -5,7 +5,6 @@ import org.apache.commons.io.IOUtils; import org.cobbzilla.util.io.BlockedInputStream; import org.cobbzilla.util.io.multi.MultiReader; import org.cobbzilla.util.io.multi.MultiStream; -import org.cobbzilla.util.io.multi.MultiUnderflowHandlerMonitor; import org.junit.Test; import java.io.*; @@ -13,6 +12,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.cobbzilla.util.daemon.ZillaRuntime.background; import static org.cobbzilla.util.daemon.ZillaRuntime.die; +import static org.cobbzilla.util.io.multi.MultiUnderflowHandlerMonitor.DEFAULT_UNDERFLOW_MONITOR; import static org.cobbzilla.util.io.regex.RegexReplacementFilter.DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH; import static org.cobbzilla.util.system.Sleep.sleep; import static org.junit.Assert.*; @@ -192,7 +192,7 @@ public class RegexFilterReaderTest { final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes()); final InputStream stream3 = new BlockedInputStream(); - MultiUnderflowHandlerMonitor.setCheckInterval(1000); + DEFAULT_UNDERFLOW_MONITOR.setCheckInterval(1000); final MultiStream multiStream = new MultiStream(stream1); multiStream.getUnderflow() .setMinUnderflowSleep(1000)