blob: 7fc67806011511280e11d452a6d5c7b499b40c03 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos.*;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.jdbc.TajoResultSet;
import org.apache.tajo.rpc.*;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.rpc.ServerCallable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ThreadSafe
public class TajoClient {
private final Log LOG = LogFactory.getLog(TajoClient.class);
private final TajoConf conf;
private Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
private InetSocketAddress tajoMasterAddr;
private RpcConnectionPool connPool;
public TajoClient(TajoConf conf) throws IOException {
this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)));
}
public TajoClient(TajoConf conf, InetSocketAddress addr) throws IOException {
this.conf = conf;
this.conf.set("tajo.disk.scheduler.report.interval", "0");
this.tajoMasterAddr = addr;
int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
//Don't share connection pool per client
connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
}
public TajoClient(InetSocketAddress addr) throws IOException {
this(new TajoConf(), addr);
}
public TajoClient(String hostname, int port) throws IOException {
this(new TajoConf(), NetUtils.createSocketAddr(hostname, port));
}
public void close() {
if(connPool != null) {
connPool.shutdown();
}
queryMasterMap.clear();
}
public TajoConf getConf() {
return conf;
}
/**
* Call to QueryMaster closing query resources
* @param queryId
*/
public void closeQuery(final QueryId queryId) {
if(queryMasterMap.containsKey(queryId)) {
NettyClientBase qmClient = null;
try {
qmClient = connPool.getConnection(queryMasterMap.get(queryId), QueryMasterClientProtocol.class, false);
QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
queryMasterService.closeQuery(null, queryId.getProto());
} catch (Exception e) {
LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
} finally {
connPool.closeConnection(qmClient);
queryMasterMap.remove(queryId);
}
}
}
public ExplainQueryResponse explainQuery(final String sql) throws ServiceException {
return new ServerCallable<ExplainQueryResponse>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public ExplainQueryResponse call(NettyClientBase client) throws ServiceException {
final ExplainQueryRequest.Builder builder = ExplainQueryRequest.newBuilder();
builder.setQuery(sql);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
return tajoMasterService.explainQuery(null, builder.build());
}
}.withRetries();
}
/**
* It submits a query statement and get a response immediately.
* The response only contains a query id, and submission status.
* In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
* or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
*/
public GetQueryStatusResponse executeQuery(final String sql) throws ServiceException {
return new ServerCallable<GetQueryStatusResponse>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public GetQueryStatusResponse call(NettyClientBase client) throws ServiceException {
final QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(sql);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
return tajoMasterService.submitQuery(null, builder.build());
}
}.withRetries();
}
/**
* It submits a query statement and get a response.
* The main difference from {@link #executeQuery(String)}
* is a blocking method. So, this method is wait for
* the finish of the submitted query.
*
* @return If failed, return null.
*/
public ResultSet executeQueryAndGetResult(final String sql)
throws ServiceException, IOException {
GetQueryStatusResponse response = new ServerCallable<GetQueryStatusResponse>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public GetQueryStatusResponse call(NettyClientBase client) throws ServiceException {
final QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(sql);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
return tajoMasterService.submitQuery(null, builder.build());
}
}.withRetries();
QueryId queryId = new QueryId(response.getQueryId());
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return this.createNullResultSet(queryId);
} else {
return this.getQueryResultAndWait(queryId);
}
}
public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException {
GetQueryStatusRequest.Builder builder
= GetQueryStatusRequest.newBuilder();
builder.setQueryId(queryId.getProto());
GetQueryStatusResponse res = null;
if(queryMasterMap.containsKey(queryId)) {
NettyClientBase qmClient = null;
try {
qmClient = connPool.getConnection(queryMasterMap.get(queryId),
QueryMasterClientProtocol.class, false);
QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
res = queryMasterService.getQueryStatus(null, builder.build());
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
connPool.releaseConnection(qmClient);
}
} else {
NettyClientBase tmClient = null;
try {
tmClient = connPool.getConnection(tajoMasterAddr,
TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
res = tajoMasterService.getQueryStatus(null, builder.build());
String queryMasterHost = res.getQueryMasterHost();
if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
NettyClientBase qmClient = null;
try {
InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
qmClient = connPool.getConnection(
qmAddr, QueryMasterClientProtocol.class, false);
QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
res = queryMasterService.getQueryStatus(null, builder.build());
queryMasterMap.put(queryId, qmAddr);
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
connPool.releaseConnection(qmClient);
}
}
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
}
return new QueryStatus(res);
}
public static boolean isQueryRunnning(QueryState state) {
return state == QueryState.QUERY_NEW ||
state == QueryState.QUERY_RUNNING ||
state == QueryState.QUERY_MASTER_LAUNCHED ||
state == QueryState.QUERY_MASTER_INIT ||
state == QueryState.QUERY_NOT_ASSIGNED;
}
public ResultSet getQueryResult(QueryId queryId)
throws ServiceException, IOException {
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return createNullResultSet(queryId);
}
GetQueryResultResponse response = getResultResponse(queryId);
TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc());
conf.setVar(ConfVars.USERNAME, response.getTajoUserName());
return new TajoResultSet(this, queryId, conf, tableDesc);
}
public ResultSet getQueryResultAndWait(QueryId queryId)
throws ServiceException, IOException {
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return createNullResultSet(queryId);
}
QueryStatus status = getQueryStatus(queryId);
while(status != null && isQueryRunnning(status.getState())) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
status = getQueryStatus(queryId);
}
if (status.getState() == QueryState.QUERY_SUCCEEDED) {
if (status.hasResult()) {
return getQueryResult(queryId);
} else {
return createNullResultSet(queryId);
}
} else {
LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState());
//TODO throw SQLException(?)
return createNullResultSet(queryId);
}
}
public ResultSet createNullResultSet(QueryId queryId) throws IOException {
return new TajoResultSet(this, queryId);
}
public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException {
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return null;
}
NettyClientBase client = null;
try {
InetSocketAddress queryMasterAddr = queryMasterMap.get(queryId);
if(queryMasterAddr == null) {
LOG.warn("No Connection to QueryMaster for " + queryId);
return null;
}
client = connPool.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
builder.setQueryId(queryId.getProto());
GetQueryResultResponse response = queryMasterService.getQueryResult(null,
builder.build());
return response;
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
connPool.releaseConnection(client);
}
}
public boolean updateQuery(final String sql) throws ServiceException {
return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
QueryRequest.Builder builder = QueryRequest.newBuilder();
builder.setQuery(sql);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
if (response.getResultCode() == ResultCode.OK) {
return true;
} else {
if (response.hasErrorMessage()) {
LOG.error(response.getErrorMessage());
}
return false;
}
}
}.withRetries();
}
/**
* Test for the existence of table in catalog data.
* <p/>
* This will return true if table exists, false if not.
* @param name
* @return
* @throws ServiceException
*/
public boolean existTable(final String name) throws ServiceException {
return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
StringProto.Builder builder = StringProto.newBuilder();
builder.setValue(name);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
return tajoMasterService.existTable(null, builder.build()).getValue();
}
}.withRetries();
}
public TableDesc createExternalTable(final String name, final Schema schema, final Path path, final TableMeta meta)
throws SQLException, ServiceException {
return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
builder.setName(name);
builder.setSchema(schema.getProto());
builder.setMeta(meta.getProto());
builder.setPath(path.toUri().toString());
TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
if (res.getResultCode() == ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
} else {
throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
}
}
}.withRetries();
}
public boolean dropTable(final String tableName) throws ServiceException {
return dropTable(tableName, false);
}
/**
* Deletes table schema from catalog data and deletes data file in hdfs
* @param tableName
* @return
* @throws ServiceException
*/
public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public Boolean call(NettyClientBase client) throws ServiceException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
DropTableRequest.Builder builder = DropTableRequest.newBuilder();
builder.setName(tableName);
builder.setPurge(purge);
return tajoMasterService.dropTable(null, builder.build()).getValue();
}
}.withRetries();
}
public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
return res.getQueryListList();
}
}.withRetries();
}
public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
return res.getQueryListList();
}
}.withRetries();
}
public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
return new ServerCallable<List<WorkerResourceInfo>>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
GetClusterInfoRequest.Builder builder = GetClusterInfoRequest.newBuilder();
GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
return res.getWorkerListList();
}
}.withRetries();
}
/**
* Get a list of table names. All table and column names are
* represented as lower-case letters.
*/
public List<String> getTableList() throws ServiceException {
return new ServerCallable<List<String>>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public List<String> call(NettyClientBase client) throws ServiceException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
return res.getTablesList();
}
}.withRetries();
}
public TableDesc getTableDesc(final String tableName) throws SQLException, ServiceException {
return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
build.setTableName(tableName);
TableResponse res = tajoMasterService.getTableDesc(null, build.build());
if (res.getResultCode() == ResultCode.OK) {
return CatalogUtil.newTableDesc(res.getTableDesc());
} else {
throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
}
}
}.withRetries();
}
public boolean killQuery(final QueryId queryId)
throws ServiceException, IOException {
QueryStatus status = getQueryStatus(queryId);
NettyClientBase tmClient = null;
try {
/* send a kill to the TM */
tmClient = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
tajoMasterService.killQuery(null, queryId.getProto());
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) {
try {
Thread.sleep(100L);
} catch(InterruptedException ie) {
break;
}
currentTimeMillis = System.currentTimeMillis();
status = getQueryStatus(queryId);
}
return status.getState() == QueryState.QUERY_KILLED;
} catch(Exception e) {
LOG.debug("Error when checking for application status", e);
return false;
} finally {
connPool.releaseConnection(tmClient);
}
}
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, tajoMasterAddr,
TajoMasterClientProtocol.class, false, true) {
public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
String paramFunctionName = functionName == null ? "" : functionName;
FunctionResponse res = tajoMasterService.getFunctionList(null,
StringProto.newBuilder().setValue(paramFunctionName).build());
if (res.getResultCode() == ResultCode.OK) {
return res.getFunctionsList();
} else {
throw new SQLException(res.getErrorMessage());
}
}
}.withRetries();
}
}