| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.slider.server.appmaster.model.mock |
| |
| import groovy.transform.CompileStatic |
| import groovy.util.logging.Slf4j |
| import org.apache.hadoop.conf.Configuration |
| import org.apache.hadoop.fs.FileSystem as HadoopFS |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.yarn.api.records.Container |
| import org.apache.hadoop.yarn.api.records.ContainerId |
| import org.apache.hadoop.yarn.api.records.ContainerState |
| import org.apache.hadoop.yarn.api.records.ContainerStatus |
| import org.apache.hadoop.yarn.conf.YarnConfiguration |
| import org.apache.slider.common.tools.SliderFileSystem |
| import org.apache.slider.common.tools.SliderUtils |
| import org.apache.slider.core.conf.AggregateConf |
| import org.apache.slider.core.main.LauncherExitCodes |
| import org.apache.slider.server.appmaster.operations.AbstractRMOperation |
| import org.apache.slider.server.appmaster.state.* |
| import org.apache.slider.test.SliderTestBase |
| import org.junit.Before |
| |
| @CompileStatic |
| @Slf4j |
| abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles { |
| public static final int RM_MAX_RAM = 4096 |
| public static final int RM_MAX_CORES = 64 |
| MockFactory factory = new MockFactory() |
| MockAppState appState |
| MockYarnEngine engine |
| protected HadoopFS fs |
| protected SliderFileSystem sliderFileSystem |
| protected File historyWorkDir |
| protected Path historyPath; |
| protected MockApplicationId applicationId; |
| protected MockApplicationAttemptId applicationAttemptId; |
| |
| @Override |
| void setup() { |
| super.setup() |
| YarnConfiguration conf = SliderUtils.createConfiguration() |
| fs = HadoopFS.get(new URI("file:///"), conf) |
| sliderFileSystem = new SliderFileSystem(fs, conf) |
| engine = createYarnEngine() |
| } |
| |
| /** |
| * Override point: called in setup() to create the YARN engine; can |
| * be changed for different sizes and options |
| * @return |
| */ |
| public MockYarnEngine createYarnEngine() { |
| return new MockYarnEngine(64, 1) |
| } |
| |
| @Before |
| void initApp(){ |
| |
| String historyDirName = testName; |
| |
| |
| YarnConfiguration conf = SliderUtils.createConfiguration() |
| applicationId = new MockApplicationId(id: 1, clusterTimestamp: 0) |
| applicationAttemptId = new MockApplicationAttemptId( |
| applicationId: applicationId, |
| attemptId: 1) |
| |
| fs = HadoopFS.get(new URI("file:///"), conf) |
| historyWorkDir = new File("target/history", historyDirName) |
| historyPath = new Path(historyWorkDir.toURI()) |
| fs.delete(historyPath, true) |
| appState = new MockAppState() |
| appState.setContainerLimits(RM_MAX_RAM, RM_MAX_CORES) |
| appState.buildInstance( |
| buildInstanceDefinition(), |
| new Configuration(), |
| new Configuration(false), |
| factory.ROLES, |
| fs, |
| historyPath, |
| null, null, |
| new SimpleReleaseSelector()) |
| } |
| |
| /** |
| * Override point, define the instance definition |
| * @return |
| */ |
| public AggregateConf buildInstanceDefinition() { |
| factory.newInstanceDefinition(0, 0, 0) |
| } |
| |
| /** |
| * Get the test name ... defaults to method name |
| * @return |
| */ |
| String getTestName() { |
| methodName.methodName; |
| } |
| |
| public RoleStatus getRole0Status() { |
| return appState.lookupRoleStatus(ROLE0) |
| } |
| |
| public RoleStatus getRole1Status() { |
| return appState.lookupRoleStatus(ROLE1) |
| } |
| |
| public RoleStatus getRole2Status() { |
| return appState.lookupRoleStatus(ROLE2) |
| } |
| |
| /** |
| * Build a role instance from a container assignment |
| * @param assigned |
| * @return |
| */ |
| RoleInstance roleInstance(ContainerAssignment assigned) { |
| Container target = assigned.container |
| RoleInstance ri = new RoleInstance(target) |
| ri.roleId = assigned.role.priority |
| ri.role = assigned.role.name |
| return ri |
| } |
| |
| |
| public NodeInstance nodeInstance(long age, int live0, int live1=0, int live2=0) { |
| NodeInstance ni = new NodeInstance("age${age}live[${live0},${live1},$live2]", |
| MockFactory.ROLE_COUNT) |
| ni.getOrCreate(0).lastUsed = age |
| ni.getOrCreate(0).live = live0; |
| if (live1 > 0) { |
| ni.getOrCreate(1).live = live1; |
| } |
| if (live2 > 0) { |
| ni.getOrCreate(2).live = live2; |
| } |
| return ni |
| } |
| |
| /** |
| * Create a container status event |
| * @param c container |
| * @return a status |
| */ |
| ContainerStatus containerStatus(Container c) { |
| return containerStatus(c.id) |
| } |
| |
| /** |
| * Create a container status instance for the given ID, declaring |
| * that it was shut down by the application itself |
| * @param cid container Id |
| * @return the instance |
| */ |
| public ContainerStatus containerStatus(ContainerId cid) { |
| ContainerStatus status = containerStatus(cid, |
| LauncherExitCodes.EXIT_CLIENT_INITIATED_SHUTDOWN) |
| return status |
| } |
| |
| public ContainerStatus containerStatus(ContainerId cid, int exitCode) { |
| ContainerStatus status = ContainerStatus.newInstance( |
| cid, |
| ContainerState.COMPLETE, |
| "", |
| exitCode) |
| return status |
| } |
| |
| /** |
| * Create nodes and bring them to the started state |
| * @return a list of roles |
| */ |
| protected List<RoleInstance> createAndStartNodes() { |
| return createStartAndStopNodes([]) |
| } |
| |
| |
| /** |
| * Create, Start and stop nodes |
| * @param completionResults List filled in with the status on all completed nodes |
| * @return the nodes |
| */ |
| public List<RoleInstance> createStartAndStopNodes( |
| List<AppState.NodeCompletionResult> completionResults) { |
| List<ContainerId> released = [] |
| List<RoleInstance> instances = createAndSubmitNodes(released) |
| processSubmissionOperations(instances, completionResults, released) |
| return instances |
| } |
| |
| /** |
| * Process the start/stop operations from |
| * @param instances |
| * @param completionResults |
| * @param released |
| */ |
| public void processSubmissionOperations( |
| List<RoleInstance> instances, |
| List<AppState.NodeCompletionResult> completionResults, |
| List<ContainerId> released) { |
| for (RoleInstance instance : instances) { |
| assert appState.onNodeManagerContainerStarted(instance.containerId) |
| } |
| releaseContainers(completionResults, |
| released, |
| ContainerState.COMPLETE, |
| "released", |
| 0 |
| ) |
| } |
| |
| /** |
| * Release a list of containers, updating the completion results |
| * @param completionResults |
| * @param containerIds |
| * @param containerState |
| * @param exitText |
| * @param containerExitCode |
| * @return |
| */ |
| public def releaseContainers( |
| List<AppState.NodeCompletionResult> completionResults, |
| List<ContainerId> containerIds, |
| ContainerState containerState, |
| String exitText, |
| int containerExitCode) { |
| containerIds.each { ContainerId id -> |
| ContainerStatus status = ContainerStatus.newInstance(id, |
| containerState, |
| exitText, |
| containerExitCode) |
| completionResults << appState.onCompletedNode(status) |
| |
| } |
| } |
| |
| /** |
| * Create nodes and submit them |
| * @return a list of roles |
| */ |
| public List<RoleInstance> createAndSubmitNodes() { |
| return createAndSubmitNodes([]) |
| } |
| |
| /** |
| * Create nodes and submit them |
| * @param released a list that is built up of all released nodes |
| * @return a list of roles allocated |
| */ |
| public List<RoleInstance> createAndSubmitNodes( |
| List<ContainerId> released) { |
| List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() |
| return submitOperations(ops, released) |
| } |
| |
| /** |
| * Process the RM operations and send <code>onContainersAllocated</code> |
| * events to the app state |
| * @param ops |
| * @param released |
| * @return |
| */ |
| public List<RoleInstance> submitOperations( |
| List<AbstractRMOperation> ops, |
| List<ContainerId> released) { |
| List<Container> allocatedContainers = engine.execute(ops, released) |
| List<ContainerAssignment> assignments = []; |
| List<AbstractRMOperation> operations = [] |
| appState.onContainersAllocated(allocatedContainers, assignments, operations) |
| List<RoleInstance> instances = [] |
| for (ContainerAssignment assigned : assignments) { |
| Container container = assigned.container |
| RoleInstance ri = roleInstance(assigned) |
| instances << ri |
| //tell the app it arrived |
| appState.containerStartSubmitted(container, ri); |
| } |
| return instances |
| } |
| |
| /** |
| * Add the AM to the app state |
| */ |
| protected void addAppMastertoAppState() { |
| appState.buildAppMasterNode( |
| new MockContainerId(applicationAttemptId, 999999L), |
| "appmaster", |
| 0, |
| null) |
| } |
| |
| /** |
| * Extract the list of container IDs from the list of role instances |
| * @param instances instance list |
| * @param role role to look up |
| * @return the list of CIDs |
| */ |
| List<ContainerId> extractContainerIds( |
| List<RoleInstance> instances, |
| int role) { |
| List<ContainerId> cids = [] |
| instances.each { RoleInstance instance -> |
| if (instance.roleId == role) { |
| cids << instance.id |
| } |
| } |
| return cids |
| } |
| |
| } |