blob: 9f5b23c0d258c691e118607811ad1cd2cb8697f8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.junit.Test;
/**
* This class tests the clean up of local caches the node manager uses for the
* purpose of resource localization.
*/
public class TestLocalCacheCleanup {
@Test
public void testBasicCleanup() {
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
addResource(publicRsrc, "/pub-resource1.txt", 5, 20, 0);
addResource(publicRsrc, "/pub-resource2.txt", 3, 20, 0);
addResource(publicRsrc, "/pub-resource3.txt", 15, 20, 0);
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String, LocalResourcesTracker>();
ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
addResource(user1rsrcs, "/private-u1-resource4.txt", 1, 20, 0);
LocalResourcesTracker user1Tracker =
new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
privateRsrc.put("user1", user1Tracker);
ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
addResource(user2rsrcs, "/private-u2-resource5.txt", 2, 20, 0);
LocalResourcesTracker user2Tracker =
new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
privateRsrc.put("user2", user2Tracker);
ResourceLocalizationService rls =
createLocService(publicRsrc, privateRsrc, 0);
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
assertEquals(0, ((StubbedLocalResourcesTrackerImpl) rls.publicRsrc)
.getLocalRsrc().size());
assertEquals(0,
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
.getLocalRsrc().size());
assertEquals(0,
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
.getLocalRsrc().size());
assertEquals(100, stats.getTotalDelSize());
assertEquals(100, rls.metrics.getTotalBytesDeleted());
assertEquals(60, stats.getPublicDelSize());
assertEquals(60, rls.metrics.getPublicBytesDeleted());
assertEquals(40, stats.getPrivateDelSize());
assertEquals(40, rls.metrics.getPrivateBytesDeleted());
assertEquals(100, rls.metrics.getCacheSizeBeforeClean());
}
@Test
public void testPositiveRefCount() {
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
// Oldest resource with a positive ref count the other with a ref count
// equal to 0.
LocalResourceRequest survivor =
addResource(publicRsrc, "/pub-resource1.txt", 1, 20, 1);
addResource(publicRsrc, "/pub-resource2.txt", 5, 20, 0);
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String, LocalResourcesTracker>();
ResourceLocalizationService rls =
createLocService(publicRsrc, privateRsrc, 0);
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
StubbedLocalResourcesTrackerImpl resources =
(StubbedLocalResourcesTrackerImpl) rls.publicRsrc;
assertEquals(1, resources.getLocalRsrc().size());
assertTrue(resources.getLocalRsrc().containsKey(survivor));
assertEquals(20, stats.getTotalDelSize());
assertEquals(20, rls.metrics.getTotalBytesDeleted());
assertEquals(20, stats.getPublicDelSize());
assertEquals(20, rls.metrics.getPublicBytesDeleted());
assertEquals(0, stats.getPrivateDelSize());
assertEquals(0, rls.metrics.getPrivateBytesDeleted());
assertEquals(40, rls.metrics.getCacheSizeBeforeClean());
}
@Test
public void testLRUAcrossTrackers() {
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrc =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourceRequest pubSurviver1 =
addResource(publicRsrc, "/pub-resource1.txt", 8, 20, 0);
LocalResourceRequest pubSurviver2 =
addResource(publicRsrc, "/pub-resource2.txt", 7, 20, 0);
addResource(publicRsrc, "/pub-resource3.txt", 1, 20, 0);
ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
new ConcurrentHashMap<String, LocalResourcesTracker>();
ConcurrentMap<LocalResourceRequest, LocalizedResource> user1rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourceRequest usr1Surviver1 =
addResource(user1rsrcs, "/private-u1-resource1.txt", 6, 20, 0);
addResource(user1rsrcs, "/private-u1-resource2.txt", 2, 20, 0);
LocalResourcesTracker user1Tracker =
new StubbedLocalResourcesTrackerImpl("user1", user1rsrcs);
privateRsrc.put("user1", user1Tracker);
ConcurrentMap<LocalResourceRequest, LocalizedResource> user2rsrcs =
new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
LocalResourceRequest usr2Surviver1 =
addResource(user2rsrcs, "/private-u2-resource1.txt", 5, 20, 0);
addResource(user2rsrcs, "/private-u2-resource2.txt", 3, 20, 0);
addResource(user2rsrcs, "/private-u2-resource3.txt", 4, 20, 0);
LocalResourcesTracker user2Tracker =
new StubbedLocalResourcesTrackerImpl("user2", user2rsrcs);
privateRsrc.put("user2", user2Tracker);
ResourceLocalizationService rls =
createLocService(publicRsrc, privateRsrc, 80);
LocalCacheCleanerStats stats = rls.handleCacheCleanup();
Map<LocalResourceRequest, LocalizedResource> pubLocalRsrc =
((StubbedLocalResourcesTrackerImpl) rls.publicRsrc).getLocalRsrc();
assertEquals(2, pubLocalRsrc.size());
assertTrue(pubLocalRsrc.containsKey(pubSurviver1));
assertTrue(pubLocalRsrc.containsKey(pubSurviver2));
Map<LocalResourceRequest, LocalizedResource> usr1LocalRsrc =
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user1"))
.getLocalRsrc();
assertEquals(1, usr1LocalRsrc.size());
assertTrue(usr1LocalRsrc.containsKey(usr1Surviver1));
Map<LocalResourceRequest, LocalizedResource> usr2LocalRsrc =
((StubbedLocalResourcesTrackerImpl) privateRsrc.get("user2"))
.getLocalRsrc();
assertEquals(1, usr2LocalRsrc.size());
assertTrue(usr2LocalRsrc.containsKey(usr2Surviver1));
assertEquals(80, stats.getTotalDelSize());
assertEquals(80, rls.metrics.getTotalBytesDeleted());
assertEquals(20, stats.getPublicDelSize());
assertEquals(20, rls.metrics.getPublicBytesDeleted());
assertEquals(60, stats.getPrivateDelSize());
assertEquals(60, rls.metrics.getPrivateBytesDeleted());
assertEquals(160, rls.metrics.getCacheSizeBeforeClean());
}
private ResourceLocalizationService createLocService(
ConcurrentMap<LocalResourceRequest, LocalizedResource> publicRsrcs,
ConcurrentMap<String, LocalResourcesTracker> privateRsrcs,
long targetCacheSize) {
Context mockedContext = mock(Context.class);
when(mockedContext.getNMStateStore()).thenReturn(null);
NodeManagerMetrics metrics = NodeManagerMetrics.create();
ResourceLocalizationService rls =
new ResourceLocalizationService(null, null, null, null, mockedContext,
metrics);
// We set the following members directly so we don't have to deal with
// mocking out the service init method.
rls.publicRsrc = new StubbedLocalResourcesTrackerImpl(null, publicRsrcs);
rls.cacheTargetSize = targetCacheSize;
rls.privateRsrc.putAll(privateRsrcs);
return rls;
}
private LocalResourceRequest addResource(
ConcurrentMap<LocalResourceRequest, LocalizedResource> resources,
String path, long timestamp, long size, int refCount) {
LocalResourceRequest request = createLocalResourceRequest(path, timestamp);
LocalizedResource resource =
createLocalizedResource(size, refCount, timestamp, request);
resources.put(request, resource);
return request;
}
private LocalResourceRequest createLocalResourceRequest(String path,
long timestamp) {
return new LocalResourceRequest(new Path(path), timestamp,
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, null);
}
private LocalizedResource createLocalizedResource(long size, int refCount,
long timestamp, LocalResourceRequest req) {
LocalizedResource lr = mock(LocalizedResource.class);
when(lr.getSize()).thenReturn(size);
when(lr.getRefCount()).thenReturn(refCount);
when(lr.getTimestamp()).thenReturn(timestamp);
when(lr.getState()).thenReturn(ResourceState.LOCALIZED);
when(lr.getRequest()).thenReturn(req);
return lr;
}
class StubbedLocalResourcesTrackerImpl extends LocalResourcesTrackerImpl {
StubbedLocalResourcesTrackerImpl(String user,
ConcurrentMap<LocalResourceRequest, LocalizedResource> rsrcs) {
super(user, null, null, rsrcs, false, new Configuration(), null, null);
}
@Override
public boolean remove(LocalizedResource rem, DeletionService delService) {
LocalizedResource r = localrsrc.remove(rem.getRequest());
if (r != null) {
LOG.info("Removed " + rem.getRequest().getPath()
+ " from localized cache");
return true;
}
return false;
}
Map<LocalResourceRequest, LocalizedResource> getLocalRsrc() {
return Collections.unmodifiableMap(localrsrc);
}
}
}