blob: 0d67c13b64734d97f12fe94c5199210c865afd9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import static org.apache.hadoop.service.Service.STATE.INITED;
import static org.apache.hadoop.service.Service.STATE.STARTED;
import static org.apache.hadoop.service.Service.STATE.STOPPED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecords;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ApplicationClassLoader;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceFile;
import org.junit.Assert;
import org.junit.Test;
/**
* Test for auxiliary services. Parameter 0 tests the Configuration-based aux
* services and parameter 1 tests manifest-based aux services.
*/
@RunWith(value = Parameterized.class)
public class TestAuxServices {
private static final Logger LOG =
LoggerFactory.getLogger(TestAuxServices.class);
private static final File TEST_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestAuxServices.class.getName());
private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
mock(AuxiliaryLocalPathHandler.class);
private final static Context MOCK_CONTEXT = mock(Context.class);
private final static DeletionService MOCK_DEL_SERVICE = mock(
DeletionService.class);
private final Boolean useManifest;
private File rootDir = GenericTestUtils.getTestDir(getClass()
.getSimpleName());
private File manifest = new File(rootDir, "manifest.txt");
private ObjectMapper mapper = new ObjectMapper();
@Parameterized.Parameters
public static Collection<Boolean> getParams() {
return Arrays.asList(false, true);
}
@Before
public void setup() {
if (!rootDir.exists()) {
rootDir.mkdirs();
}
}
@After
public void cleanup() {
if (useManifest) {
manifest.delete();
}
rootDir.delete();
}
public TestAuxServices(Boolean useManifest) {
this.useManifest = useManifest;
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
static class LightService extends AuxiliaryService implements Service
{
private final char idef;
private final int expected_appId;
private int remaining_init;
private int remaining_stop;
private ByteBuffer meta = null;
private ArrayList<Integer> stoppedApps;
private ContainerId containerId;
private Resource resource;
LightService(String name, char idef, int expected_appId) {
this(name, idef, expected_appId, null);
}
LightService(String name, char idef, int expected_appId, ByteBuffer meta) {
super(name);
this.idef = idef;
this.expected_appId = expected_appId;
this.meta = meta;
this.stoppedApps = new ArrayList<Integer>();
}
@SuppressWarnings("unchecked")
public ArrayList<Integer> getAppIdsStopped() {
return (ArrayList<Integer>)this.stoppedApps.clone();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
remaining_init = conf.getInt(idef + ".expected.init", 0);
remaining_stop = conf.getInt(idef + ".expected.stop", 0);
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
assertEquals(0, remaining_init);
assertEquals(0, remaining_stop);
super.serviceStop();
}
@Override
public void initializeApplication(ApplicationInitializationContext context) {
ByteBuffer data = context.getApplicationDataForService();
assertEquals(idef, data.getChar());
assertEquals(expected_appId, data.getInt());
assertEquals(expected_appId, context.getApplicationId().getId());
}
@Override
public void stopApplication(ApplicationTerminationContext context) {
stoppedApps.add(context.getApplicationId().getId());
}
@Override
public ByteBuffer getMetaData() {
return meta;
}
@Override
public void initializeContainer(
ContainerInitializationContext initContainerContext) {
containerId = initContainerContext.getContainerId();
resource = initContainerContext.getResource();
}
@Override
public void stopContainer(
ContainerTerminationContext stopContainerContext) {
containerId = stopContainerContext.getContainerId();
resource = stopContainerContext.getResource();
}
}
static class ServiceA extends LightService {
public ServiceA() {
super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
}
}
static class ServiceB extends LightService {
public ServiceB() {
super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
}
}
// Override getMetaData() method to return current
// class path. This class would be used for
// testCustomizedAuxServiceClassPath.
static class ServiceC extends LightService {
public ServiceC() {
super("C", 'C', 66, ByteBuffer.wrap("C".getBytes()));
}
@Override
public ByteBuffer getMetaData() {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
try {
URL[] urls = ((URLClassLoader) loader).getURLs();
String joinedString = Arrays.stream(urls)
.map(URL::toString)
.collect(Collectors.joining(","));
return ByteBuffer.wrap(joinedString.getBytes());
} catch (ClassCastException e) {
// In Java 11+, Thread.currentThread().getContextClassLoader()
// returns jdk.internal.loader.ClassLoaders$AppClassLoader
// by default and it cannot cast to URLClassLoader.
// This exception does not happen when the custom class loader is
// used from AuxiliaryServiceWithCustomClassLoader.
return super.meta;
}
}
}
private void writeManifestFile(AuxServiceRecords services, Configuration
conf) throws IOException {
conf.setBoolean(YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED, true);
conf.set(YarnConfiguration.NM_AUX_SERVICES_MANIFEST, manifest
.getAbsolutePath());
mapper.writeValue(manifest, services);
}
@SuppressWarnings("resource")
@Test
public void testRemoteAuxServiceClassPath() throws Exception {
Configuration conf = new YarnConfiguration();
FileSystem fs = FileSystem.get(conf);
AuxServiceRecord serviceC =
AuxServices.newAuxService("ServiceC", ServiceC.class.getName());
AuxServiceRecords services = new AuxServiceRecords().serviceList(serviceC);
if (!useManifest) {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[]{"ServiceC"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ServiceC"), ServiceC.class, Service.class);
}
Context mockContext2 = mock(Context.class);
LocalDirsHandlerService mockDirsHandler = mock(
LocalDirsHandlerService.class);
String root = "target/LocalDir";
Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
rootAuxServiceDirPath);
when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
DeletionService mockDelService2 = mock(DeletionService.class);
AuxServices aux = null;
File testJar = null;
try {
// the remote jar file should not be be writable by group or others.
try {
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceC.class.getName());
// Give group a write permission.
// We should not load the auxservice from remote jar file.
Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>();
perms.add(PosixFilePermission.OWNER_READ);
perms.add(PosixFilePermission.OWNER_WRITE);
perms.add(PosixFilePermission.GROUP_WRITE);
Files.setPosixFilePermissions(Paths.get(testJar.getAbsolutePath()),
perms);
if (useManifest) {
AuxServices.setClasspath(serviceC, testJar.getAbsolutePath());
writeManifestFile(services, conf);
} else {
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
testJar.getAbsolutePath());
}
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, mockDelService2);
aux.init(conf);
Assert.fail("The permission of the jar is wrong."
+ "Should throw out exception.");
} catch (YarnRuntimeException ex) {
Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(
"The remote jarfile should not be writable by group or others"));
}
Files.delete(Paths.get(testJar.getAbsolutePath()));
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceC.class.getName());
if (useManifest) {
AuxServices.setClasspath(serviceC, testJar.getAbsolutePath());
writeManifestFile(services, conf);
} else {
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
testJar.getAbsolutePath());
}
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, mockDelService2);
aux.init(conf);
aux.start();
Map<String, ByteBuffer> meta = aux.getMetaData();
String auxName = "";
Assert.assertTrue(meta.size() == 1);
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
auxName = i.getKey();
}
Assert.assertEquals("ServiceC", auxName);
aux.serviceStop();
FileStatus[] status = fs.listStatus(rootAuxServiceDirPath);
Assert.assertTrue(status.length == 1);
// initialize the same auxservice again, and make sure that we did not
// re-download the jar from remote directory.
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, mockDelService2);
aux.init(conf);
aux.start();
meta = aux.getMetaData();
Assert.assertTrue(meta.size() == 1);
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
auxName = i.getKey();
}
Assert.assertEquals("ServiceC", auxName);
verify(mockDelService2, times(0)).delete(any(FileDeletionTask.class));
status = fs.listStatus(rootAuxServiceDirPath);
Assert.assertTrue(status.length == 1);
aux.serviceStop();
// change the last modification time for remote jar,
// we will re-download the jar and clean the old jar
long time = System.currentTimeMillis() + 3600*1000;
FileTime fileTime = FileTime.fromMillis(time);
Files.setLastModifiedTime(Paths.get(testJar.getAbsolutePath()),
fileTime);
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, mockDelService2);
aux.init(conf);
aux.start();
verify(mockDelService2, times(1)).delete(any(FileDeletionTask.class));
status = fs.listStatus(rootAuxServiceDirPath);
Assert.assertTrue(status.length == 2);
aux.serviceStop();
} finally {
if (testJar != null) {
testJar.delete();
}
if (fs.exists(new Path(root))) {
fs.delete(new Path(root), true);
}
}
}
// To verify whether we could load class from customized class path.
// We would use ServiceC in this test. Also create a separate jar file
// including ServiceC class, and add this jar to customized directory.
// By setting some proper configurations, we should load ServiceC class
// from customized class path.
@Test (timeout = 15000)
public void testCustomizedAuxServiceClassPath() throws Exception {
// verify that we can load AuxService Class from default Class path
Configuration conf = new YarnConfiguration();
AuxServiceRecord serviceC =
AuxServices.newAuxService("ServiceC", ServiceC.class.getName());
AuxServiceRecords services = new AuxServiceRecords().serviceList(serviceC);
if (useManifest) {
writeManifestFile(services, conf);
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[]{"ServiceC"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ServiceC"), ServiceC.class, Service.class);
}
@SuppressWarnings("resource")
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
Map<String, ByteBuffer> meta = aux.getMetaData();
String auxName = "";
Set<String> defaultAuxClassPath = null;
Assert.assertTrue(meta.size() == 1);
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
auxName = i.getKey();
String auxClassPath = Charsets.UTF_8.decode(i.getValue()).toString();
defaultAuxClassPath = new HashSet<String>(Arrays.asList(StringUtils
.getTrimmedStrings(auxClassPath)));
}
Assert.assertEquals("ServiceC", auxName);
aux.serviceStop();
// create a new jar file, and configure it as customized class path
// for this AuxService, and make sure that we could load the class
// from this configured customized class path
File testJar = null;
try {
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceC.class.getName(), LightService
.class.getName());
conf = new YarnConfiguration();
// remove "-org.apache.hadoop." from system classes
String systemClasses = "-org.apache.hadoop." + "," +
ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
if (useManifest) {
AuxServices.setClasspath(serviceC, testJar.getAbsolutePath());
AuxServices.setSystemClasses(serviceC, systemClasses);
writeManifestFile(services, conf);
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[]{"ServiceC"});
conf.set(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ServiceC"), ServiceC.class.getName());
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, "ServiceC"),
testJar.getAbsolutePath());
conf.set(String.format(
YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
"ServiceC"), systemClasses);
}
Context mockContext2 = mock(Context.class);
LocalDirsHandlerService mockDirsHandler = mock(
LocalDirsHandlerService.class);
String root = "target/LocalDir";
Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
rootAuxServiceDirPath);
when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
mockContext2, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
meta = aux.getMetaData();
Assert.assertTrue(meta.size() == 1);
Set<String> customizedAuxClassPath = null;
for(Entry<String, ByteBuffer> i : meta.entrySet()) {
Assert.assertTrue(auxName.equals(i.getKey()));
String classPath = Charsets.UTF_8.decode(i.getValue()).toString();
customizedAuxClassPath = new HashSet<String>(Arrays.asList(StringUtils
.getTrimmedStrings(classPath)));
Assert.assertTrue(classPath.contains(testJar.getName()));
}
aux.stop();
// Verify that we do not have any overlap between customized class path
// and the default class path.
Set<String> mutalClassPath = Sets.intersection(defaultAuxClassPath,
customizedAuxClassPath);
Assert.assertTrue(mutalClassPath.isEmpty());
} finally {
if (testJar != null) {
testJar.delete();
}
}
}
@Test (timeout = 15000)
public void testReuseLocalizedAuxiliaryJar() throws Exception {
File testJar = null;
AuxServices aux = null;
Configuration conf = new YarnConfiguration();
FileSystem fs = FileSystem.get(conf);
String root = "target/LocalDir";
try {
testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
"test-runjar.jar", 2048, ServiceB.class.getName(), LightService
.class.getName());
Context mockContext = mock(Context.class);
LocalDirsHandlerService mockDirsHandler = mock(
LocalDirsHandlerService.class);
Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
rootAuxServiceDirPath);
when(mockContext.getLocalDirsHandler()).thenReturn(mockDirsHandler);
aux = new AuxServices(MOCK_AUX_PATH_HANDLER, mockContext,
MOCK_DEL_SERVICE);
// First Time the jar gets localized
Path path = aux.maybeDownloadJars("ServiceB", ServiceB.class.getName(),
testJar.getAbsolutePath(), AuxServiceFile.TypeEnum.STATIC, conf);
// Validate the path on reuse of localized jar
path = aux.maybeDownloadJars("ServiceB", ServiceB.class.getName(),
testJar.getAbsolutePath(), AuxServiceFile.TypeEnum.STATIC, conf);
assertFalse("Failed to reuse the localized jar",
path.toString().endsWith("/*"));
} finally {
if (testJar != null) {
testJar.delete();
}
if (fs.exists(new Path(root))) {
fs.delete(new Path(root), true);
}
}
}
@Test
public void testAuxEventDispatch() throws IOException {
Configuration conf = new Configuration();
if (useManifest) {
AuxServiceRecord serviceA =
AuxServices.newAuxService("Asrv", ServiceA.class.getName());
serviceA.getConfiguration().setProperty("A.expected.init", "1");
AuxServiceRecord serviceB =
AuxServices.newAuxService("Bsrv", ServiceB.class.getName());
serviceB.getConfiguration().setProperty("B.expected.stop", "1");
writeManifestFile(new AuxServiceRecords().serviceList(serviceA,
serviceB), conf);
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]{"Asrv",
"Bsrv"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
conf.setInt("A.expected.init", 1);
conf.setInt("B.expected.stop", 1);
}
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
ApplicationId appId1 = ApplicationId.newInstance(0, 65);
ByteBuffer buf = ByteBuffer.allocate(6);
buf.putChar('A');
buf.putInt(65);
buf.flip();
AuxServicesEvent event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_INIT, "user0", appId1, "Asrv", buf);
aux.handle(event);
ApplicationId appId2 = ApplicationId.newInstance(0, 66);
event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, "user0", appId2, "Bsrv", null);
// verify all services got the stop event
aux.handle(event);
Collection<AuxiliaryService> servs = aux.getServices();
for (AuxiliaryService serv: servs) {
ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
assertEquals("app not properly stopped", 1, appIds.size());
assertTrue("wrong app stopped", appIds.contains((Integer)66));
}
for (AuxiliaryService serv : servs) {
assertNull(((LightService) serv).containerId);
assertNull(((LightService) serv).resource);
}
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId1, 1);
ContainerTokenIdentifier cti = new ContainerTokenIdentifier(
ContainerId.newContainerId(attemptId, 1), "", "",
Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0);
Context context = mock(Context.class);
Container container = new ContainerImpl(new YarnConfiguration(), null, null, null,
null, cti, context);
ContainerId containerId = container.getContainerId();
Resource resource = container.getResource();
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container);
aux.handle(event);
for (AuxiliaryService serv : servs) {
assertEquals(containerId, ((LightService) serv).containerId);
assertEquals(resource, ((LightService) serv).resource);
((LightService) serv).containerId = null;
((LightService) serv).resource = null;
}
event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_STOP, container);
aux.handle(event);
for (AuxiliaryService serv : servs) {
assertEquals(containerId, ((LightService) serv).containerId);
assertEquals(resource, ((LightService) serv).resource);
}
}
private Configuration getABConf() throws
IOException {
return getABConf("Asrv", "Bsrv", ServiceA.class, ServiceB.class);
}
private Configuration getABConf(String aName, String bName,
Class<? extends Service> aClass, Class<? extends Service> bClass) throws
IOException {
Configuration conf = new Configuration();
if (useManifest) {
AuxServiceRecord serviceA =
AuxServices.newAuxService(aName, aClass.getName());
AuxServiceRecord serviceB =
AuxServices.newAuxService(bName, bClass.getName());
writeManifestFile(new AuxServiceRecords().serviceList(serviceA, serviceB),
conf);
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]{aName,
bName});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, aName),
aClass, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, bName),
bClass, Service.class);
}
return conf;
}
@Test
public void testAuxServices() throws IOException {
Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
int latch = 1;
for (Service s : aux.getServices()) {
assertEquals(INITED, s.getServiceState());
if (s instanceof ServiceA) { latch *= 2; }
else if (s instanceof ServiceB) { latch *= 3; }
else fail("Unexpected service type " + s.getClass());
}
assertEquals("Invalid mix of services", 6, latch);
aux.start();
for (AuxiliaryService s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
assertEquals(s.getAuxiliaryLocalPathHandler(),
MOCK_AUX_PATH_HANDLER);
}
aux.stop();
for (Service s : aux.getServices()) {
assertEquals(STOPPED, s.getServiceState());
}
}
@Test
public void testAuxServicesMeta() throws IOException {
Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
int latch = 1;
for (Service s : aux.getServices()) {
assertEquals(INITED, s.getServiceState());
if (s instanceof ServiceA) { latch *= 2; }
else if (s instanceof ServiceB) { latch *= 3; }
else fail("Unexpected service type " + s.getClass());
}
assertEquals("Invalid mix of services", 6, latch);
aux.start();
for (Service s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
}
Map<String, ByteBuffer> meta = aux.getMetaData();
assertEquals(2, meta.size());
assertEquals("A", new String(meta.get("Asrv").array()));
assertEquals("B", new String(meta.get("Bsrv").array()));
aux.stop();
for (Service s : aux.getServices()) {
assertEquals(STOPPED, s.getServiceState());
}
}
@Test
public void testAuxUnexpectedStop() throws IOException {
// AuxServices no longer expected to stop when services stop
Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
Service s = aux.getServices().iterator().next();
s.stop();
assertEquals("Auxiliary service stop caused AuxServices stop",
STARTED, aux.getServiceState());
assertEquals(2, aux.getServices().size());
}
@Test
public void testValidAuxServiceName() throws IOException {
Configuration conf = getABConf("Asrv1", "Bsrv_2", ServiceA.class,
ServiceB.class);
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
try {
aux.init(conf);
} catch (Exception ex) {
Assert.fail("Should not receive the exception.");
}
//Test bad auxService Name
final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
if (useManifest) {
AuxServiceRecord serviceA =
AuxServices.newAuxService("1Asrv1", ServiceA.class.getName());
writeManifestFile(new AuxServiceRecords().serviceList(serviceA), conf);
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[]
{"1Asrv1"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"1Asrv1"), ServiceA.class, Service.class);
}
try {
aux1.init(conf);
Assert.fail("Should receive the exception.");
} catch (Exception ex) {
assertTrue("Wrong message: " + ex.getMessage(),
ex.getMessage().contains("The auxiliary service name: 1Asrv1 is " +
"invalid. The valid service name should only contain a-zA-Z0-9_" +
" and cannot start with numbers."));
}
}
@Test
public void testAuxServiceRecoverySetup() throws IOException {
Configuration conf = getABConf("Asrv", "Bsrv", RecoverableServiceA.class,
RecoverableServiceB.class);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.NM_RECOVERY_DIR, TEST_DIR.toString());
try {
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
Assert.assertEquals(2, aux.getServices().size());
File auxStorageDir = new File(TEST_DIR,
AuxServices.STATE_STORE_ROOT_NAME);
Assert.assertEquals(2, auxStorageDir.listFiles().length);
aux.close();
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
static class RecoverableAuxService extends AuxiliaryService {
static final FsPermission RECOVERY_PATH_PERMS =
new FsPermission((short)0700);
String auxName;
RecoverableAuxService(String name, String auxName) {
super(name);
this.auxName = auxName;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
Path storagePath = getRecoveryPath();
Assert.assertNotNull("Recovery path not present when aux service inits",
storagePath);
Assert.assertTrue(storagePath.toString().contains(auxName));
FileSystem fs = FileSystem.getLocal(conf);
Assert.assertTrue("Recovery path does not exist",
fs.exists(storagePath));
Assert.assertEquals("Recovery path has wrong permissions",
new FsPermission((short)0700),
fs.getFileStatus(storagePath).getPermission());
}
@Override
public void initializeApplication(
ApplicationInitializationContext initAppContext) {
}
@Override
public void stopApplication(ApplicationTerminationContext stopAppContext) {
}
@Override
public ByteBuffer getMetaData() {
return null;
}
}
static class RecoverableServiceA extends RecoverableAuxService {
RecoverableServiceA() {
super("RecoverableServiceA", "Asrv");
}
}
static class RecoverableServiceB extends RecoverableAuxService {
RecoverableServiceB() {
super("RecoverableServiceB", "Bsrv");
}
}
static class ConfChangeAuxService extends AuxiliaryService
implements Service {
ConfChangeAuxService() {
super("ConfChangeAuxService");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.set("dummyConfig", "changedTestValue");
super.serviceInit(conf);
}
@Override
public void initializeApplication(
ApplicationInitializationContext initAppContext) {
}
@Override
public void stopApplication(ApplicationTerminationContext stopAppContext) {
}
@Override
public ByteBuffer getMetaData() {
return null;
}
}
@Test
public void testAuxServicesConfChange() throws IOException {
Configuration conf = new Configuration();
if (useManifest) {
AuxServiceRecord service =
AuxServices.newAuxService("ConfChangeAuxService",
ConfChangeAuxService.class.getName());
service.getConfiguration().setProperty("dummyConfig", "testValue");
writeManifestFile(new AuxServiceRecords().serviceList(service), conf);
} else {
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
new String[]{"ConfChangeAuxService"});
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
"ConfChangeAuxService"), ConfChangeAuxService.class, Service.class);
conf.set("dummyConfig", "testValue");
}
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER, MOCK_CONTEXT,
MOCK_DEL_SERVICE);
aux.init(conf);
aux.start();
for (AuxiliaryService s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
if (useManifest) {
assertNull(conf.get("dummyConfig"));
} else {
assertEquals("testValue", conf.get("dummyConfig"));
}
assertEquals("changedTestValue", s.getConfig().get("dummyConfig"));
}
aux.stop();
}
@Test
public void testAuxServicesManifestPermissions() throws IOException {
Assume.assumeTrue(useManifest);
Configuration conf = getABConf();
FileSystem fs = FileSystem.get(conf);
fs.setPermission(new Path(manifest.getAbsolutePath()), FsPermission
.createImmutable((short) 0777));
AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(0, aux.getServices().size());
fs.setPermission(new Path(manifest.getAbsolutePath()), FsPermission
.createImmutable((short) 0775));
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(0, aux.getServices().size());
fs.setPermission(new Path(manifest.getAbsolutePath()), FsPermission
.createImmutable((short) 0755));
fs.setPermission(new Path(rootDir.getAbsolutePath()), FsPermission
.createImmutable((short) 0775));
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(0, aux.getServices().size());
fs.setPermission(new Path(rootDir.getAbsolutePath()), FsPermission
.createImmutable((short) 0755));
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(2, aux.getServices().size());
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "");
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(0, aux.getServices().size());
conf.set(YarnConfiguration.YARN_ADMIN_ACL, UserGroupInformation
.getCurrentUser().getShortUserName());
aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(2, aux.getServices().size());
}
@Test
public void testRemoveManifest() throws IOException {
Assume.assumeTrue(useManifest);
Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
assertEquals(2, aux.getServices().size());
manifest.delete();
aux.loadManifest(conf, false);
assertEquals(0, aux.getServices().size());
}
@Test
public void testManualReload() throws IOException {
Assume.assumeTrue(useManifest);
Configuration conf = getABConf();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
try {
aux.reload(null);
Assert.fail("Should receive the exception.");
} catch (IOException e) {
assertTrue("Wrong message: " + e.getMessage(),
e.getMessage().equals("Auxiliary services have not been started " +
"yet, please retry later"));
}
aux.start();
assertEquals(2, aux.getServices().size());
aux.reload(null);
assertEquals(2, aux.getServices().size());
aux.reload(new AuxServiceRecords());
assertEquals(0, aux.getServices().size());
aux.stop();
}
@Test
public void testReloadWhenDisabled() throws IOException {
Configuration conf = new Configuration();
final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
MOCK_CONTEXT, MOCK_DEL_SERVICE);
aux.init(conf);
try {
aux.reload(null);
Assert.fail("Should receive the exception.");
} catch (IOException e) {
assertTrue("Wrong message: " + e.getMessage(),
e.getMessage().equals("Dynamic reloading is not enabled via " +
YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED));
}
try {
aux.reloadManifest();
Assert.fail("Should receive the exception.");
} catch (IOException e) {
assertTrue("Wrong message: " + e.getMessage(),
e.getMessage().equals("Dynamic reloading is not enabled via " +
YarnConfiguration.NM_AUX_SERVICES_MANIFEST_ENABLED));
}
}
}