@@ -0,0 +1,93 @@ | |||
package org.cobbzilla.util.io; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.system.Bytes; | |||
import java.io.*; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.function.Function; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.*; | |||
import static org.cobbzilla.util.reflect.ReflectionUtil.closeQuietly; | |||
import static org.cobbzilla.util.reflect.ReflectionUtil.instantiate; | |||
@Slf4j | |||
public class FilterInputStreamViaOutputStream extends PipedInputStream implements Runnable { | |||
private static final int DEFAULT_PIPE_BUFFER_SIZE = (int) (64 * Bytes.KB); | |||
private static final long THREAD_TERMINATE_TIMEOUT = TimeUnit.SECONDS.toMillis(10); | |||
private InputStream in; | |||
private PipedOutputStream pipeOut; | |||
private OutputStream out; | |||
private Thread thread; | |||
public FilterInputStreamViaOutputStream(InputStream in, Class<? extends OutputStream> outStreamClass) { | |||
super(DEFAULT_PIPE_BUFFER_SIZE); | |||
this.in = in; | |||
try { | |||
this.pipeOut = new PipedOutputStream(this); | |||
} catch (Exception e) { | |||
die("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e)); | |||
} | |||
this.out = instantiate(outStreamClass, this.pipeOut); | |||
start(); | |||
} | |||
public FilterInputStreamViaOutputStream(InputStream in, Function<OutputStream, OutputStream> outFactory) { | |||
super(DEFAULT_PIPE_BUFFER_SIZE); | |||
this.in = in; | |||
try { | |||
this.pipeOut = new PipedOutputStream(this); | |||
} catch (Exception e) { | |||
die("FilterInputStreamViaOutputStream: error creating pipeOut: "+shortError(e)); | |||
} | |||
try { | |||
this.out = outFactory.apply(this.pipeOut); | |||
} catch (Exception e) { | |||
die("FilterInputStreamViaOutputStream: error creating out: "+shortError(e)); | |||
} | |||
start(); | |||
} | |||
private void start() { | |||
this.thread = new Thread(this); | |||
this.thread.setName(getClass().getSimpleName()+"/"+System.identityHashCode(this)); | |||
this.thread.setDaemon(true); | |||
this.thread.start(); | |||
} | |||
@Override public void run() { | |||
try { | |||
int c; | |||
int counter = 0; | |||
while((c = in.read()) >= 0) { | |||
out.write(c); | |||
counter++; | |||
} | |||
out.flush(); | |||
} catch (IOException e) { | |||
log.error("run: error copying bytes: "+shortError(e)); | |||
throw new RuntimeException(e); | |||
} finally { | |||
closeQuietly(out); | |||
} | |||
} | |||
@Override public void close() { | |||
log.info("close called from "+stacktrace()); | |||
try { | |||
super.close(); | |||
} catch (Exception e) { | |||
log.warn("close: error closing pipeIn: "+shortError(e)); | |||
} | |||
closeQuietly(in); | |||
closeQuietly(out); | |||
closeQuietly(pipeOut); | |||
if (this.thread.isAlive()) { | |||
background(() -> terminate(this.thread, THREAD_TERMINATE_TIMEOUT)); | |||
} | |||
} | |||
} |
@@ -0,0 +1,57 @@ | |||
package org.cobbzilla.util.io; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.apache.commons.io.IOUtils; | |||
import org.apache.commons.lang3.RandomStringUtils; | |||
import org.cobbzilla.util.system.Bytes; | |||
import org.junit.Test; | |||
import java.io.ByteArrayInputStream; | |||
import java.io.ByteArrayOutputStream; | |||
import java.io.InputStream; | |||
import java.io.OutputStream; | |||
import java.util.zip.GZIPInputStream; | |||
import java.util.zip.GZIPOutputStream; | |||
import static org.junit.Assert.assertArrayEquals; | |||
import static org.junit.Assert.assertEquals; | |||
@Slf4j | |||
public class FilterInputStreamViaOutputStreamTest { | |||
@Test public void testGzipFilterInputStream () throws Exception { | |||
final String testData = RandomStringUtils.random((int) (256 * Bytes.KB)); | |||
log.info("testData has "+testData.getBytes().length+ " bytes"); | |||
// compress testData to byte array the normal way, using a GZIPOutputStream | |||
final ByteArrayOutputStream expected = new ByteArrayOutputStream(); | |||
try (OutputStream gzout = new GZIPOutputStream(expected)) { | |||
IOUtils.copyLarge(new ByteArrayInputStream(testData.getBytes()), gzout); | |||
} | |||
log.info("expected byte array has "+expected.toByteArray().length+" bytes"); | |||
// sanity check that standard decompression gets us back to where we started | |||
final ByteArrayOutputStream check = new ByteArrayOutputStream(); | |||
try (InputStream checkIn = new GZIPInputStream(new ByteArrayInputStream(expected.toByteArray()))) { | |||
IOUtils.copyLarge(checkIn, check); | |||
} | |||
assertArrayEquals("uncompressed data did not match testData", check.toByteArray(), testData.getBytes()); | |||
log.info("(check) uncompressed data has "+check.toByteArray().length+" bytes"); | |||
// Read uncompressed data using FilterInputStreamViaOutputStream, should yield same compressed bytes as the buffer | |||
final FilterInputStreamViaOutputStream filter = new FilterInputStreamViaOutputStream(new ByteArrayInputStream(testData.getBytes()), GZIPOutputStream.class); | |||
final ByteArrayOutputStream actual = new ByteArrayOutputStream(); | |||
final long copied = IOUtils.copyLarge(filter, actual); | |||
log.info("copied "+copied+" bytes, actual has "+actual.toByteArray().length+" bytes"); | |||
// Decompress what we just read, we should end up back at testData | |||
final ByteArrayOutputStream finalCheck = new ByteArrayOutputStream(); | |||
try (InputStream finalCheckIn = new GZIPInputStream(new ByteArrayInputStream(actual.toByteArray()))) { | |||
IOUtils.copyLarge(finalCheckIn, finalCheck); | |||
} | |||
assertEquals("testData was not preserved", testData, new String(finalCheck.toByteArray())); | |||
} | |||
} |