STORM-3545 don't update unused blobs (#3174)

diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index fdd130b..48ad7e8 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -262,22 +262,26 @@
                     while (!done) {
                         try {
                             synchronized (blob) {
-                                long localVersion = blob.getLocalVersion();
-                                long remoteVersion = blob.getRemoteVersion(blobStore);
-                                if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
-                                    if (blob.isFullyDownloaded()) {
-                                        //Avoid case of different blob version
-                                        // when blob is not downloaded (first time download)
-                                        numBlobUpdateVersionChanged.mark();
+                                if (blob.isUsed()) {
+                                    long localVersion = blob.getLocalVersion();
+                                    long remoteVersion = blob.getRemoteVersion(blobStore);
+                                    if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
+                                        if (blob.isFullyDownloaded()) {
+                                            //Avoid case of different blob version
+                                            // when blob is not downloaded (first time download)
+                                            numBlobUpdateVersionChanged.mark();
+                                        }
+                                        Timer.Context t = singleBlobLocalizationDuration.time();
+                                        try {
+                                            long newVersion = blob.fetchUnzipToTemp(blobStore);
+                                            blob.informReferencesAndCommitNewVersion(newVersion);
+                                            t.stop();
+                                        } finally {
+                                            blob.cleanupOrphanedData();
+                                        }
                                     }
-                                    Timer.Context t = singleBlobLocalizationDuration.time();
-                                    try {
-                                        long newVersion = blob.fetchUnzipToTemp(blobStore);
-                                        blob.informReferencesAndCommitNewVersion(newVersion);
-                                        t.stop();
-                                    } finally {
-                                        blob.cleanupOrphanedData();
-                                    }
+                                } else {
+                                    LOG.debug("Skipping update of unused blob {}", blob);
                                 }
                             }
                             done = true;
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index d3c1d74..f97c868 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -126,18 +126,21 @@
             when(jarBlob.getLocalVersion()).thenReturn(-1L);
             when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
             when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
+            when(jarBlob.isUsed()).thenReturn(true);
 
             LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
             doReturn(codeBlob).when(victim).getTopoCode(topoId, localAssignment.get_owner());
             when(codeBlob.getLocalVersion()).thenReturn(-1L);
             when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
             when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
+            when(codeBlob.isUsed()).thenReturn(true);
 
             LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
             doReturn(confBlob).when(victim).getTopoConf(topoId, localAssignment.get_owner());
             when(confBlob.getLocalVersion()).thenReturn(-1L);
             when(confBlob.getRemoteVersion(any())).thenReturn(300L);
             when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
+            when(confBlob.isUsed()).thenReturn(true);
 
             when(mockedReflectionUtils.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);