Sfoglia il codice sorgente

improve support for underflow/hung multi streams

tags/2.0.1
Jonathan Cobb 4 anni fa
parent
commit
184d01f38d
7 ha cambiato i file con 242 aggiunte e 15 eliminazioni
  1. +1
    -1
      src/main/java/org/cobbzilla/util/daemon/SimpleDaemon.java
  2. +21
    -0
      src/main/java/org/cobbzilla/util/io/BlockedInputStream.java
  3. +7
    -1
      src/main/java/org/cobbzilla/util/io/multi/MultiReader.java
  4. +19
    -4
      src/main/java/org/cobbzilla/util/io/multi/MultiStream.java
  5. +88
    -0
      src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java
  6. +13
    -0
      src/main/java/org/cobbzilla/util/io/regex/MultiUnderflowException.java
  7. +93
    -9
      src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java

+ 1
- 1
src/main/java/org/cobbzilla/util/daemon/SimpleDaemon.java Vedi File

@@ -17,7 +17,7 @@ public abstract class SimpleDaemon implements Runnable {

public SimpleDaemon (String name) { this.name = name; }

@Getter private String name;
@Getter private final String name;
@Getter private long lastProcessTime = 0;

private volatile Thread mainThread = null;


+ 21
- 0
src/main/java/org/cobbzilla/util/io/BlockedInputStream.java Vedi File

@@ -0,0 +1,21 @@
package org.cobbzilla.util.io;

import java.io.IOException;
import java.io.InputStream;

import static java.util.concurrent.TimeUnit.DAYS;
import static org.cobbzilla.util.system.Sleep.sleep;

public class BlockedInputStream extends InputStream {

@Override public int read() throws IOException {
sleep(DAYS.toMillis(100), "blocking");
return -1;
}

@Override public int read(byte[] b, int off, int len) throws IOException {
sleep(DAYS.toMillis(100), "blocking");
return super.read(b, off, len);
}

}

+ 7
- 1
src/main/java/org/cobbzilla/util/io/multi/MultiReader.java Vedi File

@@ -1,5 +1,6 @@
package org.cobbzilla.util.io.multi;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
@@ -16,6 +17,7 @@ public class MultiReader extends Reader {
private Reader currentReader;
private int readerIndex = 0;
private boolean endOfReaders = false;
@Getter private final MultiUnderflowHandler underflow = new MultiUnderflowHandler();

public MultiReader (Reader r, boolean last) {
if (last) {
@@ -53,17 +55,21 @@ public class MultiReader extends Reader {
int count = currentReader.read(buf, off, len);
if (count == -1) {
if (readerIndex == readers.size()-1) {
return endOfReaders ? -1 : 0;
if (endOfReaders) return -1;
underflow.handleUnderflow();
return 0;
}
currentReader.close();
readerIndex++;
currentReader = readers.get(readerIndex);
return read(buf, off, len);
}
underflow.handleSuccessfulRead();
return count;
}

@Override public void close() throws IOException {
if (currentReader != null) currentReader.close();
underflow.close();
}
}

+ 19
- 4
src/main/java/org/cobbzilla/util/io/multi/MultiStream.java Vedi File

@@ -1,5 +1,6 @@
package org.cobbzilla.util.io.multi;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
@@ -14,8 +15,12 @@ public class MultiStream extends InputStream {
private InputStream currentStream;
private int streamIndex = 0;
private boolean endOfStreams = false;
@Getter private final MultiUnderflowHandler underflow = new MultiUnderflowHandler();

public MultiStream (InputStream r, boolean last) {
public MultiStream (InputStream r, boolean last) { this(r, last, "no-name"); }

public MultiStream (InputStream r, boolean last, String name) {
underflow.setHandlerName(name);
if (last) {
addLastStream(r);
} else {
@@ -24,10 +29,12 @@ public class MultiStream extends InputStream {
currentStream = r;
}

public int pendingStreamCount () { return streams.size() - streamIndex; }
public MultiStream (InputStream r, String name) { this(r, false, name); }

public MultiStream (InputStream r) { this(r, false); }

public int pendingStreamCount () { return streams.size() - streamIndex; }

@Override public String toString () {
return "MultiStream{"+streams.size()+" streams, index="+streamIndex+", EOS="+endOfStreams+"}";
}
@@ -52,7 +59,9 @@ public class MultiStream extends InputStream {
if (val == -1) {
if (streamIndex == streams.size()-1) {
if (log.isTraceEnabled()) log.trace("read(byte): end of all streams? this="+this);
return endOfStreams ? -1 : 0;
if (endOfStreams) return -1;
underflow.handleUnderflow();
return 0;
}
currentStream.close();
streamIndex++;
@@ -63,16 +72,20 @@ public class MultiStream extends InputStream {
} else {
if (log.isTraceEnabled()) log.trace("read(byte): one byte read. this="+this);
}
underflow.handleSuccessfulRead();
return val;
}

@Override public int read(byte[] buf, int off, int len) throws IOException {
if (log.isTraceEnabled()) log.trace("read(byte[]): trying to read "+len+" bytes. this="+this);
final int count = currentStream.read(buf, off, len);
log.error("read: got "+count+" bytes");
if (count == -1) {
if (streamIndex == streams.size()-1) {
if (log.isTraceEnabled()) log.trace("read(byte[]): end of all streams? this="+this);
return endOfStreams ? -1 : 0;
if (endOfStreams) return -1;
underflow.handleUnderflow();
return 0;
}
currentStream.close();
streamIndex++;
@@ -83,12 +96,14 @@ public class MultiStream extends InputStream {
} else {
if (log.isTraceEnabled()) log.trace("read(byte[]): "+count+" bytes read. this="+this);
}
underflow.handleSuccessfulRead();
return count;
}

@Override public void close() throws IOException {
if (log.isTraceEnabled()) log.trace("close: closing current stream ("+(currentStream == null ? "null" : currentStream.getClass().getSimpleName())+"). this="+this);
if (currentStream != null) currentStream.close();
underflow.close();
}

}

+ 88
- 0
src/main/java/org/cobbzilla/util/io/multi/MultiUnderflowHandler.java Vedi File

@@ -0,0 +1,88 @@
package org.cobbzilla.util.io.multi;

import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.cobbzilla.util.daemon.SimpleDaemon;
import org.cobbzilla.util.io.regex.MultiUnderflowException;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.UUID.randomUUID;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.cobbzilla.util.daemon.ZillaRuntime.now;
import static org.cobbzilla.util.daemon.ZillaRuntime.terminate;
import static org.cobbzilla.util.system.Sleep.sleep;

@Slf4j @Accessors(chain=true)
public class MultiUnderflowHandler extends SimpleDaemon {

private final static Map<String, MultiUnderflowHandler> handlers = new ConcurrentHashMap<>();

@Getter @Setter private static long checkInterval = SECONDS.toMillis(20);
private static final long TERMINATE_TIMEOUT = SECONDS.toMillis(2);

@Override protected long getSleepTime() { return checkInterval; }

@Override protected void process() {
for (Iterator<MultiUnderflowHandler> iter = handlers.values().iterator(); iter.hasNext(); ) {
final MultiUnderflowHandler underflow = iter.next();
if (now() - underflow.getLastRead() > underflow.getUnderflowTimeout()) {
log.error("process: underflow timed out, terminating: name="+underflow.getHandlerName()+" thread="+underflow.getThread());
iter.remove();
terminate(underflow.getThread(), TERMINATE_TIMEOUT);
}
}
}

public MultiUnderflowHandler () { start(); }

@Getter private final String id = randomUUID().toString();
@Getter @Setter private String handlerName;
@Getter private long minUnderflowSleep = 10;
public MultiUnderflowHandler setMinUnderflowSleep(long s) { underflowSleep = minUnderflowSleep = s; return this; }

@Getter @Setter private long maxUnderflowSleep = 500;

@Getter @Setter private long lastRead = now();
@Getter @Setter private long firstUnderflow = 0;
@Getter @Setter private long underflowTimeout = SECONDS.toMillis(60);
private long underflowSleep = minUnderflowSleep;
@Getter private Thread thread;

public void handleUnderflow() throws IOException {
if (thread == null) {
thread = Thread.currentThread();
handlers.put(id, this);
}
if (firstUnderflow == 0) {
if (log.isDebugEnabled()) log.debug(handlerName+": first data underflow");
firstUnderflow = now();
} else if (now() - firstUnderflow > underflowTimeout) {
if (log.isErrorEnabled()) log.error(handlerName+": underflow timeout, throwing MultiUnderflowException");
throw new MultiUnderflowException(handlerName);
}
if (log.isDebugEnabled()) log.debug(handlerName+": data underflow, sleeping for "+ underflowSleep);
sleep(underflowSleep);
underflowSleep *= 2;
if (underflowSleep > maxUnderflowSleep) underflowSleep = maxUnderflowSleep;
}

public void handleSuccessfulRead() {
if (thread == null) {
thread = Thread.currentThread();
handlers.put(id, this);
}
firstUnderflow = 0;
lastRead = now();
underflowSleep = minUnderflowSleep;
}

public void close() {

}
}

+ 13
- 0
src/main/java/org/cobbzilla/util/io/regex/MultiUnderflowException.java Vedi File

@@ -0,0 +1,13 @@
package org.cobbzilla.util.io.regex;

import lombok.AllArgsConstructor;
import lombok.Getter;

import java.io.IOException;

@AllArgsConstructor
public class MultiUnderflowException extends IOException {

@Getter private final String name;

}

+ 93
- 9
src/test/java/org/cobbzilla/util/io/regex/RegexFilterReaderTest.java Vedi File

@@ -1,20 +1,23 @@
package org.cobbzilla.util.io.regex;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.cobbzilla.util.io.BlockedInputStream;
import org.cobbzilla.util.io.multi.MultiReader;
import org.cobbzilla.util.io.multi.MultiStream;
import org.cobbzilla.util.io.multi.MultiUnderflowHandler;
import org.junit.Test;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.*;
import java.util.concurrent.atomic.AtomicReference;

import static org.cobbzilla.util.daemon.ZillaRuntime.background;
import static org.cobbzilla.util.daemon.ZillaRuntime.die;
import static org.cobbzilla.util.io.regex.RegexReplacementFilter.DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH;
import static org.cobbzilla.util.system.Sleep.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.*;

@Slf4j
public class RegexFilterReaderTest {

public static final String TEST_STRING_1 = "this is a string\nand another string with a lone a near the end\nfoo.";
@@ -43,7 +46,8 @@ public class RegexFilterReaderTest {

@Test public void testRegexReaderIncludeMatch() throws Exception {
final Reader reader = new StringReader(TEST_STRING_INCLUDE_MATCH);
final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter("<html\\s+[^>]*>", "!INSERTED_DATA");
final RegexStreamFilter regexStreamFilter
= new RegexReplacementFilter("<html\\s+[^>]*>", DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH+"INSERTED_DATA");
final RegexFilterReader regexFilterReader = new RegexFilterReader(reader, 1024, regexStreamFilter);
final StringWriter result = new StringWriter();
IOUtils.copyLarge(regexFilterReader, result);
@@ -55,7 +59,8 @@ public class RegexFilterReaderTest {

@Test public void testRegexReaderIncludeMatchInMiddle() throws Exception {
final Reader reader = new StringReader(TEST_STRING_INCLUDE_MATCH_MIDDLE);
final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter("<html\\s+[^>]*>", "INSERTED_!DATA");
final RegexStreamFilter regexStreamFilter
= new RegexReplacementFilter("<html\\s+[^>]*>", "INSERTED_"+DEFAULT_PREFIX_REPLACEMENT_WITH_MATCH+"DATA");
final RegexFilterReader regexFilterReader = new RegexFilterReader(reader, 1024, regexStreamFilter);
final StringWriter result = new StringWriter();
IOUtils.copyLarge(regexFilterReader, result);
@@ -139,4 +144,83 @@ public class RegexFilterReaderTest {
t.join(1000);
assertFalse("Expected copy thread to finish", t.isAlive());
assertEquals("multi reader failed to get expected output", EXPECTED_MULTI_RESULT2, result.toString());
}}
}

@Test public void testMultiReaderUnderflow() throws Exception {
final StringReader reader1 = new StringReader("some test data1 ".repeat(1000));
final StringReader reader2 = new StringReader("some test data2 ".repeat(1000));
final StringReader reader3 = new StringReader("some test data3 ".repeat(1000));

final MultiReader multiReader = new MultiReader(reader1);
multiReader.getUnderflow()
.setMinUnderflowSleep(1000)
.setMaxUnderflowSleep(1000)
.setUnderflowTimeout(5000);

final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter(" test ", " bogus ");
final RegexFilterReader regexFilterReader = new RegexFilterReader(multiReader, 8, regexStreamFilter);
final StringWriter result = new StringWriter();

final AtomicReference<Exception> exRef = new AtomicReference<>(null);
final Thread t = background(() -> {
try {
IOUtils.copyLarge(regexFilterReader, result);
} catch (IOException e) {
exRef.set(e);
}
});

sleep(multiReader.getUnderflow().getMaxUnderflowSleep());
log.info("adding reader2...");
multiReader.addReader(reader2);
log.info("added reader2...");
sleep(multiReader.getUnderflow().getUnderflowTimeout()*2);
log.info("adding reader3...");
multiReader.addReader(reader3);
log.info("added reader3...");

t.join(multiReader.getUnderflow().getUnderflowTimeout()+100);
assertFalse("Expected copy thread to finish", t.isAlive());
assertNotNull("Expected copy thread to have an exception", exRef.get());
assertTrue("expected multi reader failed to get data1 output", result.toString().contains(" bogus data1 "));
assertTrue("expected multi reader failed to get data2 output", result.toString().contains(" bogus data2 "));
assertFalse("expected multi reader failed to NOT get data3 output", result.toString().contains(" bogus data3 "));
}

@Test public void testMultiStreamUnderflow() throws Exception {
final InputStream stream1 = new ByteArrayInputStream("some test data1 ".repeat(1000).getBytes());
final InputStream stream2 = new ByteArrayInputStream("some test data2 ".repeat(1000).getBytes());
final InputStream stream3 = new BlockedInputStream();

MultiUnderflowHandler.setCheckInterval(1000);
final MultiStream multiStream = new MultiStream(stream1);
multiStream.getUnderflow()
.setMinUnderflowSleep(1000)
.setMaxUnderflowSleep(1000)
.setUnderflowTimeout(5000);
multiStream.addStream(stream2);
multiStream.addStream(stream3);

final RegexStreamFilter regexStreamFilter = new RegexReplacementFilter(" test ", " bogus ");
final RegexFilterReader regexFilterReader = new RegexFilterReader(multiStream, 8, regexStreamFilter);
final StringWriter result = new StringWriter();

final AtomicReference<Exception> exRef = new AtomicReference<>(null);
final Thread t = background(() -> {
try {
IOUtils.copyLarge(regexFilterReader, result);
} catch (Exception e) {
exRef.set(e);
}
});

sleep(multiStream.getUnderflow().getUnderflowTimeout()*2);

t.join(multiStream.getUnderflow().getUnderflowTimeout()+100);
assertFalse("Expected copy thread to finish", t.isAlive());
assertNotNull("Expected copy thread to have an exception", exRef.get());
assertTrue("expected multi stream failed to get data1 output", result.toString().contains(" bogus data1 "));
assertTrue("expected multi stream failed to get data2 output", result.toString().contains(" bogus data2 "));
}

}

Caricamento…
Annulla
Salva