diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java b/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java index d666645..74ea54b 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiStream.java @@ -36,6 +36,8 @@ public class MultiStream extends InputStream { public int pendingStreamCount () { return streams.size() - streamIndex; } + public MultiStream setUnderflowTimeout(long timeout) { getUnderflow().setUnderflowTimeout(timeout); return this; } + @Override public String toString () { return "MultiStream{name="+underflow.getHandlerName()+", "+streams.size()+" streams, index="+streamIndex+", EOS="+endOfStreams+"}"; } diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java index 84ea17d..465bb2a 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java @@ -19,9 +19,10 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { public static final MultiUnderflowHandlerMonitor DEFAULT_UNDERFLOW_MONITOR = new MultiUnderflowHandlerMonitor(); private final Map handlers = new ConcurrentHashMap<>(); + private final Map threads = new ConcurrentHashMap<>(); @Getter @Setter private long checkInterval = SECONDS.toMillis(20); - private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); + @Getter @Setter private long terminateTimeout = SECONDS.toMillis(2); @Getter @Setter private Function terminateThreadFunc = null; @@ -44,7 +45,7 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { iter.remove(); if (terminateThreadFunc == null || terminateThreadFunc.apply(underflow.getThread())) { if (log.isErrorEnabled()) log.error(prefix+"underflow timed out, terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); - terminateQuietly(underflow.getThread(), TERMINATE_TIMEOUT, terminateThreadFunc); + terminateQuietly(underflow.getThread(), terminateTimeout, terminateThreadFunc); } else { if (log.isErrorEnabled()) log.error(prefix+"underflow timed out, removing but NOT terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); } @@ -52,7 +53,16 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { } } - public void register(MultiUnderflowHandler underflow) { handlers.put(underflow.getId(), underflow); } + public void register(MultiUnderflowHandler underflow) { + final int threadId = System.identityHashCode(underflow.getThread()); + final MultiUnderflowHandler existingHandlerForThread = threads.get(threadId); + if (existingHandlerForThread != null) { + if (log.isWarnEnabled()) log.warn("register: detected thread reuse ("+underflow.getThread()+") overwriting existing handler: "+existingHandlerForThread.getHandlerName()); + handlers.remove(existingHandlerForThread.getId()); + } + threads.put(threadId, underflow); + handlers.put(underflow.getId(), underflow); + } public void unregister(MultiUnderflowHandler underflow) { handlers.remove(underflow.getId()); }