From 025ed81b6271c503f260f3f83d22431fc90da1e1 Mon Sep 17 00:00:00 2001 From: Jonathan Cobb Date: Wed, 26 Aug 2020 10:47:03 -0400 Subject: [PATCH] split underflow monitor into its own class --- .../cobbzilla/util/daemon/ZillaRuntime.java | 2 +- .../util/io/multi/MultiUnderflowHandler.java | 43 ++++----------- .../multi/MultiUnderflowHandlerMonitor.java | 52 +++++++++++++++++++ .../util/io/regex/RegexFilterReaderTest.java | 4 +- 4 files changed, 64 insertions(+), 37 deletions(-) create mode 100644 src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java diff --git a/src/main/java/org/cobbzilla/util/daemon/ZillaRuntime.java b/src/main/java/org/cobbzilla/util/daemon/ZillaRuntime.java index ecc5b28..ec8d156 100644 --- a/src/main/java/org/cobbzilla/util/daemon/ZillaRuntime.java +++ b/src/main/java/org/cobbzilla/util/daemon/ZillaRuntime.java @@ -70,7 +70,7 @@ public class ZillaRuntime { sleep(100, "terminate: waiting for thread to exit: "+thread); } if (thread.isAlive()) { - if (verbose && log.isWarnEnabled()) log.warn("terminate: thread did not respond to interrupt, killing: "+thread+" with stack "+stacktrace(thread)+" from "+stacktrace()); + if (verbose && log.isWarnEnabled()) log.warn("terminate: thread did not respond to interrupt, killing: "+thread+" with stack "+stacktrace(thread)+"\nfrom: "+stacktrace()); thread.stop(); return false; } else { diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java index 133d799..afb0708 100644 --- a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java @@ -4,52 +4,27 @@ import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import org.cobbzilla.util.daemon.SimpleDaemon; import org.cobbzilla.util.io.regex.MultiUnderflowException; import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.UUID.randomUUID; import static java.util.concurrent.TimeUnit.SECONDS; import static org.cobbzilla.util.daemon.ZillaRuntime.now; -import static org.cobbzilla.util.daemon.ZillaRuntime.terminate; import static org.cobbzilla.util.system.Sleep.sleep; @Slf4j @Accessors(chain=true) -public class MultiUnderflowHandler extends SimpleDaemon { +public class MultiUnderflowHandler { - private final static Map handlers = new ConcurrentHashMap<>(); + private final MultiUnderflowHandlerMonitor monitor; - @Getter @Setter private static long checkInterval = SECONDS.toMillis(20); - private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); - - @Override protected long getSleepTime() { return checkInterval; } - - @Override protected void process() { - if (log.isTraceEnabled()) log.trace("process: examining "+handlers.size()+" underflow handlers"); - for (Iterator iter = handlers.values().iterator(); iter.hasNext(); ) { - final MultiUnderflowHandler underflow = iter.next(); - if (underflow.closed()) { - if (log.isDebugEnabled()) log.debug("process: removing closed handler: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); - iter.remove(); - - } else if (underflow.getLastRead() > 0 && !underflow.getThread().isAlive()) { - if (log.isDebugEnabled()) log.debug("process: removing dead thread: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); - iter.remove(); - - } else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { - iter.remove(); - if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); - terminate(underflow.getThread(), TERMINATE_TIMEOUT); - } - } + public MultiUnderflowHandler (MultiUnderflowHandlerMonitor m) { + this.monitor = m; + m.start(); } - public MultiUnderflowHandler () { start(); } + public MultiUnderflowHandler () { this(MultiUnderflowHandlerMonitor.DEFAULT_UNDERFLOW_MONITOR); } @Getter private final String id = randomUUID().toString(); @Getter @Setter private String handlerName; @@ -70,7 +45,7 @@ public class MultiUnderflowHandler extends SimpleDaemon { if (thread == null) { thread = Thread.currentThread(); lastRead = now(); - handlers.put(id, this); + monitor.register(this); } if (firstUnderflow == 0) { if (log.isDebugEnabled()) log.debug(handlerName+": first data underflow"); @@ -89,7 +64,7 @@ public class MultiUnderflowHandler extends SimpleDaemon { lastRead = now(); if (thread == null) { thread = Thread.currentThread(); - handlers.put(id, this); + monitor.register(this); } firstUnderflow = 0; underflowSleep = minUnderflowSleep; @@ -99,7 +74,7 @@ public class MultiUnderflowHandler extends SimpleDaemon { if (!closed()) { closed.set(true); if (log.isDebugEnabled()) log.debug(handlerName + ": closing"); - handlers.remove(id); + monitor.unregister(this); } } diff --git a/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java new file mode 100644 index 0000000..fd62ccc --- /dev/null +++ b/src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java @@ -0,0 +1,52 @@ +package org.cobbzilla.util.io.multi; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.cobbzilla.util.daemon.SimpleDaemon; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.cobbzilla.util.daemon.ZillaRuntime.now; +import static org.cobbzilla.util.daemon.ZillaRuntime.terminate; + +@Slf4j +public class MultiUnderflowHandlerMonitor extends SimpleDaemon { + + public static final MultiUnderflowHandlerMonitor DEFAULT_UNDERFLOW_MONITOR = new MultiUnderflowHandlerMonitor(); + + private final Map handlers = new ConcurrentHashMap<>(); + + @Getter @Setter private static long checkInterval = SECONDS.toMillis(20); + private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); + + @Override protected long getSleepTime() { return checkInterval; } + + @Override protected void process() { + if (log.isTraceEnabled()) log.trace("process: examining "+handlers.size()+" underflow handlers"); + for (Iterator iter = handlers.values().iterator(); iter.hasNext(); ) { + final MultiUnderflowHandler underflow = iter.next(); + if (underflow.closed()) { + if (log.isDebugEnabled()) log.debug("process: removing closed handler: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); + iter.remove(); + + } else if (underflow.getLastRead() > 0 && !underflow.getThread().isAlive()) { + if (log.isDebugEnabled()) log.debug("process: removing dead thread: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); + iter.remove(); + + } else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { + iter.remove(); + if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); + terminate(underflow.getThread(), TERMINATE_TIMEOUT); + } + } + } + + public void register(MultiUnderflowHandler underflow) { handlers.put(underflow.getId(), underflow); } + + public void unregister(MultiUnderflowHandler underflow) { handlers.remove(underflow.getId()); } + +} diff --git a/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java b/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java index 282527d..0b4905b 100644 --- a/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java +++ b/src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java @@ -5,7 +5,7 @@ import org.apache.commons.io.IOUtils; import org.cobbzilla.util.io.BlockedInputStream; import org.cobbzilla.util.io.multi.MultiReader; import org.cobbzilla.util.io.multi.MultiStream; -import org.cobbzilla.util.io.multi.MultiUnderflowHandler; +import org.cobbzilla.util.io.multi.MultiUnderflowHandlerMonitor; import org.junit.Test; import java.io.*; @@ -192,7 +192,7 @@ public class RegexFilterReaderTest { final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes()); final InputStream stream3 = new BlockedInputStream(); - MultiUnderflowHandler.setCheckInterval(1000); + MultiUnderflowHandlerMonitor.setCheckInterval(1000); final MultiStream multiStream = new MultiStream(stream1); multiStream.getUnderflow() .setMinUnderflowSleep(1000)