diff --git a/bubble-server/src/main/java/bubble/service/dbfilter/EntityIterator.java b/bubble-server/src/main/java/bubble/service/dbfilter/EntityIterator.java index 3abb963c..d3f2e3b5 100644 --- a/bubble-server/src/main/java/bubble/service/dbfilter/EntityIterator.java +++ b/bubble-server/src/main/java/bubble/service/dbfilter/EntityIterator.java @@ -24,6 +24,7 @@ import org.cobbzilla.wizard.model.IdentifiableBase; import java.util.*; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -61,16 +62,22 @@ public abstract class EntityIterator implements Iterator { return !(queue.peek() instanceof EndOfEntityStream); } + private final Map nextWaiters = new ConcurrentHashMap<>(1); + @Override public Identifiable next() { checkError(); if (!hasNext()) { throw new NoSuchElementException("iterator has been exhausted"); } + long threadId = Thread.currentThread().getId(); try { + nextWaiters.computeIfAbsent(threadId, id -> Thread.currentThread()); return queue.take(); } catch (InterruptedException e) { error.set(e); return die("next: queue.take interrupted"); + } finally { + nextWaiters.remove(threadId); } } @@ -81,6 +88,7 @@ public abstract class EntityIterator implements Iterator { add(END_OF_ENTITY_STREAM); } catch (Exception e) { error.set(e); + nextWaiters.values().forEach(Thread::interrupt); die("_iterate: "+shortError(e), e); } finally { iterating.set(false); @@ -146,14 +154,14 @@ public abstract class EntityIterator implements Iterator { } else if (Account.class.isAssignableFrom(c)) { entities.forEach(e -> { - if (network.hasAdminEmail() && network.getAccount().equals(e.getUuid())) { + if (network != null && network.hasAdminEmail() && network.getAccount().equals(e.getUuid())) { final Account a = (Account) e; a.setEmail(network.getAdminEmail()); } add(((Account) e).setPreferredPlan(null)); }); - } else if (AccountPolicy.class.isAssignableFrom(c) && network.hasAdminEmail()) { + } else if (AccountPolicy.class.isAssignableFrom(c) && network != null && network.hasAdminEmail()) { entities.forEach(e -> { if (network.hasAdminEmail()) { final AccountPolicy p = (AccountPolicy) e; diff --git a/bubble-server/src/main/java/bubble/service/dbfilter/FullEntityIterator.java b/bubble-server/src/main/java/bubble/service/dbfilter/FullEntityIterator.java index bd7b77dd..d9ff5448 100644 --- a/bubble-server/src/main/java/bubble/service/dbfilter/FullEntityIterator.java +++ b/bubble-server/src/main/java/bubble/service/dbfilter/FullEntityIterator.java @@ -44,7 +44,7 @@ public class FullEntityIterator extends EntityIterator { config.getEntityClasses() .forEach(c -> addEntities(true, c, config.getDaoForEntityClass(c).findAll(ORDER_CTIME_ASC), network, null, null)); - if (account != null && launchType != null && launchType == LaunchType.fork_node) { + if (account != null && network != null && launchType != null && launchType == LaunchType.fork_node) { // add an initial device so that algo starts properly the first time // name and totp key will be overwritten when the device is initialized for use log.info(prefix+"creating a single dummy device for algo to start properly");