@@ -22,8 +22,8 @@ import static org.cobbzilla.util.network.NetworkUtil.IPv4_ALL_ADDRS; | |||
public abstract class DnsDriverBase<T> extends CloudServiceDriverBase<T> implements DnsServiceDriver { | |||
private static final long DNS_LOCK_TIMEOUT = TimeUnit.MINUTES.toSeconds(1); | |||
private static final long DNS_DEADLOCK_TIMEOUT = TimeUnit.MINUTES.toSeconds(5); | |||
private static final long DNS_LOCK_TIMEOUT = TimeUnit.MINUTES.toMillis(1); | |||
private static final long DNS_DEADLOCK_TIMEOUT = TimeUnit.MINUTES.toMillis(5); | |||
@Autowired protected BubbleDomainDAO domainDAO; | |||
@Autowired protected BubbleNetworkDAO networkDAO; | |||
@@ -7,6 +7,7 @@ import bubble.model.cloud.StorageMetadata; | |||
import bubble.notify.storage.StorageListing; | |||
import lombok.Cleanup; | |||
import org.apache.commons.io.IOUtils; | |||
import org.cobbzilla.util.daemon.ExceptionHandler; | |||
import org.cobbzilla.util.string.Base64; | |||
import org.slf4j.Logger; | |||
import org.slf4j.LoggerFactory; | |||
@@ -60,7 +61,7 @@ public interface StorageServiceDriver extends CloudServiceDriver { | |||
Logger log = LoggerFactory.getLogger(StorageServiceDriver.class); | |||
default ExceptionRunnable getExceptionRunnable() { return exceptionRunnable(getFatalExceptionClasses()); } | |||
default ExceptionHandler getExceptionRunnable() { return ExceptionHandler.exceptionRunnable(getFatalExceptionClasses()); } | |||
default Class[] getFatalExceptionClasses() { return new Class[0]; } | |||
default int getMaxTries() { return 5; } | |||
@@ -4,7 +4,6 @@ import bubble.main.rekey.RekeyOptions; | |||
import bubble.main.rekey.RekeyReaderMain; | |||
import bubble.main.rekey.RekeyWriterMain; | |||
import org.apache.commons.exec.CommandLine; | |||
import org.cobbzilla.util.daemon.ZillaRuntime; | |||
import org.cobbzilla.util.main.BaseMain; | |||
import org.cobbzilla.util.system.Command; | |||
import org.cobbzilla.util.system.CommandResult; | |||
@@ -53,9 +52,9 @@ public class RekeyDatabaseMain extends BaseMain<RekeyDatabaseOptions> { | |||
try { | |||
writeResult.set(CommandShell.exec(writerCommand(options, env))); | |||
} catch (Exception e) { | |||
ZillaRuntime.die("WRITE ERROR: " + e); | |||
writeResult.set(new CommandResult(e).setExitStatus(-1)); | |||
} | |||
}); | |||
}, e -> writeResult.set(new CommandResult(e).setExitStatus(-1))); | |||
} | |||
public static Command readerCommand(RekeyDatabaseOptions options, Map<String, String> env) { | |||
@@ -15,6 +15,7 @@ import java.io.IOException; | |||
import java.net.ServerSocket; | |||
import java.net.Socket; | |||
import java.util.Iterator; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
@Slf4j | |||
public class RekeyReaderMain extends BaseMain<RekeyOptions> { | |||
@@ -35,7 +36,8 @@ public class RekeyReaderMain extends BaseMain<RekeyOptions> { | |||
final RestServerHarness<BubbleConfiguration, BubbleDbFilterServer> fromHarness = getOptions().getServer(); | |||
final BubbleConfiguration fromConfig = fromHarness.getConfiguration(); | |||
final boolean debugEnabled = log.isDebugEnabled(); | |||
final Iterator<Identifiable> producer = getEntityProducer(fromConfig); | |||
final AtomicReference<Exception> error = new AtomicReference<>(); | |||
final Iterator<Identifiable> producer = getEntityProducer(fromConfig, error); | |||
while (producer.hasNext()) { | |||
final Identifiable from = producer.next(); | |||
if (from instanceof EndOfEntityStream) break; | |||
@@ -50,8 +52,8 @@ public class RekeyReaderMain extends BaseMain<RekeyOptions> { | |||
out("READER: complete"); | |||
} | |||
protected Iterator<Identifiable> getEntityProducer(BubbleConfiguration fromConfig) { | |||
return new FullEntityIterator(fromConfig); | |||
protected Iterator<Identifiable> getEntityProducer(BubbleConfiguration fromConfig, AtomicReference<Exception> error) { | |||
return new FullEntityIterator(fromConfig, error); | |||
} | |||
} |
@@ -47,4 +47,7 @@ public class ActivationRequest { | |||
@Getter @Setter private Boolean createDefaultObjects = true; | |||
public boolean createDefaultObjects () { return createDefaultObjects != null && createDefaultObjects; }; | |||
@Getter @Setter private Boolean skipTests = false; | |||
public boolean skipTests () { return skipTests != null && skipTests; }; | |||
} |
@@ -300,12 +300,16 @@ public class CloudService extends IdentifiableBaseParentEntity implements Accoun | |||
} | |||
@Transient @JsonIgnore @Getter @Setter private Object testArg = null; | |||
@Transient @JsonIgnore @Getter @Setter private Boolean skipTest = false; | |||
public boolean skipTest () { return skipTest != null && skipTest; }; | |||
public static ValidationResult testDriver(CloudService cloud, BubbleConfiguration configuration) { | |||
return testDriver(cloud, configuration, new ValidationResult()); | |||
} | |||
public static ValidationResult testDriver(CloudService cloud, BubbleConfiguration configuration, ValidationResult errors) { | |||
if (cloud.skipTest()) return errors; | |||
final String prefix = cloud.getName()+": "; | |||
final Object arg = cloud.getTestArg(); | |||
final String argString = arg != null ? " with arg=" + arg : ""; | |||
@@ -67,7 +67,11 @@ public class NetworkActionsResource { | |||
// any nodes? | |||
final List<BubbleNode> nodes = nodeDAO.findByNetwork(network.getUuid()); | |||
if (!nodes.isEmpty()) return invalid("err.network.alreadyStarted"); | |||
if (!nodes.isEmpty()) { | |||
if (nodes.stream().anyMatch(BubbleNode::isRunning)) { | |||
return invalid("err.network.alreadyStarted"); | |||
} | |||
} | |||
if (!network.getState().canStartNetwork()) return invalid("err.network.cannotStartInCurrentState"); | |||
@@ -67,8 +67,8 @@ public class BackupService extends SimpleDaemon { | |||
// todo: make these configurable. maybe tags on the BubbleNetwork? | |||
public static final long BACKUP_INTERVAL = DAYS.toMillis(1) + MINUTES.toMillis(10); | |||
public static final long BR_STATE_LOCK_TIMEOUT = MINUTES.toSeconds(30); | |||
public static final long BR_STATE_DEADLOCK_TIMEOUT = MINUTES.toSeconds(25); | |||
public static final long BR_STATE_LOCK_TIMEOUT = MINUTES.toMillis(30); | |||
public static final long BR_STATE_DEADLOCK_TIMEOUT = MINUTES.toMillis(25); | |||
public static final long BR_CHECK_INTERVAL = HOURS.toMillis(1); | |||
public static final long STARTUP_DELAY = MINUTES.toMillis(5); | |||
@@ -46,8 +46,8 @@ public class RestoreService { | |||
// API is started (in role bubble_finalizer) | |||
public static final long RESTORE_MONITOR_SCRIPT_TIMEOUT_SECONDS = RESTORE_WINDOW_SECONDS + MINUTES.toSeconds(5); | |||
private static final long RESTORE_LOCK_TIMEOUT = MINUTES.toSeconds(31); | |||
private static final long RESTORE_DEADLOCK_TIMEOUT = MINUTES.toSeconds(30); | |||
private static final long RESTORE_LOCK_TIMEOUT = MINUTES.toMillis(31); | |||
private static final long RESTORE_DEADLOCK_TIMEOUT = MINUTES.toMillis(30); | |||
@Autowired private RedisService redis; | |||
@Getter(lazy=true) private final RedisService restoreKeys = redis.prefixNamespace(getClass().getSimpleName()); | |||
@@ -138,7 +138,8 @@ public class ActivationService { | |||
.setTemplate(true) | |||
.setEnabled(true) | |||
.setAccount(account.getUuid()) | |||
.setTestArg(testArg)); | |||
.setTestArg(testArg) | |||
.setSkipTest(request.skipTests())); | |||
} | |||
if (errors.isInvalid()) throw invalidEx(errors); | |||
@@ -87,8 +87,8 @@ public class StandardNetworkService implements NetworkService { | |||
public static final int MAX_ANSIBLE_TRIES = 5; | |||
public static final int RESTORE_KEY_LEN = 6; | |||
private static final long NET_LOCK_TIMEOUT = MINUTES.toSeconds(21); | |||
private static final long NET_DEADLOCK_TIMEOUT = MINUTES.toSeconds(20); | |||
private static final long NET_LOCK_TIMEOUT = MINUTES.toMillis(21); | |||
private static final long NET_DEADLOCK_TIMEOUT = MINUTES.toMillis(20); | |||
private static final long DNS_TIMEOUT = MINUTES.toMillis(60); | |||
private static final long PLAN_ENABLE_TIMEOUT = PURCHASE_DELAY + SECONDS.toMillis(10); | |||
@@ -403,6 +403,15 @@ public class StandardNetworkService implements NetworkService { | |||
} | |||
node.setState(BubbleNodeState.error_stopped); | |||
if (node.hasUuid()) nodeDAO.update(node); | |||
// if there are no running nodes, and network was in 'setup' state, put it network back into 'created' state | |||
final BubbleNetwork network = networkDAO.findByUuid(node.getNetwork()); | |||
if (network.getState() == BubbleNetworkState.setup) { | |||
if (nodeDAO.findByNetwork(node.getNetwork()).stream().noneMatch(BubbleNode::isRunning)) { | |||
networkDAO.update(network.setState(BubbleNetworkState.created)); | |||
} | |||
} | |||
return node; | |||
} | |||
@@ -486,7 +495,7 @@ public class StandardNetworkService implements NetworkService { | |||
lock = lockNetwork(network.getUuid()); | |||
// sanity checks | |||
if (!nodeDAO.findByNetwork(network.getUuid()).isEmpty()) { | |||
if (nodeDAO.findByNetwork(network.getUuid()).stream().anyMatch(BubbleNode::isRunning)) { | |||
throw invalidEx("err.network.alreadyStarted"); | |||
} | |||
if (!network.getState().canStartNetwork()) { | |||
@@ -38,6 +38,7 @@ import static org.cobbzilla.wizard.server.config.PgRestServerConfiguration.ENV_P | |||
@Service @Slf4j | |||
public class DatabaseFilterService { | |||
public static final long DB_FILTER_CHECK = SECONDS.toMillis(1); | |||
public static final long DB_FILTER_TIMEOUT = SECONDS.toMillis(120); | |||
public static final long THREAD_KILL_TIMEOUT = SECONDS.toMillis(10); | |||
@@ -83,6 +84,7 @@ public class DatabaseFilterService { | |||
// start a RekeyReader to send objects to RekeyWriter. | |||
// the RekeyReader will run in-process and receive objects from this method, instead of doing its own queries | |||
final AtomicReference<Exception> readerError = new AtomicReference<>(); | |||
final RekeyOptions readerOptions = new RekeyOptions() { | |||
@Override public Map<String, String> getEnv() { return env; } | |||
} | |||
@@ -93,12 +95,12 @@ public class DatabaseFilterService { | |||
.setPort(port); | |||
reader = new RekeyReaderMain() { | |||
@Override public RekeyOptions getOptions() { return readerOptions; } | |||
@Override protected Iterator<Identifiable> getEntityProducer(BubbleConfiguration fromConfig) { | |||
@Override protected Iterator<Identifiable> getEntityProducer(BubbleConfiguration fromConfig, AtomicReference<Exception> error) { | |||
return fork | |||
? new FullEntityIterator(configuration) | |||
: new FilteredEntityIterator(configuration, account, node); | |||
? new FullEntityIterator(configuration, readerError) | |||
: new FilteredEntityIterator(configuration, account, node, readerError); | |||
} | |||
}.runInBackground(); | |||
}.runInBackground(readerError::set); | |||
// start a RekeyWriter to pull objects from RekeyReader | |||
final AtomicReference<CommandResult> writeResult = new AtomicReference<>(); | |||
@@ -113,8 +115,15 @@ public class DatabaseFilterService { | |||
.setJar(abs(configuration.getBubbleJar())); | |||
writer = RekeyDatabaseMain.runWriter(writerOptions, writeResult, env); | |||
reader.join(DB_FILTER_TIMEOUT); | |||
writer.join(DB_FILTER_TIMEOUT); | |||
final long start = now(); | |||
while ((reader.isAlive() || writer.isAlive()) && now() - start < DB_FILTER_TIMEOUT) { | |||
reader.join(DB_FILTER_CHECK); | |||
writer.join(DB_FILTER_CHECK); | |||
if (readerError.get() != null) die("copyDatabase: reader error: "+shortError(readerError.get())); | |||
if (writeResult.get() != null && writeResult.get().hasException()) { | |||
die("copyDatabase: writer error: "+shortError(writeResult.get().getException())); | |||
} | |||
} | |||
if (reader.isAlive() || writer.isAlive()) { | |||
log.error("copyDatabase: reader/writer taking too long, stopping them (dbName="+dbName+")"); | |||
stopThread(reader, node, "reader"); | |||
@@ -174,7 +183,7 @@ public class DatabaseFilterService { | |||
log.info("copyDatabase: "+name+" thread finished! we are OK."); | |||
return true; | |||
} else { | |||
return die("copyDatabase: "+name+" thread timed out for node: "+node.getUuid()); | |||
return die("copyDatabase: "+name+" error copying database for node: "+node.getUuid()); | |||
} | |||
} | |||
} |
@@ -12,11 +12,11 @@ import java.util.List; | |||
import java.util.NoSuchElementException; | |||
import java.util.concurrent.BlockingQueue; | |||
import java.util.concurrent.LinkedBlockingQueue; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
import static bubble.cloud.storage.local.LocalStorageDriver.LOCAL_STORAGE_STANDARD_BASE_DIR; | |||
import static bubble.service.dbfilter.EndOfEntityStream.END_OF_ENTITY_STREAM; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.background; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.*; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
public abstract class EntityIterator implements Iterator<Identifiable> { | |||
@@ -26,14 +26,20 @@ public abstract class EntityIterator implements Iterator<Identifiable> { | |||
private final BlockingQueue<Identifiable> queue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); | |||
@Getter private final Thread thread; | |||
@Getter private final AtomicReference<Exception> error; | |||
public EntityIterator() { | |||
this.thread = background(this::_iterate); | |||
public EntityIterator(AtomicReference<Exception> error) { | |||
this.error = error; | |||
this.thread = background(this::_iterate, this.error::set); | |||
} | |||
@Override public boolean hasNext() { return !(queue.peek() instanceof EndOfEntityStream); } | |||
@Override public boolean hasNext() { | |||
checkError(); | |||
return !(queue.peek() instanceof EndOfEntityStream); | |||
} | |||
@Override public Identifiable next() { | |||
checkError(); | |||
if (!hasNext()) { | |||
throw new NoSuchElementException("iterator has been exhausted"); | |||
} | |||
@@ -55,6 +61,7 @@ public abstract class EntityIterator implements Iterator<Identifiable> { | |||
try { | |||
queue.put(from); | |||
} catch (InterruptedException e) { | |||
error.set(e); | |||
die("add: queue.put interrupted"); | |||
} | |||
} | |||
@@ -75,4 +82,12 @@ public abstract class EntityIterator implements Iterator<Identifiable> { | |||
return cloudService.setDriverConfigJson(json(localConfig.setBaseDir(LOCAL_STORAGE_STANDARD_BASE_DIR))); | |||
} | |||
private void checkError() { | |||
final Exception ex = error.get(); | |||
if (ex != null) { | |||
if (ex instanceof RuntimeException) throw (RuntimeException) ex; | |||
die(getClass().getName()+": "+shortError(ex)); | |||
} | |||
} | |||
} |
@@ -10,19 +10,19 @@ import bubble.model.cloud.BubbleNode; | |||
import bubble.model.cloud.BubbleNodeKey; | |||
import bubble.model.device.Device; | |||
import bubble.server.BubbleConfiguration; | |||
import lombok.AllArgsConstructor; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.wizard.dao.DAO; | |||
import org.cobbzilla.wizard.model.Identifiable; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
import static bubble.model.device.Device.UNINITIALIZED_DEVICE; | |||
import static java.util.UUID.randomUUID; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
@AllArgsConstructor @Slf4j | |||
@Slf4j | |||
public class FilteredEntityIterator extends EntityIterator { | |||
public static final List<Class<? extends Identifiable>> POST_COPY_ENTITIES = new ArrayList<>(); | |||
@@ -39,11 +39,24 @@ public class FilteredEntityIterator extends EntityIterator { | |||
private final Account account; | |||
private final BubbleNode node; | |||
public FilteredEntityIterator (BubbleConfiguration configuration, | |||
Account account, | |||
BubbleNode node, | |||
AtomicReference<Exception> error) { | |||
super(error); | |||
this.configuration = configuration; | |||
this.account = account; | |||
this.node = node; | |||
} | |||
@Override protected void iterate() { | |||
// in the new DB, the admin on this system is NOT an admin, | |||
// and the new/initial user IS the admin | |||
final Account sageAccount = configuration.getBean(AccountDAO.class).findByUuid(account.getParent()); | |||
add(Account.sageMask(sageAccount)); | |||
if (account.hasParent()) { | |||
final Account sageAccount = configuration.getBean(AccountDAO.class).findByUuid(account.getParent()); | |||
if (sageAccount == null) die(getClass().getName()+": iterate: account parent not found: "+account.getParent()); | |||
add(Account.sageMask(sageAccount)); | |||
} | |||
add(account.setAdmin(true)); | |||
configuration.getEntityClasses().forEach(c -> { | |||
@@ -3,12 +3,15 @@ package bubble.service.dbfilter; | |||
import bubble.server.BubbleConfiguration; | |||
import lombok.extern.slf4j.Slf4j; | |||
import java.util.concurrent.atomic.AtomicReference; | |||
@Slf4j | |||
public class FullEntityIterator extends EntityIterator { | |||
private final BubbleConfiguration config; | |||
public FullEntityIterator (BubbleConfiguration config) { | |||
public FullEntityIterator (BubbleConfiguration config, AtomicReference<Exception> error) { | |||
super(error); | |||
this.config = config; | |||
} | |||
@@ -1 +1 @@ | |||
Subproject commit 634f93b9b15d2f8cedd746679069d693f63082b0 | |||
Subproject commit 655df0d9c0714a408fb85cd822cbfff48375f49b |
@@ -1 +1 @@ | |||
Subproject commit 7a2ee8f8268d135177a83132ccf9f9833bc61fd1 | |||
Subproject commit 8d96a767b86889e5aefe4c3979078bc0746cee5d |