|
|
@@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicReference; |
|
|
|
|
|
|
|
import static org.apache.commons.lang3.ArrayUtils.addAll; |
|
|
|
import static org.cobbzilla.util.daemon.ZillaRuntime.die; |
|
|
|
import static org.cobbzilla.util.string.StringUtil.EMPTY_CHAR_ARRAY; |
|
|
|
|
|
|
|
@Slf4j @Accessors(chain=true) |
|
|
|
public class RegexFilterReader extends BufferedReader { |
|
|
@@ -20,6 +21,7 @@ public class RegexFilterReader extends BufferedReader { |
|
|
|
|
|
|
|
private final int bufsiz; |
|
|
|
private RegexStreamFilter filter; |
|
|
|
@Getter @Setter private Integer maxMatches; |
|
|
|
@Getter @Setter private String name; // for debugging/identifying which reader |
|
|
|
|
|
|
|
public RegexFilterReader(Reader in, RegexStreamFilter filter) { |
|
|
@@ -43,7 +45,7 @@ public class RegexFilterReader extends BufferedReader { |
|
|
|
} |
|
|
|
|
|
|
|
@Getter(lazy=true) private final FilterResponse filterResponse = initFilterResponse(); |
|
|
|
private FilterResponse initFilterResponse() { return new FilterResponse(bufsiz, filter); } |
|
|
|
private FilterResponse initFilterResponse() { return new FilterResponse(bufsiz, filter, maxMatches); } |
|
|
|
|
|
|
|
@Override public int read() throws IOException { |
|
|
|
final char[] c = new char[1]; |
|
|
@@ -105,10 +107,13 @@ public class RegexFilterReader extends BufferedReader { |
|
|
|
private int readPos = 0; |
|
|
|
private boolean eof = false; |
|
|
|
private RegexStreamFilter filter; |
|
|
|
private int matchCount = 0; |
|
|
|
private Integer maxMatches = null; |
|
|
|
|
|
|
|
public FilterResponse(int bufsiz, RegexStreamFilter filter) { |
|
|
|
public FilterResponse(int bufsiz, RegexStreamFilter filter, Integer maxMatches) { |
|
|
|
this.bufsiz = bufsiz; |
|
|
|
this.filter = filter; |
|
|
|
this.maxMatches = maxMatches; |
|
|
|
} |
|
|
|
|
|
|
|
private final AtomicReference<char[]> unprocessed = new AtomicReference<>(); |
|
|
@@ -118,46 +123,67 @@ public class RegexFilterReader extends BufferedReader { |
|
|
|
|
|
|
|
this.eof = eof; |
|
|
|
|
|
|
|
synchronized (unprocessed) { |
|
|
|
char[] input = unprocessed.get(); |
|
|
|
if (input == null) { |
|
|
|
input = new char[len]; |
|
|
|
System.arraycopy(b, 0, input, 0, len); |
|
|
|
} else { |
|
|
|
char[] new_input = new char[input.length + len]; |
|
|
|
System.arraycopy(input, 0, new_input, 0, input.length); |
|
|
|
System.arraycopy(b, 0, new_input, input.length, len); |
|
|
|
input = new_input; |
|
|
|
} |
|
|
|
unprocessed.set(input); |
|
|
|
|
|
|
|
// process buffer with filter, it may leave a remainder |
|
|
|
final RegexFilterResult result = filter.apply(new StringBuilder().append(input), eof); |
|
|
|
|
|
|
|
// put unprocessed remainder chars back onto unprocessed array |
|
|
|
if (result.remainder > 0) { |
|
|
|
final char[] subarray = ArrayUtils.subarray(input, input.length - result.remainder, input.length); |
|
|
|
unprocessed.set(subarray); |
|
|
|
} else { |
|
|
|
unprocessed.set(null); |
|
|
|
if (maxMatches != null && matchCount >= maxMatches) { |
|
|
|
// no more processing to do. add all unprocessed bytes to processed, and add new bytes to processed |
|
|
|
synchronized (unprocessed) { |
|
|
|
synchronized (processed) { |
|
|
|
char[] u = unprocessed.get(); |
|
|
|
if (u == null) u = EMPTY_CHAR_ARRAY; |
|
|
|
char[] p = processed.get(); |
|
|
|
if (p == null) p = EMPTY_CHAR_ARRAY; |
|
|
|
final int newLen = p.length + u.length + len; |
|
|
|
final char[] update = new char[newLen]; |
|
|
|
System.arraycopy(p, 0, update, 0, p.length); |
|
|
|
System.arraycopy(u, 0, update, p.length, u.length); |
|
|
|
System.arraycopy(b, 0, update, p.length + u.length, len); |
|
|
|
processed.set(update); |
|
|
|
unprocessed.set(EMPTY_CHAR_ARRAY); |
|
|
|
return u.length + len; |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
|
|
|
|
synchronized (unprocessed) { |
|
|
|
char[] input = unprocessed.get(); |
|
|
|
if (input == null) { |
|
|
|
input = new char[len]; |
|
|
|
System.arraycopy(b, 0, input, 0, len); |
|
|
|
} else { |
|
|
|
char[] new_input = new char[input.length + len]; |
|
|
|
System.arraycopy(input, 0, new_input, 0, input.length); |
|
|
|
System.arraycopy(b, 0, new_input, input.length, len); |
|
|
|
input = new_input; |
|
|
|
} |
|
|
|
unprocessed.set(input); |
|
|
|
|
|
|
|
// process buffer with filter, it may leave a remainder |
|
|
|
final RegexFilterResult result = filter.apply(new StringBuilder().append(input), eof); |
|
|
|
matchCount += result.matchCount; |
|
|
|
|
|
|
|
// put unprocessed remainder chars back onto unprocessed array |
|
|
|
if (result.remainder > 0) { |
|
|
|
final char[] subarray = ArrayUtils.subarray(input, input.length - result.remainder, input.length); |
|
|
|
unprocessed.set(subarray); |
|
|
|
} else { |
|
|
|
unprocessed.set(null); |
|
|
|
} |
|
|
|
|
|
|
|
// if it produced nothing, but gave us a remainder, return zero now, read more data |
|
|
|
if (result.buffer.length() == 0 && !eof) return 0; |
|
|
|
|
|
|
|
// remove processed chars from unprocessed array |
|
|
|
synchronized (processed) { |
|
|
|
final char[] newChars = result.buffer.toString().toCharArray(); |
|
|
|
if (newChars.length > 0) { |
|
|
|
if (processed.get() == null) { |
|
|
|
processed.set(newChars); |
|
|
|
} else { |
|
|
|
processed.set(addAll(processed.get(), newChars)); |
|
|
|
// if it produced nothing, but gave us a remainder, return zero now, read more data |
|
|
|
if (result.buffer.length() == 0 && !eof) return 0; |
|
|
|
|
|
|
|
// remove processed chars from unprocessed array |
|
|
|
synchronized (processed) { |
|
|
|
final char[] newChars = result.buffer.toString().toCharArray(); |
|
|
|
if (newChars.length > 0) { |
|
|
|
if (processed.get() == null) { |
|
|
|
processed.set(newChars); |
|
|
|
} else { |
|
|
|
processed.set(addAll(processed.get(), newChars)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return result.buffer.length(); |
|
|
|
} |
|
|
|
|
|
|
|
return result.buffer.length(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|