blob: d7b516add2c48e736c328b0c7014243d24a38d9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AllocatedTask;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AsyncDelegateRequestHandler;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.SchedulerRequest;
public class TestLocalTaskScheduler {
@Test(timeout = 5000)
public void maxTasksAllocationsCannotBeExceeded() {
final int MAX_TASKS = 4;
TezConfiguration tezConf = new TezConfiguration();
tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS);
ApplicationId appId = ApplicationId.newInstance(2000, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
TaskSchedulerContext
mockContext = TestTaskSchedulerHelpers.setupMockTaskSchedulerContext("", 0, "", true,
appAttemptId, 1000l, null, new Configuration());
LocalContainerFactory containerFactory = new LocalContainerFactory(appAttemptId, 1000);
HashMap<Object, AllocatedTask> taskAllocations = new LinkedHashMap<>();
LinkedBlockingQueue<SchedulerRequest> clientRequestQueue = new LinkedBlockingQueue<>();
// Object under test
AsyncDelegateRequestHandler requestHandler =
new AsyncDelegateRequestHandler(clientRequestQueue,
containerFactory,
taskAllocations,
mockContext,
tezConf);
// Allocate up to max tasks
for (int i = 0; i < MAX_TASKS; i++) {
Priority priority = Priority.newInstance(20);
requestHandler.addAllocateTaskRequest(new Long(i), null, priority, null);
requestHandler.dispatchRequest();
requestHandler.allocateTask();
}
// Only MAX_TASKS number of tasks should have been allocated
Assert.assertEquals("Wrong number of allocate tasks", MAX_TASKS, taskAllocations.size());
Assert.assertTrue("Another allocation should not fit", !requestHandler.shouldProcess());
// Deallocate down to zero
for (int i = 0; i < MAX_TASKS; i++) {
requestHandler.addDeallocateTaskRequest(new Long(i));
requestHandler.dispatchRequest();
}
// All allocated tasks should have been removed
Assert.assertEquals("Wrong number of allocate tasks", 0, taskAllocations.size());
}
}