瀏覽代碼

better fault tolerance on getting device location

tags/v0.15.1
Jonathan Cobb 4 年之前
父節點
當前提交
cb42ac8946
共有 2 個檔案被更改,包括 75 行新增48 行删除
  1. +1
    -1
      bubble-server/src/main/java/bubble/model/device/DeviceStatus.java
  2. +74
    -47
      bubble-server/src/main/java/bubble/service/cloud/GeoService.java

+ 1
- 1
bubble-server/src/main/java/bubble/model/device/DeviceStatus.java 查看文件

@@ -54,7 +54,7 @@ public class DeviceStatus {
setPort(Integer.parseInt(endpoint.substring(lastColon + 1)));
if (geoService != null) {
try {
setLocation(geoService.locate(geoAccount, getIp()));
setLocation(geoService.locate(geoAccount, getIp(), true));
} catch (Exception e) {
log.error("DeviceStatus: error calling geoService for ip="+getIp()+": "+shortError(e));
}


+ 74
- 47
bubble-server/src/main/java/bubble/service/cloud/GeoService.java 查看文件

@@ -24,6 +24,7 @@ import bubble.server.BubbleConfiguration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.cobbzilla.util.cache.CacheLookupException;
import org.cobbzilla.util.collection.ExpirationMap;
import org.cobbzilla.wizard.cache.redis.RedisService;
import org.cobbzilla.wizard.validation.SimpleViolationException;
@@ -31,12 +32,16 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static bubble.model.cloud.RegionalServiceDriver.findClosestRegions;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.cobbzilla.util.daemon.DaemonThreadFactory.fixedPool;
import static org.cobbzilla.util.daemon.ZillaRuntime.*;
import static org.cobbzilla.util.json.JsonUtil.json;
import static org.cobbzilla.util.string.LocaleUtil.getDefaultLocales;
@@ -63,70 +68,92 @@ public class GeoService {
private final Map<String, GeoLocation> locateCache = new ExpirationMap<>(MEMORY_CACHE_TIME);

public GeoLocation locate (String accountUuid, String ip) {
return locate(accountUuid, ip, false);
}

private final Map<String, Future<GeoLocation>> backgroundLookups = new ConcurrentHashMap<>();
private final ExecutorService backgroundLookupExec = fixedPool(5);

public GeoLocation locate (String accountUuid, String ip, boolean cacheOnly) {
final String cacheKey = hashOf(accountUuid, ip);
return locateCache.computeIfAbsent(cacheKey, k -> {
final String found = getLocateRedis().get(cacheKey);
if (found != null) return json(found, GeoLocation.class);

List<CloudService> geoLocationServices = null;
if (accountUuid != null) {
geoLocationServices = cloudDAO.findByAccountAndType(accountUuid, CloudServiceType.geoLocation);
}
if (empty(geoLocationServices)) {
// try to find using admin
final Account admin = accountDAO.getFirstAdmin();
if (admin != null && !admin.getUuid().equals(accountUuid)) {
geoLocationServices = cloudDAO.findByAccountAndType(admin.getUuid(), CloudServiceType.geoLocation);
}
if (cacheOnly) {
backgroundLookups.computeIfAbsent(cacheKey, key -> backgroundLookupExec.submit(() -> {
try {
return getLocation(accountUuid, ip, cacheKey);
} finally {
backgroundLookups.remove(key);
}
}));
throw new CacheLookupException(ip);
}
if (empty(geoLocationServices)) {
throw new SimpleViolationException("err.geoLocateService.notFound");

return getLocation(accountUuid, ip, cacheKey);
});
}

private GeoLocation getLocation(String accountUuid, String ip, String cacheKey) {
List<CloudService> geoLocationServices = null;
if (accountUuid != null) {
geoLocationServices = cloudDAO.findByAccountAndType(accountUuid, CloudServiceType.geoLocation);
}
if (empty(geoLocationServices)) {
// try to find using admin
final Account admin = accountDAO.getFirstAdmin();
if (admin != null && !admin.getUuid().equals(accountUuid)) {
geoLocationServices = cloudDAO.findByAccountAndType(admin.getUuid(), CloudServiceType.geoLocation);
}
}
if (empty(geoLocationServices)) {
throw new SimpleViolationException("err.geoLocateService.notFound");
}

log.info("locate: resolving IP: "+ip+" for cacheKey: "+cacheKey);
final List<GeoLocation> resolved = new ArrayList<>();
GeoCodeServiceDriver geoCodeDriver = null;
for (CloudService geo : geoLocationServices) {
try {
final GeoLocation result = geo.getGeoLocateDriver(configuration).geolocate(ip);
if (result != null) {
if (!result.hasLatLon()) {
if (geoCodeDriver == null) {
List<CloudService> geoCodeServices = null;
if (accountUuid != null) {
geoCodeServices = cloudDAO.findByAccountAndType(accountUuid, CloudServiceType.geoCode);
}
if (empty(geoCodeServices)) {
final Account admin = accountDAO.getFirstAdmin();
if (admin != null && !admin.getUuid().equals(accountUuid)) {
geoCodeServices = cloudDAO.findByAccountAndType(admin.getUuid(), CloudServiceType.geoCode);
}
}
if (empty(geoCodeServices)) {
throw new SimpleViolationException("err.geoCodeService.notFound");
log.info("locate: resolving IP: "+ip+" for cacheKey: "+cacheKey);
final List<GeoLocation> resolved = new ArrayList<>();
GeoCodeServiceDriver geoCodeDriver = null;
for (CloudService geo : geoLocationServices) {
try {
final GeoLocation result = geo.getGeoLocateDriver(configuration).geolocate(ip);
if (result != null) {
if (!result.hasLatLon()) {
if (geoCodeDriver == null) {
List<CloudService> geoCodeServices = null;
if (accountUuid != null) {
geoCodeServices = cloudDAO.findByAccountAndType(accountUuid, CloudServiceType.geoCode);
}
if (empty(geoCodeServices)) {
final Account admin = accountDAO.getFirstAdmin();
if (admin != null && !admin.getUuid().equals(accountUuid)) {
geoCodeServices = cloudDAO.findByAccountAndType(admin.getUuid(), CloudServiceType.geoCode);
}
geoCodeDriver = geoCodeServices.get(0).getGeoCodeDriver(configuration);
}
final GeoCodeResult code = geoCodeDriver.lookup(result);
if (code == null) {
log.info("locate: driver lookup returned null result, skipping");
continue;
if (empty(geoCodeServices)) {
throw new SimpleViolationException("err.geoCodeService.notFound");
}
result.setLat(code.getLat());
result.setLon(code.getLon());
geoCodeDriver = geoCodeServices.get(0).getGeoCodeDriver(configuration);
}
final GeoCodeResult code = geoCodeDriver.lookup(result);
if (code == null) {
log.info("locate: driver lookup returned null result, skipping");
continue;
}
resolved.add(result.setCloud(geo));
result.setLat(code.getLat());
result.setLon(code.getLon());
}
} catch (Exception e) {
log.warn("locate: "+e, e);
resolved.add(result.setCloud(geo));
}
} catch (Exception e) {
log.warn("locate: "+e, e);
}
}

final GeoLocation geoLocation = getGeoLocation(ip, geoLocationServices, resolved);
getLocateRedis().set(cacheKey, json(geoLocation), EX, REDIS_CACHE_TIME);
final GeoLocation geoLocation = getGeoLocation(ip, geoLocationServices, resolved);
getLocateRedis().set(cacheKey, json(geoLocation), EX, REDIS_CACHE_TIME);

return geoLocation;
});
return geoLocation;
}

public GeoLocation getGeoLocation(String ip, List<CloudService> geoLocationServices, List<GeoLocation> resolved) {


Loading…
取消
儲存