blob: 1b900809aebdcfbb263269fcc46e90acc1478352 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.querymaster;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tajo.QueryId;
import org.apache.tajo.ResourceProtos.TajoHeartbeatRequest;
import org.apache.tajo.ResourceProtos.TajoHeartbeatResponse;
import org.apache.tajo.ResourceProtos.WorkerConnectionsResponse;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.ReturnStateUtil;
import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.master.event.QueryStopEvent;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.RpcParameterFactory;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.HistoryWriter.WriterFuture;
import org.apache.tajo.util.history.HistoryWriter.WriterHolder;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.worker.TajoWorker;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class QueryMaster extends CompositeService implements EventHandler {
private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
private int querySessionTimeout;
private Clock clock;
private AsyncDispatcher dispatcher;
private GlobalPlanner globalPlanner;
private TajoConf systemConf;
private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
private LRUMap finishedQueryMasterTasksCache;
private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
private volatile boolean isStopped;
private QueryMasterContext queryMasterContext;
private QueryContext queryContext;
private QueryHeartbeatThread queryHeartbeatThread;
private TajoWorker.WorkerContext workerContext;
private RpcClientManager manager;
private Properties rpcClientParams;
private ExecutorService eventExecutor;
private ExecutorService singleEventExecutor;
public QueryMaster(TajoWorker.WorkerContext workerContext) {
super(QueryMaster.class.getName());
this.workerContext = workerContext;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
this.manager = RpcClientManager.getInstance();
this.rpcClientParams = RpcParameterFactory.get(this.systemConf);
querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
queryMasterContext = new QueryMasterContext(systemConf);
clock = new SystemClock();
finishedQueryMasterTasksCache = new LRUMap(systemConf.getIntVar(TajoConf.ConfVars.HISTORY_QUERY_CACHE_SIZE));
this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
globalPlanner = new GlobalPlanner(systemConf, workerContext);
dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler());
super.serviceInit(conf);
LOG.info("QueryMaster inited");
}
@Override
public void serviceStart() throws Exception {
queryHeartbeatThread = new QueryHeartbeatThread();
queryHeartbeatThread.start();
clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
clientSessionTimeoutCheckThread.start();
eventExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
singleEventExecutor = Executors.newSingleThreadExecutor();
super.serviceStart();
LOG.info("QueryMaster started");
}
@Override
public void serviceStop() throws Exception {
isStopped = true;
if(queryHeartbeatThread != null) {
queryHeartbeatThread.interrupt();
}
if(clientSessionTimeoutCheckThread != null) {
clientSessionTimeoutCheckThread.interrupt();
}
if(eventExecutor != null){
eventExecutor.shutdown();
}
if(singleEventExecutor != null){
singleEventExecutor.shutdown();
}
super.serviceStop();
LOG.info("QueryMaster stopped");
}
public List<TajoProtos.WorkerConnectionInfoProto> getAllWorker() {
NettyClientBase rpc = null;
try {
// In TajoMaster HA mode, if backup master be active status,
// worker may fail to connect existing active master. Thus,
// if worker can't connect the master, worker should try to connect another master and
// update master address in worker context.
ServiceTracker serviceTracker = workerContext.getServiceTracker();
rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true,
rpcClientParams);
QueryCoordinatorProtocolService masterService = rpc.getStub();
CallFuture<WorkerConnectionsResponse> callBack = new CallFuture<WorkerConnectionsResponse>();
masterService.getAllWorkers(callBack.getController(),
PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
WorkerConnectionsResponse connectionsProto =
callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
return connectionsProto.getWorkerList();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return new ArrayList<TajoProtos.WorkerConnectionInfoProto>();
}
@Override
public void handle(Event event) {
dispatcher.getEventHandler().handle(event);
}
public Query getQuery(QueryId queryId) {
return queryMasterTasks.get(queryId).getQuery();
}
public QueryMasterTask getQueryMasterTask(QueryId queryId) {
return queryMasterTasks.get(queryId);
}
public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
if (queryMasterTask != null) {
return queryMasterTask;
} else {
if (includeFinished) {
synchronized (finishedQueryMasterTasksCache) {
return (QueryMasterTask) finishedQueryMasterTasksCache.get(queryId);
}
} else {
return null;
}
}
}
public QueryMasterContext getContext() {
return this.queryMasterContext;
}
public Collection<QueryMasterTask> getQueryMasterTasks() {
return queryMasterTasks.values();
}
public QueryHistory getQueryHistory(QueryId queryId) throws IOException {
QueryMasterTask queryMasterTask = getQueryMasterTask(queryId, true);
if(queryMasterTask != null) {
return queryMasterTask.getQuery().getQueryHistory();
} else {
return workerContext.getHistoryReader().getQueryHistory(queryId.toString());
}
}
public class QueryMasterContext {
private TajoConf conf;
public QueryMasterContext(TajoConf conf) {
this.conf = conf;
}
public TajoConf getConf() {
return conf;
}
public ExecutorService getEventExecutor(){
return eventExecutor;
}
public ExecutorService getSingleEventExecutor(){
return singleEventExecutor;
}
public AsyncDispatcher getDispatcher() {
return dispatcher;
}
public Clock getClock() {
return clock;
}
public QueryMaster getQueryMaster() {
return QueryMaster.this;
}
public GlobalPlanner getGlobalPlanner() {
return globalPlanner;
}
public TajoWorker.WorkerContext getWorkerContext() {
return workerContext;
}
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
public void stopQuery(final QueryId queryId) {
QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
if(queryMasterTask == null) {
LOG.warn("No query info:" + queryId);
return;
}
synchronized (finishedQueryMasterTasksCache) {
finishedQueryMasterTasksCache.put(queryId, queryMasterTask);
}
queryMasterTasks.remove(queryId);
TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
NettyClientBase tmClient;
try {
tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true, rpcClientParams);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
future.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS);
} catch (Exception e) {
//this function will be closed in new thread.
//When tajo do stop cluster, tajo master maybe throw closed connection exception
LOG.error(e.getMessage(), e);
}
try {
queryMasterTask.stop();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
Query query = queryMasterTask.getQuery();
if (query != null) {
QueryHistory queryHisory = query.getQueryHistory();
if (queryHisory != null) {
try {
WriterFuture<WriterHolder> writerFuture = new WriterFuture<WriterHolder>(queryHisory) {
@Override
public void done(WriterHolder writerHolder) {
super.done(writerHolder);
//remove memory cache, if history file writer is done
synchronized (finishedQueryMasterTasksCache) {
finishedQueryMasterTasksCache.remove(queryId);
}
}
};
query.context.getQueryMasterContext().getWorkerContext().
getTaskHistoryWriter().appendHistory(writerFuture);
} catch (Throwable e) {
LOG.warn(e, e);
}
}
}
}
}
private TajoHeartbeatRequest buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
TajoHeartbeatRequest.Builder builder = TajoHeartbeatRequest.newBuilder();
builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
builder.setQueryId(queryMasterTask.getQueryId().getProto());
builder.setState(queryMasterTask.getState());
if (queryMasterTask.getQuery() != null) {
if (queryMasterTask.getQuery().getResultDesc() != null) {
builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
}
builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
}
if (queryMasterTask.isInitError()) {
builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage());
}
return builder.build();
}
private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
@Override
public void handle(QueryStartEvent event) {
LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getAllocation());
synchronized(queryMasterTasks) {
queryMasterTasks.put(event.getQueryId(), queryMasterTask);
}
queryMasterTask.init(systemConf);
if (!queryMasterTask.isInitError()) {
queryMasterTask.start();
}
queryContext = event.getQueryContext();
if (queryMasterTask.isInitError()) {
queryMasterContext.stopQuery(queryMasterTask.getQueryId());
}
}
}
private class QueryStopEventHandler implements EventHandler<QueryStopEvent> {
@Override
public void handle(QueryStopEvent event) {
queryMasterContext.stopQuery(event.getQueryId());
}
}
class QueryHeartbeatThread extends Thread {
public QueryHeartbeatThread() {
super("QueryHeartbeatThread");
}
@Override
public void run() {
LOG.info("Start QueryMaster heartbeat thread");
while(!isStopped) {
List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
tempTasks.addAll(queryMasterTasks.values());
for(QueryMasterTask eachTask: tempTasks) {
NettyClientBase tmClient;
try {
ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker();
tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(),
QueryCoordinatorProtocol.class, true, rpcClientParams);
QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(eachTask);
masterClientService.heartbeat(null, queryHeartbeat, NullCallback.get());
} catch (Throwable t) {
t.printStackTrace();
}
}
synchronized(this) {
try {
this.wait(2000);
} catch (InterruptedException e) {
break;
}
}
}
LOG.info("QueryMaster heartbeat thread stopped");
}
}
class ClientSessionTimeoutCheckThread extends Thread {
public void run() {
LOG.info("ClientSessionTimeoutCheckThread started");
while(!isStopped) {
try {
synchronized (this) {
this.wait(1000);
}
} catch (InterruptedException e) {
break;
}
List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
tempTasks.addAll(queryMasterTasks.values());
for(QueryMasterTask eachTask: tempTasks) {
if(!eachTask.isStopped()) {
try {
long lastHeartbeat = eachTask.getLastClientHeartbeat();
long time = System.currentTimeMillis() - lastHeartbeat;
if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query session timeout: " + time + " ms");
eachTask.expireQuerySession();
}
} catch (Exception e) {
LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
}
}
}
}
}
}
}