Browse Source

allow FilterInputStreamViaOutputStream to remember a name for logging

feature/named_wrapped_streams
Jonathan Cobb 4 years ago
parent
commit
c2c9d47891
2 changed files with 48 additions and 24 deletions
  1. +10
    -4
      src/main/java/org/cobbzilla/util/http/HttpContentEncodingType.java
  2. +38
    -20
      src/main/java/org/cobbzilla/util/io/FilterInputStreamViaOutputStream.java

+ 10
- 4
src/main/java/org/cobbzilla/util/http/HttpContentEncodingType.java View File

@@ -19,7 +19,7 @@ public enum HttpContentEncodingType {


deflate (in -> new InflaterInputStream(in, new Inflater(true)), deflate (in -> new InflaterInputStream(in, new Inflater(true)),
out -> new DeflaterOutputStream(out, new Deflater(7, true)), out -> new DeflaterOutputStream(out, new Deflater(7, true)),
in -> new FilterInputStreamViaOutputStream(in, out -> new InflaterOutputStream(out, new Inflater(true)))),
(in, name) -> new FilterInputStreamViaOutputStream(in, out -> new InflaterOutputStream(out, new Inflater(true)), name)),


br (BrotliInputStream::new, BrotliOutputStream::new, BrotliOutputStream.class), br (BrotliInputStream::new, BrotliOutputStream::new, BrotliOutputStream.class),
bro (BrotliInputStream::new, BrotliOutputStream::new, BrotliOutputStream.class); bro (BrotliInputStream::new, BrotliOutputStream::new, BrotliOutputStream.class);
@@ -35,7 +35,7 @@ public enum HttpContentEncodingType {
Class<? extends OutputStream> inAsOutClass) { Class<? extends OutputStream> inAsOutClass) {
this.inputWrapper = inWrap; this.inputWrapper = inWrap;
this.outputWrapper = outWrap; this.outputWrapper = outWrap;
this.inputAsOutputWrapper = in -> new FilterInputStreamViaOutputStream(in, inAsOutClass);
this.inputAsOutputWrapper = (in, name) -> new FilterInputStreamViaOutputStream(in, inAsOutClass, name);
} }


@JsonCreator public static HttpContentEncodingType fromString (String v) { return valueOf(v.toLowerCase()); } @JsonCreator public static HttpContentEncodingType fromString (String v) { return valueOf(v.toLowerCase()); }
@@ -44,7 +44,13 @@ public enum HttpContentEncodingType {


public OutputStream wrapOutput(OutputStream out) throws IOException { return outputWrapper.wrap(out); } public OutputStream wrapOutput(OutputStream out) throws IOException { return outputWrapper.wrap(out); }


public FilterInputStreamViaOutputStream wrapInputAsOutput(InputStream in) throws IOException { return inputAsOutputWrapper.wrap(in); }
public FilterInputStreamViaOutputStream wrapInputAsOutput(InputStream in) throws IOException {
return inputAsOutputWrapper.wrap(in, null);
}

public FilterInputStreamViaOutputStream wrapInputAsOutput(InputStream in, String name) throws IOException {
return inputAsOutputWrapper.wrap(in, name);
}


public interface HttpContentEncodingInputWrapper { public interface HttpContentEncodingInputWrapper {
InputStream wrap(InputStream in) throws IOException; InputStream wrap(InputStream in) throws IOException;
@@ -55,7 +61,7 @@ public enum HttpContentEncodingType {
} }


public interface HttpContentEncodingInputAsOutputWrapper { public interface HttpContentEncodingInputAsOutputWrapper {
FilterInputStreamViaOutputStream wrap(InputStream in) throws IOException;
FilterInputStreamViaOutputStream wrap(InputStream in, String name) throws IOException;
} }


} }

+ 38
- 20
src/main/java/org/cobbzilla/util/io/FilterInputStreamViaOutputStream.java View File

@@ -18,53 +18,70 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement
private static final int DEFAULT_COPY_BUFFER_SIZE = (int) (8 * Bytes.KB); private static final int DEFAULT_COPY_BUFFER_SIZE = (int) (8 * Bytes.KB);
private static final long THREAD_TERMINATE_TIMEOUT = TimeUnit.SECONDS.toMillis(10); private static final long THREAD_TERMINATE_TIMEOUT = TimeUnit.SECONDS.toMillis(10);


private InputStream in;
private PipedOutputStream pipeOut;
private OutputStream out;
private Thread thread;
private final InputStream in;
private final String name;
private final PipedOutputStream pipeOut;
private final OutputStream out;
private final Thread thread;


