|
|
@@ -11,6 +11,7 @@ import java.io.IOException; |
|
|
|
import java.util.Iterator; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
|
|
|
import static java.util.UUID.randomUUID; |
|
|
|
import static java.util.concurrent.TimeUnit.SECONDS; |
|
|
@@ -29,11 +30,20 @@ public class MultiUnderflowHandler extends SimpleDaemon { |
|
|
|
@Override protected long getSleepTime() { return checkInterval; } |
|
|
|
|
|
|
|
@Override protected void process() { |
|
|
|
if (log.isTraceEnabled()) log.trace("process: examining "+handlers.size()+" underflow handlers"); |
|
|
|
for (Iterator<MultiUnderflowHandler> iter = handlers.values().iterator(); iter.hasNext(); ) { |
|
|
|
final MultiUnderflowHandler underflow = iter.next(); |
|
|
|
if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { |
|
|
|
log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); |
|
|
|
if (underflow.closed()) { |
|
|
|
if (log.isDebugEnabled()) log.debug("process: removing closed handler: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); |
|
|
|
iter.remove(); |
|
|
|
|
|
|
|
} else if (underflow.getLastRead() > 0 && !underflow.getThread().isAlive()) { |
|
|
|
if (log.isDebugEnabled()) log.debug("process: removing dead thread: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); |
|
|
|
iter.remove(); |
|
|
|
|
|
|
|
} else if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) { |
|
|
|
iter.remove(); |
|
|
|
if (log.isErrorEnabled()) log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread()); |
|
|
|
terminate(underflow.getThread(), TERMINATE_TIMEOUT); |
|
|
|
} |
|
|
|
} |
|
|
@@ -48,15 +58,18 @@ public class MultiUnderflowHandler extends SimpleDaemon { |
|
|
|
|
|
|
|
@Getter @Setter private long maxUnderflowSleep = 500; |
|
|
|
|
|
|
|
@Getter @Setter private long lastRead = now(); |
|
|
|
@Getter @Setter private long lastRead = 0; |
|
|
|
@Getter @Setter private long firstUnderflow = 0; |
|
|
|
@Getter @Setter private long underflowTimeout = SECONDS.toMillis(60); |
|
|
|
private long underflowSleep = minUnderflowSleep; |
|
|
|
@Getter private Thread thread; |
|
|
|
private final AtomicBoolean closed = new AtomicBoolean(false); |
|
|
|
public boolean closed () { return closed.get(); } |
|
|
|
|
|
|
|
public void handleUnderflow() throws IOException { |
|
|
|
if (thread == null) { |
|
|
|
thread = Thread.currentThread(); |
|
|
|
lastRead = now(); |
|
|
|
handlers.put(id, this); |
|
|
|
} |
|
|
|
if (firstUnderflow == 0) { |
|
|
@@ -73,16 +86,21 @@ public class MultiUnderflowHandler extends SimpleDaemon { |
|
|
|
} |
|
|
|
|
|
|
|
public void handleSuccessfulRead() { |
|
|
|
lastRead = now(); |
|
|
|
if (thread == null) { |
|
|
|
thread = Thread.currentThread(); |
|
|
|
handlers.put(id, this); |
|
|
|
} |
|
|
|
firstUnderflow = 0; |
|
|
|
lastRead = now(); |
|
|
|
underflowSleep = minUnderflowSleep; |
|
|
|
} |
|
|
|
|
|
|
|
public void close() { |
|
|
|
|
|
|
|
if (!closed()) { |
|
|
|
closed.set(true); |
|
|
|
if (log.isDebugEnabled()) log.debug(handlerName + ": closing"); |
|
|
|
handlers.remove(id); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |