From 0725e9b0b5404ecc751d87aae7d5df3c866188f7 Mon Sep 17 00:00:00 2001 From: Jonathan Cobb Date: Sat, 18 Jul 2020 16:16:16 -0400 Subject: [PATCH] add imagerefs to packer job, notify all waiters --- .../java/bubble/model/cloud/BubbleNode.java | 12 +++---- .../java/bubble/service/packer/PackerJob.java | 33 ++++++++++++++----- .../bubble/service/packer/PackerService.java | 20 +++++------ 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/bubble-server/src/main/java/bubble/model/cloud/BubbleNode.java b/bubble-server/src/main/java/bubble/model/cloud/BubbleNode.java index 288fa82f..7f1465ad 100644 --- a/bubble-server/src/main/java/bubble/model/cloud/BubbleNode.java +++ b/bubble-server/src/main/java/bubble/model/cloud/BubbleNode.java @@ -182,7 +182,7 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl @ECSearchable(filter=true) @ECField(type=EntityFieldType.ip4, index=130) @ECIndex(unique=true) @Column(length=IP4_MAXLEN) - @Getter @Setter private String ip4; + @Getter @Setter private volatile String ip4; public boolean hasIp4() { return ip4 != null; } public boolean localIp4() { return ip4 == null || isLocalIpv4(ip4); } @@ -190,7 +190,7 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl @ECSearchable(filter=true) @ECField(type=EntityFieldType.ip6, index=140) @ECIndex(unique=true) @Column(length=IP6_MAXLEN) - @Getter @Setter private String ip6; + @Getter @Setter private volatile String ip6; public boolean hasIp6() { return ip6 != null; } public boolean hasSameIp(BubbleNode other) { @@ -262,8 +262,8 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl public void waitForIpAddresses(NodeProgressMeter progressMeter) throws TimeoutException, IOException { final long start = now(); - long lastWrite = start; - while ((!hasIp4() || !hasIp6()) && !hasLaunchException() && now() - start < getIpTimeout()) { + long lastWrite = 0; + while ((!hasIp4() || !hasIp6()) && !hasLaunchException() && now() - start < ipTimeout()) { sleep(SECONDS.toMillis(2), "waiting for node to have IP addresses"); if (progressMeter != null && isPackerImageCreation() && now() - lastWrite > LAUNCH_ACTIVITY_TIMEOUT/2) { log.info("waitForIpAddresses: packerImageCreation is true, keeping progress meter alive"); @@ -275,9 +275,7 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl if (!hasIp4() || !hasIp6()) throw new TimeoutException("waitForIpAddresses: timeout"); } - private long getIpTimeout() { - return isPackerImageCreation() ? PACKER_IP_ADDR_TIMEOUT : IP_ADDR_TIMEOUT; - } + private long ipTimeout() { return isPackerImageCreation() ? PACKER_IP_ADDR_TIMEOUT : IP_ADDR_TIMEOUT; } @Transient @Getter @Setter private BubbleVersionInfo sageVersion; public boolean hasSageVersion () { return sageVersion != null && sageVersion.valid(); } diff --git a/bubble-server/src/main/java/bubble/service/packer/PackerJob.java b/bubble-server/src/main/java/bubble/service/packer/PackerJob.java index 1acacfdd..f92c36f9 100644 --- a/bubble-server/src/main/java/bubble/service/packer/PackerJob.java +++ b/bubble-server/src/main/java/bubble/service/packer/PackerJob.java @@ -80,15 +80,32 @@ public class PackerJob implements Callable> { @Autowired private GeoService geoService; @Autowired private PackerService packerService; - @Getter private CloudService cloud; - @Getter private AnsibleInstallType installType; - @Getter private AtomicReference> imagesRef; + @Getter private final CloudService cloud; + @Getter private final AnsibleInstallType installType; + @Getter private final List>> imagesRefList = new ArrayList<>(); @Getter private List images = new ArrayList<>(); - public PackerJob(CloudService cloud, AnsibleInstallType installType, AtomicReference> imagesRef) { + public PackerJob(CloudService cloud, AnsibleInstallType installType) { this.cloud = cloud; this.installType = installType; - this.imagesRef = imagesRef; + } + + public void addImagesRef(AtomicReference> imagesRef) { + synchronized (imagesRefList) { + if (!images.isEmpty()) { + imagesRef.set(images); + } else { + imagesRefList.add(imagesRef); + } + } + } + + private void setImagesRefs() { + synchronized (imagesRefList) { + for (AtomicReference> ref : imagesRefList) { + ref.set(images); + } + } } public String cacheKey() { return PackerService.cacheKey(cloud, installType); } @@ -157,7 +174,7 @@ public class PackerJob implements Callable> { // this image is for all regions log.info("packer image already exists for "+installType+" for all regions"); images = existingForInstallType; - if (imagesRef != null) imagesRef.set(images); + setImagesRefs(); return images; } else { @@ -175,7 +192,7 @@ public class PackerJob implements Callable> { if (empty(imagesToCreate)) { log.info("packer image already exists for "+installType+" for all regions"); images = existingForInstallType; - if (imagesRef != null) imagesRef.set(images); + setImagesRefs(); return images; } ctx.put(IMAGE_REGIONS_VAR, toInnerStringList(imagesToCreate)); @@ -257,7 +274,7 @@ public class PackerJob implements Callable> { images.addAll(finalizedImages); } - if (imagesRef != null) imagesRef.set(images); + setImagesRefs(); log.info("packer images created in "+formatDuration(now() - start)+": "+images); return images; } diff --git a/bubble-server/src/main/java/bubble/service/packer/PackerService.java b/bubble-server/src/main/java/bubble/service/packer/PackerService.java index 06e3a0f3..589886d1 100644 --- a/bubble-server/src/main/java/bubble/service/packer/PackerService.java +++ b/bubble-server/src/main/java/bubble/service/packer/PackerService.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -43,24 +42,25 @@ public class PackerService { .stream().filter(s -> !empty(s)).collect(Collectors.toList()); public static final String PACKER_KEY_NAME = "packer_rsa"; - private final Map>> activeJobs = new ConcurrentHashMap<>(16); + private final Map activeJobs = new ConcurrentHashMap<>(16); private final Map> completedJobs = new ConcurrentHashMap<>(16); private final ExecutorService pool = DaemonThreadFactory.fixedPool(5); @Autowired private BubbleConfiguration configuration; - public List writePackerImages(CloudService cloud, - AnsibleInstallType installType, - AtomicReference> imagesRef) { + public void writePackerImages(CloudService cloud, + AnsibleInstallType installType, + AtomicReference> imagesRef) { final String cacheKey = cacheKey(cloud, installType); synchronized (activeJobs) { final List images = completedJobs.get(cacheKey); - if (images != null) return images; - activeJobs.computeIfAbsent(cacheKey, k -> { - final PackerJob packerJob = configuration.autowire(new PackerJob(cloud, installType, imagesRef)); - return pool.submit(packerJob); + if (images != null) return; + final PackerJob job = activeJobs.computeIfAbsent(cacheKey, k -> { + final PackerJob packerJob = configuration.autowire(new PackerJob(cloud, installType)); + pool.submit(packerJob); + return packerJob; }); - return null; + job.addImagesRef(imagesRef); } }