public Class<? extends OutputStream> getOutputStreamClass () { return out == null ? null : out.getClass(); } public Class<? extends OutputStream> getOutputStreamClass () { return out == null ? null : out.getClass(); }


public FilterInputStreamViaOutputStream(InputStream in, Class<? extends OutputStream> outStreamClass) { public FilterInputStreamViaOutputStream(InputStream in, Class<? extends OutputStream> outStreamClass) {
this(in, outStreamClass, null);
}

public FilterInputStreamViaOutputStream(InputStream in, Class<? extends OutputStream> outStreamClass, String name) {
super(DEFAULT_PIPE_BUFFER_SIZE); super(DEFAULT_PIPE_BUFFER_SIZE);
this.in = in; this.in = in;
this.name = name;
try { try {
this.pipeOut = new PipedOutputStream(this);
pipeOut = new PipedOutputStream(this);
} catch (Exception e) { } catch (Exception e) {
die("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e));
throw new IllegalStateException("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e));
} }
this.out = instantiate(outStreamClass, this.pipeOut);
start();
out = instantiate(outStreamClass, pipeOut);
thread = start();
if (log.isDebugEnabled()) log.debug(logPrefix()+"started based on in="+in.getClass().getSimpleName());
} }


public FilterInputStreamViaOutputStream(InputStream in, Function<OutputStream, OutputStream> outFactory) { public FilterInputStreamViaOutputStream(InputStream in, Function<OutputStream, OutputStream> outFactory) {
this(in, outFactory, null);
}

public FilterInputStreamViaOutputStream(InputStream in, Function<OutputStream, OutputStream> outFactory, String name) {
super(DEFAULT_PIPE_BUFFER_SIZE); super(DEFAULT_PIPE_BUFFER_SIZE);
this.in = in; this.in = in;
this.name = name;
try { try {
this.pipeOut = new PipedOutputStream(this);
pipeOut = new PipedOutputStream(this);
} catch (Exception e) { } catch (Exception e) {
die("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e));
throw new IllegalStateException("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e));
} }
try { try {
this.out = outFactory.apply(this.pipeOut);
out = outFactory.apply(pipeOut);
} catch (Exception e) { } catch (Exception e) {
die("FilterInputStreamViaOutputStream: error creating out: "+shortError(e));
throw new IllegalStateException("FilterInputStreamViaOutputStream: error creating out: "+shortError(e));
} }
start();
thread = start();
if (log.isDebugEnabled()) log.debug(logPrefix()+"started based on in="+in.getClass().getSimpleName());
} }


private void start() {
this.thread = new Thread(this);
this.thread.setName(getClass().getSimpleName()+"/"+System.identityHashCode(this));
this.thread.setDaemon(true);
this.thread.start();
private Thread start() {
final Thread t = new Thread(this);
t.setName(!empty(name) ? name : getClass().getSimpleName()+"/"+System.identityHashCode(this));
t.setDaemon(true);
t.start();
return t;
} }


private String logPrefix() { return (thread != null ? thread.getName() : !empty(name) ? name : "no-name")+": "; }

@Override public void run() { @Override public void run() {
try { try {
final byte[] buf = new byte[DEFAULT_COPY_BUFFER_SIZE]; final byte[] buf = new byte[DEFAULT_COPY_BUFFER_SIZE];
int bytesRead; int bytesRead;
while ((bytesRead = in.read(buf)) >= 0) { while ((bytesRead = in.read(buf)) >= 0) {
if (log.isDebugEnabled()) log.debug(logPrefix()+"run: pumping "+bytesRead+" bytes read from in -> out");
out.write(buf, 0, bytesRead); out.write(buf, 0, bytesRead);
} }
out.flush(); out.flush();
@@ -80,6 +97,7 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement
} }


@Override public void close() { @Override public void close() {
if (log.isDebugEnabled()) log.debug(logPrefix()+"close");
try { try {
super.close(); super.close();
} catch (Exception e) { } catch (Exception e) {
@@ -88,8 +106,8 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement
closeQuietly(in); closeQuietly(in);
closeQuietly(out); closeQuietly(out);
closeQuietly(pipeOut); closeQuietly(pipeOut);
if (this.thread.isAlive()) {
background(() -> terminate(this.thread, THREAD_TERMINATE_TIMEOUT));
if (thread.isAlive()) {
background(() -> terminate(thread, THREAD_TERMINATE_TIMEOUT));
} }
} }
} }

Loading…
Cancel
Save