blob: bc4d397b4cc509f3c79160209964c11fcb64f91b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.actionmanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
import org.apache.ambari.server.agent.ActionQueue;
import org.apache.ambari.server.agent.AgentCommand;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.apache.ambari.server.agent.CancelCommand;
import org.apache.ambari.server.agent.CommandReport;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.HostsMap;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.RequestEntity;
import org.apache.ambari.server.serveraction.MockServerAction;
import org.apache.ambari.server.serveraction.ServerActionExecutor;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.reflect.TypeToken;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.persist.PersistService;
import com.google.inject.persist.UnitOfWork;
import junit.framework.Assert;
public class TestActionScheduler {
private static final Logger log = LoggerFactory.getLogger(TestActionScheduler.class);
private static final String CLUSTER_HOST_INFO = "{all_hosts=[c6403.ambari.apache.org," +
" c6401.ambari.apache.org, c6402.ambari.apache.org], slave_hosts=[c6403.ambari.apache.org," +
" c6401.ambari.apache.org, c6402.ambari.apache.org]}";
private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org,"
+ " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org,"
+ " c6402.ambari.apache.org]}";
private static Injector injector;
private final String hostname = "ahost.ambari.apache.org";
private final int MAX_CYCLE_ITERATIONS = 100;
@Inject
HostRoleCommandFactory hostRoleCommandFactory;
@Inject
StageFactory stageFactory;
@Inject
StageUtils stageUtils;
@Inject
HostDAO hostDAO;
@Before
public void setup() throws Exception {
injector = Guice.createInjector(new InMemoryDefaultTestModule());
injector.getInstance(GuiceJpaInitializer.class);
injector.injectMembers(this);
}
@After
public void teardown() {
injector.getInstance(PersistService.class).stop();
injector = null;
}
/**
* This test sends a new action to the action scheduler and verifies that the action
* shows up in the action queue.
*/
@Test
public void testActionSchedule() throws Exception {
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(fsm.getClusterById(anyLong())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(oneClusterMock.getClusterId()).thenReturn(Long.valueOf(1L));
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
HostEntity hostEntity = new HostEntity();
hostEntity.setHostName(hostname);
hostDAO.merge(hostEntity);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEALTHY);
when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
List<Stage> stages = new ArrayList<Stage>();
Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
stages.add(s);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
//Keep large number of attempts so that the task is not expired finally
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
10000, new HostsMap((String) null), unitOfWork, null, conf);
scheduler.setTaskTimeoutAdjustment(false);
List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
assertTrue(ac.get(0) instanceof ExecutionCommand);
assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
//The action status has not changed, it should be queued again.
ac = waitForQueueSize(hostname, aq, 1, scheduler);
assertTrue(ac.get(0) instanceof ExecutionCommand);
assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
//Now change the action status
s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED);
ac = aq.dequeueAll(hostname);
//Wait for sometime, it shouldn't be scheduled this time.
ac = waitForQueueSize(hostname, aq, 0, scheduler);
}
private List<AgentCommand> waitForQueueSize(String hostname, ActionQueue aq,
int expectedQueueSize, ActionScheduler scheduler) {
int cycleCount = 0;
while (cycleCount++ <= MAX_CYCLE_ITERATIONS) {
List<AgentCommand> ac = aq.dequeueAll(hostname);
if (ac != null) {
if (ac.size() == expectedQueueSize) {
return ac;
} else if (ac.size() > expectedQueueSize) {
Assert.fail("Expected size : " + expectedQueueSize + " Actual size="
+ ac.size());
}
}
try {
scheduler.doWork();
} catch (AmbariException e) {
Assert.fail("Ambari exception : " + e.getMessage() + e.getStackTrace());
}
}
return null;
}
/**
* Test whether scheduler times out an action
*/
@Test
public void testActionTimeout() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEALTHY);
when(host.getHostName()).thenReturn(hostname);
HostEntity hostEntity = new HostEntity();
hostEntity.setHostName(hostname);
hostDAO.create(hostEntity);
List<Stage> stages = new ArrayList<Stage>();
final Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
s.addHostRoleExecutionCommand(hostname, Role.SECONDARY_NAMENODE, RoleCommand.INSTALL,
new ServiceComponentHostInstallEvent("SECONDARY_NAMENODE", hostname, System.currentTimeMillis(), "HDP-1.2.0"),
"cluster1", "HDFS", false, false);
s.setHostRoleStatus(hostname, "SECONDARY_NAMENODE", HostRoleStatus.IN_PROGRESS);
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
HostRoleCommand command = s.getHostRoleCommand(host, role);
command.setStatus(HostRoleStatus.TIMEDOUT);
return null;
}
}).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
int cycleCount = 0;
scheduler.doWork();
//Check that in_progress command is rescheduled
assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE"));
//Switch command back to IN_PROGRESS status and check that other command is not rescheduled
stages.get(0).setHostRoleStatus(hostname, "SECONDARY_NAMENODE", HostRoleStatus.IN_PROGRESS);
scheduler.doWork();
assertEquals(1, stages.get(0).getAttemptCount(hostname, "NAMENODE"));
assertEquals(2, stages.get(0).getAttemptCount(hostname, "SECONDARY_NAMENODE"));
while (!stages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE")
.equals(HostRoleStatus.TIMEDOUT) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
}
assertEquals(HostRoleStatus.TIMEDOUT,
stages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE"));
verify(db, times(1)).startRequest(eq(1L));
verify(db, times(1)).abortOperation(1L);
}
@Test
public void testActionTimeoutForLostHost() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEARTBEAT_LOST);
when(host.getHostName()).thenReturn(hostname);
List<Stage> stages = new ArrayList<Stage>();
final Stage s = StageUtils.getATestStage(1, 977, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
HostRoleCommand command = s.getHostRoleCommand(host, role);
command.setStatus(HostRoleStatus.TIMEDOUT);
return null;
}
}).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
//Small action timeout to test rescheduling
AmbariEventPublisher aep = EasyMock.createNiceMock(AmbariEventPublisher.class);
ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, aep, conf).
addMockedMethod("cancelHostRoleCommands").
createMock();
scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class));
EasyMock.expectLastCall();
EasyMock.replay(scheduler);
scheduler.setTaskTimeoutAdjustment(false);
int cycleCount=0;
while (!stages.get(0).getHostRoleStatus(hostname, "NAMENODE")
.equals(HostRoleStatus.TIMEDOUT) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
}
Assert.assertEquals(HostRoleStatus.TIMEDOUT,stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
EasyMock.verify(scheduler);
}
@Test
public void testOpFailedEventRaisedForAbortedHostRole() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch1 = mock(ServiceComponentHost.class);
ServiceComponentHost sch2 = mock(ServiceComponentHost.class);
String hostname1 = "host1";
String hostname2 = "host2";
Host host1 = mock(Host.class);
Host host2 = mock(Host.class);
HostEntity hostEntity1 = new HostEntity();
hostEntity1.setHostName(hostname1);
HostEntity hostEntity2 = new HostEntity();
hostEntity2.setHostName(hostname2);
hostDAO.merge(hostEntity1);
hostDAO.merge(hostEntity2);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch1);
hosts.put(hostname2, sch2);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
when(fsm.getHost(hostname1)).thenReturn(host1);
when(fsm.getHost(hostname2)).thenReturn(host2);
when(host1.getState()).thenReturn(HostState.HEARTBEAT_LOST);
when(host2.getState()).thenReturn(HostState.HEALTHY);
when(host1.getHostName()).thenReturn(hostname1);
when(host2.getHostName()).thenReturn(hostname2);
when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
when(scomp.getServiceComponentHost(hostname2)).thenReturn(sch2);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
final List<Stage> stages = new ArrayList<Stage>();
Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "stageWith2Tasks",
CLUSTER_HOST_INFO, "{\"command_param\":\"param_value\"}", "{\"host_param\":\"param_value\"}");
addInstallTaskToStage(stage, hostname1, "cluster1", Role.DATANODE,
RoleCommand.INSTALL, Service.Type.HDFS, 1);
addInstallTaskToStage(stage, hostname2, "cluster1", Role.NAMENODE,
RoleCommand.INSTALL, Service.Type.HDFS, 2);
stages.add(stage);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
//HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
for (HostRoleCommand command : stages.get(0).getOrderedHostRoleCommands()) {
if (command.getHostName().equals(host) && command.getRole().name()
.equals(role)) {
command.setStatus(HostRoleStatus.TIMEDOUT);
}
}
return null;
}
}).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED ||
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
return null;
}
}).when(db).abortOperation(anyLong());
ArgumentCaptor<ServiceComponentHostEvent> eventsCapture1 =
ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
ArgumentCaptor<ServiceComponentHostEvent> eventsCapture2 =
ArgumentCaptor.forClass(ServiceComponentHostEvent.class);
// Make sure the NN install doesn't timeout
ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
scheduler.setTaskTimeoutAdjustment(false);
int cycleCount=0;
while (!(stages.get(0).getHostRoleStatus(hostname1, "DATANODE")
.equals(HostRoleStatus.TIMEDOUT) && stages.get(0).getHostRoleStatus
(hostname2, "NAMENODE").equals(HostRoleStatus.ABORTED)) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
}
Assert.assertEquals(HostRoleStatus.TIMEDOUT,
stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
Assert.assertEquals(HostRoleStatus.ABORTED,
stages.get(0).getHostRoleStatus(hostname2, "NAMENODE"));
verify(sch1, atLeastOnce()).handleEvent(eventsCapture1.capture());
verify(sch2, atLeastOnce()).handleEvent(eventsCapture2.capture());
List<ServiceComponentHostEvent> eventTypes = eventsCapture1.getAllValues();
eventTypes.addAll(eventsCapture2.getAllValues());
Assert.assertNotNull(eventTypes);
ServiceComponentHostOpFailedEvent datanodeFailedEvent = null;
ServiceComponentHostOpFailedEvent namenodeFailedEvent = null;
for (ServiceComponentHostEvent eventType : eventTypes) {
if (eventType instanceof ServiceComponentHostOpFailedEvent) {
ServiceComponentHostOpFailedEvent event =
(ServiceComponentHostOpFailedEvent) eventType;
if (event.getServiceComponentName().equals("DATANODE")) {
datanodeFailedEvent = event;
} else if (event.getServiceComponentName().equals("NAMENODE")) {
namenodeFailedEvent = event;
}
}
}
Assert.assertNotNull("Datanode should be in Install failed state.",
datanodeFailedEvent);
Assert.assertNotNull("Namenode should be in Install failed state.",
namenodeFailedEvent);
}
/**
* Test server action
*/
@Test
public void testServerAction() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
List<Stage> stages = new ArrayList<Stage>();
Map<String, String> payload = new HashMap<String, String>();
final Stage s = getStageWithServerAction(1, 977, payload, "test", 1200);
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
HostRoleCommand command = null;
if (null == host) {
command = s.getHostRoleCommand(null, role);
} else {
command = s.getHostRoleCommand(host, role);
}
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
return null;
}
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
doAnswer(new Answer<HostRoleCommand>() {
@Override
public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
}
}).when(db).getTask(anyLong());
doAnswer(new Answer<List<HostRoleCommand>>() {
@Override
public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
String role = (String) invocation.getArguments()[1];
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
HostRoleCommand task = s.getHostRoleCommand(null, role);
if (task.getStatus() == status) {
return Arrays.asList(task);
} else {
return Collections.emptyList();
}
}
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
ServerActionExecutor.init(injector);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
.equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
scheduler.getServerActionExecutor().doWork();
}
assertEquals(stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"),
HostRoleStatus.COMPLETED);
}
/**
* Test server actions in multiple requests.
*
* This is used to make sure the server-side actions do not get filtered out from
* {@link org.apache.ambari.server.actionmanager.ActionScheduler#filterParallelPerHostStages(java.util.List)}
*/
@Test
public void testServerActionInMultipleRequests() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
String clusterName = "cluster1";
String hostname1 = "ahost.ambari.apache.org";
String hostname2 = "bhost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch);
hosts.put(hostname2, sch);
hosts.put(Stage.INTERNAL_HOSTNAME, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
List<Stage> stages = new ArrayList<Stage>();
Stage stage01 = createStage(clusterName, 0, 1);
addTask(stage01, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 1);
Stage stage11 = createStage("cluster1", 1, 1);
addTask(stage11, hostname1, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 2);
Stage stage02 = createStage("cluster1", 0, 2);
addTask(stage02, Stage.INTERNAL_HOSTNAME, clusterName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, "AMBARI", 3);
Stage stage12 = createStage("cluster1", 1, 2);
addTask(stage12, hostname2, clusterName, Role.KERBEROS_CLIENT, RoleCommand.CUSTOM_COMMAND, "KERBEROS", 4);
stages.add(stage01);
stages.add(stage11);
stages.add(stage02);
stages.add(stage12);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf);
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, Role.KERBEROS_CLIENT.name()));
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(Stage.INTERNAL_HOSTNAME, Role.AMBARI_SERVER_ACTION.name()));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname2, Role.KERBEROS_CLIENT.name()));
}
/**
* Test server action
*/
@Test
public void testServerActionTimeOut() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
List<Stage> stages = new ArrayList<Stage>();
Map<String, String> payload = new HashMap<String, String>();
payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout");
final Stage s = getStageWithServerAction(1, 977, payload, "test", 2);
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
HostRoleCommand command = null;
if (null == host) {
command = s.getHostRoleCommand(null, role);
} else {
command = s.getHostRoleCommand(host, role);
}
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
return null;
}
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
doAnswer(new Answer<HostRoleCommand>() {
@Override
public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
}
}).when(db).getTask(anyLong());
doAnswer(new Answer<List<HostRoleCommand>>() {
@Override
public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
String role = (String) invocation.getArguments()[1];
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
HostRoleCommand task = s.getHostRoleCommand(null, role);
if (task.getStatus() == status) {
return Arrays.asList(task);
} else {
return Collections.emptyList();
}
}
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
ServerActionExecutor.init(injector);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState()
&& cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
scheduler.getServerActionExecutor().doWork();
}
assertEquals(HostRoleStatus.TIMEDOUT,
stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"));
}
@Test
public void testServerActionFailed() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
List<Stage> stages = new ArrayList<Stage>();
Map<String, String> payload = new HashMap<String, String>();
payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception");
final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
HostRoleCommand command = null;
if (null == host) {
command = s.getHostRoleCommand(null, role);
} else {
command = s.getHostRoleCommand(host, role);
}
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
return null;
}
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
doAnswer(new Answer<HostRoleCommand>() {
@Override
public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
}
}).when(db).getTask(anyLong());
doAnswer(new Answer<List<HostRoleCommand>>() {
@Override
public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
String role = (String) invocation.getArguments()[1];
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
HostRoleCommand task = s.getHostRoleCommand(null, role);
if (task.getStatus() == status) {
return Arrays.asList(task);
} else {
return Collections.emptyList();
}
}
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
.equals(HostRoleStatus.FAILED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
scheduler.getServerActionExecutor().doWork();
}
assertEquals(stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"),
HostRoleStatus.FAILED);
assertEquals("test", stages.get(0).getRequestContext());
}
private Stage getStageWithServerAction(long requestId, long stageId,
Map<String, String> payload, String requestContext,
int timeout) {
Stage stage = stageFactory.createNew(requestId, "/tmp", "cluster1", 1L, requestContext, CLUSTER_HOST_INFO,
"{}", "{}");
stage.setStageId(stageId);
stage.addServerActionCommand(MockServerAction.class.getName(), null,
Role.AMBARI_SERVER_ACTION,
RoleCommand.EXECUTE, "cluster1",
new ServiceComponentHostServerActionEvent(null, System.currentTimeMillis()),
payload,
null, null, timeout, false, false);
return stage;
}
/**
* Verifies that stages that are executed on different hosts and
* rely to different requests are scheduled to be executed in parallel
*/
@Test
public void testIndependentStagesExecution() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
String hostname1 = "ahost.ambari.apache.org";
String hostname2 = "bhost.ambari.apache.org";
String hostname3 = "chost.ambari.apache.org";
String hostname4 = "chost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch);
hosts.put(hostname2, sch);
hosts.put(hostname3, sch);
hosts.put(hostname4, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
List<Stage> stages = new ArrayList<Stage>();
stages.add(
getStageWithSingleTask(
hostname1, "cluster1", Role.DATANODE,
RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
stages.add( // Stage with the same hostname, should not be scheduled
getStageWithSingleTask(
hostname1, "cluster1", Role.GANGLIA_MONITOR,
RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
stages.add(
getStageWithSingleTask(
hostname2, "cluster1", Role.DATANODE,
RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
stages.add(
getStageWithSingleTask(
hostname3, "cluster1", Role.DATANODE,
RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
stages.add( // Stage with the same request id, should not be scheduled
getStageWithSingleTask(
hostname4, "cluster1", Role.GANGLIA_MONITOR,
RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "DATANODE"));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
}
/**
* Verifies that ActionScheduler respects "disable parallel stage execution option"
*/
@Test
public void testIndependentStagesExecutionDisabled() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
String hostname1 = "ahost.ambari.apache.org";
String hostname2 = "bhost.ambari.apache.org";
String hostname3 = "chost.ambari.apache.org";
String hostname4 = "chost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch);
hosts.put(hostname2, sch);
hosts.put(hostname3, sch);
hosts.put(hostname4, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
List<Stage> stages = new ArrayList<Stage>();
Stage stage = getStageWithSingleTask(
hostname1, "cluster1", Role.HIVE_CLIENT,
RoleCommand.INSTALL, Service.Type.HIVE, 1, 1, 1);
Map<String, String> hiveSite = new TreeMap<String, String>();
hiveSite.put("javax.jdo.option.ConnectionPassword", "password");
hiveSite.put("hive.server2.thrift.port", "10000");
Map<String, Map<String, String>> configurations =
new TreeMap<String, Map<String, String>>();
configurations.put("hive-site", hiveSite);
stage.getExecutionCommands(hostname1).get(0).getExecutionCommand().setConfigurations(configurations);
stages.add(stage);
stages.add( // Stage with the same hostname, should not be scheduled
getStageWithSingleTask(
hostname1, "cluster1", Role.GANGLIA_MONITOR,
RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
stages.add(
getStageWithSingleTask(
hostname2, "cluster1", Role.HIVE_CLIENT,
RoleCommand.INSTALL, Service.Type.HIVE, 3, 3, 3));
stages.add(
getStageWithSingleTask(
hostname3, "cluster1", Role.DATANODE,
RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
stages.add( // Stage with the same request id, should not be scheduled
getStageWithSingleTask(
hostname4, "cluster1", Role.GANGLIA_MONITOR,
RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf);
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "HIVE_CLIENT"));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(2).getHostRoleStatus(hostname2, "HIVE_CLIENT"));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(3).getHostRoleStatus(hostname3, "DATANODE"));
Assert.assertEquals(HostRoleStatus.PENDING, stages.get(4).getHostRoleStatus(hostname4, "GANGLIA_MONITOR"));
Assert.assertFalse(stages.get(0).getExecutionCommands(hostname1).get(0).getExecutionCommand().
getConfigurations().containsKey("javax.jdo.option.ConnectionPassword"));
}
/**
* Verifies that ActionScheduler allows to execute background tasks in parallel
*/
@Test
public void testBackgroundStagesExecutionEnable() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
String hostname1 = "ahost.ambari.apache.org";
String hostname2 = "bhost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch);
hosts.put(hostname2, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
List<Stage> stages = new ArrayList<Stage>();
Stage backgroundStage = null;
stages.add(//stage with background command
backgroundStage = getStageWithSingleTask(
hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1));
Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType());
stages.add( // Stage with the same hostname, should be scheduled
getStageWithSingleTask(
hostname1, "cluster1", Role.GANGLIA_MONITOR,
RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
stages.add(
getStageWithSingleTask(
hostname2, "cluster1", Role.DATANODE,
RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
Properties properties = new Properties();
properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf);
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(1).getHostRoleStatus(hostname1, "GANGLIA_MONITOR"));
}
@Test
public void testRequestFailureOnStageFailure() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
RequestFactory requestFactory = mock(RequestFactory.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
final List<Stage> stages = new ArrayList<Stage>();
stages.add(
getStageWithSingleTask(
hostname, "cluster1", Role.NAMENODE, RoleCommand.UPGRADE, Service.Type.HDFS, 1, 1, 1));
stages.add(
getStageWithSingleTask(
hostname, "cluster1", Role.DATANODE, RoleCommand.UPGRADE, Service.Type.HDFS, 2, 2, 1));
Host host = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEALTHY);
when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
for (CommandReport report : reports) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
Long requestId = requestStageIds[0];
Long stageId = requestStageIds[1];
Long id = report.getTaskId();
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
if (hostRoleCommand.getTaskId() == id) {
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
}
}
}
}
}
return null;
}
}).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED ||
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
return null;
}
}).when(db).abortOperation(anyLong());
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>();
ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
withConstructor((long)100, (long)50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf).
addMockedMethod("cancelHostRoleCommands").
createMock();
scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList),
EasyMock.eq(ActionScheduler.FAILED_TASK_ABORT_REASONING));
EasyMock.expectLastCall().once();
EasyMock.replay(scheduler);
ActionManager am = new ActionManager(
2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf,
EasyMock.createNiceMock(AmbariEventPublisher.class));
scheduler.doWork();
List<CommandReport> reports = new ArrayList<CommandReport>();
reports.add(getCommandReport(HostRoleStatus.FAILED, Role.NAMENODE, Service.Type.HDFS, "1-1", 1));
am.processTaskResponse(hostname, reports, stages.get(0).getOrderedHostRoleCommands());
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.FAILED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
Assert.assertEquals(cancelCommandList.getValue().size(), 1);
EasyMock.verify(scheduler);
}
/**
* Tests that the whole request is aborted when there are no QUEUED tasks for a role and
* success factor is not met. As long as there is one QUEUED task the request is not
* aborted.
* @throws Exception
*/
@Test
public void testRequestAbortsOnlyWhenNoQueuedTaskAndSuccessFactorUnmet() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
String host1 = "host1";
String host2 = "host2";
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(host1, sch);
hosts.put(host2, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEALTHY);
when(host.getHostName()).thenReturn(host1);
HostEntity hostEntity1 = new HostEntity();
HostEntity hostEntity2 = new HostEntity();
hostEntity1.setHostName(host1);
hostEntity2.setHostName(host2);
hostDAO.create(hostEntity1);
hostDAO.create(hostEntity2);
final List<Stage> stages = new ArrayList<Stage>();
long now = System.currentTimeMillis();
Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L,
"testRequestFailureBasedOnSuccessFactor", CLUSTER_HOST_INFO, "", "");
stage.setStageId(1);
addHostRoleExecutionCommand(now, stage, Role.SQOOP, Service.Type.SQOOP,
RoleCommand.INSTALL, host1, "cluster1");
addHostRoleExecutionCommand(now, stage, Role.OOZIE_CLIENT, Service.Type.OOZIE,
RoleCommand.INSTALL, host1, "cluster1");
addHostRoleExecutionCommand(now, stage, Role.MAPREDUCE_CLIENT, Service.Type.MAPREDUCE,
RoleCommand.INSTALL, host1, "cluster1");
addHostRoleExecutionCommand(now, stage, Role.HBASE_CLIENT, Service.Type.HBASE,
RoleCommand.INSTALL, host1, "cluster1");
addHostRoleExecutionCommand(now, stage, Role.GANGLIA_MONITOR, Service.Type.GANGLIA,
RoleCommand.INSTALL, host1, "cluster1");
addHostRoleExecutionCommand(now, stage, Role.HBASE_CLIENT, Service.Type.HBASE,
RoleCommand.INSTALL, host2, "cluster1");
addHostRoleExecutionCommand(now, stage, Role.GANGLIA_MONITOR, Service.Type.GANGLIA,
RoleCommand.INSTALL, host2, "cluster1");
stages.add(stage);
HostRoleStatus[] statusesAtIterOne = {HostRoleStatus.QUEUED, HostRoleStatus.QUEUED,
HostRoleStatus.QUEUED, HostRoleStatus.QUEUED, HostRoleStatus.FAILED,
HostRoleStatus.FAILED, HostRoleStatus.QUEUED, HostRoleStatus.QUEUED};
for (int index = 0; index < stage.getOrderedHostRoleCommands().size(); index++) {
stage.getOrderedHostRoleCommands().get(index).setTaskId(index + 1);
stage.getOrderedHostRoleCommands().get(index).setStatus(statusesAtIterOne[index]);
}
stage.setLastAttemptTime(host1, Role.SQOOP.toString(), now);
stage.setLastAttemptTime(host1, Role.MAPREDUCE_CLIENT.toString(), now);
stage.setLastAttemptTime(host1, Role.OOZIE_CLIENT.toString(), now);
stage.setLastAttemptTime(host1, Role.GANGLIA_MONITOR.toString(), now);
stage.setLastAttemptTime(host1, Role.HBASE_CLIENT.toString(), now);
stage.setLastAttemptTime(host2, Role.GANGLIA_MONITOR.toString(), now);
stage.setLastAttemptTime(host2, Role.HBASE_CLIENT.toString(), now);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
Long requestId = (Long) invocation.getArguments()[1];
Long stageId = (Long) invocation.getArguments()[2];
String role = (String) invocation.getArguments()[3];
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
HostRoleCommand command = stage.getHostRoleCommand(host, role);
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
}
}
return null;
}
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED ||
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
return null;
}
}).when(db).abortOperation(anyLong());
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf);
scheduler.doWork();
// Request is not aborted because all roles are in progress
HostRoleStatus[] expectedStatusesAtIterOne = {HostRoleStatus.QUEUED, HostRoleStatus.QUEUED,
HostRoleStatus.QUEUED, HostRoleStatus.QUEUED, HostRoleStatus.FAILED,
HostRoleStatus.FAILED, HostRoleStatus.QUEUED, HostRoleStatus.QUEUED};
for (int index = 0; index < stage.getOrderedHostRoleCommands().size(); index++) {
log.info(stage.getOrderedHostRoleCommands().get(index).toString());
Assert.assertEquals(expectedStatusesAtIterOne[index],
stage.getOrderedHostRoleCommands().get(index).getStatus());
}
HostRoleStatus[] statusesAtIterTwo = {HostRoleStatus.QUEUED, HostRoleStatus.QUEUED,
HostRoleStatus.QUEUED, HostRoleStatus.QUEUED, HostRoleStatus.FAILED,
HostRoleStatus.FAILED, HostRoleStatus.QUEUED, HostRoleStatus.COMPLETED};
for (int index = 0; index < stage.getOrderedHostRoleCommands().size(); index++) {
stage.getOrderedHostRoleCommands().get(index).setStatus(statusesAtIterTwo[index]);
}
scheduler.doWork();
// Request is not aborted because GANGLIA_MONITOR's success factor (0.5) is met
HostRoleStatus[] expectedStatusesAtIterTwo = {HostRoleStatus.QUEUED, HostRoleStatus.QUEUED,
HostRoleStatus.QUEUED, HostRoleStatus.QUEUED, HostRoleStatus.FAILED,
HostRoleStatus.FAILED, HostRoleStatus.QUEUED, HostRoleStatus.COMPLETED};
for (int index = 0; index < stage.getOrderedHostRoleCommands().size(); index++) {
log.info(stage.getOrderedHostRoleCommands().get(index).toString());
Assert.assertEquals(expectedStatusesAtIterTwo[index],
stage.getOrderedHostRoleCommands().get(index).getStatus());
}
HostRoleStatus[] statusesAtIterThree = {HostRoleStatus.QUEUED, HostRoleStatus.QUEUED,
HostRoleStatus.QUEUED, HostRoleStatus.QUEUED, HostRoleStatus.FAILED,
HostRoleStatus.FAILED, HostRoleStatus.FAILED, HostRoleStatus.COMPLETED};
for (int index = 0; index < stage.getOrderedHostRoleCommands().size(); index++) {
stage.getOrderedHostRoleCommands().get(index).setStatus(statusesAtIterThree[index]);
}
// Fails becuse HostRoleCommand doesn't have a hostName
scheduler.doWork();
// Request is aborted because HBASE_CLIENT's success factor (1) is not met
HostRoleStatus[] expectedStatusesAtIterThree = {HostRoleStatus.ABORTED, HostRoleStatus.ABORTED,
HostRoleStatus.ABORTED, HostRoleStatus.ABORTED, HostRoleStatus.FAILED,
HostRoleStatus.FAILED, HostRoleStatus.FAILED, HostRoleStatus.COMPLETED};
for (int index = 0; index < stage.getOrderedHostRoleCommands().size(); index++) {
log.info(stage.getOrderedHostRoleCommands().get(index).toString());
Assert.assertEquals(expectedStatusesAtIterThree[index],
stage.getOrderedHostRoleCommands().get(index).getStatus());
}
}
private void addHostRoleExecutionCommand(long now, Stage stage, Role role, Service.Type service,
RoleCommand command, String host, String cluster) {
stage.addHostRoleExecutionCommand(host, role, command,
new ServiceComponentHostInstallEvent(role.toString(), host, now, "HDP-0.2"),
cluster, service.toString(), false, false);
stage.getExecutionCommandWrapper(host,
role.toString()).getExecutionCommand();
}
@Test
public void testRequestFailureBasedOnSuccessFactor() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
RequestFactory requestFactory = mock(RequestFactory.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
final List<Stage> stages = new ArrayList<Stage>();
long now = System.currentTimeMillis();
Stage stage = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "testRequestFailureBasedOnSuccessFactor",
CLUSTER_HOST_INFO, "", "");
stage.setStageId(1);
stage.addHostRoleExecutionCommand("host1", Role.DATANODE, RoleCommand.UPGRADE,
new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host1", now, "HDP-0.2"),
"cluster1", Service.Type.HDFS.toString(), false, false);
stage.getExecutionCommandWrapper("host1",
Role.DATANODE.toString()).getExecutionCommand();
stage.addHostRoleExecutionCommand("host2", Role.DATANODE, RoleCommand.UPGRADE,
new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host2", now, "HDP-0.2"),
"cluster1", Service.Type.HDFS.toString(), false, false);
stage.getExecutionCommandWrapper("host2",
Role.DATANODE.toString()).getExecutionCommand();
stage.addHostRoleExecutionCommand("host3", Role.DATANODE, RoleCommand.UPGRADE,
new ServiceComponentHostUpgradeEvent(Role.DATANODE.toString(), "host3", now, "HDP-0.2"),
"cluster1", Service.Type.HDFS.toString(), false, false);
stage.getExecutionCommandWrapper("host3",
Role.DATANODE.toString()).getExecutionCommand();
stages.add(stage);
stage.getOrderedHostRoleCommands().get(0).setTaskId(1);
stage.getOrderedHostRoleCommands().get(1).setTaskId(2);
stage.getOrderedHostRoleCommands().get(2).setTaskId(3);
stages.add(
getStageWithSingleTask(
"host1", "cluster1", Role.HDFS_CLIENT, RoleCommand.UPGRADE, Service.Type.HDFS, 4, 2, 1));
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
for (CommandReport report : reports) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
Long requestId = requestStageIds[0];
Long stageId = requestStageIds[1];
Long id = report.getTaskId();
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
if (hostRoleCommand.getTaskId() == id) {
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
}
}
}
}
}
return null;
}
}).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED ||
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
return null;
}
}).when(db).abortOperation(anyLong());
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf);
ActionManager am = new ActionManager(
2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf, null);
scheduler.doWork();
List<CommandReport> reports = new ArrayList<CommandReport>();
reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS, "1-1", 1));
am.processTaskResponse("host1", reports, stage.getOrderedHostRoleCommands());
reports.clear();
reports.add(getCommandReport(HostRoleStatus.FAILED, Role.DATANODE, Service.Type.HDFS, "1-1", 2));
am.processTaskResponse("host2", reports, stage.getOrderedHostRoleCommands());
reports.clear();
reports.add(getCommandReport(HostRoleStatus.COMPLETED, Role.DATANODE, Service.Type.HDFS, "1-1", 3));
am.processTaskResponse("host3", reports, stage.getOrderedHostRoleCommands());
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus("host1", "HDFS_CLIENT"));
}
private CommandReport getCommandReport(HostRoleStatus status, Role role, Service.Type service, String actionId,
int taskId) {
CommandReport report = new CommandReport();
report.setExitCode(999);
report.setStdErr("");
report.setStdOut("");
report.setStatus(status.toString());
report.setRole(role.toString());
report.setServiceName(service.toString());
report.setActionId(actionId);
report.setTaskId(taskId);
return report;
}
private Stage createStage(String clusterName, int stageId, int requestId) {
Stage stage = stageFactory.createNew(requestId, "/tmp", clusterName, 1L, "getStageWithSingleTask",
CLUSTER_HOST_INFO, "{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
stage.setStageId(stageId);
return stage;
}
private Stage addTask(Stage stage, String hostname, String clusterName, Role role,
RoleCommand roleCommand, String serviceName, int taskId) {
stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
new ServiceComponentHostUpgradeEvent(role.toString(), hostname, System.currentTimeMillis(), "HDP-0.2"),
clusterName, serviceName, false, false);
stage.getExecutionCommandWrapper(hostname,
role.toString()).getExecutionCommand();
stage.getOrderedHostRoleCommands().get(0).setTaskId(taskId);
return stage;
}
private Stage getStageWithSingleTask(String hostname, String clusterName, Role role,
RoleCommand roleCommand, Service.Type service, int taskId,
int stageId, int requestId) {
Stage stage = createStage(clusterName, stageId, requestId);
return addTask(stage, hostname, clusterName, role, roleCommand, service.name(), taskId);
}
private Stage getStageWithSingleTask(String hostname, String clusterName, Role role, RoleCommand roleCommand,
String customCommandName, Service.Type service, int taskId, int stageId, int requestId) {
Stage stage = getStageWithSingleTask(hostname, clusterName, role, roleCommand, service, taskId, stageId, requestId);
HostRoleCommand cmd = stage.getHostRoleCommand(hostname, role.name());
if (cmd != null) {
cmd.setCustomCommandName(customCommandName);
}
stage.getExecutionCommandWrapper(hostname, role.toString()).getExecutionCommand().setCommandType(AgentCommandType.BACKGROUND_EXECUTION_COMMAND);
return stage;
}
private void addInstallTaskToStage(Stage stage, String hostname,
String clusterName, Role role,
RoleCommand roleCommand, Service.Type service,
int taskId) {
stage.addHostRoleExecutionCommand(hostname, role, roleCommand,
new ServiceComponentHostInstallEvent(role.toString(), hostname,
System.currentTimeMillis(), "HDP-0.2"), clusterName, service.toString(), false, false);
ExecutionCommand command = stage.getExecutionCommandWrapper
(hostname, role.toString()).getExecutionCommand();
command.setTaskId(taskId);
for (HostRoleCommand cmd :stage.getOrderedHostRoleCommands()) {
if (cmd.getHostName().equals(hostname) && cmd.getRole().equals(role)) {
cmd.setTaskId(taskId);
}
}
}
@Test
public void testSuccessFactors() {
Stage s = StageUtils.getATestStage(1, 1, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
assertEquals(new Float(0.5), new Float(s.getSuccessFactor(Role.DATANODE)));
assertEquals(new Float(0.5), new Float(s.getSuccessFactor(Role.TASKTRACKER)));
assertEquals(new Float(0.5), new Float(s.getSuccessFactor(Role.GANGLIA_MONITOR)));
assertEquals(new Float(0.5), new Float(s.getSuccessFactor(Role.HBASE_REGIONSERVER)));
assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE)));
assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER)));
}
@Test
public void testSuccessCriteria() {
RoleStats rs1 = new RoleStats(1, (float)0.5);
rs1.numSucceeded = 1;
assertTrue(rs1.isSuccessFactorMet());
rs1.numSucceeded = 0;
assertFalse(rs1.isSuccessFactorMet());
RoleStats rs2 = new RoleStats(2, (float)0.5);
rs2.numSucceeded = 1;
assertTrue(rs2.isSuccessFactorMet());
RoleStats rs3 = new RoleStats(3, (float)0.5);
rs3.numSucceeded = 2;
assertTrue(rs2.isSuccessFactorMet());
rs3.numSucceeded = 1;
assertFalse(rs3.isSuccessFactorMet());
RoleStats rs4 = new RoleStats(3, (float)1.0);
rs4.numSucceeded = 2;
assertFalse(rs3.isSuccessFactorMet());
}
/**
* This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly.
*/
@Test
public void testClusterHostInfoCache() throws Exception {
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
//Data for stages
Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
int stageId = 1;
int requestId1 = 1;
int requestId2 = 2;
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
Host host = mock(Host.class);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEALTHY);
when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessorImpl.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
Stage s1 = StageUtils.getATestStage(requestId1, stageId, hostname, CLUSTER_HOST_INFO,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
Stage s2 = StageUtils.getATestStage(requestId2, stageId, hostname, CLUSTER_HOST_INFO_UPDATED,
"{\"host_param\":\"param_value\"}", "{\"stage_param\":\"param_value\"}");
when(db.getCommandsInProgressCount()).thenReturn(1);
when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s1));
//Keep large number of attempts so that the task is not expired finally
//Small action timeout to test rescheduling
ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
10000, new HostsMap((String) null), unitOfWork, null, conf);
scheduler.setTaskTimeoutAdjustment(false);
List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
assertTrue(ac.get(0) instanceof ExecutionCommand);
assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
when(db.getCommandsInProgressCount()).thenReturn(1);
when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
//Verify that ActionSheduler does not return cached value of cluster host info for new requestId
ac = waitForQueueSize(hostname, aq, 1, scheduler);
assertTrue(ac.get(0) instanceof ExecutionCommand);
assertEquals(String.valueOf(requestId2) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
assertEquals(clusterHostInfo2, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
}
/**
* Checks what happens when stage has an execution command for
* host component that has been recently deleted
* @throws Exception
*/
@Test
public void testCommandAbortForDeletedComponent() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponent scWithDeletedSCH = mock(ServiceComponent.class);
ServiceComponentHost sch1 = mock(ServiceComponentHost.class);
String hostname1 = "host1";
Host host1 = mock(Host.class);
when(fsm.getHost(hostname1)).thenReturn(host1);
when(host1.getState()).thenReturn(HostState.HEALTHY);
when(host1.getHostName()).thenReturn(hostname1);
when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch1);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
HostEntity hostEntity = new HostEntity();
hostEntity.setHostName(hostname1);
hostDAO.create(hostEntity);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(Role.HBASE_MASTER.toString())).
thenReturn(scWithDeletedSCH);
when(serviceObj.getServiceComponent(Role.HBASE_REGIONSERVER.toString())).
thenReturn(scomp);
when(scWithDeletedSCH.getServiceComponentHost(anyString())).
thenThrow(new ServiceComponentHostNotFoundException("dummyCluster",
"dummyService", "dummyComponent", "dummyHostname"));
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
final List<Stage> stages = new ArrayList<Stage>();
Stage stage1 = stageFactory.createNew(1, "/tmp", "cluster1", 1L, "stageWith2Tasks",
CLUSTER_HOST_INFO, "", "");
addInstallTaskToStage(stage1, hostname1, "cluster1", Role.HBASE_MASTER,
RoleCommand.INSTALL, Service.Type.HBASE, 1);
addInstallTaskToStage(stage1, hostname1, "cluster1", Role.HBASE_REGIONSERVER,
RoleCommand.INSTALL, Service.Type.HBASE, 2);
stages.add(stage1);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
final CountDownLatch abortCalls = new CountDownLatch(2);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED ||
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
abortCalls.countDown();
return null;
}
}).when(db).abortOperation(anyLong());
scheduler.setTaskTimeoutAdjustment(false);
// Start the thread
scheduler.start();
long timeout = 60;
abortCalls.await(timeout, TimeUnit.SECONDS);
Assert.assertEquals(HostRoleStatus.ABORTED,
stages.get(0).getHostRoleStatus(hostname1, "HBASE_MASTER"));
Assert.assertEquals(HostRoleStatus.ABORTED,
stages.get(0).getHostRoleStatus(hostname1, "HBASE_REGIONSERVER"));
// If regression occured, scheduler thread would fail with an exception
// instead of aborting request
verify(db, times(2)).abortOperation(anyLong());
scheduler.stop();
}
@Test
public void testServerActionWOService() throws Exception {
ActionQueue aq = new ActionQueue();
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
Clusters fsm = mock(Clusters.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
List<Stage> stages = new ArrayList<Stage>();
Map<String, String> payload = new HashMap<String, String>();
final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
stages.add(s);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
String host = (String) invocation.getArguments()[0];
String role = (String) invocation.getArguments()[3];
CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
HostRoleCommand command = null;
if (null == host) {
command = s.getHostRoleCommand(null, role);
} else {
command = s.getHostRoleCommand(host, role);
}
command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
return null;
}
}).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
doAnswer(new Answer<List<HostRoleCommand>>() {
@Override
public List<HostRoleCommand> answer(InvocationOnMock invocation) throws Throwable {
String role = (String) invocation.getArguments()[1];
HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
HostRoleCommand task = s.getHostRoleCommand(null, role);
if (task.getStatus() == status) {
return Arrays.asList(task);
} else {
return Collections.emptyList();
}
}
}).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
doAnswer(new Answer<HostRoleCommand>() {
@Override
public HostRoleCommand answer(InvocationOnMock invocation) throws Throwable {
return s.getHostRoleCommand(null, "AMBARI_SERVER_ACTION");
}
}).when(db).getTask(anyLong());
ServerActionExecutor.init(injector);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
int cycleCount = 0;
while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
.equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
scheduler.doWork();
scheduler.getServerActionExecutor().doWork();
}
assertEquals(stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION"),
HostRoleStatus.COMPLETED);
}
@Test
public void testCancelRequests() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
HostEntity hostEntity = new HostEntity();
hostEntity.setHostName(hostname);
hostDAO.create(hostEntity);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
hosts.put(hostname, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
long requestId = 1;
final List<Stage> stages = new ArrayList<Stage>();
int namenodeCmdTaskId = 1;
stages.add(
getStageWithSingleTask(
hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
stages.add(
getStageWithSingleTask(
hostname, "cluster1", Role.DATANODE, RoleCommand.START,
Service.Type.HDFS, 2, 2, (int)requestId));
Host host = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host);
when(host.getState()).thenReturn(HostState.HEALTHY);
when(host.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
for (Stage stage : stages) {
requestTasks.addAll(stage.getOrderedHostRoleCommands());
}
when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
when(db.getAllStages(anyLong())).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
for (CommandReport report : reports) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
Long requestId = requestStageIds[0];
Long stageId = requestStageIds[1];
Long id = report.getTaskId();
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
if (hostRoleCommand.getTaskId() == id) {
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
}
}
}
}
}
return null;
}
}).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED ||
command.getStatus() == HostRoleStatus.IN_PROGRESS ||
command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
return null;
}
}).when(db).abortOperation(anyLong());
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();
String reason = "Some reason";
scheduler.scheduleCancellingRequest(requestId, reason);
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be generated only for 1 stage
CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname);
Assert.assertEquals(cancelCommand.getTargetTaskId(), namenodeCmdTaskId);
Assert.assertEquals(cancelCommand.getReason(), reason);
}
@Test
public void testExclusiveRequests() throws Exception {
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
HashMap<String, ServiceComponentHost> hosts =
new HashMap<String, ServiceComponentHost>();
String hostname1 = "hostname1";
String hostname2 = "hostname2";
String hostname3 = "hostname3";
hosts.put(hostname1, sch);
hosts.put(hostname2, sch);
hosts.put(hostname3, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
long requestId1 = 1;
long requestId2 = 2;
long requestId3 = 3;
final List<Stage> stagesInProgress = new ArrayList<Stage>();
int namenodeCmdTaskId = 1;
stagesInProgress.add(
getStageWithSingleTask(
hostname1, "cluster1", Role.NAMENODE, RoleCommand.START,
Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1));
stagesInProgress.add(
getStageWithSingleTask(
hostname1, "cluster1", Role.DATANODE, RoleCommand.START,
Service.Type.HDFS, 2, 2, (int) requestId1));
stagesInProgress.add(
getStageWithSingleTask(
hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive
Service.Type.HDFS, 3, 3, (int) requestId2));
stagesInProgress.add(
getStageWithSingleTask(
hostname3, "cluster1", Role.DATANODE, RoleCommand.START,
Service.Type.HDFS, 4, 4, (int) requestId3));
Host host1 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host1);
when(host1.getState()).thenReturn(HostState.HEALTHY);
when(host1.getHostName()).thenReturn(hostname);
Host host2 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host2);
when(host2.getState()).thenReturn(HostState.HEALTHY);
when(host2.getHostName()).thenReturn(hostname);
Host host3 = mock(Host.class);
when(fsm.getHost(anyString())).thenReturn(host3);
when(host3.getState()).thenReturn(HostState.HEALTHY);
when(host3.getHostName()).thenReturn(hostname);
ActionDBAccessor db = mock(ActionDBAccessor.class);
when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
when(db.getStagesInProgress()).thenReturn(stagesInProgress);
List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
for (Stage stage : stagesInProgress) {
requestTasks.addAll(stage.getOrderedHostRoleCommands());
}
when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
when(db.getAllStages(anyLong())).thenReturn(stagesInProgress);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
for (CommandReport report : reports) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
Long requestId = requestStageIds[0];
Long stageId = requestStageIds[1];
Long id = report.getTaskId();
for (Stage stage : stagesInProgress) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
if (hostRoleCommand.getTaskId() == id) {
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
}
}
}
}
}
return null;
}
}).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stagesInProgress) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
final Map<Long, Boolean> startedRequests = new HashMap<Long, Boolean>();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
startedRequests.put((Long)invocation.getArguments()[0], true);
return null;
}
}).when(db).startRequest(anyLong());
RequestEntity request1 = mock(RequestEntity.class);
when(request1.isExclusive()).thenReturn(false);
RequestEntity request2 = mock(RequestEntity.class);
when(request2.isExclusive()).thenReturn(true);
RequestEntity request3 = mock(RequestEntity.class);
when(request3.isExclusive()).thenReturn(false);
when(db.getRequestEntity(requestId1)).thenReturn(request1);
when(db.getRequestEntity(requestId2)).thenReturn(request2);
when(db.getRequestEntity(requestId3)).thenReturn(request3);
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
// Execution of request 1
scheduler.doWork();
Assert.assertTrue(startedRequests.containsKey(requestId1));
Assert.assertFalse(startedRequests.containsKey(requestId2));
Assert.assertFalse(startedRequests.containsKey(requestId3));
stagesInProgress.remove(0);
scheduler.doWork();
Assert.assertTrue(startedRequests.containsKey(requestId1));
Assert.assertFalse(startedRequests.containsKey(requestId2));
Assert.assertFalse(startedRequests.containsKey(requestId3));
// Execution of request 2
stagesInProgress.remove(0);
scheduler.doWork();
Assert.assertTrue(startedRequests.containsKey(requestId1));
Assert.assertTrue(startedRequests.containsKey(requestId2));
Assert.assertFalse(startedRequests.containsKey(requestId3));
// Execution of request 3
stagesInProgress.remove(0);
scheduler.doWork();
Assert.assertTrue(startedRequests.containsKey(requestId1));
Assert.assertTrue(startedRequests.containsKey(requestId2));
Assert.assertTrue(startedRequests.containsKey(requestId3));
}
@Test
public void testAbortHolding() {
UnitOfWork unitOfWork = EasyMock.createMock(UnitOfWork.class);
ActionDBAccessor db = EasyMock.createMock(ActionDBAccessor.class);
ActionQueue aq = new ActionQueue();
Clusters fsm = EasyMock.createMock(Clusters.class);
Configuration conf = new Configuration(new Properties());
HostEntity hostEntity1 = new HostEntity();
hostEntity1.setHostName("h1");
hostDAO.merge(hostEntity1);
db.abortHostRole("h1", -1L, -1L, "AMBARI_SERVER_ACTION");
EasyMock.expectLastCall();
EasyMock.replay(db);
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null),
unitOfWork, null, conf);
HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE);
hrc1.setStatus(HostRoleStatus.COMPLETED);
HostRoleCommand hrc3 = hostRoleCommandFactory.create("h1", Role.AMBARI_SERVER_ACTION, null, RoleCommand.CUSTOM_COMMAND);
hrc3.setStatus(HostRoleStatus.HOLDING);
HostRoleCommand hrc4 = hostRoleCommandFactory.create("h1", Role.FLUME_HANDLER, null, RoleCommand.EXECUTE);
hrc4.setStatus(HostRoleStatus.PENDING);
List<HostRoleCommand> hostRoleCommands = Arrays.asList(hrc1, hrc3, hrc4);
scheduler.cancelHostRoleCommands(hostRoleCommands, "foo");
EasyMock.verify(db);
}
/**
* Tests that command failures in skippable stages do not cause the request to
* be aborted.
*/
@Test
public void testSkippableCommandFailureDoesNotAbortRequest() throws Exception {
Properties properties = new Properties();
Configuration conf = new Configuration(properties);
ActionQueue aq = new ActionQueue();
Clusters fsm = mock(Clusters.class);
Cluster oneClusterMock = mock(Cluster.class);
Service serviceObj = mock(Service.class);
ServiceComponent scomp = mock(ServiceComponent.class);
ServiceComponentHost sch = mock(ServiceComponentHost.class);
UnitOfWork unitOfWork = mock(UnitOfWork.class);
when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
when(serviceObj.getCluster()).thenReturn(oneClusterMock);
String hostname1 = "ahost.ambari.apache.org";
HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>();
hosts.put(hostname1, sch);
when(scomp.getServiceComponentHosts()).thenReturn(hosts);
// create 1 stage with 2 commands and then another stage with 1 command
Stage stage = null;
Stage stage2 = null;
final List<Stage> stages = new ArrayList<Stage>();
stages.add(stage = getStageWithSingleTask(hostname1, "cluster1", Role.NAMENODE,
RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
addInstallTaskToStage(stage, hostname1, "cluster1", Role.HBASE_MASTER, RoleCommand.INSTALL,
Service.Type.HBASE, 1);
stages.add(stage2 = getStageWithSingleTask(hostname1, "cluster1", Role.DATANODE,
RoleCommand.STOP, Service.Type.HDFS, 1, 1, 1));
// !!! this is the test; make the stages skippable so that when their
// commands fail, the entire request is not aborted
for (Stage stageToMakeSkippable : stages) {
stageToMakeSkippable.setSkippable(true);
}
// fail the first task - normally this would cause an abort, exception that our stages
// are skippable now so it should not
HostRoleCommand command = stage.getOrderedHostRoleCommands().iterator().next();
command.setStatus(HostRoleStatus.FAILED);
ActionDBAccessor db = mock(ActionDBAccessor.class);
RequestEntity request = mock(RequestEntity.class);
when(request.isExclusive()).thenReturn(false);
when(db.getRequestEntity(anyLong())).thenReturn(request);
when(db.getCommandsInProgressCount()).thenReturn(stages.size());
when(db.getStagesInProgress()).thenReturn(stages);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
for (CommandReport report : reports) {
String actionId = report.getActionId();
long[] requestStageIds = StageUtils.getRequestStage(actionId);
Long requestId = requestStageIds[0];
Long stageId = requestStageIds[1];
Long id = report.getTaskId();
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
if (hostRoleCommand.getTaskId() == id) {
hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
}
}
}
}
}
return null;
}
}).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));
when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Long taskId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (taskId.equals(command.getTaskId())) {
return command;
}
}
}
return null;
}
});
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Long requestId = (Long) invocation.getArguments()[0];
for (Stage stage : stages) {
if (requestId.equals(stage.getRequestId())) {
for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
if (command.getStatus() == HostRoleStatus.QUEUED
|| command.getStatus() == HostRoleStatus.IN_PROGRESS
|| command.getStatus() == HostRoleStatus.PENDING) {
command.setStatus(HostRoleStatus.ABORTED);
}
}
}
}
return null;
}
}).when(db).abortOperation(anyLong());
ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
new HostsMap((String) null), unitOfWork, null, conf);
scheduler.doWork();
Assert.assertEquals(HostRoleStatus.FAILED,
stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
// the remaining tasks should NOT have been aborted since the stage is
// skippable - these tasks would normally be ABORTED if the stage was not
// skippable
Assert.assertEquals(HostRoleStatus.QUEUED,
stages.get(0).getHostRoleStatus(hostname1, "HBASE_MASTER"));
Assert.assertEquals(HostRoleStatus.PENDING,
stages.get(1).getHostRoleStatus(hostname1, "DATANODE"));
}
public static class MockModule extends AbstractModule {
@Override
protected void configure() {
bind(Clusters.class).toInstance(mock(Clusters.class));
}
}
}