blob: dc70f23bf6e7861b48de291f3d5ff1271d12bb16 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryInfo;
import org.apache.tajo.master.querymaster.QueryJobEvent;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.NetUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
public class TajoMasterClientService extends AbstractService {
private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class);
private final MasterContext context;
private final TajoConf conf;
private final CatalogService catalog;
private final TajoMasterClientProtocolServiceHandler clientHandler;
private BlockingRpcServer server;
private InetSocketAddress bindAddress;
private final BoolProto BOOL_TRUE =
BoolProto.newBuilder().setValue(true).build();
private final BoolProto BOOL_FALSE =
BoolProto.newBuilder().setValue(false).build();
public TajoMasterClientService(MasterContext context) {
super(TajoMasterClientService.class.getName());
this.context = context;
this.conf = context.getConf();
this.catalog = context.getCatalog();
this.clientHandler = new TajoMasterClientProtocolServiceHandler();
}
@Override
public void start() {
// start the rpc server
String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
try {
server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
} catch (Exception e) {
LOG.error(e);
throw new RuntimeException(e);
}
server.start();
bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
super.start();
}
@Override
public void stop() {
if (server != null) {
server.shutdown();
}
super.stop();
}
public InetSocketAddress getBindAddress() {
return this.bindAddress;
}
public int getHttpPort() {
return 0;
}
/////////////////////////////////////////////////////////////////////////////
// TajoMasterClientProtocolService
/////////////////////////////////////////////////////////////////////////////
public class TajoMasterClientProtocolServiceHandler implements TajoMasterClientProtocolService.BlockingInterface {
@Override
public BoolProto updateSessionVariables(RpcController controller,
UpdateSessionVariableRequest request)
throws ServiceException {
return null;
}
@Override
public ExplainQueryResponse explainQuery(RpcController controller,
ExplainQueryRequest request)
throws ServiceException {
try {
if(LOG.isDebugEnabled()) {
LOG.debug("ExplainQuery [" + request.getQuery() + "]");
}
ClientProtos.ExplainQueryResponse.Builder responseBuilder = ClientProtos.ExplainQueryResponse.newBuilder();
responseBuilder.setResultCode(ResultCode.OK);
String plan = context.getGlobalEngine().explainQuery(request.getQuery());
if(LOG.isDebugEnabled()) {
LOG.debug("ExplainQuery [" + plan + "]");
}
responseBuilder.setExplain(plan);
return responseBuilder.build();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
ClientProtos.ExplainQueryResponse.Builder responseBuilder = ClientProtos.ExplainQueryResponse.newBuilder();
responseBuilder.setResultCode(ResultCode.ERROR);
responseBuilder.setErrorMessage(e.getMessage());
return responseBuilder.build();
}
}
@Override
public GetQueryStatusResponse submitQuery(RpcController controller,
QueryRequest request)
throws ServiceException {
try {
if(LOG.isDebugEnabled()) {
LOG.debug("Query [" + request.getQuery() + "] is submitted");
}
return context.getGlobalEngine().executeQuery(request.getQuery());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
responseBuilder.setResultCode(ResultCode.ERROR);
if (e.getMessage() != null) {
responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
} else {
responseBuilder.setErrorMessage("Internal Error");
}
return responseBuilder.build();
}
}
@Override
public UpdateQueryResponse updateQuery(RpcController controller,
QueryRequest request)
throws ServiceException {
UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
try {
context.getGlobalEngine().updateQuery(request.getQuery());
builder.setResultCode(ResultCode.OK);
return builder.build();
} catch (Exception e) {
builder.setResultCode(ResultCode.ERROR);
if (e.getMessage() == null) {
builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
}
return builder.build();
}
}
@Override
public GetQueryResultResponse getQueryResult(RpcController controller,
GetQueryResultRequest request)
throws ServiceException {
QueryId queryId = new QueryId(request.getQueryId());
QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
QueryInfo queryInfo = queryInProgress.getQueryInfo();
GetQueryResultResponse.Builder builder
= GetQueryResultResponse.newBuilder();
try {
//TODO After implementation Tajo's user security feature, Should be modified.
builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
} catch (IOException e) {
LOG.warn("Can't get current user name");
}
switch (queryInfo.getQueryState()) {
case QUERY_SUCCEEDED:
// TODO check this logic needed
//builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
break;
case QUERY_FAILED:
case QUERY_ERROR:
builder.setErrorMessage("Query " + queryId + " is failed");
default:
builder.setErrorMessage("Query " + queryId + " is still running");
}
return builder.build();
}
@Override
public GetQueryListResponse getRunningQueryList(RpcController controller,
GetQueryListRequest request)
throws ServiceException {
GetQueryListResponse.Builder builder
= GetQueryListResponse.newBuilder();
Collection<QueryInProgress> queries
= context.getQueryJobManager().getRunningQueries();
BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
for (QueryInProgress queryInProgress : queries) {
QueryInfo queryInfo = queryInProgress.getQueryInfo();
infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
infoBuilder.setState(queryInfo.getQueryState());
infoBuilder.setQuery(queryInfo.getSql());
infoBuilder.setStartTime(queryInfo.getStartTime());
long endTime = (queryInfo.getFinishTime() == 0) ?
System.currentTimeMillis() : queryInfo.getFinishTime();
infoBuilder.setFinishTime(endTime);
infoBuilder.setProgress(queryInfo.getProgress());
infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
builder.addQueryList(infoBuilder.build());
}
GetQueryListResponse result = builder.build();
return result;
}
@Override
public GetQueryListResponse getFinishedQueryList(RpcController controller,
GetQueryListRequest request)
throws ServiceException {
GetQueryListResponse.Builder builder
= GetQueryListResponse.newBuilder();
Collection<QueryInProgress> queries
= context.getQueryJobManager().getFinishedQueries();
BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
for (QueryInProgress queryInProgress : queries) {
QueryInfo queryInfo = queryInProgress.getQueryInfo();
infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
infoBuilder.setState(queryInfo.getQueryState());
infoBuilder.setQuery(queryInfo.getSql());
infoBuilder.setStartTime(queryInfo.getStartTime());
long endTime = (queryInfo.getFinishTime() == 0) ?
System.currentTimeMillis() : queryInfo.getFinishTime();
infoBuilder.setFinishTime(endTime);
infoBuilder.setProgress(queryInfo.getProgress());
infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
builder.addQueryList(infoBuilder.build());
}
GetQueryListResponse result = builder.build();
return result;
}
@Override
public GetQueryStatusResponse getQueryStatus(RpcController controller,
GetQueryStatusRequest request)
throws ServiceException {
GetQueryStatusResponse.Builder builder
= GetQueryStatusResponse.newBuilder();
QueryId queryId = new QueryId(request.getQueryId());
builder.setQueryId(request.getQueryId());
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
builder.setResultCode(ResultCode.OK);
builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
if (queryInProgress != null) {
QueryInfo queryInfo = queryInProgress.getQueryInfo();
builder.setResultCode(ResultCode.OK);
builder.setState(queryInfo.getQueryState());
builder.setProgress(queryInfo.getProgress());
builder.setSubmitTime(queryInfo.getStartTime());
if(queryInfo.getQueryMasterHost() != null) {
builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
}
//builder.setInitTime(queryJobManager.getInitializationTime());
//builder.setHasResult(!queryJobManager.isCreateTableStmt());
if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
builder.setFinishTime(queryInfo.getFinishTime());
} else {
builder.setFinishTime(System.currentTimeMillis());
}
} else {
builder.setResultCode(ResultCode.ERROR);
builder.setErrorMessage("No such query: " + queryId.toString());
}
}
return builder.build();
}
/**
* It is invoked by TajoContainerProxy.
*/
@Override
public BoolProto killQuery(RpcController controller, TajoIdProtos.QueryIdProto request) throws ServiceException {
QueryId queryId = new QueryId(request);
QueryJobManager queryJobManager = context.getQueryJobManager();
queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
new QueryInfo(queryId)));
return BOOL_TRUE;
}
@Override
public GetClusterInfoResponse getClusterInfo(RpcController controller,
GetClusterInfoRequest request)
throws ServiceException {
GetClusterInfoResponse.Builder builder
= GetClusterInfoResponse.newBuilder();
Map<String, Worker> workers = context.getResourceManager().getWorkers();
List<String> wokerKeys = new ArrayList<String>(workers.keySet());
Collections.sort(wokerKeys);
WorkerResourceInfo.Builder workerBuilder
= WorkerResourceInfo.newBuilder();
for(Worker worker: workers.values()) {
WorkerResource workerResource = worker.getResource();
workerBuilder.setAllocatedHost(worker.getHostName());
workerBuilder.setDiskSlots(workerResource.getDiskSlots());
workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
workerBuilder.setMemoryMB(workerResource.getMemoryMB());
workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
workerBuilder.setWorkerStatus(worker.getState().toString());
workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
workerBuilder.setClientPort(worker.getClientPort());
workerBuilder.setPullServerPort(worker.getPullServerPort());
workerBuilder.setHttpPort(worker.getHttpPort());
workerBuilder.setMaxHeap(workerResource.getMaxHeap());
workerBuilder.setFreeHeap(workerResource.getFreeHeap());
workerBuilder.setTotalHeap(workerResource.getTotalHeap());
workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
builder.addWorkerList(workerBuilder.build());
}
return builder.build();
}
@Override
public BoolProto existTable(RpcController controller,
StringProto tableNameProto)
throws ServiceException {
String tableName = tableNameProto.getValue();
if (catalog.existsTable(tableName)) {
return BOOL_TRUE;
} else {
return BOOL_FALSE;
}
}
@Override
public GetTableListResponse getTableList(RpcController controller,
GetTableListRequest request)
throws ServiceException {
Collection<String> tableNames = catalog.getAllTableNames();
GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
builder.addAllTables(tableNames);
return builder.build();
}
@Override
public TableResponse getTableDesc(RpcController controller,
GetTableDescRequest request)
throws ServiceException {
String name = request.getTableName();
if (catalog.existsTable(name)) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.OK)
.setTableDesc(catalog.getTableDesc(name).getProto())
.build();
} else {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage("ERROR: no such a table: " + request.getTableName())
.build();
}
}
@Override
public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
throws ServiceException {
try {
Path path = new Path(request.getPath());
FileSystem fs = path.getFileSystem(conf);
if (!fs.exists(path)) {
throw new IOException("No such a directory: " + path);
}
Schema schema = new Schema(request.getSchema());
TableMeta meta = new TableMeta(request.getMeta());
PartitionMethodDesc partitionDesc = null;
if (request.hasPartition()) {
partitionDesc = new PartitionMethodDesc(request.getPartition());
}
TableDesc desc;
try {
desc = context.getGlobalEngine().createTableOnPath(request.getName(), schema,
meta, path, false, partitionDesc);
} catch (Exception e) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage(e.getMessage()).build();
}
return TableResponse.newBuilder()
.setResultCode(ResultCode.OK)
.setTableDesc(desc.getProto()).build();
} catch (IOException ioe) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
.setErrorMessage(ioe.getMessage()).build();
}
}
@Override
public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException {
context.getGlobalEngine().dropTable(dropTable.getName(), dropTable.getPurge());
return BOOL_TRUE;
}
@Override
public FunctionResponse getFunctionList(RpcController controller, StringProto request) throws ServiceException {
String functionName = request.getValue();
Collection<FunctionDesc> functions = catalog.getFunctions();
List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
for (FunctionDesc eachFunction: functions) {
if (functionName == null || functionName.isEmpty()) {
functionProtos.add(eachFunction.getProto());
} else {
if(functionName.equals(eachFunction.getSignature())) {
functionProtos.add(eachFunction.getProto());
}
}
}
return FunctionResponse.newBuilder()
.setResultCode(ResultCode.OK)
.addAllFunctions(functionProtos)
.build();
}
}
}