diff --git a/src/main/java/org/cobbzilla/util/http/HttpContentEncodingType.java b/src/main/java/org/cobbzilla/util/http/HttpContentEncodingType.java index 834d1a9..f620790 100644 --- a/src/main/java/org/cobbzilla/util/http/HttpContentEncodingType.java +++ b/src/main/java/org/cobbzilla/util/http/HttpContentEncodingType.java @@ -19,7 +19,7 @@ public enum HttpContentEncodingType { deflate (in -> new InflaterInputStream(in, new Inflater(true)), out -> new DeflaterOutputStream(out, new Deflater(7, true)), - in -> new FilterInputStreamViaOutputStream(in, out -> new InflaterOutputStream(out, new Inflater(true)))), + (in, name) -> new FilterInputStreamViaOutputStream(in, out -> new InflaterOutputStream(out, new Inflater(true)), name)), br (BrotliInputStream::new, BrotliOutputStream::new, BrotliOutputStream.class), bro (BrotliInputStream::new, BrotliOutputStream::new, BrotliOutputStream.class); @@ -35,7 +35,7 @@ public enum HttpContentEncodingType { Class inAsOutClass) { this.inputWrapper = inWrap; this.outputWrapper = outWrap; - this.inputAsOutputWrapper = in -> new FilterInputStreamViaOutputStream(in, inAsOutClass); + this.inputAsOutputWrapper = (in, name) -> new FilterInputStreamViaOutputStream(in, inAsOutClass, name); } @JsonCreator public static HttpContentEncodingType fromString (String v) { return valueOf(v.toLowerCase()); } @@ -44,7 +44,13 @@ public enum HttpContentEncodingType { public OutputStream wrapOutput(OutputStream out) throws IOException { return outputWrapper.wrap(out); } - public FilterInputStreamViaOutputStream wrapInputAsOutput(InputStream in) throws IOException { return inputAsOutputWrapper.wrap(in); } + public FilterInputStreamViaOutputStream wrapInputAsOutput(InputStream in) throws IOException { + return inputAsOutputWrapper.wrap(in, null); + } + + public FilterInputStreamViaOutputStream wrapInputAsOutput(InputStream in, String name) throws IOException { + return inputAsOutputWrapper.wrap(in, name); + } public interface HttpContentEncodingInputWrapper { InputStream wrap(InputStream in) throws IOException; @@ -55,7 +61,7 @@ public enum HttpContentEncodingType { } public interface HttpContentEncodingInputAsOutputWrapper { - FilterInputStreamViaOutputStream wrap(InputStream in) throws IOException; + FilterInputStreamViaOutputStream wrap(InputStream in, String name) throws IOException; } } diff --git a/src/main/java/org/cobbzilla/util/io/FilterInputStreamViaOutputStream.java b/src/main/java/org/cobbzilla/util/io/FilterInputStreamViaOutputStream.java index bf2f6db..7518d51 100644 --- a/src/main/java/org/cobbzilla/util/io/FilterInputStreamViaOutputStream.java +++ b/src/main/java/org/cobbzilla/util/io/FilterInputStreamViaOutputStream.java @@ -18,53 +18,70 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement private static final int DEFAULT_COPY_BUFFER_SIZE = (int) (8 * Bytes.KB); private static final long THREAD_TERMINATE_TIMEOUT = TimeUnit.SECONDS.toMillis(10); - private InputStream in; - private PipedOutputStream pipeOut; - private OutputStream out; - private Thread thread; + private final InputStream in; + private final String name; + private final PipedOutputStream pipeOut; + private final OutputStream out; + private final Thread thread; public Class getOutputStreamClass () { return out == null ? null : out.getClass(); } public FilterInputStreamViaOutputStream(InputStream in, Class outStreamClass) { + this(in, outStreamClass, null); + } + + public FilterInputStreamViaOutputStream(InputStream in, Class outStreamClass, String name) { super(DEFAULT_PIPE_BUFFER_SIZE); this.in = in; + this.name = name; try { - this.pipeOut = new PipedOutputStream(this); + pipeOut = new PipedOutputStream(this); } catch (Exception e) { - die("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e)); + throw new IllegalStateException("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e)); } - this.out = instantiate(outStreamClass, this.pipeOut); - start(); + out = instantiate(outStreamClass, pipeOut); + thread = start(); + if (log.isDebugEnabled()) log.debug(logPrefix()+"started based on in="+in.getClass().getSimpleName()); } public FilterInputStreamViaOutputStream(InputStream in, Function outFactory) { + this(in, outFactory, null); + } + + public FilterInputStreamViaOutputStream(InputStream in, Function outFactory, String name) { super(DEFAULT_PIPE_BUFFER_SIZE); this.in = in; + this.name = name; try { - this.pipeOut = new PipedOutputStream(this); + pipeOut = new PipedOutputStream(this); } catch (Exception e) { - die("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e)); + throw new IllegalStateException("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e)); } try { - this.out = outFactory.apply(this.pipeOut); + out = outFactory.apply(pipeOut); } catch (Exception e) { - die("FilterInputStreamViaOutputStream: error creating out: "+shortError(e)); + throw new IllegalStateException("FilterInputStreamViaOutputStream: error creating out: "+shortError(e)); } - start(); + thread = start(); + if (log.isDebugEnabled()) log.debug(logPrefix()+"started based on in="+in.getClass().getSimpleName()); } - private void start() { - this.thread = new Thread(this); - this.thread.setName(getClass().getSimpleName()+"/"+System.identityHashCode(this)); - this.thread.setDaemon(true); - this.thread.start(); + private Thread start() { + final Thread t = new Thread(this); + t.setName(!empty(name) ? name : getClass().getSimpleName()+"/"+System.identityHashCode(this)); + t.setDaemon(true); + t.start(); + return t; } + private String logPrefix() { return (thread != null ? thread.getName() : !empty(name) ? name : "no-name")+": "; } + @Override public void run() { try { final byte[] buf = new byte[DEFAULT_COPY_BUFFER_SIZE]; int bytesRead; while ((bytesRead = in.read(buf)) >= 0) { + if (log.isDebugEnabled()) log.debug(logPrefix()+"run: pumping "+bytesRead+" bytes read from in -> out"); out.write(buf, 0, bytesRead); } out.flush(); @@ -80,6 +97,7 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement } @Override public void close() { + if (log.isDebugEnabled()) log.debug(logPrefix()+"close"); try { super.close(); } catch (Exception e) { @@ -88,8 +106,8 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement closeQuietly(in); closeQuietly(out); closeQuietly(pipeOut); - if (this.thread.isAlive()) { - background(() -> terminate(this.thread, THREAD_TERMINATE_TIMEOUT)); + if (thread.isAlive()) { + background(() -> terminate(thread, THREAD_TERMINATE_TIMEOUT)); } } }