[STORM-3709] Topology without spout should fail submission. (#3343)
diff --git a/.travis.yml b/.travis.yml
index e0ba311..8e947ca 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,6 +13,10 @@
hosts:
- node1
+arch:
+ - amd64
+ - arm64-graviton2
+
env:
- MODULES=Client
- MODULES=Server
@@ -37,8 +41,6 @@
include:
- arch: s390x
jdk: openjdk11
- - arch: arm64
- jdk: openjdk11
before_install:
- rvm reload
diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index 2346f54..0557962 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -597,7 +597,7 @@
Common Development and Distribution License
* Expression Language 3.0 (org.glassfish:javax.el:3.0.0 - http://el-spec.java.net)
- * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b11 - http://uel.java.net)
+ * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b12 - http://uel.java.net)
* JavaServer Pages(TM) API (javax.servlet.jsp:javax.servlet.jsp-api:2.3.1 - http://jsp.java.net)
* Java Servlet API (javax.servlet:javax.servlet-api:3.1.0 - http://servlet-spec.java.net)
* javax.annotation API (javax.annotation:javax.annotation-api:1.3.2 - http://jcp.org/en/jsr/detail?id=250)
diff --git a/LICENSE-binary b/LICENSE-binary
index 7888f6f..827cb1a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -966,7 +966,7 @@
Common Development and Distribution License
* Expression Language 3.0 (org.glassfish:javax.el:3.0.0 - http://el-spec.java.net)
- * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b11 - http://uel.java.net)
+ * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b12 - http://uel.java.net)
* JavaServer Pages(TM) API (javax.servlet.jsp:javax.servlet.jsp-api:2.3.1 - http://jsp.java.net)
* Java Servlet API (javax.servlet:javax.servlet-api:3.1.0 - http://servlet-spec.java.net)
* JSP implementation (org.glassfish.web:javax.servlet.jsp:2.3.2 - http://jsp.java.net)
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index 16c25d2..74ec2ef 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -22,12 +22,15 @@
import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.security.auth.Subject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -79,6 +82,8 @@
private HdfsBlobStoreImpl hbs;
private Subject localSubject;
private Map<String, Object> conf;
+ private Cache<String, SettableBlobMeta> cacheMetas = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
+ private Cache<String, Integer> cachedReplicationCount = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
/**
* If who is null then we want to use the user hadoop says we are.
@@ -152,6 +157,7 @@
outputStream = null;
BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
dataFile.setMetadata(meta);
+ cacheMetas.put(key, meta);
return new BlobStoreFileOutputStream(dataFile);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -170,7 +176,7 @@
public AtomicOutputStream updateBlob(String key, Subject who)
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
validateKey(key);
aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
try {
@@ -199,7 +205,8 @@
}
in.close();
in = null;
- return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+ SettableBlobMeta blobMeta = Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+ return blobMeta;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
@@ -218,7 +225,7 @@
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
ReadableBlobMeta rbm = new ReadableBlobMeta();
rbm.set_settable(meta);
@@ -251,7 +258,7 @@
validateKey(key);
aclHandler.normalizeSettableBlobMeta(key, meta, who, ADMIN);
BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
- SettableBlobMeta orig = getStoredBlobMeta(key);
+ SettableBlobMeta orig = extractBlobMeta(key);
aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
writeMetadata(key, meta);
}
@@ -261,7 +268,7 @@
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
try {
hbs.deleteKey(DATA_PREFIX + key);
@@ -269,6 +276,8 @@
} catch (IOException e) {
throw new RuntimeException(e);
}
+ cacheMetas.invalidate(key);
+ cachedReplicationCount.invalidate(key);
}
@Override
@@ -276,7 +285,7 @@
throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
try {
return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
@@ -296,7 +305,7 @@
try {
who = checkAndGetSubject(who);
validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
} catch (KeyNotFoundException e) {
return false;
@@ -322,25 +331,47 @@
public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
try {
- return hbs.getBlobReplication(DATA_PREFIX + key);
+ Integer cachedCount = cachedReplicationCount.getIfPresent(key);
+ int blobReplication = 0;
+ if (cachedCount != null) {
+ blobReplication = cachedCount.intValue();
+ } else {
+ blobReplication = hbs.getBlobReplication(DATA_PREFIX + key);
+ cachedReplicationCount.put(key, blobReplication);
+ }
+ return blobReplication;
} catch (IOException exp) {
throw new RuntimeException(exp);
}
}
+ private SettableBlobMeta extractBlobMeta(String key) throws KeyNotFoundException {
+ if (key == null) {
+ throw new WrappedKeyNotFoundException("null can not be blob key");
+ }
+ SettableBlobMeta meta = cacheMetas.getIfPresent(key);
+ if (meta == null) {
+ meta = getStoredBlobMeta(key);
+ cacheMetas.put(key, meta);
+ }
+ return meta;
+ }
+
@Override
public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
who = checkAndGetSubject(who);
validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
+ SettableBlobMeta meta = extractBlobMeta(key);
meta.set_replication_factor(replication);
aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
try {
writeMetadata(key, meta);
- return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+ int updatedReplCount = hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+ cachedReplicationCount.put(key, updatedReplCount);
+ return updatedReplCount;
} catch (IOException exp) {
throw new RuntimeException(exp);
}
@@ -356,6 +387,7 @@
outputStream.write(Utils.thriftSerialize(meta));
outputStream.close();
outputStream = null;
+ cacheMetas.put(key, meta);
} catch (IOException exp) {
throw new RuntimeException(exp);
} finally {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 7a8d0fe..abb5fa8 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -208,6 +208,7 @@
assertEquals(1, assignedSlots.size());
assertEquals(1, nodesIDs.size());
assertEquals(2, executors.size());
+ assertFalse(cluster.needsSchedulingRas(topology1));
assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
}
@@ -257,6 +258,7 @@
assertEquals(1, assignedSlots1.size());
assertEquals(1, nodesIDs1.size());
assertEquals(7, executors1.size());
+ assertFalse(cluster.needsSchedulingRas(topology1));
assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
@@ -270,6 +272,7 @@
assertEquals(1, assignedSlots2.size());
assertEquals(1, nodesIDs2.size());
assertEquals(2, executors2.size());
+ assertFalse(cluster.needsSchedulingRas(topology2));
assertTrue(cluster.getStatusMap().get(topology2.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
}
@@ -314,6 +317,7 @@
assertEquals(2, executors1.size());
assertEquals(400.0, assignedMemory, 0.001);
assertEquals(40.0, assignedCpu, 0.001);
+ assertFalse(cluster.needsSchedulingRas(topology1));
String expectedStatusPrefix = "Running - Fully Scheduled by DefaultResourceAwareStrategy";
assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith(expectedStatusPrefix));
}
@@ -401,6 +405,7 @@
for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) {
assertTrue(entry.getKey() - entry.getValue() >= 0);
}
+ assertFalse(cluster.needsSchedulingRas(topology1));
assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
}
@@ -458,6 +463,7 @@
for (ExecutorDetails executor : healthyExecutors) {
assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
}
+ assertFalse(cluster.needsSchedulingRas(topology2));
assertTrue(cluster.getStatusMap().get(topology2.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
// end of Test1
@@ -513,6 +519,7 @@
for (ExecutorDetails executor : existingExecutors) {
assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
}
+ assertFalse(cluster1.needsSchedulingRas(topology1));
assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId()));
// end of Test3
@@ -534,6 +541,8 @@
for (ExecutorDetails executor : copyOfOldMapping.keySet()) {
assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
}
+ assertFalse(cluster1.needsSchedulingRas(topology1));
+ assertFalse(cluster1.needsSchedulingRas(topology2));
String expectedStatusPrefix = "Running - Fully Scheduled by DefaultResourceAwareStrategy";
assertTrue(cluster1.getStatusMap().get(topology1.getId()).startsWith(expectedStatusPrefix));
assertTrue(cluster1.getStatusMap().get(topology2.getId()).startsWith(expectedStatusPrefix));
@@ -621,6 +630,10 @@
try {
rs.schedule(topologies, cluster);
+ assertFalse(cluster.needsSchedulingRas(topology1));
+ assertFalse(cluster.needsSchedulingRas(topology2));
+ assertFalse(cluster.needsSchedulingRas(topology3));
+
String expectedMsgPrefix = "Running - Fully Scheduled by " + strategyName;
assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith(expectedMsgPrefix));
assertTrue(cluster.getStatusMap().get(topology2.getId()).startsWith(expectedMsgPrefix));
@@ -727,6 +740,7 @@
rs.prepare(config1, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
+ assertFalse(cluster.needsSchedulingRas(topology1));
assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
assertEquals(4, cluster.getAssignedNumWorkers(topology1));
} finally {
@@ -751,6 +765,7 @@
rs.prepare(config2, new StormMetricsRegistry());
try {
rs.schedule(topologies, cluster);
+ assertTrue(cluster.needsSchedulingRas(topology2));
String status = cluster.getStatusMap().get(topology2.getId());
assert status.startsWith("Not enough resources to schedule") : status;
//assert status.endsWith("5 executors not scheduled") : status;
@@ -989,6 +1004,7 @@
scheduler = new ResourceAwareScheduler();
scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
+ assertFalse(cluster.needsSchedulingRas(topo1));
assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
}
@@ -1010,6 +1026,9 @@
scheduler = new ResourceAwareScheduler();
scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
+ assertFalse(cluster.needsSchedulingRas(topo1));
+ assertFalse(cluster.needsSchedulingRas(topo2));
+ assertTrue(cluster.needsSchedulingRas(topo3));
assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
assertFalse("topo-3 unscheduled?", cluster.getAssignmentById(topo3.getId()) != null);
@@ -1056,6 +1075,18 @@
scheduler = new ResourceAwareScheduler();
scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
+
+ assertFalse(cluster.needsSchedulingRas(topo0));
+ assertFalse(cluster.needsSchedulingRas(topo1));
+ assertFalse(cluster.needsSchedulingRas(topo2));
+ assertFalse(cluster.needsSchedulingRas(topo3));
+ assertFalse(cluster.needsSchedulingRas(topo4));
+ assertFalse(cluster.needsSchedulingRas(topo5));
+ assertFalse(cluster.needsSchedulingRas(topo6));
+ assertFalse(cluster.needsSchedulingRas(topo7));
+ assertFalse(cluster.needsSchedulingRas(topo8));
+ assertTrue(cluster.needsSchedulingRas(topo9));
+
assertTrue("topo-0 scheduled?", cluster.getAssignmentById(topo0.getId()) != null);
assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
@@ -1085,6 +1116,7 @@
scheduler = new ResourceAwareScheduler();
scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
+ assertTrue(cluster.needsSchedulingRas(topo1));
assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
}
@@ -1424,6 +1456,10 @@
scheduler.prepare(config, new StormMetricsRegistry());
scheduler.schedule(topologies, cluster);
+ assertFalse(cluster.needsSchedulingRas(topo1));
+ assertTrue(cluster.needsSchedulingRas(topo2));
+ assertFalse(cluster.needsSchedulingRas(topo3));
+
assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
assertEquals("Topo-1 all executors scheduled?", 2, cluster.getAssignmentById(topo1.getId()).getExecutorToSlot().size());
assertTrue("Topo-2 not scheduled", cluster.getAssignmentById(topo2.getId()) == null);