|
|
@@ -24,6 +24,7 @@ import org.cobbzilla.wizard.model.IdentifiableBase; |
|
|
|
|
|
|
|
import java.util.*; |
|
|
|
import java.util.concurrent.BlockingQueue; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.LinkedBlockingQueue; |
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
import java.util.concurrent.atomic.AtomicReference; |
|
|
@@ -61,16 +62,22 @@ public abstract class EntityIterator implements Iterator<Identifiable> { |
|
|
|
return !(queue.peek() instanceof EndOfEntityStream); |
|
|
|
} |
|
|
|
|
|
|
|
private final Map<Long, Thread> nextWaiters = new ConcurrentHashMap<>(1); |
|
|
|
|
|
|
|
@Override public Identifiable next() { |
|
|
|
checkError(); |
|
|
|
if (!hasNext()) { |
|
|
|
throw new NoSuchElementException("iterator has been exhausted"); |
|
|
|
} |
|
|
|
long threadId = Thread.currentThread().getId(); |
|
|
|
try { |
|
|
|
nextWaiters.computeIfAbsent(threadId, id -> Thread.currentThread()); |
|
|
|
return queue.take(); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
error.set(e); |
|
|
|
return die("next: queue.take interrupted"); |
|
|
|
} finally { |
|
|
|
nextWaiters.remove(threadId); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@@ -81,6 +88,7 @@ public abstract class EntityIterator implements Iterator<Identifiable> { |
|
|
|
add(END_OF_ENTITY_STREAM); |
|
|
|
} catch (Exception e) { |
|
|
|
error.set(e); |
|
|
|
nextWaiters.values().forEach(Thread::interrupt); |
|
|
|
die("_iterate: "+shortError(e), e); |
|
|
|
} finally { |
|
|
|
iterating.set(false); |
|
|
@@ -146,14 +154,14 @@ public abstract class EntityIterator implements Iterator<Identifiable> { |
|
|
|
|
|
|
|
} else if (Account.class.isAssignableFrom(c)) { |
|
|
|
entities.forEach(e -> { |
|
|
|
if (network.hasAdminEmail() && network.getAccount().equals(e.getUuid())) { |
|
|
|
if (network != null && network.hasAdminEmail() && network.getAccount().equals(e.getUuid())) { |
|
|
|
final Account a = (Account) e; |
|
|
|
a.setEmail(network.getAdminEmail()); |
|
|
|
} |
|
|
|
add(((Account) e).setPreferredPlan(null)); |
|
|
|
}); |
|
|
|
|
|
|
|
} else if (AccountPolicy.class.isAssignableFrom(c) && network.hasAdminEmail()) { |
|
|
|
} else if (AccountPolicy.class.isAssignableFrom(c) && network != null && network.hasAdminEmail()) { |
|
|
|
entities.forEach(e -> { |
|
|
|
if (network.hasAdminEmail()) { |
|
|
|
final AccountPolicy p = (AccountPolicy) e; |
|
|
|