Browse Source

fix http timeouts, use thread pool for concurrent pings

pull/51/head
Jonathan Cobb 4 years ago
parent
commit
8fa2d0e158
1 changed files with 22 additions and 6 deletions
  1. +22
    -6
      bubble-server/src/main/java/bubble/service/device/FlexRouterService.java

+ 22
- 6
bubble-server/src/main/java/bubble/service/device/FlexRouterService.java View File

@@ -10,6 +10,7 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.cobbzilla.util.daemon.AwaitResult;
import org.cobbzilla.util.daemon.SimpleDaemon; import org.cobbzilla.util.daemon.SimpleDaemon;
import org.cobbzilla.util.http.HttpRequestBean; import org.cobbzilla.util.http.HttpRequestBean;
import org.cobbzilla.util.http.HttpResponseBean; import org.cobbzilla.util.http.HttpResponseBean;
@@ -17,11 +18,16 @@ import org.cobbzilla.util.http.HttpUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;


import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


import static bubble.model.device.FlexRouterPing.MAX_PING_AGE; import static bubble.model.device.FlexRouterPing.MAX_PING_AGE;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.cobbzilla.util.daemon.Await.awaitAll;
import static org.cobbzilla.util.daemon.DaemonThreadFactory.fixedPool;
import static org.cobbzilla.util.daemon.ZillaRuntime.shortError; import static org.cobbzilla.util.daemon.ZillaRuntime.shortError;
import static org.cobbzilla.util.http.HttpMethods.POST; import static org.cobbzilla.util.http.HttpMethods.POST;
import static org.cobbzilla.util.json.JsonUtil.json; import static org.cobbzilla.util.json.JsonUtil.json;
@@ -33,8 +39,8 @@ public class FlexRouterService extends SimpleDaemon {
public static final int MAX_PING_TRIES = 5; public static final int MAX_PING_TRIES = 5;
private static final long PING_SLEEP_FACTOR = SECONDS.toMillis(2); private static final long PING_SLEEP_FACTOR = SECONDS.toMillis(2);


public static final int DEFAULT_PING_TIMEOUT = (int) SECONDS.toMillis(MAX_PING_AGE/2);
// HttpClient timeouts are in seconds
public static final int DEFAULT_PING_TIMEOUT = (int) SECONDS.toSeconds(MAX_PING_AGE/2);
public static final RequestConfig DEFAULT_PING_REQUEST_CONFIG = RequestConfig.custom() public static final RequestConfig DEFAULT_PING_REQUEST_CONFIG = RequestConfig.custom()
.setConnectTimeout(DEFAULT_PING_TIMEOUT) .setConnectTimeout(DEFAULT_PING_TIMEOUT)
.setSocketTimeout(DEFAULT_PING_TIMEOUT) .setSocketTimeout(DEFAULT_PING_TIMEOUT)
@@ -46,6 +52,8 @@ public class FlexRouterService extends SimpleDaemon {
.build(); .build();
} }


private static final long PING_ALL_TIMEOUT = MINUTES.toMillis(2);

@Getter private final long sleepTime = MINUTES.toMillis(2); @Getter private final long sleepTime = MINUTES.toMillis(2);


@Autowired private FlexRouterDAO flexRouterDAO; @Autowired private FlexRouterDAO flexRouterDAO;
@@ -54,12 +62,20 @@ public class FlexRouterService extends SimpleDaemon {
try { try {
@Cleanup final CloseableHttpClient httpClient = getHttpClient(); @Cleanup final CloseableHttpClient httpClient = getHttpClient();
final List<FlexRouter> routers = flexRouterDAO.findEnabledAndRegistered(); final List<FlexRouter> routers = flexRouterDAO.findEnabledAndRegistered();
final List<Future<?>> futures = new ArrayList<>();
@Cleanup("shutdownNow") final ExecutorService exec = fixedPool(5);
for (FlexRouter router : routers) { for (FlexRouter router : routers) {
boolean active = pingFlexRouter(router, flexRouterDAO, httpClient);
if (active != router.active()) {
flexRouterDAO.update(router.setActive(active));
}
futures.add(exec.submit(() -> {
boolean active = pingFlexRouter(router, flexRouterDAO, httpClient);
if (active != router.active()) {
flexRouterDAO.update(router.setActive(active));
}
return active;
}));
} }
final AwaitResult<Boolean> awaitResult = awaitAll(futures, PING_ALL_TIMEOUT);
log.debug("process: awaitResult="+awaitResult);

} catch (Exception e) { } catch (Exception e) {
log.error("process: "+shortError(e)); log.error("process: "+shortError(e));
} }


Loading…
Cancel
Save