blob: 6d67ad32fb72490a68160930ecc77e2a8b2aec02 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestInMemorySCMStore extends SCMStoreBaseTest {
private InMemorySCMStore store;
private AppChecker checker;
@Override
Class<? extends SCMStore> getStoreClass() {
return InMemorySCMStore.class;
}
@Before
public void setup() {
this.checker = spy(new DummyAppChecker());
this.store = spy(new InMemorySCMStore(checker));
}
@After
public void cleanup() {
if (this.store != null) {
this.store.stop();
}
}
private void startEmptyStore() throws Exception {
doReturn(new ArrayList<ApplicationId>()).when(checker)
.getActiveApplications();
doReturn(new HashMap<String, String>()).when(store)
.getInitialCachedResources(isA(FileSystem.class),
isA(Configuration.class));
this.store.init(new Configuration());
this.store.start();
}
private Map<String, String> startStoreWithResources() throws Exception {
Map<String, String> initialCachedResources = new HashMap<String, String>();
int count = 10;
for (int i = 0; i < count; i++) {
String key = String.valueOf(i);
String fileName = key + ".jar";
initialCachedResources.put(key, fileName);
}
doReturn(new ArrayList<ApplicationId>()).when(checker)
.getActiveApplications();
doReturn(initialCachedResources).when(store).getInitialCachedResources(
isA(FileSystem.class), isA(Configuration.class));
this.store.init(new Configuration());
this.store.start();
return initialCachedResources;
}
private void startStoreWithApps() throws Exception {
ArrayList<ApplicationId> list = new ArrayList<ApplicationId>();
int count = 5;
for (int i = 0; i < count; i++) {
list.add(createAppId(i, i));
}
doReturn(list).when(checker).getActiveApplications();
doReturn(new HashMap<String, String>()).when(store)
.getInitialCachedResources(isA(FileSystem.class),
isA(Configuration.class));
this.store.init(new Configuration());
this.store.start();
}
@Test
public void testAddResourceConcurrency() throws Exception {
startEmptyStore();
final String key = "key1";
int count = 5;
ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
final String fileName = "foo-" + i + ".jar";
Callable<String> task = new Callable<String>() {
public String call() throws Exception {
start.await();
String result = store.addResource(key, fileName);
System.out.println("fileName: " + fileName + ", result: " + result);
return result;
}
};
futures.add(exec.submit(task));
}
// start them all at the same time
start.countDown();
// check the result; they should all agree with the value
Set<String> results = new HashSet<String>();
for (Future<String> future: futures) {
results.add(future.get());
}
assertSame(1, results.size());
exec.shutdown();
}
@Test
public void testAddResourceRefNonExistentResource() throws Exception {
startEmptyStore();
String key = "key1";
ApplicationId id = createAppId(1, 1L);
// try adding an app id without adding the key first
assertNull(store.addResourceReference(key,
new SharedCacheResourceReference(id, "user")));
}
@Test
public void testRemoveResourceEmptyRefs() throws Exception {
startEmptyStore();
String key = "key1";
String fileName = "foo.jar";
// first add resource
store.addResource(key, fileName);
// try removing the resource; it should return true
assertTrue(store.removeResource(key));
}
@Test
public void testAddResourceRefRemoveResource() throws Exception {
startEmptyStore();
String key = "key1";
ApplicationId id = createAppId(1, 1L);
String user = "user";
// add the resource, and then add a resource ref
store.addResource(key, "foo.jar");
store.addResourceReference(key, new SharedCacheResourceReference(id, user));
// removeResource should return false
assertTrue(!store.removeResource(key));
// the resource and the ref should be intact
Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
assertTrue(refs != null);
assertEquals(Collections.singleton(new SharedCacheResourceReference(id, user)), refs);
}
@Test
public void testAddResourceRefConcurrency() throws Exception {
startEmptyStore();
final String key = "key1";
final String user = "user";
String fileName = "foo.jar";
// first add the resource
store.addResource(key, fileName);
// make concurrent addResourceRef calls (clients)
int count = 5;
ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
final ApplicationId id = createAppId(i, i);
Callable<String> task = new Callable<String>() {
public String call() throws Exception {
start.await();
return store.addResourceReference(key,
new SharedCacheResourceReference(id, user));
}
};
futures.add(exec.submit(task));
}
// start them all at the same time
start.countDown();
// check the result
Set<String> results = new HashSet<String>();
for (Future<String> future: futures) {
results.add(future.get());
}
// they should all have the same file name
assertSame(1, results.size());
assertEquals(Collections.singleton(fileName), results);
// there should be 5 refs as a result
Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
assertSame(count, refs.size());
exec.shutdown();
}
@Test
public void testAddResourceRefAddResourceConcurrency() throws Exception {
startEmptyStore();
final String key = "key1";
final String fileName = "foo.jar";
final String user = "user";
final ApplicationId id = createAppId(1, 1L);
// add the resource and add the resource ref at the same time
ExecutorService exec = HadoopExecutors.newFixedThreadPool(2);
final CountDownLatch start = new CountDownLatch(1);
Callable<String> addKeyTask = new Callable<String>() {
public String call() throws Exception {
start.await();
return store.addResource(key, fileName);
}
};
Callable<String> addAppIdTask = new Callable<String>() {
public String call() throws Exception {
start.await();
return store.addResourceReference(key,
new SharedCacheResourceReference(id, user));
}
};
Future<String> addAppIdFuture = exec.submit(addAppIdTask);
Future<String> addKeyFuture = exec.submit(addKeyTask);
// start them at the same time
start.countDown();
// get the results
String addKeyResult = addKeyFuture.get();
String addAppIdResult = addAppIdFuture.get();
assertEquals(fileName, addKeyResult);
System.out.println("addAppId() result: " + addAppIdResult);
// it may be null or the fileName depending on the timing
assertTrue(addAppIdResult == null || addAppIdResult.equals(fileName));
exec.shutdown();
}
@Test
public void testRemoveRef() throws Exception {
startEmptyStore();
String key = "key1";
String fileName = "foo.jar";
String user = "user";
// first add the resource
store.addResource(key, fileName);
// add a ref
ApplicationId id = createAppId(1, 1L);
SharedCacheResourceReference myRef = new SharedCacheResourceReference(id, user);
String result = store.addResourceReference(key, myRef);
assertEquals(fileName, result);
Collection<SharedCacheResourceReference> refs = store.getResourceReferences(key);
assertSame(1, refs.size());
assertEquals(Collections.singleton(myRef), refs);
// remove the same ref
store.removeResourceReferences(key, Collections.singleton(myRef), true);
Collection<SharedCacheResourceReference> newRefs = store.getResourceReferences(key);
assertTrue(newRefs == null || newRefs.isEmpty());
}
@Test
public void testBootstrapping() throws Exception {
Map<String, String> initialCachedResources = startStoreWithResources();
int count = initialCachedResources.size();
ApplicationId id = createAppId(1, 1L);
// the entries from the cached entries should now exist
for (int i = 0; i < count; i++) {
String key = String.valueOf(i);
String fileName = key + ".jar";
String result =
store.addResourceReference(key, new SharedCacheResourceReference(id,
"user"));
// the value should not be null (i.e. it has the key) and the filename should match
assertEquals(fileName, result);
// the initial input should be emptied
assertTrue(initialCachedResources.isEmpty());
}
}
@Test
public void testEvictableWithInitialApps() throws Exception {
startStoreWithApps();
assertFalse(store.isResourceEvictable("key", mock(FileStatus.class)));
}
private ApplicationId createAppId(int id, long timestamp) {
return ApplicationId.newInstance(timestamp, id);
}
}