Переглянути джерело

update logs, terminate underflows quietly

tags/2.0.1
Jonathan Cobb 4 роки тому
джерело
коміт
6259eb3590
2 змінених файлів з 19 додано та 17 видалено
  1. +17
    -14
      src/main/java/org/cobbzilla/util/io/multi/MultiStream.java
  2. +2
    -3
      src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java

+ 17
- 14
src/main/java/org/cobbzilla/util/io/multi/MultiStream.java Переглянути файл

@@ -27,6 +27,7 @@ public class MultiStream extends InputStream {
addStream(r); addStream(r);
} }
currentStream = r; currentStream = r;
if (log.isInfoEnabled()) log.info(logPrefix()+"created with initial stream="+r+", last="+last);
} }


public MultiStream (InputStream r, String name) { this(r, false, name); } public MultiStream (InputStream r, String name) { this(r, false, name); }
@@ -36,29 +37,31 @@ public class MultiStream extends InputStream {
public int pendingStreamCount () { return streams.size() - streamIndex; } public int pendingStreamCount () { return streams.size() - streamIndex; }


@Override public String toString () { @Override public String toString () {
return "MultiStream{"+streams.size()+" streams, index="+streamIndex+", EOS="+endOfStreams+"}";
return "MultiStream{name="+underflow.getHandlerName()+", "+streams.size()+" streams, index="+streamIndex+", EOS="+endOfStreams+"}";
} }


private String logPrefix () { return this + ": "; }

public void addStream (InputStream in) { public void addStream (InputStream in) {
if (endOfStreams) { if (endOfStreams) {
log.warn("addStream: endOfStreams is true, not adding InputStream");
if (log.isWarnEnabled()) log.warn(logPrefix()+"addStream: endOfStreams is true, not adding InputStream");
} else { } else {
streams.add(in); streams.add(in);
if (log.isTraceEnabled()) log.trace("addStream: added stream ("+in.getClass().getSimpleName()+"). this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"addStream: added stream ("+in.getClass().getSimpleName()+")");
} }
} }


public void addLastStream (InputStream in) { public void addLastStream (InputStream in) {
addStream(in); addStream(in);
endOfStreams = true; endOfStreams = true;
if (log.isTraceEnabled()) log.trace("addLastStream: added last stream ("+in.getClass().getSimpleName()+"). this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"addLastStream: added last stream ("+in.getClass().getSimpleName()+")");
} }


@Override public int read() throws IOException { @Override public int read() throws IOException {
final int val = currentStream.read(); final int val = currentStream.read();
if (val == -1) { if (val == -1) {
if (streamIndex == streams.size()-1) { if (streamIndex == streams.size()-1) {
if (log.isTraceEnabled()) log.trace("read(byte): end of all streams? this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte): end of all streams? this="+this);
if (endOfStreams) return -1; if (endOfStreams) return -1;
underflow.handleUnderflow(); underflow.handleUnderflow();
return 0; return 0;
@@ -66,23 +69,23 @@ public class MultiStream extends InputStream {
currentStream.close(); currentStream.close();
streamIndex++; streamIndex++;
currentStream = streams.get(streamIndex); currentStream = streams.get(streamIndex);
if (log.isTraceEnabled()) log.trace("read(byte): end of all stream, advanced to next stream ("+currentStream.getClass().getSimpleName()+"). this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte): end of all stream, advanced to next stream ("+currentStream.getClass().getSimpleName()+")");
return read(); return read();


} else { } else {
if (log.isTraceEnabled()) log.trace("read(byte): one byte read. this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte): one byte read");
} }
underflow.handleSuccessfulRead(); underflow.handleSuccessfulRead();
return val; return val;
} }


@Override public int read(byte[] buf, int off, int len) throws IOException { @Override public int read(byte[] buf, int off, int len) throws IOException {
if (log.isTraceEnabled()) log.trace("read(byte[]): trying to read "+len+" bytes. this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte[]): trying to read "+len+" bytes");
final int count = currentStream.read(buf, off, len); final int count = currentStream.read(buf, off, len);
if (log.isTraceEnabled()) log.trace("read(byte[]): trying to read "+count+" bytes");
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte[]): trying to read "+count+" bytes");
if (count == -1) { if (count == -1) {
if (streamIndex == streams.size()-1) { if (streamIndex == streams.size()-1) {
if (log.isTraceEnabled()) log.trace("read(byte[]): end of all streams? this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte[]): end of all streams?");
if (endOfStreams) return -1; if (endOfStreams) return -1;
underflow.handleUnderflow(); underflow.handleUnderflow();
return 0; return 0;
@@ -90,19 +93,19 @@ public class MultiStream extends InputStream {
currentStream.close(); currentStream.close();
streamIndex++; streamIndex++;
currentStream = streams.get(streamIndex); currentStream = streams.get(streamIndex);
if (log.isTraceEnabled()) log.trace("read(byte[]): end of all stream, advanced to next stream ("+currentStream.getClass().getSimpleName()+"). this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte[]): end of all stream, advanced to next stream ("+currentStream.getClass().getSimpleName()+")");
return read(buf, off, len); return read(buf, off, len);


} else { } else {
if (log.isTraceEnabled()) log.trace("read(byte[]): "+count+" bytes read. this="+this);
if (log.isTraceEnabled()) log.trace(logPrefix()+"read(byte[]): "+count+" bytes read");
} }
underflow.handleSuccessfulRead(); underflow.handleSuccessfulRead();
return count; return count;
} }


@Override public void close() throws IOException { @Override public void close() throws IOException {
if (log.isInfoEnabled()) log.info("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName());
if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName());
if (log.isInfoEnabled()) log.info(logPrefix()+"close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName());
else if (log.isTraceEnabled()) log.trace(logPrefix()+"close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). name="+underflow.getHandlerName());
if (currentStream != null) currentStream.close(); if (currentStream != null) currentStream.close();
underflow.close(); underflow.close();
} }


+ 2
- 3
src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandlerMonitor.java Переглянути файл

@@ -11,8 +11,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function; import java.util.function.Function;


import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.cobbzilla.util.daemon.ZillaRuntime.now;
import static org.cobbzilla.util.daemon.ZillaRuntime.terminate;
import static org.cobbzilla.util.daemon.ZillaRuntime.*;


@Slf4j @Slf4j
public class MultiUnderflowHandlerMonitor extends SimpleDaemon { public class MultiUnderflowHandlerMonitor extends SimpleDaemon {
@@ -45,7 +44,7 @@ public class MultiUnderflowHandlerMonitor extends SimpleDaemon {
iter.remove(); iter.remove();
if (terminateThreadFunc == null || terminateThreadFunc.apply(underflow.getThread())) { if (terminateThreadFunc == null || terminateThreadFunc.apply(underflow.getThread())) {
if (log.isErrorEnabled()) log.error(prefix+"underflow timed out, terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); if (log.isErrorEnabled()) log.error(prefix+"underflow timed out, terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread());
terminate(underflow.getThread(), TERMINATE_TIMEOUT, terminateThreadFunc);
terminateQuietly(underflow.getThread(), TERMINATE_TIMEOUT, terminateThreadFunc);
} else { } else {
if (log.isErrorEnabled()) log.error(prefix+"underflow timed out, removing but NOT terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread()); if (log.isErrorEnabled()) log.error(prefix+"underflow timed out, removing but NOT terminating: name=" + underflow.getHandlerName() + " thread=" + underflow.getThread());
} }


Завантаження…
Відмінити
Зберегти