@@ -70,7 +70,7 @@ public class ZillaRuntime { | |||
sleep(100, "terminate: waiting for thread to exit: "+thread); | |||
} | |||
if (thread.isAlive()) { | |||
if (verbose && log.isWarnEnabled()) log.warn("terminate: thread did not respond to interrupt, killing: "+thread+" with stack "+stacktrace(thread)+" from "+stacktrace()); | |||
if (verbose && log.isWarnEnabled()) log.warn("terminate: thread did not respond to interrupt, killing: "+thread+" with stack "+stacktrace(thread)+"\nfrom: "+stacktrace()); | |||
thread.stop(); | |||
return false; | |||
} else { | |||
@@ -4,52 +4,27 @@ import lombok.Getter; | |||
import lombok.Setter; | |||
import lombok.experimental.Accessors; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.daemon.SimpleDaemon; | |||
import org.cobbzilla.util.io.regex.MultiUnderflowException; | |||
import java.io.IOException; | |||
import java.util.Iterator; | |||
import java.util.Map; | |||
import java.util.concurrent.ConcurrentHashMap; | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
import static java.util.UUID.randomUUID; | |||
import static java.util.concurrent.TimeUnit.SECONDS; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.now; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.terminate; | |||
import static org.cobbzilla.util.system.Sleep.sleep; | |||
@Slf4j @Accessors(chain=true) | |||
public class MultiUnderflowHandler extends SimpleDaemon { | |||
public class MultiUnderflowHandler { | |||
private final static Map<String, MultiUnderflowHandler> handlers = new ConcurrentHashMap<>(); | |||
private final MultiUnderflowHandlerMonitor monitor; | |||
@Getter @Setter private static long checkInterval = SECONDS.toMillis(20); | |||
private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); | |||
@Override protected long getSleepTime() { return checkInterval; } | |||
@Override protected void process() { | |||
if (log.isTraceEnabled()) log.trace("process: examining "+handlers.size()+" underflow handlers"); | |||
for (Iterator<MultiUnderflowHandler> iter = handlers.values().iterator(); iter.hasNext(); ) { | |||
final MultiUnderflowHandler underflow = iter.next(); | |||
if (underflow.closed()) { | |||
if (log.isDebugEnabled()) log.debug("process: removing closed handler: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||
iter.remove(); | |||
} else if (underflow.getLastRead() > 0 && !underflow.getThread().isAlive()) { | |||
if (log.isDebugEnabled()) log.debug("process: removing dead thread: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||
iter.remove(); | |||
} else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { | |||
iter.remove(); | |||
if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||
terminate(underflow.getThread(), TERMINATE_TIMEOUT); | |||
} | |||
} | |||
public MultiUnderflowHandler (MultiUnderflowHandlerMonitor m) { | |||
this.monitor = m; | |||
m.start(); | |||
} | |||
public MultiUnderflowHandler () { start(); } | |||
public MultiUnderflowHandler () { this(MultiUnderflowHandlerMonitor.DEFAULT_UNDERFLOW_MONITOR); } | |||
@Getter private final String id = randomUUID().toString(); | |||
@Getter @Setter private String handlerName; | |||
@@ -70,7 +45,7 @@ public class MultiUnderflowHandler extends SimpleDaemon { | |||
if (thread == null) { | |||
thread = Thread.currentThread(); | |||
lastRead = now(); | |||
handlers.put(id, this); | |||
monitor.register(this); | |||
} | |||
if (firstUnderflow == 0) { | |||
if (log.isDebugEnabled()) log.debug(handlerName+": first data underflow"); | |||
@@ -89,7 +64,7 @@ public class MultiUnderflowHandler extends SimpleDaemon { | |||
lastRead = now(); | |||
if (thread == null) { | |||
thread = Thread.currentThread(); | |||
handlers.put(id, this); | |||
monitor.register(this); | |||
} | |||
firstUnderflow = 0; | |||
underflowSleep = minUnderflowSleep; | |||
@@ -99,7 +74,7 @@ public class MultiUnderflowHandler extends SimpleDaemon { | |||
if (!closed()) { | |||
closed.set(true); | |||
if (log.isDebugEnabled()) log.debug(handlerName + ": closing"); | |||
handlers.remove(id); | |||
monitor.unregister(this); | |||
} | |||
} | |||
@@ -0,0 +1,52 @@ | |||
package org.cobbzilla.util.io.multi; | |||
import lombok.Getter; | |||
import lombok.Setter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.daemon.SimpleDaemon; | |||
import java.util.Iterator; | |||
import java.util.Map; | |||
import java.util.concurrent.ConcurrentHashMap; | |||
import static java.util.concurrent.TimeUnit.SECONDS; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.now; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.terminate; | |||
@Slf4j | |||
public class MultiUnderflowHandlerMonitor extends SimpleDaemon { | |||
public static final MultiUnderflowHandlerMonitor DEFAULT_UNDERFLOW_MONITOR = new MultiUnderflowHandlerMonitor(); | |||
private final Map<String, MultiUnderflowHandler> handlers = new ConcurrentHashMap<>(); | |||
@Getter @Setter private static long checkInterval = SECONDS.toMillis(20); | |||
private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2); | |||
@Override protected long getSleepTime() { return checkInterval; } | |||
@Override protected void process() { | |||
if (log.isTraceEnabled()) log.trace("process: examining "+handlers.size()+" underflow handlers"); | |||
for (Iterator<MultiUnderflowHandler> iter = handlers.values().iterator(); iter.hasNext(); ) { | |||
final MultiUnderflowHandler underflow = iter.next(); | |||
if (underflow.closed()) { | |||
if (log.isDebugEnabled()) log.debug("process: removing closed handler: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||
iter.remove(); | |||
} else if (underflow.getLastRead() > 0 && !underflow.getThread().isAlive()) { | |||
if (log.isDebugEnabled()) log.debug("process: removing dead thread: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||
iter.remove(); | |||
} else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { | |||
iter.remove(); | |||
if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); | |||
terminate(underflow.getThread(), TERMINATE_TIMEOUT); | |||
} | |||
} | |||
} | |||
public void register(MultiUnderflowHandler underflow) { handlers.put(underflow.getId(), underflow); } | |||
public void unregister(MultiUnderflowHandler underflow) { handlers.remove(underflow.getId()); } | |||
} |
@@ -5,7 +5,7 @@ import org.apache.commons.io.IOUtils; | |||
import org.cobbzilla.util.io.BlockedInputStream; | |||
import org.cobbzilla.util.io.multi.MultiReader; | |||
import org.cobbzilla.util.io.multi.MultiStream; | |||
import org.cobbzilla.util.io.multi.MultiUnderflowHandler; | |||
import org.cobbzilla.util.io.multi.MultiUnderflowHandlerMonitor; | |||
import org.junit.Test; | |||
import java.io.*; | |||
@@ -192,7 +192,7 @@ public class RegexFilterReaderTest { | |||
final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes()); | |||
final InputStream stream3 = new BlockedInputStream(); | |||
MultiUnderflowHandler.setCheckInterval(1000); | |||
MultiUnderflowHandlerMonitor.setCheckInterval(1000); | |||
final MultiStream multiStream = new MultiStream(stream1); | |||
multiStream.getUnderflow() | |||
.setMinUnderflowSleep(1000) | |||