blob: 7b32ad61a1b7b2d87995e9fd617edc88baf454f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.querymaster;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tajo.*;
import org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
import org.apache.tajo.master.event.StageEvent;
import org.apache.tajo.master.event.StageEventType;
import org.apache.tajo.parser.sql.SQLAnalyzer;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.resource.NodeResources;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.worker.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class TestKillQuery {
private static TajoTestingCluster cluster;
private static TajoConf conf;
private static TajoClient client;
private static String queryStr = "select t1.l_orderkey, t1.l_partkey, t2.c_custkey " +
"from lineitem t1 join customer t2 " +
"on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey";
@BeforeClass
public static void setUp() throws Exception {
cluster = new TajoTestingCluster();
cluster.startMiniClusterInLocal(1);
conf = cluster.getConfiguration();
client = cluster.newTajoClient();
client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+ "using text location '" + TpchTestBase.getInstance().getPath("lineitem") + "'");
assertTrue(client.existTable("default.lineitem"));
client.executeQueryAndGetResult("create external table default.customer (c_custkey int, c_name text) "
+ "using text location '" + TpchTestBase.getInstance().getPath("customer") + "'");
assertTrue(client.existTable("default.customer"));
}
@AfterClass
public static void tearDown() throws IOException {
if (client != null) client.close();
if (cluster != null) cluster.shutdownMiniCluster();
}
@Test
public final void testKillQueryFromInitState() throws Exception {
SQLAnalyzer analyzer = new SQLAnalyzer();
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
Expr expr = analyzer.parse(queryStr);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
optimizer.optimize(plan);
QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
QueryContext queryContext = new QueryContext(conf);
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
globalPlanner.build(queryContext, masterPlan);
CountDownLatch barrier = new CountDownLatch(1);
MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, StageEventType.SQ_INIT);
QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch);
queryMasterTask.init(conf);
queryMasterTask.getQueryTaskContext().getDispatcher().start();
queryMasterTask.startQuery();
try{
barrier.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
}
Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
assertNotNull(stage);
// fire kill event
queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
try {
cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
} catch (Exception e) {
e.printStackTrace();
if (stage != null) {
System.err.println(String.format("Stage: [%s] (Total: %d, Complete: %d, Success: %d, Killed: %d, Failed: %d)",
stage.getId().toString(),
stage.getTotalScheduledObjectsCount(),
stage.getCompletedTaskCount(),
stage.getSucceededObjectCount(),
stage.getKilledObjectCount(),
stage.getFailedObjectCount()));
}
throw e;
} finally {
queryMasterTask.stop();
}
}
@Test
public final void testIgnoreStageStateFromKilled() throws Exception {
SQLAnalyzer analyzer = new SQLAnalyzer();
QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
Session session = LocalTajoTestingUtility.createDummySession();
CatalogService catalog = cluster.getMaster().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog, TablespaceManager.getInstance());
Expr expr = analyzer.parse(queryStr);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
optimizer.optimize(plan);
QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
QueryContext queryContext = new QueryContext(conf);
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
globalPlanner.build(queryContext, masterPlan);
CountDownLatch barrier = new CountDownLatch(1);
MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
queryId, session, defaultContext, expr.toJson(), NodeResources.createResource(512), dispatch);
queryMasterTask.init(conf);
queryMasterTask.getQueryTaskContext().getDispatcher().start();
queryMasterTask.startQuery();
try{
barrier.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
}
Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
assertNotNull(stage);
// fire kill event
queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
try {
cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
} finally {
queryMasterTask.stop();
}
List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
Stage lastStage = stages.get(stages.size() - 1);
assertEquals(StageState.KILLED, lastStage.getSynchronizedState());
lastStage.getStateMachine().doTransition(StageEventType.SQ_START,
new StageEvent(lastStage.getId(), StageEventType.SQ_START));
lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL,
new StageEvent(lastStage.getId(), StageEventType.SQ_KILL));
lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT,
new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT));
lastStage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED,
new StageEvent(lastStage.getId(), StageEventType.SQ_STAGE_COMPLETED));
lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED,
new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED));
}
@Test
public void testKillTask() throws Throwable {
QueryId qid = LocalTajoTestingUtility.newQueryId();
ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
TaskId tid = QueryIdFactory.newTaskId(eid);
final TajoConf conf = new TajoConf();
TaskRequestImpl taskRequest = new TaskRequestImpl();
WorkerConnectionInfo queryMaster = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
taskRequest.set(attemptId, new ArrayList<CatalogProtos.FragmentProto>(),
null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf),
null, null, queryMaster.getHostAndQMPort());
taskRequest.setInterQuery();
ExecutionBlockContextResponse.Builder requestProtoBuilder =
ExecutionBlockContextResponse.newBuilder();
requestProtoBuilder.setExecutionBlockId(eid.getProto())
.setPlanJson("test")
.setQueryContext(new QueryContext(conf).getProto())
.setQueryOutputPath("testpath")
.setShuffleType(PlanProto.ShuffleType.HASH_SHUFFLE);
TajoWorker.WorkerContext workerContext = new MockWorkerContext() {
@Override
public TajoConf getConf() {
return conf;
}
@Override
public TaskManager getTaskManager() {
return null;
}
@Override
public TaskExecutor getTaskExecuor() {
return null;
}
@Override
public NodeResourceManager getNodeResourceManager() {
return null;
}
};
ExecutionBlockContext context = new MockExecutionBlock(workerContext, requestProtoBuilder.build()) {
@Override
public Path createBaseDir() throws IOException {
return new Path("test");
}
};
org.apache.tajo.worker.Task task = new TaskImpl(taskRequest, context);
task.kill();
assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
try {
task.run();
assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
} catch (Exception e) {
assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getTaskContext().getState());
}
}
static class MockAsyncDispatch extends AsyncDispatcher {
private CountDownLatch latch;
private Enum eventType;
MockAsyncDispatch(CountDownLatch latch, Enum eventType) {
super();
this.latch = latch;
this.eventType = eventType;
}
@Override
protected void dispatch(Event event) {
if (event.getType() == eventType) {
latch.countDown();
}
super.dispatch(event);
}
}
}