Selaa lähdekoodia

add FlexRouter.registered flag, unset if polls fail enough times

tags/v1.1.4
Jonathan Cobb 4 vuotta sitten
vanhempi
commit
8ffedde320
5 muutettua tiedostoa jossa 56 lisäystä ja 20 poistoa
  1. +1
    -0
      bubble-server/src/main/java/bubble/dao/device/FlexRouterDAO.java
  2. +7
    -1
      bubble-server/src/main/java/bubble/model/device/FlexRouter.java
  3. +5
    -3
      bubble-server/src/main/java/bubble/resources/device/FlexRoutersResource.java
  4. +0
    -1
      bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java
  5. +43
    -15
      bubble-server/src/main/java/bubble/service/device/StandardFlexRouterService.java

+ 1
- 0
bubble-server/src/main/java/bubble/dao/device/FlexRouterDAO.java Näytä tiedosto

@@ -59,6 +59,7 @@ public class FlexRouterDAO extends AccountOwnedEntityDAO<FlexRouter> {
public List<FlexRouter> findEnabledAndRegistered() {
return list(criteria().add(and(
eq("enabled", true),
eq("registered", true),
gt("port", 1024),
le("port", 65535),
isNotNull("token"))));


+ 7
- 1
bubble-server/src/main/java/bubble/model/device/FlexRouter.java Näytä tiedosto

@@ -102,7 +102,13 @@ public class FlexRouter extends IdentifiableBase implements HasAccount {
public boolean active() { return bool(active); }
public boolean inactive() { return !active(); }

@ECSearchable(filter=true) @ECField(index=100)
@ECSearchable @ECField(index=100)
@ECIndex @Column(nullable=false)
@Getter @Setter private Boolean registered = false;
public boolean registered() { return bool(registered); }
public boolean notRegistered() { return !registered(); }

@ECSearchable(filter=true) @ECField(index=110)
@Type(type=ENCRYPTED_STRING) @Column(columnDefinition="varchar("+(100+ENC_PAD)+") NOT NULL")
@JsonIgnore @Getter @Setter private String token;
public boolean hasToken () { return !empty(token); }


+ 5
- 3
bubble-server/src/main/java/bubble/resources/device/FlexRoutersResource.java Näytä tiedosto

@@ -48,9 +48,9 @@ public class FlexRoutersResource extends AccountOwnedResource<FlexRouter, FlexRo
}

@GET @Path("/{id}"+EP_STATUS)
public Response update(@Context Request req,
@Context ContainerRequest ctx,
@PathParam("id") String id) {
public Response getStatus(@Context Request req,
@Context ContainerRequest ctx,
@PathParam("id") String id) {
FlexRouter router = find(req, ctx, id);
if (router == null) router = findAlternate(req, ctx, id);
if (router == null) return ok(FlexRouterStatus.deleted);
@@ -58,12 +58,14 @@ public class FlexRoutersResource extends AccountOwnedResource<FlexRouter, FlexRo
}

@Override protected Object daoCreate(FlexRouter toCreate) {
toCreate.setRegistered(true);
final Object router = super.daoCreate(toCreate);
flexRouterService.interruptSoon();
return router;
}

@Override protected Object daoUpdate(FlexRouter toUpdate) {
toUpdate.setRegistered(true);
final Object router = super.daoUpdate(toUpdate);
flexRouterService.interruptSoon();
return router;


+ 0
- 1
bubble-server/src/main/java/bubble/resources/stream/FilterHttpResource.java Näytä tiedosto

@@ -465,7 +465,6 @@ public class FilterHttpResource {
@QueryParam("type") String contentType,
@QueryParam("length") Long contentLength,
@QueryParam("last") Boolean last) throws IOException {

validateMitmCall(req);

requestId = trimQuotes(requestId);


+ 43
- 15
bubble-server/src/main/java/bubble/service/device/StandardFlexRouterService.java Näytä tiedosto

@@ -10,6 +10,7 @@ import bubble.model.device.DeviceStatus;
import bubble.model.device.FlexRouter;
import bubble.model.device.FlexRouterPing;
import bubble.service.cloud.GeoService;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpClient;
@@ -28,10 +29,12 @@ import org.springframework.stereotype.Service;

import java.io.File;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static bubble.ApiConstants.HOME_DIR;
import static bubble.model.device.FlexRouterPing.MAX_PING_AGE;
@@ -119,6 +122,9 @@ public class StandardFlexRouterService extends SimpleDaemon implements FlexRoute
private final Map<String, FlexRouterStatus> statusMap = new ConcurrentHashMap<>(DEFAULT_MAX_TUNNELS);
private final Map<String, FlexRouterInfo> activeRouters = new ConcurrentHashMap<>(DEFAULT_MAX_TUNNELS);

private final int MAX_POLL_FAILURES = 3;
private final Map<String, AtomicInteger> pollFailures = new ConcurrentHashMap<>(DEFAULT_MAX_TUNNELS);

public FlexRouterStatus status(String uuid) {
final FlexRouterStatus stat = statusMap.get(uuid);
if (stat == FlexRouterStatus.unreachable) interruptSoon();
@@ -165,21 +171,7 @@ public class StandardFlexRouterService extends SimpleDaemon implements FlexRoute
final List<Future<?>> futures = new ArrayList<>();
@Cleanup("shutdownNow") final ExecutorService exec = fixedPool(DEFAULT_MAX_TUNNELS, "StandardFlexRouterService.process");
for (FlexRouter router : routers) {
futures.add(exec.submit(() -> {
final long firstTimeDelay = now() - router.getCtime();
if (firstTimeDelay < FIRST_TIME_WAIT) {
sleep(FIRST_TIME_WAIT - firstTimeDelay, "process: waiting for flex ssh key");
}
boolean active = pingFlexRouter(router, httpClient);
if (active != router.active() || (active && router.uninitialized())) {
if (active && router.uninitialized()) {
router.setInitialized(true);
}
router.setActive(active);
flexRouterDAO.update(router);
}
return active;
}));
futures.add(exec.submit(new FlexPollJob(router, httpClient)));
}
final AwaitResult<Boolean> awaitResult = awaitAll(futures, PING_ALL_TIMEOUT);
if (log.isTraceEnabled()) log.trace("process: awaitResult="+awaitResult);
@@ -292,4 +284,40 @@ public class StandardFlexRouterService extends SimpleDaemon implements FlexRoute
return authFile;
}

@AllArgsConstructor
private class FlexPollJob implements Callable<Boolean> {
private final FlexRouter router;
private final HttpClient httpClient;

@Override public Boolean call() {
final long firstTimeDelay = now() - router.getCtime();
if (firstTimeDelay < FIRST_TIME_WAIT) {
sleep(FIRST_TIME_WAIT - firstTimeDelay, "process: waiting for flex ssh key");
}
boolean active = pingFlexRouter(router, httpClient);
boolean update = false;
if (!active) {
if (pollFailures.computeIfAbsent(router.getUuid(), k -> new AtomicInteger(0)).incrementAndGet() > MAX_POLL_FAILURES) {
log.warn("process: too many poll failures for router ("+router+"), marking unregistered");
router.setRegistered(false);
update = true;
}
} else {
pollFailures.put(router.getUuid(), new AtomicInteger(0));
if (router.notRegistered()) {
router.setRegistered(true);
update = true;
}
}
if (update || active != router.active() || (active && router.uninitialized())) {
if (active && router.uninitialized()) {
router.setInitialized(true);
}
router.setActive(active);
flexRouterDAO.update(router);
}
return active;
}
}

}

Ladataan…
Peruuta
Tallenna