@@ -6,7 +6,7 @@ | |||
{"name": "node_uuid", "value": "[[node.uuid]]"}, | |||
{"name": "network_uuid", "value": "[[node.network]]"}, | |||
{"name": "admin_port", "value": "[[node.adminPort]]"}, | |||
{"name": "ssl_port", "value": "[[sslPort]]"}, | |||
{"name": "ssl_port", "value": "[[node.sslPort]]"}, | |||
{"name": "public_base_uri", "value": "[[publicBaseUri]]"}, | |||
{"name": "sage_node", "value": "[[sageNode]]"}, | |||
{"name": "install_type", "value": "[[installType]]"}, | |||
@@ -26,4 +26,8 @@ public class BubbleApiClient extends ApiClientBase { | |||
return HttpClientBuilder.create().setConnectionManager(cm); | |||
} | |||
@Override public HttpClientBuilder getHttpClientBuilder() { | |||
return newHttpClientBuilder(5, 5); | |||
} | |||
} |
@@ -12,17 +12,14 @@ import org.apache.http.client.methods.HttpRequestBase; | |||
import org.apache.http.entity.ContentType; | |||
import org.cobbzilla.util.http.ApiConnectionInfo; | |||
import org.cobbzilla.util.http.HttpRequestBean; | |||
import org.cobbzilla.util.http.URIUtil; | |||
import org.cobbzilla.wizard.server.config.HttpConfiguration; | |||
import java.io.IOException; | |||
import java.io.InputStream; | |||
import java.net.URI; | |||
import java.util.HashMap; | |||
import java.util.List; | |||
import java.util.Map; | |||
import static bubble.ApiConstants.isHttpsPort; | |||
import static bubble.server.BubbleServer.getRestoreKey; | |||
import static bubble.server.BubbleServer.isRestoreMode; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
@@ -42,21 +39,11 @@ public class BubbleNodeClient extends BubbleApiClient { | |||
private BubbleNodeKey fromKey; | |||
private BubbleNode toNode; | |||
private BubbleNodeKey toKey; | |||
private BubbleApiClient alternate; | |||
private boolean useAlternate = false; | |||
public BubbleNodeClient(BubbleNode toNode, BubbleConfiguration configuration) { | |||
// use http if connection is to localhost | |||
super(new ApiConnectionInfo(baseUri(toNode, configuration))); | |||
initKeys(toNode, configuration); | |||
alternate = getAlternate(toNode, configuration); | |||
} | |||
// ensure we have at least one valid key so others can talk to us | |||
public BubbleNodeClient(BubbleNode toNode, BubbleConfiguration configuration, boolean alternate) { | |||
super(new ApiConnectionInfo(baseUri(toNode, configuration))); | |||
initKeys(toNode, configuration); | |||
this.alternate = null; | |||
} | |||
public void initKeys(BubbleNode toNode, BubbleConfiguration configuration) { | |||
@@ -78,18 +65,9 @@ public class BubbleNodeClient extends BubbleApiClient { | |||
this.toNode = toNode; | |||
} | |||
public BubbleNodeClient getAlternate(BubbleNode node, BubbleConfiguration configuration) { | |||
return new BubbleNodeClient(node, configuration, true); | |||
} | |||
private static String baseUri(BubbleNode node, BubbleConfiguration configuration) { | |||
final HttpConfiguration http = configuration.getHttp(); | |||
if (node.getUuid().equals(configuration.getThisNode().getUuid())) { | |||
return "http://127.0.0.1:"+ http.getPort()+ http.getBaseUri(); | |||
} | |||
return (isHttpsPort(node.getSslPort()) ? "https://" : "http://") | |||
+ node.getFqdn() + ":" + node.getSslPort() + http.getBaseUri(); | |||
return "https://" + node.getFqdn() + ":" + node.getSslPort() + http.getBaseUri(); | |||
} | |||
@Override protected <T> void setRequestEntity(HttpEntityEnclosingRequest entityRequest, T data, ContentType contentType) { | |||
@@ -116,31 +94,12 @@ public class BubbleNodeClient extends BubbleApiClient { | |||
} | |||
@Override public HttpResponse execute(HttpClient client, HttpRequestBase request) throws IOException { | |||
if (useAlternate) { | |||
log.info("execute: useAlternate true, using alternate..."); | |||
return alternate.execute(client, request); | |||
} | |||
try { | |||
log.debug("execute: attempting request..."); | |||
return super.execute(client, request); | |||
} catch (Exception e) { | |||
log.info("execute("+request+"): error: "+e); | |||
if (alternate == null) throw e; | |||
final String uri = (isHttpsPort(toNode.getSslPort()) ? "https://" : "http://") | |||
+ toNode.getIp4() + ":" + toNode.getAdminPort() + URIUtil.getPath(request.getURI().toString()); | |||
request.setURI(URI.create(uri)); | |||
log.info("execute: api call failed, trying alternate..."); | |||
final HttpResponse response = alternate.execute(client, request); | |||
useAlternate = true; | |||
log.info("execute: api call failed, alternate succeeded, will continue using that"); | |||
return response; | |||
return die("execute("+request+"): error: "+e); | |||
} | |||
} | |||
@Override public void close() { | |||
super.close(); | |||
if (alternate != null) alternate.close(); | |||
} | |||
} |
@@ -18,15 +18,6 @@ public class BubbleNodeDownloadClient extends BubbleNodeClient { | |||
init(); | |||
} | |||
public BubbleNodeDownloadClient(BubbleNode node, BubbleConfiguration configuration, boolean alternate) { | |||
super(node, configuration, alternate); | |||
init(); | |||
} | |||
@Override public BubbleNodeClient getAlternate(BubbleNode node, BubbleConfiguration configuration) { | |||
return new BubbleNodeDownloadClient(node, configuration, true); | |||
} | |||
public void init() { | |||
setNumTries(NUM_TRIES); | |||
setConnectTimeout(DL_CONNECT_TIMEOUT); | |||
@@ -17,15 +17,6 @@ public class BubbleNodeQuickClient extends BubbleNodeClient { | |||
init(); | |||
} | |||
public BubbleNodeQuickClient(BubbleNode node, BubbleConfiguration configuration, boolean alternate) { | |||
super(node, configuration, alternate); | |||
init(); | |||
} | |||
@Override public BubbleNodeClient getAlternate(BubbleNode node, BubbleConfiguration configuration) { | |||
return new BubbleNodeQuickClient(node, configuration, true); | |||
} | |||
public void init() { | |||
setNumTries(NUM_TRIES); | |||
setConnectTimeout(QUICK_CONNECT_TIMEOUT); | |||
@@ -21,8 +21,8 @@ import static org.cobbzilla.util.system.Sleep.sleep; | |||
@Slf4j | |||
public class NodeReaper extends SimpleDaemon { | |||
private static final long STARTUP_DELAY = MINUTES.toMillis(1); | |||
private static final long KILL_CHECK_INTERVAL = MINUTES.toMillis(10); | |||
private static final long STARTUP_DELAY = MINUTES.toMillis(30); | |||
private static final long KILL_CHECK_INTERVAL = MINUTES.toMillis(30); | |||
private final ComputeServiceDriverBase compute; | |||
@@ -22,6 +22,7 @@ import bubble.model.cloud.CloudService; | |||
import bubble.resources.account.AccountOwnedResource; | |||
import bubble.server.BubbleConfiguration; | |||
import bubble.service.AuthenticatorService; | |||
import bubble.service.cloud.GeoService; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.wizard.validation.ValidationResult; | |||
import org.glassfish.grizzly.http.server.Request; | |||
@@ -54,6 +55,7 @@ public class AccountPlansResource extends AccountOwnedResource<AccountPlan, Acco | |||
@Autowired private AccountPaymentMethodDAO paymentMethodDAO; | |||
@Autowired private BubbleConfiguration configuration; | |||
@Autowired private AuthenticatorService authenticatorService; | |||
@Autowired private GeoService geoService; | |||
public AccountPlansResource(Account account) { super(account); } | |||
@@ -78,7 +80,7 @@ public class AccountPlansResource extends AccountOwnedResource<AccountPlan, Acco | |||
final String remoteHost = getRemoteHost(req); | |||
for (CloudService geo : geoLocationServices) { | |||
try { | |||
final GeoLocation location = geo.getGeoLocateDriver(configuration).geolocate(remoteHost); | |||
final GeoLocation location = geoService.locate(request.getAccount(), remoteHost); | |||
if (configuration.isDisallowed(location.getCountry())) throw invalidEx("err.accountPlan.callerCountryDisallowed"); | |||
break; | |||
} catch (Exception e) { | |||
@@ -1,26 +1,16 @@ | |||
package bubble.resources.notify; | |||
import bubble.dao.cloud.BubbleNetworkDAO; | |||
import bubble.dao.cloud.BubbleNodeDAO; | |||
import bubble.dao.cloud.BubbleNodeKeyDAO; | |||
import bubble.dao.cloud.notify.ReceivedNotificationDAO; | |||
import bubble.model.cloud.BubbleNetwork; | |||
import bubble.model.cloud.BubbleNetworkState; | |||
import bubble.model.cloud.BubbleNode; | |||
import bubble.model.cloud.BubbleNodeKey; | |||
import bubble.model.cloud.notify.NotificationReceipt; | |||
import bubble.model.cloud.notify.ReceivedNotification; | |||
import bubble.model.cloud.notify.SentNotification; | |||
import bubble.notify.storage.StorageStreamRequest; | |||
import bubble.server.BubbleConfiguration; | |||
import bubble.service.backup.RestoreService; | |||
import bubble.service.cloud.StorageStreamService; | |||
import bubble.service.notify.NotificationService; | |||
import bubble.service.notify.InboundNotification; | |||
import bubble.service.notify.NotificationReceiverService; | |||
import com.fasterxml.jackson.databind.JsonNode; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.security.RsaMessage; | |||
import org.cobbzilla.util.string.StringUtil; | |||
import org.glassfish.grizzly.http.server.Request; | |||
import org.glassfish.jersey.server.ContainerRequest; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
@@ -30,21 +20,17 @@ import javax.ws.rs.*; | |||
import javax.ws.rs.core.Context; | |||
import javax.ws.rs.core.Response; | |||
import java.io.InputStream; | |||
import java.util.List; | |||
import java.util.Set; | |||
import static bubble.ApiConstants.*; | |||
import static bubble.client.BubbleNodeClient.*; | |||
import static bubble.model.cloud.BubbleNodeKey.TOKEN_GENERATION_LIMIT; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.empty; | |||
import static org.cobbzilla.util.http.HttpContentTypes.APPLICATION_JSON; | |||
import static org.cobbzilla.util.http.HttpContentTypes.APPLICATION_OCTET_STREAM; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.network.NetworkUtil.configuredIpsAndExternalIp; | |||
import static org.cobbzilla.util.network.NetworkUtil.isLocalHost; | |||
import static org.cobbzilla.util.string.StringUtil.truncate; | |||
import static org.cobbzilla.util.time.TimeUtil.formatDuration; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.*; | |||
@Consumes(APPLICATION_JSON) | |||
@@ -53,14 +39,9 @@ import static org.cobbzilla.wizard.resources.ResourceUtil.*; | |||
@Service @Slf4j | |||
public class InboundNotifyResource { | |||
@Autowired private ReceivedNotificationDAO receivedNotificationDAO; | |||
@Autowired private BubbleConfiguration configuration; | |||
@Autowired private BubbleNetworkDAO networkDAO; | |||
@Autowired private BubbleNodeDAO nodeDAO; | |||
@Autowired private BubbleNodeKeyDAO nodeKeyDAO; | |||
@Autowired private NotificationService notificationService; | |||
@Autowired private StorageStreamService storageStreamService; | |||
@Autowired private RestoreService restoreService; | |||
@Autowired private NotificationReceiverService notificationReceiverService; | |||
@Getter(lazy=true) private final Set<String> localIps = configuredIpsAndExternalIp(); | |||
@@ -69,9 +50,11 @@ public class InboundNotifyResource { | |||
@Context ContainerRequest ctx, | |||
JsonNode jsonNode) { | |||
try { | |||
log.debug("_notify:\n<<<<< RECEIVED NOTIFICATION from "+getRemoteHost(req)+" <<<<<\n" | |||
+ (jsonNode == null ? "null" : truncate(json(jsonNode), MAX_NOTIFY_LOG)) | |||
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); | |||
if (log.isTraceEnabled()) { | |||
log.trace("_notify:\n<<<<< RECEIVED NOTIFICATION from " + getRemoteHost(req) + " <<<<<\n" | |||
+ (jsonNode == null ? "null" : truncate(json(jsonNode), MAX_NOTIFY_LOG)) | |||
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); | |||
} | |||
final RsaMessage message = json(jsonNode, RsaMessage.class); | |||
final String remoteHost = getRemoteHost(req); | |||
@@ -98,8 +81,8 @@ public class InboundNotifyResource { | |||
} | |||
final String restoreKey = req.getHeader(H_BUBBLE_RESTORE_KEY); | |||
log.debug("receiveNotification: header value for "+H_BUBBLE_RESTORE_KEY+"="+restoreKey); | |||
final BubbleNodeKey fromKey = findFromKey(fromNode, fromKeyUuid, remoteHost, restoreKey, message); | |||
// Find our key as receiver | |||
final String toKeyUuid = req.getHeader(H_BUBBLE_TO_NODE_KEY); | |||
@@ -107,36 +90,15 @@ public class InboundNotifyResource { | |||
log.warn("receiveNotification: missing " + H_BUBBLE_TO_NODE_KEY + " request header"); | |||
return forbidden(); | |||
} | |||
final BubbleNode thisNode = configuration.getThisNode(); | |||
final BubbleNodeKey toKey = nodeKeyDAO.findByNodeAndUuid(thisNode.getUuid(), toKeyUuid); | |||
if (toKey == null) { | |||
log.warn("receiveNotification: node key " + toKeyUuid + " not found"); | |||
return forbidden(); | |||
} | |||
// If the message is not from ourselves, check the remoteHost | |||
if (!toKeyUuid.equals(fromKeyUuid) && !remoteHost.equals(fromKey.getRemoteHost())) { | |||
log.warn("receiveNotification: remoteHost mismatch: request="+remoteHost+", key="+fromKey.getRemoteHost()); | |||
return forbidden(); | |||
} | |||
// Decrypt message | |||
log.debug("decrypting message with key: "+toKey.getUuid()); | |||
final String json = toKey.decrypt(message, fromKey.getRsaKey()); | |||
log.debug("_notify:\n<<<<< DECRYPTED NOTIFICATION <<<<<\n" | |||
+ truncate(json, MAX_NOTIFY_LOG) | |||
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); | |||
final SentNotification notification = json(json, SentNotification.class); | |||
final NotificationReceipt receipt = new NotificationReceipt(); | |||
if (notification.isResolveNodes()) { | |||
receipt.setResolvedSender(nodeDAO.findByUuid(fromNodeUuid)); | |||
receipt.setResolvedRecipient(configuration.getThisNode()); | |||
} | |||
receivedNotificationDAO.create(new ReceivedNotification(notification).setReceipt(receipt)); | |||
notificationService.checkInbox(); | |||
final NotificationReceipt receipt = notificationReceiverService.receive(new InboundNotification() | |||
.setMessage(message) | |||
.setRemoteHost(remoteHost) | |||
.setFromNodeUuid(fromNodeUuid) | |||
.setFromKeyUuid(fromKeyUuid) | |||
.setToKeyUuid(toKeyUuid) | |||
.setRestoreKey(restoreKey) | |||
); | |||
return ok(receipt); | |||
} catch (Exception e) { | |||
@@ -145,104 +107,6 @@ public class InboundNotifyResource { | |||
} | |||
} | |||
private BubbleNodeKey findFromKey(BubbleNode fromNode, String fromKeyUuid, String remoteHost, String restoreKey, RsaMessage message) { | |||
final String fromNodeUuid = fromNode.getUuid(); | |||
BubbleNodeKey fromKey = nodeKeyDAO.findByNodeAndUuid(fromNodeUuid, fromKeyUuid); | |||
if (fromKey != null) { | |||
if (!fromKey.getRemoteHost().equals(remoteHost)) { | |||
// if request is from 127.0.0.1, check to see if fromKey is for a local address | |||
if (isLocalHost(remoteHost) && getLocalIps().contains(fromKey.getRemoteHost())) { | |||
log.debug("findFromKey: request from 127.0.0.1 is OK, key is local: "+fromKey.getRemoteHost()+ " (ips="+ StringUtil.toString(getLocalIps())+")"); | |||
} else { | |||
log.warn("findFromKey: remoteHost for for node " + fromNodeUuid + " (key=" + fromKeyUuid + ", remoteHost=" + fromKey.getRemoteHost() + ") does not match request: " + remoteHost+ " (ips="+ StringUtil.toString(getLocalIps())+")"); | |||
throw forbiddenEx(); | |||
} | |||
} | |||
return fromKey; | |||
} | |||
// Do we have any other keys for this node? | |||
final List<BubbleNodeKey> currentKeys = nodeKeyDAO.findByNode(fromNodeUuid); | |||
// Ensure remote host matches | |||
final String currentRemoteHost = nodeKeyDAO.findRemoteHostForNode(fromNodeUuid); | |||
if (currentRemoteHost != null && !remoteHost.equals(currentRemoteHost)) { | |||
log.warn("findFromKey: new key provided for node "+fromNodeUuid+" but remoteHost does not match: "+remoteHost); | |||
throw forbiddenEx(); | |||
} | |||
// Do we have the current key? | |||
fromKey = currentKeys.stream() | |||
.filter(k -> k.getPublicKey().equals(message.getPublicKey())) | |||
.findFirst() | |||
.orElse(null); | |||
if (fromKey != null) return fromKey; | |||
// Create a record for this key, no private key because only the node knows that. | |||
// Record remoteHost, future requests must match | |||
if (currentKeys.isEmpty()) { | |||
// verify old keys match remoteHost | |||
fromKey = createFromKey(fromNode, fromKeyUuid, remoteHost, message); | |||
log.info("findFromKey: registered new node key: " + fromKeyUuid + " for node: " + fromNodeUuid); | |||
return fromKey; | |||
} | |||
// we have current keys, why are they not using one of those? | |||
// maybe because they are all about to expire? | |||
if (currentKeys.stream().allMatch(k -> k.expiresInLessThan(TOKEN_GENERATION_LIMIT))) { | |||
// OK, we'll create it since all other keys have less than 24 hours left | |||
fromKey = createFromKey(fromNode, fromKeyUuid, remoteHost, message); | |||
log.info("findFromKey: due to expiring current key, registered new node key: " + fromKeyUuid + " for node: " + fromNodeUuid); | |||
return fromKey; | |||
} else if (!empty(restoreKey) && isValidRestoreKey(fromNode, restoreKey, remoteHost)) { | |||
fromKey = createFromKey(fromNode, fromKeyUuid, remoteHost, message); | |||
log.info("findFromKey: accepting key with valid restoreKey ("+restoreKey+"): registered new node key: " + fromKeyUuid + " for node: " + fromNodeUuid); | |||
return fromKey; | |||
} else { | |||
// todo: send verify_key synchronous message to node, if it can verify the key, then we'll accept it | |||
log.warn("findFromKey: new key not accepted, current keys exist that are not expiring soon, node should use one of those"); | |||
throw forbiddenEx(); | |||
} | |||
} | |||
private boolean isValidRestoreKey(BubbleNode fromNode, String restoreKey, String remoteHost) { | |||
final BubbleNetwork network = networkDAO.findByUuid(fromNode.getNetwork()); | |||
if (network == null) { | |||
log.info("isValidRestoreKey: network not found ("+fromNode.getNetwork()+"), returning false"); | |||
return false; | |||
} | |||
if (network.getState() != BubbleNetworkState.restoring) { | |||
log.info("isValidRestoreKey: network ("+network.getUuid()+") is not in 'restoring' state ("+network.getState()+"), returning false"); | |||
return false; | |||
} | |||
if (network.getMtimeAge() > RestoreService.RESTORE_WINDOW) { | |||
log.info("isValidRestoreKey: network ("+network.getUuid()+") has been in 'restoring' state too long ("+formatDuration(network.getMtimeAge())+"), must stop network and retry restore, returning false"); | |||
return false; | |||
} | |||
if (!restoreService.isValidRestoreKey(restoreKey)) { | |||
log.info("isValidRestoreKey: restoreKey ("+restoreKey+") is not valid, returning false"); | |||
return false; | |||
} | |||
if (!fromNode.hasSameIp(remoteHost)) { | |||
log.info("isValidRestoreKey: remoteHost ("+remoteHost+") does not match IP of restoring node ("+fromNode.id()+"), returning false"); | |||
return false; | |||
} | |||
return true; | |||
} | |||
private final Object createKeyLock = new Object(); | |||
private BubbleNodeKey createFromKey(BubbleNode fromNode, String fromKeyUuid, String remoteHost, RsaMessage message) { | |||
synchronized (createKeyLock) { | |||
final BubbleNodeKey existing = nodeKeyDAO.findByUuid(fromKeyUuid); | |||
return existing != null | |||
? existing | |||
: nodeKeyDAO.create(new BubbleNodeKey(fromKeyUuid, fromNode, message.getPublicKey(), remoteHost)); | |||
} | |||
} | |||
@GET @Path(EP_READ+"/{token}") | |||
public Response readStorage(@Context Request req, | |||
@Context ContainerRequest ctx, | |||
@@ -10,6 +10,7 @@ import bubble.model.cloud.BubbleNode; | |||
import bubble.server.listener.BubbleFirstTimeListener; | |||
import bubble.service.boot.ActivationService; | |||
import bubble.service.boot.StandardSelfNodeService; | |||
import bubble.service.notify.LocalNotificationStrategy; | |||
import com.fasterxml.jackson.annotation.JsonIgnore; | |||
import com.github.jknack.handlebars.Handlebars; | |||
import lombok.Getter; | |||
@@ -70,14 +71,18 @@ public class BubbleConfiguration extends PgRestServerConfiguration | |||
public static final String TAG_LOCALES = "locales"; | |||
public static final String TAG_CLOUD_CONFIGS = "cloudConfigs"; | |||
public static final String TAG_LOCKED = "locked"; | |||
public static final String TAG_NGINX_PORT = "nginxPort"; | |||
public static final String TAG_SSL_PORT = "sslPort"; | |||
public static final String DEFAULT_LOCAL_STORAGE_DIR = HOME_DIR + "/.bubble_local_storage"; | |||
public BubbleConfiguration (BubbleConfiguration other) { copy(this, other); } | |||
@Getter @Setter private int nginxPort = 1443; | |||
@Getter @Setter private int mitmPort = 8888; | |||
@Getter @Setter private int defaultNodeSslPort = 1443; | |||
@Getter @Setter private LocalNotificationStrategy localNotificationStrategy = LocalNotificationStrategy.inline; | |||
public LocalNotificationStrategy localNotificationStrategy() { | |||
return localNotificationStrategy == null ? LocalNotificationStrategy.inline : localNotificationStrategy; | |||
} | |||
@Getter @Setter private Boolean backupsEnabled = true; | |||
public boolean backupsEnabled() { return backupsEnabled == null || backupsEnabled; } | |||
@@ -250,6 +255,7 @@ public class BubbleConfiguration extends PgRestServerConfiguration | |||
public Map<String, Object> getPublicSystemConfigs () { | |||
synchronized (publicSystemConfigs) { | |||
if (publicSystemConfigs.get() == null) { | |||
final BubbleNode thisNode = getThisNode(); | |||
final BubbleNetwork thisNetwork = getThisNetwork(); | |||
final AccountDAO accountDAO = getBean(AccountDAO.class); | |||
final ActivationService activationService = getBean(ActivationService.class); | |||
@@ -263,7 +269,7 @@ public class BubbleConfiguration extends PgRestServerConfiguration | |||
{TAG_LOCALES, getAllLocales()}, | |||
{TAG_CLOUD_CONFIGS, accountDAO.activated() ? null : activationService.getCloudDefaults()}, | |||
{TAG_LOCKED, accountDAO.locked()}, | |||
{TAG_NGINX_PORT, getNginxPort()} | |||
{TAG_SSL_PORT, thisNode == null ? null : thisNode.getSslPort()} | |||
})); | |||
} | |||
return publicSystemConfigs.get(); | |||
@@ -7,6 +7,7 @@ import bubble.model.cloud.BubbleNode; | |||
import bubble.model.cloud.CloudService; | |||
import bubble.server.BubbleConfiguration; | |||
import bubble.service.boot.SelfNodeService; | |||
import bubble.service.notify.LocalNotificationStrategy; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.wizard.server.RestServer; | |||
import org.cobbzilla.wizard.server.RestServerLifecycleListenerBase; | |||
@@ -23,6 +24,8 @@ import static org.cobbzilla.util.time.TimeUtil.DATE_FORMAT_YYYY_MM_DD_HH_mm_ss; | |||
@Slf4j | |||
public class NodeInitializerListener extends RestServerLifecycleListenerBase<BubbleConfiguration> { | |||
private static final int MIN_WORKER_THREADS_FOR_LOCAL_HTTP_NOTIFY = 10; | |||
@Override public void beforeStart(RestServer server) { | |||
final BubbleConfiguration c = (BubbleConfiguration) server.getConfiguration(); | |||
@@ -33,6 +36,14 @@ public class NodeInitializerListener extends RestServerLifecycleListenerBase<Bub | |||
// ensure locales were loaded correctly | |||
final String[] allLocales = c.getAllLocales(); | |||
if (empty(allLocales)) die("beforeStart: no locales found"); // should never happen | |||
// if we are using the 'http' localNotificationStrategy, ensure we have enough worker threads | |||
if (c.localNotificationStrategy() == LocalNotificationStrategy.http) { | |||
if (!c.getHttp().hasWorkerThreads() || c.getHttp().getWorkerThreads() < MIN_WORKER_THREADS_FOR_LOCAL_HTTP_NOTIFY) { | |||
log.info("beforeStart: http.workerThreads="+c.getHttp().getWorkerThreads()+" is not set or too low, increasing to "+MIN_WORKER_THREADS_FOR_LOCAL_HTTP_NOTIFY); | |||
c.getHttp().setWorkerThreads(MIN_WORKER_THREADS_FOR_LOCAL_HTTP_NOTIFY); | |||
} | |||
} | |||
} | |||
@Override public void onStart(RestServer server) { | |||
@@ -71,7 +71,7 @@ public class AnsiblePrepService { | |||
ctx.put("restoreTimeoutSeconds", RESTORE_MONITOR_SCRIPT_TIMEOUT_SECONDS); | |||
} | |||
final int sslPort = installType == AnsibleInstallType.sage ? 443 : configuration.getNginxPort(); | |||
final int sslPort = node.getSslPort(); | |||
ctx.put("sslPort", sslPort); | |||
final String publicBaseUri = sslPort == 443 | |||
? "https://"+network.getNetworkDomain()+"/" | |||
@@ -17,9 +17,11 @@ import bubble.model.cloud.BubbleNetwork; | |||
import bubble.model.cloud.CloudService; | |||
import bubble.model.cloud.NetLocation; | |||
import bubble.server.BubbleConfiguration; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.apache.commons.lang.ArrayUtils; | |||
import org.cobbzilla.util.collection.ExpirationMap; | |||
import org.cobbzilla.wizard.cache.redis.RedisService; | |||
import org.cobbzilla.wizard.validation.SimpleViolationException; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.stereotype.Service; | |||
@@ -33,8 +35,11 @@ import java.util.stream.Collectors; | |||
import static bubble.model.cloud.RegionalServiceDriver.findClosestRegions; | |||
import static java.util.concurrent.TimeUnit.DAYS; | |||
import static java.util.concurrent.TimeUnit.MINUTES; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.*; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.string.LocaleUtil.getDefaultLocales; | |||
import static org.cobbzilla.wizard.cache.redis.RedisService.EX; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.invalidEx; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.notFoundEx; | |||
@@ -48,29 +53,35 @@ public class GeoService { | |||
@Autowired private CloudServiceDAO cloudDAO; | |||
@Autowired private BubbleFootprintDAO footprintDAO; | |||
@Autowired private BubbleConfiguration configuration; | |||
@Autowired private RedisService redis; | |||
private final Map<String, GeoLocation> locateCache = new ExpirationMap<>(DAYS.toMillis(1)); | |||
private static final long REDIS_CACHE_TIME = DAYS.toSeconds(1); | |||
private static final long MEMORY_CACHE_TIME = MINUTES.toSeconds(20); | |||
@Getter(lazy=true) private final RedisService locateRedis = redis.prefixNamespace(getClass().getName()+".locate"); | |||
private final Map<String, GeoLocation> locateCache = new ExpirationMap<>(MEMORY_CACHE_TIME); | |||
public GeoLocation locate (String accountUuid, String ip) { | |||
final String cacheKey = hashOf(accountUuid, ip); | |||
return locateCache.computeIfAbsent(cacheKey, k -> { | |||
final String found = getLocateRedis().get(cacheKey); | |||
if (found != null) return json(found, GeoLocation.class); | |||
List<CloudService> candidateGeoLocationServices = null; | |||
if (accountUuid != null) { | |||
candidateGeoLocationServices = cloudDAO.findByAccountAndType(accountUuid, CloudServiceType.geoLocation); | |||
} | |||
if (empty(candidateGeoLocationServices)) { | |||
// try to find using admin | |||
final Account admin = accountDAO.getFirstAdmin(); | |||
if (admin != null && !admin.getUuid().equals(accountUuid)) { | |||
candidateGeoLocationServices = cloudDAO.findByAccountAndType(admin.getUuid(), CloudServiceType.geoLocation); | |||
List<CloudService> geoLocationServices = null; | |||
if (accountUuid != null) { | |||
geoLocationServices = cloudDAO.findByAccountAndType(accountUuid, CloudServiceType.geoLocation); | |||
} | |||
if (empty(geoLocationServices)) { | |||
// try to find using admin | |||
final Account admin = accountDAO.getFirstAdmin(); | |||
if (admin != null && !admin.getUuid().equals(accountUuid)) { | |||
geoLocationServices = cloudDAO.findByAccountAndType(admin.getUuid(), CloudServiceType.geoLocation); | |||
} | |||
} | |||
if (empty(geoLocationServices)) { | |||
throw new SimpleViolationException("err.geoLocateService.notFound"); | |||
} | |||
} | |||
if (empty(candidateGeoLocationServices)) { | |||
throw new SimpleViolationException("err.geoLocateService.notFound"); | |||
} | |||
final List<CloudService> geoLocationServices = candidateGeoLocationServices; | |||
final String cacheKey = hashOf(accountUuid, ip, geoLocationServices); | |||
return locateCache.computeIfAbsent(cacheKey, k -> { | |||
log.info("locate: resolving IP: "+ip+" for cacheKey: "+cacheKey); | |||
final List<GeoLocation> resolved = new ArrayList<>(); | |||
GeoCodeServiceDriver geoCodeDriver = null; | |||
@@ -106,7 +117,10 @@ public class GeoService { | |||
} | |||
} | |||
return getGeoLocation(ip, geoLocationServices, resolved); | |||
final GeoLocation geoLocation = getGeoLocation(ip, geoLocationServices, resolved); | |||
getLocateRedis().set(cacheKey, json(geoLocation), EX, REDIS_CACHE_TIME); | |||
return geoLocation; | |||
}); | |||
} | |||
@@ -150,10 +164,15 @@ public class GeoService { | |||
return resolved.get(0); | |||
} | |||
private Map<String, GeoTimeZone> timezoneCache = new ExpirationMap<>(DAYS.toMillis(1)); | |||
@Getter(lazy=true) private final RedisService timezoneRedis = redis.prefixNamespace(getClass().getName()+".timezone"); | |||
private Map<String, GeoTimeZone> timezoneCache = new ExpirationMap<>(MEMORY_CACHE_TIME); | |||
public GeoTimeZone getTimeZone (final Account account, String ip) { | |||
final AtomicReference<Account> acct = new AtomicReference<>(account); | |||
return timezoneCache.computeIfAbsent(ip, k -> { | |||
final String found = getTimezoneRedis().get(ip); | |||
if (found != null) return json(found, GeoTimeZone.class); | |||
if (acct.get() == null) acct.set(accountDAO.getFirstAdmin()); | |||
List<CloudService> geoServices = cloudDAO.findByAccountAndType(acct.get().getUuid(), CloudServiceType.geoTime); | |||
if (geoServices.isEmpty() && !account.admin()) { | |||
@@ -176,7 +195,9 @@ public class GeoService { | |||
location.setLon(code.getLon()); | |||
} | |||
return geoServices.get(0).getGeoTimeDriver(configuration).getTimezone(location.getLat(), location.getLon()); | |||
final GeoTimeZone timezone = geoServices.get(0).getGeoTimeDriver(configuration).getTimezone(location.getLat(), location.getLon()); | |||
getTimezoneRedis().set(ip, json(timezone), EX, REDIS_CACHE_TIME); | |||
return timezone; | |||
}); | |||
} | |||
@@ -237,6 +258,7 @@ public class GeoService { | |||
} | |||
private Map<String, List<String>> localesCache = new ExpirationMap<>(DAYS.toMillis(1)); | |||
public List<String> getSupportedLocales(Account caller, String remoteHost, String langHeader) { | |||
return localesCache.computeIfAbsent((caller==null?"null":caller.getUuid())+remoteHost+"\t"+langHeader, k -> { | |||
final List<String> locales = new ArrayList<>(); | |||
@@ -183,6 +183,7 @@ public class StandardNetworkService implements NetworkService { | |||
.setHost(nn.getHost()) | |||
.setState(BubbleNodeState.created) | |||
.setSageNode(nn.fork() ? null : configuration.getThisNode().getUuid()) | |||
.setSslPort(network.getInstallType() == AnsibleInstallType.sage ? 443 : configuration.getDefaultNodeSslPort()) | |||
.setNetwork(network.getUuid()) | |||
.setDomain(network.getDomain()) | |||
.setAccount(network.getAccount()) | |||
@@ -0,0 +1,19 @@ | |||
package bubble.service.notify; | |||
import lombok.Getter; | |||
import lombok.NoArgsConstructor; | |||
import lombok.Setter; | |||
import lombok.experimental.Accessors; | |||
import org.cobbzilla.util.security.RsaMessage; | |||
@NoArgsConstructor @Accessors(chain=true) | |||
public class InboundNotification { | |||
@Getter @Setter private RsaMessage message; | |||
@Getter @Setter private String remoteHost; | |||
@Getter @Setter private String fromNodeUuid; | |||
@Getter @Setter private String fromKeyUuid; | |||
@Getter @Setter private String toKeyUuid; | |||
@Getter @Setter private String restoreKey; | |||
} |
@@ -0,0 +1,13 @@ | |||
package bubble.service.notify; | |||
import com.fasterxml.jackson.annotation.JsonCreator; | |||
import static bubble.ApiConstants.enumFromString; | |||
public enum LocalNotificationStrategy { | |||
http, queue, inline; | |||
@JsonCreator public LocalNotificationStrategy fromString (String v) { return enumFromString(LocalNotificationStrategy.class, v); } | |||
} |
@@ -38,6 +38,12 @@ public class NotificationInboxProcessor implements Runnable { | |||
} | |||
private void processNotification(ReceivedNotification n, Map<String, SynchronousNotification> syncRequests) { | |||
processNotification(n, syncRequests, configuration); | |||
} | |||
public static void processNotification(ReceivedNotification n, | |||
Map<String, SynchronousNotification> syncRequests, | |||
BubbleConfiguration configuration) { | |||
final ReceivedNotificationHandler handler = n.getType().getHandler(configuration); | |||
if (n.getType().isResponse()) { | |||
final SynchronousNotificationReply reply = json(n.getPayloadJson(), SynchronousNotificationReply.class); | |||
@@ -0,0 +1,208 @@ | |||
package bubble.service.notify; | |||
import bubble.dao.cloud.BubbleNetworkDAO; | |||
import bubble.dao.cloud.BubbleNodeDAO; | |||
import bubble.dao.cloud.BubbleNodeKeyDAO; | |||
import bubble.dao.cloud.notify.ReceivedNotificationDAO; | |||
import bubble.model.cloud.BubbleNetwork; | |||
import bubble.model.cloud.BubbleNetworkState; | |||
import bubble.model.cloud.BubbleNode; | |||
import bubble.model.cloud.BubbleNodeKey; | |||
import bubble.model.cloud.notify.NotificationReceipt; | |||
import bubble.model.cloud.notify.ReceivedNotification; | |||
import bubble.model.cloud.notify.SentNotification; | |||
import bubble.server.BubbleConfiguration; | |||
import bubble.service.backup.RestoreService; | |||
import lombok.Getter; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.cobbzilla.util.security.RsaMessage; | |||
import org.cobbzilla.util.string.StringUtil; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.stereotype.Service; | |||
import java.util.List; | |||
import java.util.Set; | |||
import static bubble.ApiConstants.MAX_NOTIFY_LOG; | |||
import static bubble.model.cloud.BubbleNodeKey.TOKEN_GENERATION_LIMIT; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.die; | |||
import static org.cobbzilla.util.daemon.ZillaRuntime.empty; | |||
import static org.cobbzilla.util.json.JsonUtil.json; | |||
import static org.cobbzilla.util.network.NetworkUtil.configuredIpsAndExternalIp; | |||
import static org.cobbzilla.util.network.NetworkUtil.isLocalHost; | |||
import static org.cobbzilla.util.string.StringUtil.truncate; | |||
import static org.cobbzilla.util.time.TimeUtil.formatDuration; | |||
import static org.cobbzilla.wizard.resources.ResourceUtil.forbiddenEx; | |||
@Service @Slf4j | |||
public class NotificationReceiverService { | |||
@Autowired private ReceivedNotificationDAO receivedNotificationDAO; | |||
@Autowired private BubbleConfiguration configuration; | |||
@Autowired private BubbleNetworkDAO networkDAO; | |||
@Autowired private BubbleNodeDAO nodeDAO; | |||
@Autowired private BubbleNodeKeyDAO nodeKeyDAO; | |||
@Autowired private NotificationService notificationService; | |||
@Autowired private RestoreService restoreService; | |||
public NotificationReceipt receive (InboundNotification n) { | |||
final RsaMessage message = n.getMessage(); | |||
final String remoteHost = n.getRemoteHost(); | |||
final String fromNodeUuid = n.getFromNodeUuid(); | |||
final String fromKeyUuid = n.getFromKeyUuid(); | |||
final String toKeyUuid = n.getToKeyUuid(); | |||
final String restoreKey = n.getRestoreKey(); | |||
final BubbleNode fromNode = nodeDAO.findByUuid(fromNodeUuid); | |||
if (fromNode == null) { | |||
log.warn("receiveNotification: fromNode not found: "+fromNodeUuid); | |||
throw forbiddenEx(); | |||
} | |||
final BubbleNodeKey fromKey = findFromKey(fromNode, fromKeyUuid, remoteHost, restoreKey, message); | |||
// Find our key as receiver | |||
final BubbleNode thisNode = configuration.getThisNode(); | |||
final BubbleNodeKey toKey = nodeKeyDAO.findByNodeAndUuid(thisNode.getUuid(), toKeyUuid); | |||
if (toKey == null) { | |||
log.warn("receiveNotification: node key " + toKeyUuid + " not found"); | |||
throw forbiddenEx(); | |||
} | |||
// If the message is not from ourselves, check the remoteHost | |||
if (!toKeyUuid.equals(fromKeyUuid) && !remoteHost.equals(fromKey.getRemoteHost())) { | |||
log.warn("receiveNotification: remoteHost mismatch: request="+remoteHost+", key="+fromKey.getRemoteHost()); | |||
throw forbiddenEx(); | |||
} | |||
// Decrypt message | |||
try { | |||
log.debug("decrypting message with key: "+toKey.getUuid()); | |||
final String json = toKey.decrypt(message, fromKey.getRsaKey()); | |||
if (log.isDebugEnabled()) { | |||
log.debug("_notify:\n<<<<< DECRYPTED NOTIFICATION <<<<<\n" | |||
+ truncate(json, MAX_NOTIFY_LOG) | |||
+ "\n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<"); | |||
} | |||
final SentNotification notification = json(json, SentNotification.class); | |||
return receive(fromNodeUuid, notification); | |||
} catch (Exception e) { | |||
return die("receiveNotification: "+e, e); | |||
} | |||
} | |||
public NotificationReceipt receive(String fromNodeUuid, SentNotification notification) { | |||
final NotificationReceipt receipt = new NotificationReceipt(); | |||
if (notification.isResolveNodes()) { | |||
receipt.setResolvedSender(nodeDAO.findByUuid(fromNodeUuid)); | |||
receipt.setResolvedRecipient(configuration.getThisNode()); | |||
} | |||
receivedNotificationDAO.create(new ReceivedNotification(notification).setReceipt(receipt)); | |||
notificationService.checkInbox(); | |||
return receipt; | |||
} | |||
@Getter(lazy=true) private final Set<String> localIps = configuredIpsAndExternalIp(); | |||
private BubbleNodeKey findFromKey(BubbleNode fromNode, String fromKeyUuid, String remoteHost, String restoreKey, RsaMessage message) { | |||
final String fromNodeUuid = fromNode.getUuid(); | |||
BubbleNodeKey fromKey = nodeKeyDAO.findByNodeAndUuid(fromNodeUuid, fromKeyUuid); | |||
if (fromKey != null) { | |||
if (!fromKey.getRemoteHost().equals(remoteHost)) { | |||
// if request is from 127.0.0.1, check to see if fromKey is for a local address | |||
if (isLocalHost(remoteHost) && getLocalIps().contains(fromKey.getRemoteHost())) { | |||
log.debug("findFromKey: request from 127.0.0.1 is OK, key is local: "+fromKey.getRemoteHost()+ " (ips="+ StringUtil.toString(getLocalIps())+")"); | |||
} else { | |||
log.warn("findFromKey: remoteHost for for node " + fromNodeUuid + " (key=" + fromKeyUuid + ", remoteHost=" + fromKey.getRemoteHost() + ") does not match request: " + remoteHost+ " (ips="+ StringUtil.toString(getLocalIps())+")"); | |||
throw forbiddenEx(); | |||
} | |||
} | |||
return fromKey; | |||
} | |||
// Do we have any other keys for this node? | |||
final List<BubbleNodeKey> currentKeys = nodeKeyDAO.findByNode(fromNodeUuid); | |||
// Ensure remote host matches | |||
final String currentRemoteHost = nodeKeyDAO.findRemoteHostForNode(fromNodeUuid); | |||
if (currentRemoteHost != null && !remoteHost.equals(currentRemoteHost)) { | |||
log.warn("findFromKey: new key provided for node "+fromNodeUuid+" but remoteHost does not match: "+remoteHost); | |||
throw forbiddenEx(); | |||
} | |||
// Do we have the current key? | |||
fromKey = currentKeys.stream() | |||
.filter(k -> k.getPublicKey().equals(message.getPublicKey())) | |||
.findFirst() | |||
.orElse(null); | |||
if (fromKey != null) return fromKey; | |||
// Create a record for this key, no private key because only the node knows that. | |||
// Record remoteHost, future requests must match | |||
if (currentKeys.isEmpty()) { | |||
// verify old keys match remoteHost | |||
fromKey = createFromKey(fromNode, fromKeyUuid, remoteHost, message); | |||
log.info("findFromKey: registered new node key: " + fromKeyUuid + " for node: " + fromNodeUuid); | |||
return fromKey; | |||
} | |||
// we have current keys, why are they not using one of those? | |||
// maybe because they are all about to expire? | |||
if (currentKeys.stream().allMatch(k -> k.expiresInLessThan(TOKEN_GENERATION_LIMIT))) { | |||
// OK, we'll create it since all other keys have less than 24 hours left | |||
fromKey = createFromKey(fromNode, fromKeyUuid, remoteHost, message); | |||
log.info("findFromKey: due to expiring current key, registered new node key: " + fromKeyUuid + " for node: " + fromNodeUuid); | |||
return fromKey; | |||
} else if (!empty(restoreKey) && isValidRestoreKey(fromNode, restoreKey, remoteHost)) { | |||
fromKey = createFromKey(fromNode, fromKeyUuid, remoteHost, message); | |||
log.info("findFromKey: accepting key with valid restoreKey ("+restoreKey+"): registered new node key: " + fromKeyUuid + " for node: " + fromNodeUuid); | |||
return fromKey; | |||
} else { | |||
// todo: send verify_key synchronous message to node, if it can verify the key, then we'll accept it | |||
log.warn("findFromKey: new key not accepted, current keys exist that are not expiring soon, node should use one of those"); | |||
throw forbiddenEx(); | |||
} | |||
} | |||
private boolean isValidRestoreKey(BubbleNode fromNode, String restoreKey, String remoteHost) { | |||
final BubbleNetwork network = networkDAO.findByUuid(fromNode.getNetwork()); | |||
if (network == null) { | |||
log.info("isValidRestoreKey: network not found ("+fromNode.getNetwork()+"), returning false"); | |||
return false; | |||
} | |||
if (network.getState() != BubbleNetworkState.restoring) { | |||
log.info("isValidRestoreKey: network ("+network.getUuid()+") is not in 'restoring' state ("+network.getState()+"), returning false"); | |||
return false; | |||
} | |||
if (network.getMtimeAge() > RestoreService.RESTORE_WINDOW) { | |||
log.info("isValidRestoreKey: network ("+network.getUuid()+") has been in 'restoring' state too long ("+formatDuration(network.getMtimeAge())+"), must stop network and retry restore, returning false"); | |||
return false; | |||
} | |||
if (!restoreService.isValidRestoreKey(restoreKey)) { | |||
log.info("isValidRestoreKey: restoreKey ("+restoreKey+") is not valid, returning false"); | |||
return false; | |||
} | |||
if (!fromNode.hasSameIp(remoteHost)) { | |||
log.info("isValidRestoreKey: remoteHost ("+remoteHost+") does not match IP of restoring node ("+fromNode.id()+"), returning false"); | |||
return false; | |||
} | |||
return true; | |||
} | |||
private final Object createKeyLock = new Object(); | |||
private BubbleNodeKey createFromKey(BubbleNode fromNode, String fromKeyUuid, String remoteHost, RsaMessage message) { | |||
synchronized (createKeyLock) { | |||
final BubbleNodeKey existing = nodeKeyDAO.findByUuid(fromKeyUuid); | |||
return existing != null | |||
? existing | |||
: nodeKeyDAO.create(new BubbleNodeKey(fromKeyUuid, fromNode, message.getPublicKey(), remoteHost)); | |||
} | |||
} | |||
} |
@@ -42,6 +42,7 @@ public class NotificationService { | |||
@Autowired private ReceivedNotificationDAO receivedNotificationDAO; | |||
@Autowired private BubbleConfiguration configuration; | |||
@Autowired private BubbleNodeKeyDAO nodeKeyDAO; | |||
@Autowired private NotificationReceiverService notificationReceiverService; | |||
public NotificationReceipt notify(String sender, ApiClientBase api, NotificationType type, Object payload) { | |||
return notify(sender, api, null, type, payload); | |||
@@ -72,46 +73,63 @@ public class NotificationService { | |||
return _notify(sender, api, toNodeUuid, type, payload, true); | |||
} | |||
public NotificationReceipt _notify(String sender, ApiClientBase api, String toNodeUuid, NotificationType type, Object payload, boolean enhancedReceipt) { | |||
final BubbleNode thisNode = configuration.getThisNode(); | |||
final boolean isLocal = toNodeUuid != null && toNodeUuid.equals(thisNode.getUuid()); | |||
final SentNotification notification = sentNotificationDAO.create((SentNotification) new SentNotification() | |||
.setNotificationId(getNotificationId(payload)) | |||
.setAccount(sender) | |||
.setType(type) | |||
.setFromNode(configuration.getThisNode().getUuid()) | |||
.setFromNode(thisNode.getUuid()) | |||
.setToNode(toNodeUuid != null ? toNodeUuid : api.getBaseUri()) | |||
.setUri(api.getBaseUri()+NOTIFY_ENDPOINT) | |||
.setUri(api.getBaseUri() + NOTIFY_ENDPOINT) | |||
.setPayloadJson(payload == null ? null : json(payload))); | |||
notification.setStatus(NotificationSendStatus.sending); | |||
sentNotificationDAO.update(notification); | |||
try { | |||
final String json = json(notification); | |||
if (log.isDebugEnabled()) { | |||
log.debug("_notify:\n>>>>> SENDING to " + api.getConnectionInfo().getBaseUri() + " >>>>>\n" | |||
+ truncate(json, MAX_NOTIFY_LOG) | |||
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); | |||
} else { | |||
log.info("_notify: >>>>> SENDING "+notification.getType()+" to " + api.getConnectionInfo().getBaseUri() + " >>>>>"); | |||
} | |||
final RestResponse response = api.doPost(NOTIFY_ENDPOINT, json); | |||
final NotificationReceipt receipt = json(response.json, NotificationReceipt.class); | |||
notification.setStatus(NotificationSendStatus.sent); | |||
notification.setReceipt(receipt); | |||
sentNotificationDAO.update(notification); | |||
log.debug("_notify: <<<<< RECEIPT <<<<<< "+json(receipt, COMPACT_MAPPER)+" <<<<<<<<<<<<<<<<<<"); | |||
if (isLocal && configuration.localNotificationStrategy() == LocalNotificationStrategy.inline) { | |||
final NotificationReceipt receipt = new NotificationReceipt(); | |||
final ReceivedNotification n = new ReceivedNotification(notification).setReceipt(receipt); | |||
NotificationInboxProcessor.processNotification(n, syncRequests, configuration); | |||
return receipt; | |||
} catch (ConnectException | ConnectTimeoutException | ApiException e) { | |||
notification.setStatus(NotificationSendStatus.error); | |||
notification.setException(e); | |||
} else { | |||
notification.setStatus(NotificationSendStatus.sending); | |||
sentNotificationDAO.update(notification); | |||
return die("_notify: " + e); | |||
} catch (Exception e) { | |||
notification.setStatus(NotificationSendStatus.error); | |||
notification.setException(e); | |||
sentNotificationDAO.update(notification); | |||
return die("_notify: " + e, e); | |||
try { | |||
final NotificationReceipt receipt; | |||
if (isLocal && configuration.localNotificationStrategy() == LocalNotificationStrategy.queue) { | |||
log.info("_notify: >>>>> SENDING " + notification.getType() + " to SELF via NotificationReceiverService >>>>>"); | |||
receipt = notificationReceiverService.receive(thisNode.getUuid(), notification); | |||
} else { | |||
final String json = json(notification); | |||
if (log.isTraceEnabled()) { | |||
log.trace("_notify:\n>>>>> SENDING to " + api.getConnectionInfo().getBaseUri() + " >>>>>\n" | |||
+ truncate(json, MAX_NOTIFY_LOG) | |||
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); | |||
} else { | |||
log.info("_notify: >>>>> SENDING " + notification.getType() + " to " + api.getConnectionInfo().getBaseUri() + " >>>>>"); | |||
} | |||
final RestResponse response = api.doPost(NOTIFY_ENDPOINT, json); | |||
receipt = json(response.json, NotificationReceipt.class); | |||
} | |||
notification.setStatus(NotificationSendStatus.sent); | |||
notification.setReceipt(receipt); | |||
sentNotificationDAO.update(notification); | |||
log.debug("_notify: <<<<< RECEIPT <<<<<< " + json(receipt, COMPACT_MAPPER) + " <<<<<<<<<<<<<<<<<<"); | |||
return receipt; | |||
} catch (ConnectException | ConnectTimeoutException | ApiException e) { | |||
notification.setStatus(NotificationSendStatus.error); | |||
notification.setException(e); | |||
sentNotificationDAO.update(notification); | |||
return die("_notify: " + e); | |||
} catch (Exception e) { | |||
notification.setStatus(NotificationSendStatus.error); | |||
notification.setException(e); | |||
sentNotificationDAO.update(notification); | |||
return die("_notify: " + e, e); | |||
} | |||
} | |||
} | |||
@@ -172,9 +190,11 @@ public class NotificationService { | |||
n.setProcessingStatus(NotificationProcessingStatus.processing); | |||
receivedNotificationDAO.update(n); | |||
receivedNotificationDAO.flush(); | |||
log.debug("checkInbox: spawning NotificationInboxProcessor for "+n.getType()+" notificationId="+n.getNotificationId()); | |||
daemon(new NotificationInboxProcessor(n, syncRequests, configuration, receivedNotificationDAO)); | |||
} | |||
} | |||
log.debug("checkInbox: finished"); | |||
} catch (Exception e) { | |||
log.error("checkInbox: "+e, e); | |||
} | |||
@@ -42,6 +42,8 @@ staticAssets: | |||
http: | |||
port: {{#exists BUBBLE_SERVER_PORT}}{{BUBBLE_SERVER_PORT}}{{else}}8090{{/exists}} | |||
baseUri: /api | |||
{{#exists BUBBLE_SELECTOR_THREADS}}selectorThreads: {{BUBBLE_SELECTOR_THREADS}}{{/exists}} | |||
{{#exists BUBBLE_WORKER_THREADS}}workerThreads: {{BUBBLE_WORKER_THREADS}}{{/exists}} | |||
jersey: | |||
resourcePackages: | |||
@@ -64,6 +66,8 @@ errorApi: | |||
key: {{ERRBIT_KEY}} | |||
env: {{ERRBIT_ENV}} | |||
localNotificationStrategy: {{#exists BUBBLE_LOCAL_NOTIFY}}{{BUBBLE_LOCAL_NOTIFY}}{{else}}inline{{/exists}} | |||
letsencryptEmail: {{LETSENCRYPT_EMAIL}} | |||
localStorageDir: {{LOCALSTORAGE_BASE_DIR}} | |||
@@ -1 +1 @@ | |||
Subproject commit 879203f920a120f0cded72acb5305719aeb1730a | |||
Subproject commit 519e0e4948ae275886913a8e4e956bd3b9d7e38b |
@@ -1 +1 @@ | |||
Subproject commit c9a8ff0a5cab48c3220b525ee94e430f745ea052 | |||
Subproject commit 486d2bb39781234ff3a5f603612ff2c21d8b4226 |
@@ -1 +1 @@ | |||
Subproject commit aeafb1ccbdcc6c742922b079fa42135a78515727 | |||
Subproject commit c9c0ae2823641b31ff33f6e710b50745c181ebb2 |