瀏覽代碼

add imagerefs to packer job, notify all waiters

tags/v0.15.1
Jonathan Cobb 4 年之前
父節點
當前提交
0725e9b0b5
共有 3 個文件被更改,包括 40 次插入25 次删除
  1. +5
    -7
      bubble-server/src/main/java/bubble/model/cloud/BubbleNode.java
  2. +25
    -8
      bubble-server/src/main/java/bubble/service/packer/PackerJob.java
  3. +10
    -10
      bubble-server/src/main/java/bubble/service/packer/PackerService.java

+ 5
- 7
bubble-server/src/main/java/bubble/model/cloud/BubbleNode.java 查看文件

@@ -182,7 +182,7 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl

@ECSearchable(filter=true) @ECField(type=EntityFieldType.ip4, index=130)
@ECIndex(unique=true) @Column(length=IP4_MAXLEN)
@Getter @Setter private String ip4;
@Getter @Setter private volatile String ip4;
public boolean hasIp4() { return ip4 != null; }
public boolean localIp4() { return ip4 == null || isLocalIpv4(ip4); }

@@ -190,7 +190,7 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl

@ECSearchable(filter=true) @ECField(type=EntityFieldType.ip6, index=140)
@ECIndex(unique=true) @Column(length=IP6_MAXLEN)
@Getter @Setter private String ip6;
@Getter @Setter private volatile String ip6;
public boolean hasIp6() { return ip6 != null; }

public boolean hasSameIp(BubbleNode other) {
@@ -262,8 +262,8 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl

public void waitForIpAddresses(NodeProgressMeter progressMeter) throws TimeoutException, IOException {
final long start = now();
long lastWrite = start;
while ((!hasIp4() || !hasIp6()) && !hasLaunchException() && now() - start < getIpTimeout()) {
long lastWrite = 0;
while ((!hasIp4() || !hasIp6()) && !hasLaunchException() && now() - start < ipTimeout()) {
sleep(SECONDS.toMillis(2), "waiting for node to have IP addresses");
if (progressMeter != null && isPackerImageCreation() && now() - lastWrite > LAUNCH_ACTIVITY_TIMEOUT/2) {
log.info("waitForIpAddresses: packerImageCreation is true, keeping progress meter alive");
@@ -275,9 +275,7 @@ public class BubbleNode extends IdentifiableBase implements HasNetwork, HasBubbl
if (!hasIp4() || !hasIp6()) throw new TimeoutException("waitForIpAddresses: timeout");
}

private long getIpTimeout() {
return isPackerImageCreation() ? PACKER_IP_ADDR_TIMEOUT : IP_ADDR_TIMEOUT;
}
private long ipTimeout() { return isPackerImageCreation() ? PACKER_IP_ADDR_TIMEOUT : IP_ADDR_TIMEOUT; }

@Transient @Getter @Setter private BubbleVersionInfo sageVersion;
public boolean hasSageVersion () { return sageVersion != null && sageVersion.valid(); }


+ 25
- 8
bubble-server/src/main/java/bubble/service/packer/PackerJob.java 查看文件

@@ -80,15 +80,32 @@ public class PackerJob implements Callable<List<PackerImage>> {
@Autowired private GeoService geoService;
@Autowired private PackerService packerService;

@Getter private CloudService cloud;
@Getter private AnsibleInstallType installType;
@Getter private AtomicReference<List<PackerImage>> imagesRef;
@Getter private final CloudService cloud;
@Getter private final AnsibleInstallType installType;
@Getter private final List<AtomicReference<List<PackerImage>>> imagesRefList = new ArrayList<>();
@Getter private List<PackerImage> images = new ArrayList<>();

public PackerJob(CloudService cloud, AnsibleInstallType installType, AtomicReference<List<PackerImage>> imagesRef) {
public PackerJob(CloudService cloud, AnsibleInstallType installType) {
this.cloud = cloud;
this.installType = installType;
this.imagesRef = imagesRef;
}

public void addImagesRef(AtomicReference<List<PackerImage>> imagesRef) {
synchronized (imagesRefList) {
if (!images.isEmpty()) {
imagesRef.set(images);
} else {
imagesRefList.add(imagesRef);
}
}
}

private void setImagesRefs() {
synchronized (imagesRefList) {
for (AtomicReference<List<PackerImage>> ref : imagesRefList) {
ref.set(images);
}
}
}

public String cacheKey() { return PackerService.cacheKey(cloud, installType); }
@@ -157,7 +174,7 @@ public class PackerJob implements Callable<List<PackerImage>> {
// this image is for all regions
log.info("packer image already exists for "+installType+" for all regions");
images = existingForInstallType;
if (imagesRef != null) imagesRef.set(images);
setImagesRefs();
return images;

} else {
@@ -175,7 +192,7 @@ public class PackerJob implements Callable<List<PackerImage>> {
if (empty(imagesToCreate)) {
log.info("packer image already exists for "+installType+" for all regions");
images = existingForInstallType;
if (imagesRef != null) imagesRef.set(images);
setImagesRefs();
return images;
}
ctx.put(IMAGE_REGIONS_VAR, toInnerStringList(imagesToCreate));
@@ -257,7 +274,7 @@ public class PackerJob implements Callable<List<PackerImage>> {
images.addAll(finalizedImages);
}

if (imagesRef != null) imagesRef.set(images);
setImagesRefs();
log.info("packer images created in "+formatDuration(now() - start)+": "+images);
return images;
}


+ 10
- 10
bubble-server/src/main/java/bubble/service/packer/PackerService.java 查看文件

@@ -19,7 +19,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

@@ -43,24 +42,25 @@ public class PackerService {
.stream().filter(s -> !empty(s)).collect(Collectors.toList());
public static final String PACKER_KEY_NAME = "packer_rsa";

private final Map<String, Future<List<PackerImage>>> activeJobs = new ConcurrentHashMap<>(16);
private final Map<String, PackerJob> activeJobs = new ConcurrentHashMap<>(16);
private final Map<String, List<PackerImage>> completedJobs = new ConcurrentHashMap<>(16);
private final ExecutorService pool = DaemonThreadFactory.fixedPool(5);

@Autowired private BubbleConfiguration configuration;

public List<PackerImage> writePackerImages(CloudService cloud,
AnsibleInstallType installType,
AtomicReference<List<PackerImage>> imagesRef) {
public void writePackerImages(CloudService cloud,
AnsibleInstallType installType,
AtomicReference<List<PackerImage>> imagesRef) {
final String cacheKey = cacheKey(cloud, installType);
synchronized (activeJobs) {
final List<PackerImage> images = completedJobs.get(cacheKey);
if (images != null) return images;
activeJobs.computeIfAbsent(cacheKey, k -> {
final PackerJob packerJob = configuration.autowire(new PackerJob(cloud, installType, imagesRef));
return pool.submit(packerJob);
if (images != null) return;
final PackerJob job = activeJobs.computeIfAbsent(cacheKey, k -> {
final PackerJob packerJob = configuration.autowire(new PackerJob(cloud, installType));
pool.submit(packerJob);
return packerJob;
});
return null;
job.addImagesRef(imagesRef);
}
}



Loading…
取消
儲存