blob: 311f194f076c2a8aebc54bcd5fb7a704cc839f01 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.worker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.tajo.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ResourceProtos.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestTaskExecutor {
private NodeResourceManager resourceManager;
private NodeStatusUpdater statusUpdater;
private TaskManager taskManager;
private MyTaskExecutor taskExecutor;
private AsyncDispatcher dispatcher;
private AsyncDispatcher taskDispatcher;
private TajoWorker.WorkerContext workerContext;
private CompositeService service;
private TajoConf conf;
private Semaphore barrier;
private Semaphore resourceManagerBarrier;
@Before
public void setup() {
conf = new TajoConf();
conf.setBoolVar(TajoConf.ConfVars.$TEST_MODE, true);
conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 2);
conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM, 2);
dispatcher = new AsyncDispatcher();
taskDispatcher = new AsyncDispatcher();
workerContext = new MockWorkerContext() {
WorkerConnectionInfo workerConnectionInfo;
@Override
public TajoConf getConf() {
return conf;
}
@Override
public TaskManager getTaskManager() {
return taskManager;
}
@Override
public org.apache.tajo.worker.TaskExecutor getTaskExecuor() {
return taskExecutor;
}
@Override
public NodeResourceManager getNodeResourceManager() {
return resourceManager;
}
@Override
public WorkerConnectionInfo getConnectionInfo() {
if (workerConnectionInfo == null) {
workerConnectionInfo = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
}
return workerConnectionInfo;
}
};
barrier = new Semaphore(0);
resourceManagerBarrier = new Semaphore(0);
taskManager = new MockTaskManager(new Semaphore(0), taskDispatcher, workerContext);
taskExecutor = new MyTaskExecutor(barrier, workerContext);
resourceManager = new MockNodeResourceManager(resourceManagerBarrier, dispatcher, workerContext);
statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), workerContext);
service = new CompositeService("MockService") {
@Override
protected void serviceInit(Configuration conf) throws Exception {
addIfService(dispatcher);
addIfService(taskDispatcher);
addIfService(taskManager);
addIfService(taskExecutor);
addIfService(resourceManager);
addIfService(statusUpdater);
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
workerContext.getMetrics().stop();
super.serviceStop();
}
};
service.init(conf);
service.start();
}
@After
public void tearDown() {
service.stop();
}
@Test
public void testTaskRequest() throws Exception {
int requestSize = 1;
QueryId qid = LocalTajoTestingUtility.newQueryId();
ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
CallFuture<BatchAllocationResponse> callFuture = new CallFuture<>();
BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
requestProto.setExecutionBlockId(ebId.getProto());
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
//verify running task
assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
assertEquals(1, taskExecutor.getRunningTasks());
assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
assertEquals(0, taskExecutor.getRunningTasks());
assertEquals(1, taskExecutor.completeTasks);
//verify the released resources
Thread.sleep(100);
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
}
@Test
public void testTaskException() throws Exception {
int requestSize = 1;
QueryId qid = LocalTajoTestingUtility.newQueryId();
ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(qid, 1);
CallFuture<BatchAllocationResponse> callFuture = new CallFuture<>();
BatchAllocationRequest.Builder requestProto = BatchAllocationRequest.newBuilder();
requestProto.setExecutionBlockId(ebId.getProto());
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
requestProto.addAllTaskRequest(MockNodeResourceManager.createTaskRequests(ebId, 10, requestSize));
taskExecutor.throwException.set(true);
dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture));
//verify running task
assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
assertEquals(1, taskExecutor.getRunningTasks());
assertTrue(barrier.tryAcquire(3, TimeUnit.SECONDS));
assertEquals(0, taskExecutor.getRunningTasks());
assertEquals(0, taskExecutor.completeTasks);
//verify the released resources
Thread.sleep(100);
assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource());
}
class MyTaskExecutor extends MockTaskExecutor {
int completeTasks;
AtomicBoolean throwException = new AtomicBoolean();
public MyTaskExecutor(Semaphore barrier, TajoWorker.WorkerContext workerContext) {
super(barrier, workerContext);
}
@Override
protected void stopTask(TaskAttemptId taskId) {
super.stopTask(taskId);
super.barrier.release();
}
@Override
protected Task createTask(final ExecutionBlockContext context, TaskRequestProto taskRequest) {
final TaskAttemptId taskAttemptId = new TaskAttemptId(taskRequest.getId());
final TaskAttemptContext taskAttemptContext =
new TaskAttemptContext(new QueryContext(conf), context, taskAttemptId, null, null);
return new Task() {
@Override
public void init() throws IOException {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void fetch(ExecutorService fetchExecutor) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() throws Exception {
Thread.sleep(50);
if(throwException.get()) throw new RuntimeException();
taskAttemptContext.stop();
taskAttemptContext.setProgress(1.0f);
taskAttemptContext.setState(TajoProtos.TaskAttemptState.TA_SUCCEEDED);
completeTasks++;
}
@Override
public void kill() {
}
@Override
public void abort() {
}
@Override
public void cleanup() {
}
@Override
public boolean hasFetchPhase() {
return false;
}
@Override
public boolean isProgressChanged() {
return false;
}
@Override
public boolean isStopped() {
return taskAttemptContext.isStopped();
}
@Override
public void updateProgress() {
}
@Override
public TaskAttemptContext getTaskContext() {
return taskAttemptContext;
}
@Override
public ExecutionBlockContext getExecutionBlockContext() {
return context;
}
@Override
public TaskStatusProto getReport() {
TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
builder.setWorkerName("localhost:0");
builder.setId(taskAttemptContext.getTaskId().getProto())
.setProgress(taskAttemptContext.getProgress())
.setState(taskAttemptContext.getState());
builder.setInputStats(new TableStats().getProto());
return builder.build();
}
@Override
public TaskHistory createTaskHistory() {
return null;
}
@Override
public List<AbstractFetcher> getFetchers() {
return null;
}
};
}
}
}