@@ -32,7 +32,7 @@ public class BufferedRunDaemon implements Runnable { | |||||
protected long getIdleSyncInterval() { return IDLE_SYNC_INTERVAL; } | protected long getIdleSyncInterval() { return IDLE_SYNC_INTERVAL; } | ||||
protected long getMinSyncWait () { return MIN_SYNC_WAIT; } | protected long getMinSyncWait () { return MIN_SYNC_WAIT; } | ||||
public void start () { daemonThread.set(background(this)); } | |||||
public void start () { daemonThread.set(background(this, "BufferedRunDaemon")); } | |||||
protected void interrupt() { if (daemonThread.get() != null) daemonThread.get().interrupt(); } | protected void interrupt() { if (daemonThread.get() != null) daemonThread.get().interrupt(); } | ||||
@@ -24,6 +24,8 @@ import java.math.RoundingMode; | |||||
import java.security.SecureRandom; | import java.security.SecureRandom; | ||||
import java.util.*; | import java.util.*; | ||||
import java.util.concurrent.Callable; | import java.util.concurrent.Callable; | ||||
import java.util.concurrent.ConcurrentHashMap; | |||||
import java.util.concurrent.atomic.AtomicInteger; | |||||
import java.util.concurrent.atomic.AtomicLong; | import java.util.concurrent.atomic.AtomicLong; | ||||
import java.util.concurrent.atomic.AtomicReference; | import java.util.concurrent.atomic.AtomicReference; | ||||
import java.util.function.Function; | import java.util.function.Function; | ||||
@@ -105,9 +107,16 @@ public class ZillaRuntime { | |||||
public static boolean bool(Boolean b) { return b != null && b; } | public static boolean bool(Boolean b) { return b != null && b; } | ||||
public static boolean bool(Boolean b, boolean val) { return b != null ? b : val; } | public static boolean bool(Boolean b, boolean val) { return b != null ? b : val; } | ||||
private static final AtomicInteger backgroundCounter = new AtomicInteger(0); | |||||
private static final Map<String, AtomicInteger> backgroundNameCounters = new ConcurrentHashMap<>(50); | |||||
public static Thread background (Runnable r) { return background(r, DEFAULT_EX_RUNNABLE); } | public static Thread background (Runnable r) { return background(r, DEFAULT_EX_RUNNABLE); } | ||||
public static Thread background (Runnable r, ExceptionHandler ex) { | |||||
public static Thread background (Runnable r, ExceptionHandler ex) { return background(r, null, ex); } | |||||
public static Thread background (Runnable r, String name) { return background(r, name, DEFAULT_EX_RUNNABLE); } | |||||
public static Thread background (Runnable r, String name, ExceptionHandler ex) { | |||||
final Thread t = new Thread(() -> { | final Thread t = new Thread(() -> { | ||||
try { | try { | ||||
r.run(); | r.run(); | ||||
@@ -115,6 +124,11 @@ public class ZillaRuntime { | |||||
ex.handle(e); | ex.handle(e); | ||||
} | } | ||||
}); | }); | ||||
final AtomicInteger counter = name == null | |||||
? backgroundCounter | |||||
: backgroundNameCounters.computeIfAbsent(name, k -> new AtomicInteger(0)); | |||||
if (name == null) name = "background"; | |||||
t.setName(name+"-"+counter.incrementAndGet()); | |||||
t.start(); | t.start(); | ||||
return t; | return t; | ||||
} | } | ||||
@@ -91,7 +91,7 @@ public class FilterInputStreamViaOutputStream extends PipedInputStream implement | |||||
closeQuietly(out); | closeQuietly(out); | ||||
closeQuietly(pipeOut); | closeQuietly(pipeOut); | ||||
if (this.thread.isAlive()) { | if (this.thread.isAlive()) { | ||||
background(() -> terminate(this.thread, THREAD_TERMINATE_TIMEOUT)); | |||||
background(() -> terminate(this.thread, THREAD_TERMINATE_TIMEOUT), "FilterInputStreamViaOutputStream.close"); | |||||
} | } | ||||
} | } | ||||
@@ -109,6 +109,10 @@ public abstract class BaseMain<OPT extends BaseMainOptions> { | |||||
return background(new RunWithHandler(this, errorHandler), errorHandler); | return background(new RunWithHandler(this, errorHandler), errorHandler); | ||||
} | } | ||||
public Thread runInBackground (String name, ExceptionHandler errorHandler) { | |||||
return background(new RunWithHandler(this, errorHandler), name, errorHandler); | |||||
} | |||||
@AllArgsConstructor | @AllArgsConstructor | ||||
private static class RunWithHandler implements Runnable { | private static class RunWithHandler implements Runnable { | ||||
private final BaseMain runnable; | private final BaseMain runnable; | ||||
@@ -92,7 +92,7 @@ public class RegexFilterReaderTest { | |||||
} catch (IOException e) { | } catch (IOException e) { | ||||
die("Error copying in background: "+e, e); | die("Error copying in background: "+e, e); | ||||
} | } | ||||
}); | |||||
}, "RegexFilterReaderTest.testMultiStreamRegexReader"); | |||||
sleep(500); | sleep(500); | ||||
multiReader.addReader(reader2); | multiReader.addReader(reader2); | ||||
@@ -132,7 +132,7 @@ public class RegexFilterReaderTest { | |||||
} catch (IOException e) { | } catch (IOException e) { | ||||
die("Error copying in background: "+e, e); | die("Error copying in background: "+e, e); | ||||
} | } | ||||
}); | |||||
}, "RegexFilterReaderTest.testMultiStreamRegexReaderWithRegexAcrossBoundary"); | |||||
sleep(500); | sleep(500); | ||||
multiReader.addReader(reader2); | multiReader.addReader(reader2); | ||||
@@ -168,7 +168,7 @@ public class RegexFilterReaderTest { | |||||
} catch (IOException e) { | } catch (IOException e) { | ||||
exRef.set(e); | exRef.set(e); | ||||
} | } | ||||
}); | |||||
}, "RegexFilterReaderTest.testMultiReaderUnderflow"); | |||||
sleep(multiReader.getUnderflow().getMaxUnderflowSleep()); | sleep(multiReader.getUnderflow().getMaxUnderflowSleep()); | ||||
log.info("adding reader2..."); | log.info("adding reader2..."); | ||||
@@ -212,7 +212,7 @@ public class RegexFilterReaderTest { | |||||
} catch (Exception e) { | } catch (Exception e) { | ||||
exRef.set(e); | exRef.set(e); | ||||
} | } | ||||
}); | |||||
}, "RegexFilterReaderTest.testMultiStreamUnderflow"); | |||||
sleep(multiStream.getUnderflow().getUnderflowTimeout()*2); | sleep(multiStream.getUnderflow().getUnderflowTimeout()*2); | ||||