STORM-3545 don't update unused blobs (#3174)
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index fdd130b..48ad7e8 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -262,22 +262,26 @@
while (!done) {
try {
synchronized (blob) {
- long localVersion = blob.getLocalVersion();
- long remoteVersion = blob.getRemoteVersion(blobStore);
- if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
- if (blob.isFullyDownloaded()) {
- //Avoid case of different blob version
- // when blob is not downloaded (first time download)
- numBlobUpdateVersionChanged.mark();
+ if (blob.isUsed()) {
+ long localVersion = blob.getLocalVersion();
+ long remoteVersion = blob.getRemoteVersion(blobStore);
+ if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
+ if (blob.isFullyDownloaded()) {
+ //Avoid case of different blob version
+ // when blob is not downloaded (first time download)
+ numBlobUpdateVersionChanged.mark();
+ }
+ Timer.Context t = singleBlobLocalizationDuration.time();
+ try {
+ long newVersion = blob.fetchUnzipToTemp(blobStore);
+ blob.informReferencesAndCommitNewVersion(newVersion);
+ t.stop();
+ } finally {
+ blob.cleanupOrphanedData();
+ }
}
- Timer.Context t = singleBlobLocalizationDuration.time();
- try {
- long newVersion = blob.fetchUnzipToTemp(blobStore);
- blob.informReferencesAndCommitNewVersion(newVersion);
- t.stop();
- } finally {
- blob.cleanupOrphanedData();
- }
+ } else {
+ LOG.debug("Skipping update of unused blob {}", blob);
}
}
done = true;
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index d3c1d74..f97c868 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -126,18 +126,21 @@
when(jarBlob.getLocalVersion()).thenReturn(-1L);
when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
+ when(jarBlob.isUsed()).thenReturn(true);
LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(codeBlob).when(victim).getTopoCode(topoId, localAssignment.get_owner());
when(codeBlob.getLocalVersion()).thenReturn(-1L);
when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
+ when(codeBlob.isUsed()).thenReturn(true);
LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
doReturn(confBlob).when(victim).getTopoConf(topoId, localAssignment.get_owner());
when(confBlob.getLocalVersion()).thenReturn(-1L);
when(confBlob.getRemoteVersion(any())).thenReturn(300L);
when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
+ when(confBlob.isUsed()).thenReturn(true);
when(mockedReflectionUtils.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);