blob: f8f948dd88f0c7f87602870c77e5ea0984aab3e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.util.Iterator;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants
.CONTAINER_STATE_REPORT_AS_SERVICE_STATE;
/**
* Tests for {@link Component}.
*/
public class TestComponent {
static final Logger LOG = Logger.getLogger(TestComponent.class);
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testComponentUpgrade() throws Exception {
ServiceContext context = createTestContext(rule, "testComponentUpgrade");
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
ComponentEventType.UPGRADE);
comp.handle(upgradeEvent);
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
}
@Test
public void testCheckState() throws Exception {
String serviceName = "testCheckState";
ServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
// one instance finished upgrading
comp.getUpgradeStatus().decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
// second instance finished upgrading
comp.getUpgradeStatus().decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("component did not upgrade successfully", "val1",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testContainerCompletedWhenUpgrading() throws Exception {
String serviceName = "testContainerCompletedWhenUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)));
// reinitialization of a container failed
for(ComponentInstance instance : comp.getAllComponentInstances()) {
ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED)
.setInstance(instance)
.setContainerId(instance.getContainer().getId());
comp.handle(stopEvent);
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), STOP));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in needs upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
}
@Test
public void testCancelUpgrade() throws Exception {
ServiceContext context = createTestContext(rule, "testCancelUpgrade");
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CANCEL_UPGRADE);
comp.handle(upgradeEvent);
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
}
@Test
public void testContainerCompletedCancelUpgrade() throws Exception {
String serviceName = "testContainerCompletedCancelUpgrade";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
// upgrade completes
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.UPGRADE)));
// reinitialization of a container done
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), START));
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), BECOME_READY));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CANCEL_UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val0")).setUpgradeVersion("v1"));
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE)));
Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
.iterator();
// cancel upgrade failed of a container
ComponentInstance instance1 = iter.next();
ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED)
.setInstance(instance1)
.setContainerId(instance1.getContainer().getId());
comp.handle(stopEvent);
instance1.handle(new ComponentInstanceEvent(
instance1.getContainer().getId(), STOP));
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in needs upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
// second instance finished upgrading
ComponentInstance instance2 = iter.next();
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(), ComponentInstanceEventType.START));
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in flexing state",
ComponentState.FLEXING, comp.getComponentSpec().getState());
// new container get allocated
context.assignNewContainer(context.attemptId, 10, comp);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testCancelUpgradeSuccessWhileUpgrading() throws Exception {
String serviceName = "testCancelUpgradeWhileUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
cancelUpgradeWhileUpgrading(context, comp);
// cancel upgrade successful for both instances
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.START));
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testCancelUpgradeFailureWhileUpgrading() throws Exception {
String serviceName = "testCancelUpgradeFailureWhileUpgrading";
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
cancelUpgradeWhileUpgrading(context, comp);
// cancel upgrade failed for both instances
for(ComponentInstance instance : comp.getAllComponentInstances()) {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.STOP));
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in flexing state",
ComponentState.FLEXING, comp.getComponentSpec().getState());
for (ComponentInstance instance : comp.getAllComponentInstances()) {
// new container get allocated
context.assignNewContainer(context.attemptId, 10, comp);
}
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("cancel upgrade failed", "val0",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
private void cancelUpgradeWhileUpgrading(
MockRunningServiceContext context, Component comp)
throws Exception {
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(context.service.getName(),
comp.getName(), "key1", "val1")).setUpgradeVersion("v0"));
Iterator<ComponentInstance> iter = comp.getAllComponentInstances()
.iterator();
ComponentInstance instance1 = iter.next();
// instance1 is triggered to upgrade
instance1.handle(new ComponentInstanceEvent(
instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
// component upgrade is cancelled
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CANCEL_UPGRADE)
.setTargetSpec(createSpecWithEnv(context.service.getName(),
comp.getName(), "key1",
"val0")).setUpgradeVersion("v0"));
// all instances upgrade is cancelled.
comp.getAllComponentInstances().forEach(instance ->
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(),
ComponentInstanceEventType.CANCEL_UPGRADE)));
// regular upgrade failed for instance 1
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1)
.setContainerId(instance1.getContainer().getId()));
instance1.handle(new ComponentInstanceEvent(
instance1.getContainer().getId(), STOP));
// component should be in cancel upgrade
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in needs upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState
.CANCEL_UPGRADING, comp.getState());
}
@Test
public void testComponentStateReachesStableStateWithTerminatingComponents()
throws
Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingComponents";
Service testService = ServiceTestUtils.createTerminatingJobExample(
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = new MockRunningServiceContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
ComponentInstance componentInstance = instanceIter.next();
Container instanceContainer = componentInstance.getContainer();
Assert.assertEquals(0, comp.getNumSucceededInstances());
Assert.assertEquals(0, comp.getNumFailedInstances());
Assert.assertEquals(2, comp.getNumRunningInstances());
Assert.assertEquals(2, comp.getNumReadyInstances());
Assert.assertEquals(0, comp.getPendingInstances().size());
//stop 1 container
ContainerStatus containerStatus = ContainerStatus.newInstance(
instanceContainer.getId(),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
"successful", 0);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
.setContainerId(instanceContainer.getId()));
componentInstance.handle(
new ComponentInstanceEvent(componentInstance.getContainer().getId(),
ComponentInstanceEventType.STOP).setStatus(containerStatus));
Assert.assertEquals(1, comp.getNumSucceededInstances());
Assert.assertEquals(0, comp.getNumFailedInstances());
Assert.assertEquals(1, comp.getNumRunningInstances());
Assert.assertEquals(1, comp.getNumReadyInstances());
Assert.assertEquals(0, comp.getPendingInstances().size());
org.apache.hadoop.yarn.service.component.ComponentState componentState =
Component.checkIfStable(comp);
Assert.assertEquals(
org.apache.hadoop.yarn.service.component.ComponentState.STABLE,
componentState);
}
}
@Test
public void testComponentStateUpdatesWithTerminatingComponents()
throws
Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingComponents";
Service testService = ServiceTestUtils.createTerminatingJobExample(
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = new MockRunningServiceContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
while (instanceIter.hasNext()) {
ComponentInstance componentInstance = instanceIter.next();
Container instanceContainer = componentInstance.getContainer();
//stop 1 container
ContainerStatus containerStatus = ContainerStatus.newInstance(
instanceContainer.getId(),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
"successful", 0);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
.setContainerId(instanceContainer.getId()));
componentInstance.handle(
new ComponentInstanceEvent(componentInstance.getContainer().getId(),
ComponentInstanceEventType.STOP).setStatus(containerStatus));
}
ComponentState componentState =
comp.getComponentSpec().getState();
Assert.assertEquals(
ComponentState.SUCCEEDED,
componentState);
}
ServiceState serviceState =
testService.getState();
Assert.assertEquals(
ServiceState.SUCCEEDED,
serviceState);
}
@Test
public void testComponentStateUpdatesWithTerminatingDominantComponents()
throws Exception {
final String serviceName =
"testComponentStateUpdatesWithTerminatingServiceStateComponents";
Service testService =
ServiceTestUtils.createTerminatingDominantComponentJobExample(
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = new MockRunningServiceContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
boolean componentIsDominant = comp.getComponentSpec()
.getConfiguration().getPropertyBool(
CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false);
if (componentIsDominant) {
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
while (instanceIter.hasNext()) {
ComponentInstance componentInstance = instanceIter.next();
Container instanceContainer = componentInstance.getContainer();
//stop 1 container
ContainerStatus containerStatus = ContainerStatus.newInstance(
instanceContainer.getId(),
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
"successful", 0);
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)
.setContainerId(instanceContainer.getId()));
componentInstance.handle(
new ComponentInstanceEvent(componentInstance.getContainer().
getId(), ComponentInstanceEventType.STOP).
setStatus(containerStatus));
}
ComponentState componentState =
comp.getComponentSpec().getState();
Assert.assertEquals(
ComponentState.SUCCEEDED,
componentState);
}
}
ServiceState serviceState =
testService.getState();
Assert.assertEquals(
ServiceState.SUCCEEDED,
serviceState);
}
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {
Service service = TestServiceManager.createBaseDef(serviceName);
org.apache.hadoop.yarn.service.api.records.Component spec =
service.getComponent(compName);
spec.getConfiguration().getEnv().put(key, val);
return spec;
}
public static MockRunningServiceContext createTestContext(
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
throws Exception {
return new MockRunningServiceContext(fsWatcher,
TestServiceManager.createBaseDef(serviceName));
}
}