@@ -22,6 +22,8 @@ import java.net.Socket; | |||
import java.util.Iterator; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
import static org.cobbzilla.util.json.JsonUtil.COMPACT_MAPPER; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.network.NetworkUtil.IPv4_LOCALHOST; | |||
@Slf4j | |||
@@ -31,8 +33,9 @@ public class RekeyReaderMain extends BaseMain<RekeyOptions> { | |||
@Override protected void run() throws Exception { | |||
final boolean debugEnabled = log.isDebugEnabled(); | |||
final RekeyOptions options = getOptions(); | |||
// log.info("run: options=\n"+json(options)); | |||
if (debugEnabled) out("READER: options="+json(options, COMPACT_MAPPER)); | |||
@Cleanup final ServerSocket sock = new ServerSocket(options.getPort(), 10, InetAddress.getByName(IPv4_LOCALHOST)); | |||
out("READER: awaiting connection from WRITER..."); | |||
@@ -42,9 +45,11 @@ public class RekeyReaderMain extends BaseMain<RekeyOptions> { | |||
final RestServerHarness<BubbleConfiguration, BubbleDbFilterServer> fromHarness = getOptions().getServer(); | |||
final BubbleConfiguration fromConfig = fromHarness.getConfiguration(); | |||
final boolean debugEnabled = log.isDebugEnabled(); | |||
final AtomicReference<Exception> error = new AtomicReference<>(); | |||
if (debugEnabled) out("READER: creating entity producer..."); | |||
final Iterator<Identifiable> producer = getEntityProducer(fromConfig, error); | |||
if (debugEnabled) out("READER: created entity producer, iterating..."); | |||
while (producer.hasNext()) { | |||
final Identifiable from = producer.next(); | |||
if (from instanceof EndOfEntityStream) break; | |||
@@ -24,6 +24,8 @@ import java.util.HashMap; | |||
import static java.util.concurrent.TimeUnit.SECONDS; | |||
import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; | |||
import static org.cobbzilla.util.json.JsonUtil.COMPACT_MAPPER; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.network.NetworkUtil.IPv4_LOCALHOST; | |||
import static org.cobbzilla.util.system.Sleep.sleep; | |||
@@ -36,15 +38,15 @@ public class RekeyWriterMain extends BaseMain<RekeyOptions> { | |||
@Override protected void run() throws Exception { | |||
final boolean debugEnabled = log.isDebugEnabled(); | |||
final RekeyOptions options = getOptions(); | |||
// log.info("run: options=\n"+json(options)); | |||
if (debugEnabled) out("WRITER: options="+json(options, COMPACT_MAPPER)); | |||
final RestServerHarness<BubbleConfiguration, BubbleDbFilterServer> toHarness = options.getServer(); | |||
final BubbleConfiguration toConfig = toHarness.getConfiguration(); | |||
IdentifiableBase.getEnforceNullUuidOnCreate().set(false); | |||
AbstractCRUDDAO.getRawMode().set(true); | |||
final boolean debugEnabled = log.isDebugEnabled(); | |||
final var daoMap = new HashMap<Class<? extends Identifiable>, DAO>(); | |||
while (true) { | |||
@@ -52,6 +54,7 @@ public class RekeyWriterMain extends BaseMain<RekeyOptions> { | |||
@Cleanup final Socket clientSocket = new Socket(IPv4_LOCALHOST, options.getPort()); | |||
@Cleanup BufferedReader inFromServer = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); | |||
String line; | |||
if (debugEnabled) out("WRITER: reading first object from reader..."); | |||
while ((line = inFromServer.readLine()) != null) { | |||
if (debugEnabled) out("WRITER<<< received json: " + line); | |||
final Identifiable entity = Identifiable.deserialize(line); | |||
@@ -25,6 +25,7 @@ import org.cobbzilla.wizard.model.IdentifiableBase; | |||
import java.util.*; | |||
import java.util.concurrent.BlockingQueue; | |||
import java.util.concurrent.LinkedBlockingQueue; | |||
import java.util.concurrent.atomic.AtomicBoolean; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
import java.util.stream.Collectors; | |||
@@ -47,6 +48,9 @@ public abstract class EntityIterator implements Iterator<Identifiable> { | |||
private List<BubbleApp> userApps; | |||
private final Map<CloudServiceType, CloudService> noopClouds = new HashMap<>(); | |||
private final AtomicBoolean iterating = new AtomicBoolean(false); | |||
public boolean iterating () { return iterating.get(); } | |||
public EntityIterator(AtomicReference<Exception> error) { | |||
this.error = error; | |||
this.thread = background(this::_iterate, "EntityIterator", this.error::set); | |||
@@ -71,8 +75,16 @@ public abstract class EntityIterator implements Iterator<Identifiable> { | |||
} | |||
private void _iterate() { | |||
iterate(); | |||
add(END_OF_ENTITY_STREAM); | |||
try { | |||
iterating.set(true); | |||
iterate(); | |||
add(END_OF_ENTITY_STREAM); | |||
} catch (Exception e) { | |||
error.set(e); | |||
die("_iterate: "+shortError(e), e); | |||
} finally { | |||
iterating.set(false); | |||
} | |||
} | |||
protected abstract void iterate(); | |||
@@ -71,6 +71,8 @@ public class FilteredEntityIterator extends EntityIterator { | |||
} | |||
@Override protected void iterate() { | |||
final String prefix = "iterate(" + (network == null ? "no-network" : network.getUuid()) + "): "; | |||
// in the new DB, the admin on this system is NOT an admin, | |||
// and the new/initial user IS the admin | |||
if (account.hasParent()) { | |||
@@ -83,10 +85,9 @@ public class FilteredEntityIterator extends EntityIterator { | |||
configuration.getEntityClasses().forEach(c -> { | |||
final DAO dao = configuration.getDaoForEntityClass(c); | |||
if (!AccountOwnedEntityDAO.class.isAssignableFrom(dao.getClass())) { | |||
log.debug("iterate: skipping entity, not an AccountOwnedEntityDAO: " + c.getSimpleName()); | |||
log.debug(prefix+"skipping entity, not an AccountOwnedEntityDAO: " + c.getSimpleName()); | |||
} else if (isNotDefaultCopyEntity(c)) { | |||
log.debug("iterate: skipping " + c.getSimpleName() | |||
+ ", may copy some of these after default objects are copied"); | |||
log.debug(prefix+"skipping " + c.getSimpleName() + ", may copy some of these after default objects are copied"); | |||
} else { | |||
// copy entities. this is how the re-keying works (decrypt using current spring config, | |||
// encrypt using new config) | |||
@@ -112,7 +113,7 @@ public class FilteredEntityIterator extends EntityIterator { | |||
// add an initial device so that algo starts properly the first time | |||
// name and totp key will be overwritten when the device is initialized for use | |||
log.info("iterate: creating a single dummy device for algo to start properly"); | |||
log.debug(prefix+"creating a single dummy device for algo to start properly"); | |||
final var initDevice = newUninitializedDevice(network.getUuid(), account.getUuid()); | |||
add(configuration.getBean(DeviceDAO.class).create(initDevice)); | |||
@@ -14,6 +14,8 @@ import lombok.extern.slf4j.Slf4j; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
import static bubble.model.device.Device.newUninitializedDevice; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.shortError; | |||
import static org.cobbzilla.wizard.dao.AbstractCRUDDAO.ORDER_CTIME_ASC; | |||
@Slf4j | |||
@@ -37,17 +39,23 @@ public class FullEntityIterator extends EntityIterator { | |||
} | |||
protected void iterate() { | |||
config.getEntityClasses() | |||
.forEach(c -> addEntities(true, c, config.getDaoForEntityClass(c).findAll(ORDER_CTIME_ASC), | |||
network, null, null)); | |||
if (account != null && launchType != null && launchType == LaunchType.fork_node) { | |||
// add an initial device so that algo starts properly the first time | |||
// name and totp key will be overwritten when the device is initialized for use | |||
log.info("iterate: creating a single dummy device for algo to start properly"); | |||
final var initDevice = newUninitializedDevice(network.getUuid(), account.getUuid()); | |||
add(config.getBean(DeviceDAO.class).create(initDevice)); | |||
final String prefix = "iterate(" + (network == null ? "no-network" : network.getUuid()) + "): "; | |||
try { | |||
config.getEntityClasses() | |||
.forEach(c -> addEntities(true, c, config.getDaoForEntityClass(c).findAll(ORDER_CTIME_ASC), | |||
network, null, null)); | |||
if (account != null && launchType != null && launchType == LaunchType.fork_node) { | |||
// add an initial device so that algo starts properly the first time | |||
// name and totp key will be overwritten when the device is initialized for use | |||
log.info(prefix+"creating a single dummy device for algo to start properly"); | |||
final var initDevice = newUninitializedDevice(network.getUuid(), account.getUuid()); | |||
add(config.getBean(DeviceDAO.class).create(initDevice)); | |||
} | |||
log.debug(prefix+"completed"); | |||
} catch (Exception e) { | |||
die(prefix+"error: "+shortError(e), e); | |||
} | |||
log.info("iterate: completed"); | |||
} | |||
} |
@@ -80,7 +80,7 @@ | |||
<logger name="bubble.resources.notify" level="WARN" /> | |||
<!-- <logger name="bubble.resources.notify.InboundNotifyResource" level="TRACE" />--> | |||
<logger name="bubble.client" level="WARN" /> | |||
<logger name="bubble.main.rekey" level="INFO" /> | |||
<logger name="bubble.main.rekey" level="DEBUG" /> | |||
<logger name="bubble" level="INFO" /> | |||
<root level="INFO"> | |||