blob: 19662445350e7f3d1ea472246428cb206e73b95c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.parser.sql.SQLAnalyzer;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.*;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.DDLExecutor;
import org.apache.tajo.master.exec.QueryExecutor;
import org.apache.tajo.metrics.Master;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.InsertNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.*;
import org.apache.tajo.session.Session;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
public class GlobalEngine extends AbstractService {
/** Class Logger */
private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
private final MasterContext context;
private SQLAnalyzer analyzer;
private CatalogService catalog;
private PreLogicalPlanVerifier preVerifier;
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
private LogicalPlanVerifier annotatedPlanVerifier;
private PostLogicalPlanVerifier postLogicalPlanVerifier;
private QueryExecutor queryExecutor;
private DDLExecutor ddlExecutor;
public GlobalEngine(final MasterContext context) {
super(GlobalEngine.class.getName());
this.context = context;
this.catalog = context.getCatalog();
this.ddlExecutor = new DDLExecutor(context);
this.queryExecutor = new QueryExecutor(context, ddlExecutor);
}
public void start() {
try {
analyzer = new SQLAnalyzer();
preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
// Access path rewriter is enabled only in QueryMasterTask
optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog());
annotatedPlanVerifier = new LogicalPlanVerifier();
postLogicalPlanVerifier = new PostLogicalPlanVerifier();
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
throw new RuntimeException(t);
}
super.start();
}
public void stop() {
super.stop();
}
@VisibleForTesting
public SQLAnalyzer getAnalyzer() {
return analyzer;
}
@VisibleForTesting
public PreLogicalPlanVerifier getPreLogicalPlanVerifier() {
return preVerifier;
}
@VisibleForTesting
public LogicalPlanner getLogicalPlanner() {
return planner;
}
@VisibleForTesting
public LogicalOptimizer getLogicalOptimizer() {
return optimizer;
}
public LogicalPlanVerifier getLogicalPlanVerifier() {
return annotatedPlanVerifier;
}
public DDLExecutor getDDLExecutor() {
return ddlExecutor;
}
public QueryExecutor getQueryExecutor() {
return queryExecutor;
}
private QueryContext createQueryContext(Session session) {
QueryContext newQueryContext = new QueryContext(context.getConf(), session);
// Set default space uri and its root uri
newQueryContext.setDefaultSpaceUri(TablespaceManager.getDefault().getUri());
newQueryContext.setDefaultSpaceRootUri(TablespaceManager.getDefault().getRootUri());
if (TajoConstants.IS_TEST_MODE) {
newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest());
}
// Set queryCache in session
int queryCacheSize = context.getConf().getIntVar(TajoConf.ConfVars.QUERY_SESSION_QUERY_CACHE_SIZE);
if (queryCacheSize > 0 && session.getQueryCache() == null) {
Weigher<String, Expr> weighByLength = new Weigher<String, Expr>() {
public int weigh(String key, Expr expr) {
return key.length();
}
};
LoadingCache<String, Expr> cache = CacheBuilder.newBuilder()
.maximumWeight(queryCacheSize * 1024)
.weigher(weighByLength)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(new CacheLoader<String, Expr>() {
public Expr load(String sql) throws SQLSyntaxError {
return analyzer.parse(sql);
}
});
session.setQueryCache(cache);
}
return newQueryContext;
}
public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) {
LOG.info("Query: " + query);
QueryContext queryContext = createQueryContext(session);
Expr planningContext;
try {
context.getMetrics().counter(Master.Query.SUBMITTED).inc();
if (isJson) {
planningContext = buildExpressionFromJson(query);
} else {
planningContext = buildExpressionFromSql(query, session);
}
String jsonExpr = planningContext.toJson();
LogicalPlan plan = createLogicalPlan(queryContext, planningContext);
SubmitQueryResponse response = queryExecutor.execute(queryContext, session, query, jsonExpr, plan);
return response;
} catch (Throwable t) {
ExceptionUtil.printStackTraceIfError(LOG, t);
context.getMetrics().counter(Master.Query.ERROR).inc();
SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
responseBuilder.setState(ReturnStateUtil.returnError(t));
return responseBuilder.build();
}
}
public Expr buildExpressionFromJson(String json) {
return JsonHelper.fromJson(json, Expr.class);
}
public Expr buildExpressionFromSql(String sql, Session session) throws TajoException {
try {
if (session.getQueryCache() == null) {
return analyzer.parse(sql);
} else {
try {
return (Expr) session.getQueryCache().get(sql.trim()).clone();
} catch (ExecutionException e) {
throw e.getCause();
}
}
} catch (Throwable t) {
if (t instanceof TajoException) {
throw (TajoException)t;
} else if (t instanceof TajoRuntimeException) {
throw (TajoException)t.getCause();
} else {
throw new TajoInternalError(t);
}
}
}
public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws Throwable {
try {
LOG.info("SQL: " + sql);
Expr expr;
if (isJson) {
expr = JsonHelper.fromJson(sql, Expr.class);
} else {
// parse the query
expr = analyzer.parse(sql);
}
LogicalPlan plan = createLogicalPlan(queryContext, expr);
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
throw new SQLException("This is not update query:\n" + sql);
} else {
ddlExecutor.execute(queryContext, plan);
return QueryIdFactory.NULL_QUERY_ID;
}
} catch (Throwable e) {
ExceptionUtil.printStackTraceIfError(LOG, e);
throw e;
}
}
private LogicalPlan createLogicalPlan(QueryContext queryContext, Expr expression) throws Throwable {
VerificationState state = new VerificationState();
preVerifier.verify(queryContext, state, expression);
if (!state.verified()) {
for (Throwable error : state.getErrors()) {
throw error;
}
}
LogicalPlan plan = planner.createPlan(queryContext, expression);
if (LOG.isDebugEnabled()) {
LOG.debug("=============================================");
LOG.debug("Non Optimized Query: \n" + plan.toString());
LOG.debug("=============================================");
}
LOG.info("Non Optimized Query: \n" + plan.toString());
optimizer.optimize(queryContext, plan);
LOG.info("=============================================");
LOG.info("Optimized Query: \n" + plan.toString());
LOG.info("=============================================");
annotatedPlanVerifier.verify(state, plan);
verifyInsertTableSchema(state, plan);
if (!state.verified()) {
for (Throwable error : state.getErrors()) {
throw error;
}
}
postLogicalPlanVerifier.verify(queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD), state, plan);
if (!state.verified()) {
for (Throwable error : state.getErrors()) {
throw error;
}
}
return plan;
}
private void verifyInsertTableSchema(VerificationState state, LogicalPlan plan) {
String dataFormat = PlannerUtil.getDataFormat(plan);
if (dataFormat != null) {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
if (rootNode.getChild().getType() == NodeType.INSERT) {
try {
TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
InsertNode iNode = rootNode.getChild();
Schema outSchema = iNode.getChild().getOutSchema();
TablespaceManager.get(tableDesc.getUri()).verifySchemaToWrite(tableDesc, outSchema);
} catch (TajoException t) {
state.addVerification(t);
} catch (TajoRuntimeException t) {
state.addVerification(t);
} catch (Throwable t) {
state.addVerification(SyntaxErrorUtil.makeSyntaxError(t.getMessage()));
}
}
}
}
}