[STORM-3709] Topology without spout should fail submission. (#3343)

diff --git a/.travis.yml b/.travis.yml
index e0ba311..8e947ca 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,6 +13,10 @@
   hosts:
     - node1
 
+arch:
+  - amd64
+  - arm64-graviton2
+
 env:
   - MODULES=Client
   - MODULES=Server
@@ -37,8 +41,6 @@
   include:
     - arch: s390x
       jdk: openjdk11
-    - arch: arm64
-      jdk: openjdk11
 
 before_install:
   - rvm reload
diff --git a/DEPENDENCY-LICENSES b/DEPENDENCY-LICENSES
index 2346f54..0557962 100644
--- a/DEPENDENCY-LICENSES
+++ b/DEPENDENCY-LICENSES
@@ -597,7 +597,7 @@
     Common Development and Distribution License
 
         * Expression Language 3.0 (org.glassfish:javax.el:3.0.0 - http://el-spec.java.net)
-        * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b11 - http://uel.java.net)
+        * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b12 - http://uel.java.net)
         * JavaServer Pages(TM) API (javax.servlet.jsp:javax.servlet.jsp-api:2.3.1 - http://jsp.java.net)
         * Java Servlet API (javax.servlet:javax.servlet-api:3.1.0 - http://servlet-spec.java.net)
         * javax.annotation API (javax.annotation:javax.annotation-api:1.3.2 - http://jcp.org/en/jsr/detail?id=250)
diff --git a/LICENSE-binary b/LICENSE-binary
index 7888f6f..827cb1a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -966,7 +966,7 @@
     Common Development and Distribution License
 
         * Expression Language 3.0 (org.glassfish:javax.el:3.0.0 - http://el-spec.java.net)
-        * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b11 - http://uel.java.net)
+        * Expression Language 3.0 (org.glassfish:javax.el:3.0.1-b12 - http://uel.java.net)
         * JavaServer Pages(TM) API (javax.servlet.jsp:javax.servlet.jsp-api:2.3.1 - http://jsp.java.net)
         * Java Servlet API (javax.servlet:javax.servlet-api:3.1.0 - http://servlet-spec.java.net)
         * JSP implementation (org.glassfish.web:javax.servlet.jsp:2.3.2 - http://jsp.java.net)
diff --git a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
index 16c25d2..74ec2ef 100644
--- a/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
+++ b/external/storm-hdfs-blobstore/src/main/java/org/apache/storm/hdfs/blobstore/HdfsBlobStore.java
@@ -22,12 +22,15 @@
 import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
 import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.io.ByteArrayOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import javax.security.auth.Subject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -79,6 +82,8 @@
     private HdfsBlobStoreImpl hbs;
     private Subject localSubject;
     private Map<String, Object> conf;
+    private Cache<String, SettableBlobMeta> cacheMetas = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
+    private Cache<String, Integer> cachedReplicationCount = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
 
     /**
      * If who is null then we want to use the user hadoop says we are.
@@ -152,6 +157,7 @@
             outputStream = null;
             BlobStoreFile dataFile = hbs.write(DATA_PREFIX + key, true);
             dataFile.setMetadata(meta);
+            cacheMetas.put(key, meta);
             return new BlobStoreFileOutputStream(dataFile);
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -170,7 +176,7 @@
     public AtomicOutputStream updateBlob(String key, Subject who)
             throws AuthorizationException, KeyNotFoundException {
         who = checkAndGetSubject(who);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
+        SettableBlobMeta meta = extractBlobMeta(key);
         validateKey(key);
         aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
         try {
@@ -199,7 +205,8 @@
             }
             in.close();
             in = null;
-            return Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+            SettableBlobMeta blobMeta = Utils.thriftDeserialize(SettableBlobMeta.class, out.toByteArray());
+            return blobMeta;
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {
@@ -218,7 +225,7 @@
             throws AuthorizationException, KeyNotFoundException {
         who = checkAndGetSubject(who);
         validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
+        SettableBlobMeta meta = extractBlobMeta(key);
         aclHandler.validateUserCanReadMeta(meta.get_acl(), who, key);
         ReadableBlobMeta rbm = new ReadableBlobMeta();
         rbm.set_settable(meta);
@@ -251,7 +258,7 @@
         validateKey(key);
         aclHandler.normalizeSettableBlobMeta(key,  meta, who, ADMIN);
         BlobStoreAclHandler.validateSettableACLs(key, meta.get_acl());
-        SettableBlobMeta orig = getStoredBlobMeta(key);
+        SettableBlobMeta orig = extractBlobMeta(key);
         aclHandler.hasPermissions(orig.get_acl(), ADMIN, who, key);
         writeMetadata(key, meta);
     }
@@ -261,7 +268,7 @@
             throws AuthorizationException, KeyNotFoundException {
         who = checkAndGetSubject(who);
         validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
+        SettableBlobMeta meta = extractBlobMeta(key);
         aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
         try {
             hbs.deleteKey(DATA_PREFIX + key);
@@ -269,6 +276,8 @@
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+        cacheMetas.invalidate(key);
+        cachedReplicationCount.invalidate(key);
     }
 
     @Override
@@ -276,7 +285,7 @@
             throws AuthorizationException, KeyNotFoundException {
         who = checkAndGetSubject(who);
         validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
+        SettableBlobMeta meta = extractBlobMeta(key);
         aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
         try {
             return new BlobStoreFileInputStream(hbs.read(DATA_PREFIX + key));
@@ -296,7 +305,7 @@
         try {
             who = checkAndGetSubject(who);
             validateKey(key);
-            SettableBlobMeta meta = getStoredBlobMeta(key);
+            SettableBlobMeta meta = extractBlobMeta(key);
             aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
         } catch (KeyNotFoundException e) {
             return false;
@@ -322,25 +331,47 @@
     public int getBlobReplication(String key, Subject who) throws AuthorizationException, KeyNotFoundException {
         who = checkAndGetSubject(who);
         validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
+        SettableBlobMeta meta = extractBlobMeta(key);
         aclHandler.hasAnyPermissions(meta.get_acl(), READ | WRITE | ADMIN, who, key);
         try {
-            return hbs.getBlobReplication(DATA_PREFIX + key);
+            Integer cachedCount = cachedReplicationCount.getIfPresent(key);
+            int blobReplication = 0;
+            if (cachedCount != null) {
+                blobReplication = cachedCount.intValue();
+            } else {
+                blobReplication = hbs.getBlobReplication(DATA_PREFIX + key);
+                cachedReplicationCount.put(key, blobReplication);
+            }
+            return blobReplication;
         } catch (IOException exp) {
             throw new RuntimeException(exp);
         }
     }
 
+    private SettableBlobMeta extractBlobMeta(String key) throws KeyNotFoundException {
+        if (key == null) {
+            throw new WrappedKeyNotFoundException("null can not be blob key");
+        }
+        SettableBlobMeta meta = cacheMetas.getIfPresent(key);
+        if (meta == null) {
+            meta = getStoredBlobMeta(key);
+            cacheMetas.put(key, meta);
+        }
+        return meta;
+    }
+
     @Override
     public int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException {
         who = checkAndGetSubject(who);
         validateKey(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
+        SettableBlobMeta meta = extractBlobMeta(key);
         meta.set_replication_factor(replication);
         aclHandler.hasAnyPermissions(meta.get_acl(), WRITE | ADMIN, who, key);
         try {
             writeMetadata(key, meta);
-            return hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+            int updatedReplCount = hbs.updateBlobReplication(DATA_PREFIX + key, replication);
+            cachedReplicationCount.put(key, updatedReplCount);
+            return updatedReplCount;
         } catch (IOException exp) {
             throw new RuntimeException(exp);
         }
@@ -356,6 +387,7 @@
             outputStream.write(Utils.thriftSerialize(meta));
             outputStream.close();
             outputStream = null;
+            cacheMetas.put(key, meta);
         } catch (IOException exp) {
             throw new RuntimeException(exp);
         } finally {
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index 7a8d0fe..abb5fa8 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -208,6 +208,7 @@
         assertEquals(1, assignedSlots.size());
         assertEquals(1, nodesIDs.size());
         assertEquals(2, executors.size());
+        assertFalse(cluster.needsSchedulingRas(topology1));
         assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
     }
 
@@ -257,6 +258,7 @@
         assertEquals(1, assignedSlots1.size());
         assertEquals(1, nodesIDs1.size());
         assertEquals(7, executors1.size());
+        assertFalse(cluster.needsSchedulingRas(topology1));
         assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
 
         SchedulerAssignment assignment2 = cluster.getAssignmentById(topology2.getId());
@@ -270,6 +272,7 @@
         assertEquals(1, assignedSlots2.size());
         assertEquals(1, nodesIDs2.size());
         assertEquals(2, executors2.size());
+        assertFalse(cluster.needsSchedulingRas(topology2));
         assertTrue(cluster.getStatusMap().get(topology2.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
     }
 
@@ -314,6 +317,7 @@
         assertEquals(2, executors1.size());
         assertEquals(400.0, assignedMemory, 0.001);
         assertEquals(40.0, assignedCpu, 0.001);
+        assertFalse(cluster.needsSchedulingRas(topology1));
         String expectedStatusPrefix = "Running - Fully Scheduled by DefaultResourceAwareStrategy";
         assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith(expectedStatusPrefix));
     }
@@ -401,6 +405,7 @@
         for (Map.Entry<Double, Double> entry : cpuAvailableToUsed.entrySet()) {
             assertTrue(entry.getKey() - entry.getValue() >= 0);
         }
+        assertFalse(cluster.needsSchedulingRas(topology1));
         assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
     }
 
@@ -458,6 +463,7 @@
         for (ExecutorDetails executor : healthyExecutors) {
             assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
         }
+        assertFalse(cluster.needsSchedulingRas(topology2));
         assertTrue(cluster.getStatusMap().get(topology2.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
         // end of Test1
 
@@ -513,6 +519,7 @@
         for (ExecutorDetails executor : existingExecutors) {
             assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
         }
+        assertFalse(cluster1.needsSchedulingRas(topology1));
         assertEquals("Fully Scheduled", cluster1.getStatusMap().get(topology1.getId()));
         // end of Test3
 
@@ -534,6 +541,8 @@
         for (ExecutorDetails executor : copyOfOldMapping.keySet()) {
             assertEquals(copyOfOldMapping.get(executor), newExecutorToSlot.get(executor));
         }
+        assertFalse(cluster1.needsSchedulingRas(topology1));
+        assertFalse(cluster1.needsSchedulingRas(topology2));
         String expectedStatusPrefix = "Running - Fully Scheduled by DefaultResourceAwareStrategy";
         assertTrue(cluster1.getStatusMap().get(topology1.getId()).startsWith(expectedStatusPrefix));
         assertTrue(cluster1.getStatusMap().get(topology2.getId()).startsWith(expectedStatusPrefix));
@@ -621,6 +630,10 @@
         try {
             rs.schedule(topologies, cluster);
 
+            assertFalse(cluster.needsSchedulingRas(topology1));
+            assertFalse(cluster.needsSchedulingRas(topology2));
+            assertFalse(cluster.needsSchedulingRas(topology3));
+
             String expectedMsgPrefix = "Running - Fully Scheduled by " + strategyName;
             assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith(expectedMsgPrefix));
             assertTrue(cluster.getStatusMap().get(topology2.getId()).startsWith(expectedMsgPrefix));
@@ -727,6 +740,7 @@
         rs.prepare(config1, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
+            assertFalse(cluster.needsSchedulingRas(topology1));
             assertTrue(cluster.getStatusMap().get(topology1.getId()).startsWith("Running - Fully Scheduled by DefaultResourceAwareStrategy"));
             assertEquals(4, cluster.getAssignedNumWorkers(topology1));
         } finally {
@@ -751,6 +765,7 @@
         rs.prepare(config2, new StormMetricsRegistry());
         try {
             rs.schedule(topologies, cluster);
+            assertTrue(cluster.needsSchedulingRas(topology2));
             String status = cluster.getStatusMap().get(topology2.getId());
             assert status.startsWith("Not enough resources to schedule") : status;
             //assert status.endsWith("5 executors not scheduled") : status;
@@ -989,6 +1004,7 @@
         scheduler = new ResourceAwareScheduler();
         scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
+        assertFalse(cluster.needsSchedulingRas(topo1));
         assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
     }
 
@@ -1010,6 +1026,9 @@
         scheduler = new ResourceAwareScheduler();
         scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
+        assertFalse(cluster.needsSchedulingRas(topo1));
+        assertFalse(cluster.needsSchedulingRas(topo2));
+        assertTrue(cluster.needsSchedulingRas(topo3));
         assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
         assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
         assertFalse("topo-3 unscheduled?", cluster.getAssignmentById(topo3.getId()) != null);
@@ -1056,6 +1075,18 @@
         scheduler = new ResourceAwareScheduler();
         scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
+
+        assertFalse(cluster.needsSchedulingRas(topo0));
+        assertFalse(cluster.needsSchedulingRas(topo1));
+        assertFalse(cluster.needsSchedulingRas(topo2));
+        assertFalse(cluster.needsSchedulingRas(topo3));
+        assertFalse(cluster.needsSchedulingRas(topo4));
+        assertFalse(cluster.needsSchedulingRas(topo5));
+        assertFalse(cluster.needsSchedulingRas(topo6));
+        assertFalse(cluster.needsSchedulingRas(topo7));
+        assertFalse(cluster.needsSchedulingRas(topo8));
+        assertTrue(cluster.needsSchedulingRas(topo9));
+
         assertTrue("topo-0 scheduled?", cluster.getAssignmentById(topo0.getId()) != null);
         assertTrue("topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
         assertTrue("topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
@@ -1085,6 +1116,7 @@
         scheduler = new ResourceAwareScheduler();
         scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
+        assertTrue(cluster.needsSchedulingRas(topo1));
         assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
     }
 
@@ -1424,6 +1456,10 @@
         scheduler.prepare(config, new StormMetricsRegistry());
         scheduler.schedule(topologies, cluster);
 
+        assertFalse(cluster.needsSchedulingRas(topo1));
+        assertTrue(cluster.needsSchedulingRas(topo2));
+        assertFalse(cluster.needsSchedulingRas(topo3));
+
         assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(topo1.getId()) != null);
         assertEquals("Topo-1 all executors scheduled?", 2, cluster.getAssignmentById(topo1.getId()).getExecutorToSlot().size());
         assertTrue("Topo-2 not scheduled", cluster.getAssignmentById(topo2.getId()) == null);