blob: 05e7165e336188e619dfc7f949a374bd31c05b9d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.querymaster;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tajo.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.master.event.*;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.resource.NodeResource;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TajoWorker;
import org.apache.tajo.worker.event.QMResourceAllocateEvent;
import java.net.InetSocketAddress;
import static org.apache.tajo.ResourceProtos.*;
public class QueryMasterManagerService extends CompositeService
implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
private AsyncRpcServer rpcServer;
private InetSocketAddress bindAddr;
private String addr;
private int port;
private QueryMaster queryMaster;
private TajoWorker.WorkerContext workerContext;
public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
super(QueryMasterManagerService.class.getName());
this.workerContext = workerContext;
this.port = port;
}
public QueryMaster getQueryMaster() {
return queryMaster;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
// Setup RPC server
InetSocketAddress initIsa =
new InetSocketAddress("0.0.0.0", port);
if (initIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
this.port = bindAddr.getPort();
queryMaster = new QueryMaster(workerContext);
addService(queryMaster);
// Get the master address
LOG.info("QueryMasterManagerService is bind to " + addr);
tajoConf.setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
super.serviceInit(conf);
}
@Override
public void serviceStop() throws Exception {
if(rpcServer != null) {
rpcServer.shutdown();
}
LOG.info("QueryMasterManagerService stopped");
super.serviceStop();
}
public InetSocketAddress getBindAddr() {
return bindAddr;
}
@Override
public void statusUpdate(RpcController controller, TaskStatusProto request,
RpcCallback<PrimitiveProtos.NullProto> done) {
QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
TaskAttemptId attemptId = new TaskAttemptId(request.getId());
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
if (queryMasterTask != null) {
Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
Task task = sq.getTask(attemptId.getTaskId());
TaskAttempt attempt = task.getAttempt(attemptId.getId());
if(LOG.isDebugEnabled()){
LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
}
if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
LOG.warn(attemptId + " Killed");
attempt.handle(
new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
} else {
queryMasterTask.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
}
}
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void ping(RpcController controller,
TajoIdProtos.ExecutionBlockIdProto requestProto,
RpcCallback<PrimitiveProtos.NullProto> done) {
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void fatalError(RpcController controller, TaskFatalErrorReport report,
RpcCallback<PrimitiveProtos.NullProto> done) {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
if (queryMasterTask != null) {
queryMasterTask.handleTaskFailed(report);
} else {
LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
}
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void done(RpcController controller, TaskCompletionReport report,
RpcCallback<PrimitiveProtos.NullProto> done) {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
if (queryMasterTask != null) {
queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
}
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void doneExecutionBlock(
RpcController controller, ExecutionBlockReport request,
RpcCallback<PrimitiveProtos.NullProto> done) {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
if (queryMasterTask != null) {
ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
}
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void getExecutionBlockContext(RpcController controller,
ExecutionBlockContextRequest request,
RpcCallback<ExecutionBlockContextResponse> done) {
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
new QueryId(request.getExecutionBlockId().getQueryId()));
if (queryMasterTask != null) {
Stage stage = queryMasterTask.getQuery().getStage(new ExecutionBlockId(request.getExecutionBlockId()));
// first request with starting ExecutionBlock
PlanProto.ShuffleType shuffleType = stage.getDataChannel().getShuffleType();
ExecutionBlockContextResponse.Builder ebRequestProto = ExecutionBlockContextResponse.newBuilder();
ebRequestProto.setExecutionBlockId(request.getExecutionBlockId())
.setQueryContext(stage.getContext().getQueryContext().getProto())
.setQueryOutputPath(stage.getContext().getStagingDir().toString())
.setPlanJson(CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class))
.setShuffleType(shuffleType);
//Set assigned worker to stage
if (!stage.getAssignedWorkerMap().containsKey(request.getWorker().getId())) {
stage.getAssignedWorkerMap().put(request.getWorker().getId(),
NetUtils.createSocketAddr(request.getWorker().getHost(), request.getWorker().getPeerRpcPort()));
}
done.run(ebRequestProto.build());
} else {
controller.setFailed("Can't find query. request: " + request);
}
}
@Override
public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
RpcCallback<PrimitiveProtos.NullProto> done) {
QueryId queryId = new QueryId(request);
QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
if (queryMasterTask != null) {
queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
}
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void executeQuery(RpcController controller,
QueryExecutionRequest request,
RpcCallback<PrimitiveProtos.NullProto> done) {
QueryId queryId = new QueryId(request.getQueryId());
LOG.info("Receive executeQuery request:" + queryId);
queryMaster.handle(new QueryStartEvent(queryId,
new Session(request.getSession()),
new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
request.getQueryContext()), request.getExprInJson().getValue(),
request.getLogicalPlanJson().getValue(), new NodeResource(request.getAllocation().getResource())));
done.run(TajoWorker.NULL_PROTO);
}
@Override
public void allocateQueryMaster(RpcController controller,
AllocationResourceProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>();
workerContext.getNodeResourceManager().handle(new QMResourceAllocateEvent(request, callFuture));
try {
done.run(callFuture.get());
} catch (Exception e) {
controller.setFailed(e.getMessage());
done.run(TajoWorker.FALSE_PROTO);
}
}
}