blob: 46c9faa24b89beb001f5d4e07379cbc13e4df569 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import org.junit.Test;
import static org.junit.Assert.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.Service;
import static org.apache.hadoop.yarn.service.Service.STATE.*;
public class TestAuxServices {
private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
static class LightService extends AbstractService
implements AuxServices.AuxiliaryService {
private final char idef;
private final int expected_appId;
private int remaining_init;
private int remaining_stop;
private ByteBuffer meta = null;
private ArrayList<Integer> stoppedApps;
LightService(String name, char idef, int expected_appId) {
this(name, idef, expected_appId, null);
}
LightService(String name, char idef, int expected_appId, ByteBuffer meta) {
super(name);
this.idef = idef;
this.expected_appId = expected_appId;
this.meta = meta;
this.stoppedApps = new ArrayList<Integer>();
}
public ArrayList<Integer> getAppIdsStopped() {
return (ArrayList)this.stoppedApps.clone();
}
@Override
public void init(Configuration conf) {
remaining_init = conf.getInt(idef + ".expected.init", 0);
remaining_stop = conf.getInt(idef + ".expected.stop", 0);
super.init(conf);
}
@Override
public void stop() {
assertEquals(0, remaining_init);
assertEquals(0, remaining_stop);
super.stop();
}
@Override
public void initApp(String user, ApplicationId appId, ByteBuffer data) {
assertEquals(idef, data.getChar());
assertEquals(expected_appId, data.getInt());
assertEquals(expected_appId, appId.getId());
}
@Override
public void stopApp(ApplicationId appId) {
stoppedApps.add(appId.getId());
}
@Override
public ByteBuffer getMeta() {
return meta;
}
}
static class ServiceA extends LightService {
public ServiceA() {
super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
}
}
static class ServiceB extends LightService {
public ServiceB() {
super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
}
}
@Test
public void testAuxEventDispatch() {
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
conf.setInt("A.expected.init", 1);
conf.setInt("B.expected.stop", 1);
final AuxServices aux = new AuxServices();
aux.init(conf);
aux.start();
ApplicationId appId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
appId.setId(65);
ByteBuffer buf = ByteBuffer.allocate(6);
buf.putChar('A');
buf.putInt(65);
buf.flip();
AuxServicesEvent event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_INIT, "user0", appId, "Asrv", buf);
aux.handle(event);
appId.setId(66);
event = new AuxServicesEvent(
AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
// verify all services got the stop event
aux.handle(event);
Collection<AuxServices.AuxiliaryService> servs = aux.getServices();
for (AuxServices.AuxiliaryService serv: servs) {
ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
assertEquals("app not properly stopped", 1, appIds.size());
assertTrue("wrong app stopped", appIds.contains((Integer)66));
}
}
@Test
public void testAuxServices() {
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
aux.init(conf);
int latch = 1;
for (Service s : aux.getServices()) {
assertEquals(INITED, s.getServiceState());
if (s instanceof ServiceA) { latch *= 2; }
else if (s instanceof ServiceB) { latch *= 3; }
else fail("Unexpected service type " + s.getClass());
}
assertEquals("Invalid mix of services", 6, latch);
aux.start();
for (Service s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
}
aux.stop();
for (Service s : aux.getServices()) {
assertEquals(STOPPED, s.getServiceState());
}
}
@Test
public void testAuxServicesMeta() {
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
aux.init(conf);
int latch = 1;
for (Service s : aux.getServices()) {
assertEquals(INITED, s.getServiceState());
if (s instanceof ServiceA) { latch *= 2; }
else if (s instanceof ServiceB) { latch *= 3; }
else fail("Unexpected service type " + s.getClass());
}
assertEquals("Invalid mix of services", 6, latch);
aux.start();
for (Service s : aux.getServices()) {
assertEquals(STARTED, s.getServiceState());
}
Map<String, ByteBuffer> meta = aux.getMeta();
assertEquals(2, meta.size());
assertEquals("A", new String(meta.get("Asrv").array()));
assertEquals("B", new String(meta.get("Bsrv").array()));
aux.stop();
for (Service s : aux.getServices()) {
assertEquals(STOPPED, s.getServiceState());
}
}
@Test
public void testAuxUnexpectedStop() {
Configuration conf = new Configuration();
conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv"),
ServiceA.class, Service.class);
conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
ServiceB.class, Service.class);
final AuxServices aux = new AuxServices();
aux.init(conf);
aux.start();
Service s = aux.getServices().iterator().next();
s.stop();
assertEquals("Auxiliary service stopped, but AuxService unaffected.",
STOPPED, aux.getServiceState());
assertTrue(aux.getServices().isEmpty());
}
}