@@ -101,7 +101,8 @@ public class MultiStream extends InputStream { | |||||
} | } | ||||
@Override public void close() throws IOException { | @Override public void close() throws IOException { | ||||
if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). this="+this); | |||||
if (log.isInfoEnabled()) log.info("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName()); | |||||
if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName()); | |||||
if (currentStream != null) currentStream.close(); | if (currentStream != null) currentStream.close(); | ||||
underflow.close(); | underflow.close(); | ||||
} | } | ||||
@@ -8,6 +8,7 @@ import org.cobbzilla.util.daemon.SimpleDaemon; | |||||
import java.util.Iterator; | import java.util.Iterator; | ||||
import java.util.Map; | import java.util.Map; | ||||
import java.util.concurrent.ConcurrentHashMap; | import java.util.concurrent.ConcurrentHashMap; | ||||
import java.util.function.Function; | |||||
import static java.util.concurrent.TimeUnit.SECONDS; | import static java.util.concurrent.TimeUnit.SECONDS; | ||||
import static org.cobbzilla.util.daemon.ZillaRuntime.now; | import static org.cobbzilla.util.daemon.ZillaRuntime.now; | ||||
@@ -20,9 +21,11 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { | |||||
private final Map<String, MultiUnderflowHandler> handlers = new ConcurrentHashMap<>(); | private final Map<String, MultiUnderflowHandler> handlers = new ConcurrentHashMap<>(); | ||||
@Getter @Setter private static long checkInterval = SECONDS.toMillis(20); | |||||
@Getter @Setter private long checkInterval = SECONDS.toMillis(20); | |||||
private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); | private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); | ||||
@Getter @Setter private Function<Thread, Boolean> terminateThreadFunc = null; | |||||
@Override protected long getSleepTime() { return checkInterval; } | @Override protected long getSleepTime() { return checkInterval; } | ||||
@Override protected void process() { | @Override protected void process() { | ||||
@@ -39,8 +42,12 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon { | |||||
} else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { | } else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { | ||||
iter.remove(); | iter.remove(); | ||||
if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||||
terminate(underflow.getThread(), TERMINATE_TIMEOUT); | |||||
if (terminateThreadFunc == null || terminateThreadFunc.apply(underflow.getThread())) { | |||||
if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); | |||||
terminate(underflow.getThread(), TERMINATE_TIMEOUT); | |||||
} else { | |||||
if (log.isErrorEnabled()) log.error("process: underflow timed out, removing but NOT terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); | |||||
} | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -5,7 +5,6 @@ import org.apache.commons.io.IOUtils; | |||||
import org.cobbzilla.util.io.BlockedInputStream; | import org.cobbzilla.util.io.BlockedInputStream; | ||||
import org.cobbzilla.util.io.multi.MultiReader; | import org.cobbzilla.util.io.multi.MultiReader; | ||||
import org.cobbzilla.util.io.multi.MultiStream; | import org.cobbzilla.util.io.multi.MultiStream; | ||||
import org.cobbzilla.util.io.multi.MultiUnderflowHandlerMonitor; | |||||
import org.junit.Test; | import org.junit.Test; | ||||
import java.io.*; | import java.io.*; | ||||
@@ -13,6 +12,7 @@ import java.util.concurrent.atomic.AtomicReference; | |||||
import static org.cobbzilla.util.daemon.ZillaRuntime.background; | import static org.cobbzilla.util.daemon.ZillaRuntime.background; | ||||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | import static org.cobbzilla.util.daemon.ZillaRuntime.die; | ||||
import static org.cobbzilla.util.io.multi.MultiUnderflowHandlerMonitor.DEFAULT_UNDERFLOW_MONITOR; | |||||
import static org.cobbzilla.util.io.regex.RegexReplacementFilter.DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH; | import static org.cobbzilla.util.io.regex.RegexReplacementFilter.DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH; | ||||
import static org.cobbzilla.util.system.Sleep.sleep; | import static org.cobbzilla.util.system.Sleep.sleep; | ||||
import static org.junit.Assert.*; | import static org.junit.Assert.*; | ||||
@@ -192,7 +192,7 @@ public class RegexFilterReaderTest { | |||||
final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes()); | final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes()); | ||||
final InputStream stream3 = new BlockedInputStream(); | final InputStream stream3 = new BlockedInputStream(); | ||||
MultiUnderflowHandlerMonitor.setCheckInterval(1000); | |||||
DEFAULT_UNDERFLOW_MONITOR.setCheckInterval(1000); | |||||
final MultiStream multiStream = new MultiStream(stream1); | final MultiStream multiStream = new MultiStream(stream1); | ||||
multiStream.getUnderflow() | multiStream.getUnderflow() | ||||
.setMinUnderflowSleep(1000) | .setMinUnderflowSleep(1000) | ||||