| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.master; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.cache.Weigher; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.tajo.QueryId; |
| import org.apache.tajo.QueryIdFactory; |
| import org.apache.tajo.SessionVars; |
| import org.apache.tajo.TajoConstants; |
| import org.apache.tajo.algebra.Expr; |
| import org.apache.tajo.algebra.JsonHelper; |
| import org.apache.tajo.catalog.CatalogService; |
| import org.apache.tajo.catalog.Schema; |
| import org.apache.tajo.catalog.TableDesc; |
| import org.apache.tajo.conf.TajoConf; |
| import org.apache.tajo.parser.sql.SQLAnalyzer; |
| import org.apache.tajo.engine.query.QueryContext; |
| import org.apache.tajo.exception.*; |
| import org.apache.tajo.master.TajoMaster.MasterContext; |
| import org.apache.tajo.master.exec.DDLExecutor; |
| import org.apache.tajo.master.exec.QueryExecutor; |
| import org.apache.tajo.metrics.Master; |
| import org.apache.tajo.plan.LogicalOptimizer; |
| import org.apache.tajo.plan.LogicalPlan; |
| import org.apache.tajo.plan.LogicalPlanner; |
| import org.apache.tajo.plan.logical.InsertNode; |
| import org.apache.tajo.plan.logical.LogicalRootNode; |
| import org.apache.tajo.plan.logical.NodeType; |
| import org.apache.tajo.plan.util.PlannerUtil; |
| import org.apache.tajo.plan.verifier.*; |
| import org.apache.tajo.session.Session; |
| import org.apache.tajo.storage.TablespaceManager; |
| import org.apache.tajo.util.CommonTestingUtil; |
| |
| import java.sql.SQLException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; |
| |
| public class GlobalEngine extends AbstractService { |
| /** Class Logger */ |
| private final static Log LOG = LogFactory.getLog(GlobalEngine.class); |
| |
| private final MasterContext context; |
| |
| private SQLAnalyzer analyzer; |
| private CatalogService catalog; |
| private PreLogicalPlanVerifier preVerifier; |
| private LogicalPlanner planner; |
| private LogicalOptimizer optimizer; |
| private LogicalPlanVerifier annotatedPlanVerifier; |
| private PostLogicalPlanVerifier postLogicalPlanVerifier; |
| |
| private QueryExecutor queryExecutor; |
| private DDLExecutor ddlExecutor; |
| |
| public GlobalEngine(final MasterContext context) { |
| super(GlobalEngine.class.getName()); |
| this.context = context; |
| this.catalog = context.getCatalog(); |
| |
| this.ddlExecutor = new DDLExecutor(context); |
| this.queryExecutor = new QueryExecutor(context, ddlExecutor); |
| } |
| |
| public void start() { |
| try { |
| analyzer = new SQLAnalyzer(); |
| preVerifier = new PreLogicalPlanVerifier(context.getCatalog()); |
| planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance()); |
| // Access path rewriter is enabled only in QueryMasterTask |
| optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog()); |
| annotatedPlanVerifier = new LogicalPlanVerifier(); |
| postLogicalPlanVerifier = new PostLogicalPlanVerifier(); |
| } catch (Throwable t) { |
| LOG.error(t.getMessage(), t); |
| throw new RuntimeException(t); |
| } |
| super.start(); |
| } |
| |
| public void stop() { |
| super.stop(); |
| } |
| |
| @VisibleForTesting |
| public SQLAnalyzer getAnalyzer() { |
| return analyzer; |
| } |
| |
| @VisibleForTesting |
| public PreLogicalPlanVerifier getPreLogicalPlanVerifier() { |
| return preVerifier; |
| } |
| |
| @VisibleForTesting |
| public LogicalPlanner getLogicalPlanner() { |
| return planner; |
| } |
| |
| @VisibleForTesting |
| public LogicalOptimizer getLogicalOptimizer() { |
| return optimizer; |
| } |
| |
| public LogicalPlanVerifier getLogicalPlanVerifier() { |
| return annotatedPlanVerifier; |
| } |
| |
| public DDLExecutor getDDLExecutor() { |
| return ddlExecutor; |
| } |
| |
| public QueryExecutor getQueryExecutor() { |
| return queryExecutor; |
| } |
| |
| private QueryContext createQueryContext(Session session) { |
| QueryContext newQueryContext = new QueryContext(context.getConf(), session); |
| |
| // Set default space uri and its root uri |
| newQueryContext.setDefaultSpaceUri(TablespaceManager.getDefault().getUri()); |
| newQueryContext.setDefaultSpaceRootUri(TablespaceManager.getDefault().getRootUri()); |
| |
| if (TajoConstants.IS_TEST_MODE) { |
| newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest()); |
| } |
| |
| // Set queryCache in session |
| int queryCacheSize = context.getConf().getIntVar(TajoConf.ConfVars.QUERY_SESSION_QUERY_CACHE_SIZE); |
| if (queryCacheSize > 0 && session.getQueryCache() == null) { |
| Weigher<String, Expr> weighByLength = new Weigher<String, Expr>() { |
| public int weigh(String key, Expr expr) { |
| return key.length(); |
| } |
| }; |
| LoadingCache<String, Expr> cache = CacheBuilder.newBuilder() |
| .maximumWeight(queryCacheSize * 1024) |
| .weigher(weighByLength) |
| .expireAfterAccess(1, TimeUnit.HOURS) |
| .build(new CacheLoader<String, Expr>() { |
| public Expr load(String sql) throws SQLSyntaxError { |
| return analyzer.parse(sql); |
| } |
| }); |
| session.setQueryCache(cache); |
| } |
| return newQueryContext; |
| } |
| |
| public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) { |
| LOG.info("Query: " + query); |
| QueryContext queryContext = createQueryContext(session); |
| Expr planningContext; |
| |
| try { |
| context.getMetrics().counter(Master.Query.SUBMITTED).inc(); |
| |
| if (isJson) { |
| planningContext = buildExpressionFromJson(query); |
| } else { |
| planningContext = buildExpressionFromSql(query, session); |
| } |
| |
| String jsonExpr = planningContext.toJson(); |
| LogicalPlan plan = createLogicalPlan(queryContext, planningContext); |
| SubmitQueryResponse response = queryExecutor.execute(queryContext, session, query, jsonExpr, plan); |
| return response; |
| |
| |
| } catch (Throwable t) { |
| ExceptionUtil.printStackTraceIfError(LOG, t); |
| |
| context.getMetrics().counter(Master.Query.ERROR).inc(); |
| |
| SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder(); |
| responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME)); |
| responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); |
| responseBuilder.setState(ReturnStateUtil.returnError(t)); |
| return responseBuilder.build(); |
| } |
| } |
| |
| public Expr buildExpressionFromJson(String json) { |
| return JsonHelper.fromJson(json, Expr.class); |
| } |
| |
| public Expr buildExpressionFromSql(String sql, Session session) throws TajoException { |
| try { |
| |
| if (session.getQueryCache() == null) { |
| return analyzer.parse(sql); |
| |
| } else { |
| try { |
| return (Expr) session.getQueryCache().get(sql.trim()).clone(); |
| } catch (ExecutionException e) { |
| throw e.getCause(); |
| } |
| } |
| |
| } catch (Throwable t) { |
| if (t instanceof TajoException) { |
| throw (TajoException)t; |
| } else if (t instanceof TajoRuntimeException) { |
| throw (TajoException)t.getCause(); |
| } else { |
| throw new TajoInternalError(t); |
| } |
| } |
| } |
| |
| public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws Throwable { |
| try { |
| LOG.info("SQL: " + sql); |
| |
| Expr expr; |
| if (isJson) { |
| expr = JsonHelper.fromJson(sql, Expr.class); |
| } else { |
| // parse the query |
| expr = analyzer.parse(sql); |
| } |
| |
| LogicalPlan plan = createLogicalPlan(queryContext, expr); |
| LogicalRootNode rootNode = plan.getRootBlock().getRoot(); |
| |
| if (!PlannerUtil.checkIfDDLPlan(rootNode)) { |
| throw new SQLException("This is not update query:\n" + sql); |
| } else { |
| ddlExecutor.execute(queryContext, plan); |
| return QueryIdFactory.NULL_QUERY_ID; |
| } |
| } catch (Throwable e) { |
| ExceptionUtil.printStackTraceIfError(LOG, e); |
| throw e; |
| } |
| } |
| |
| private LogicalPlan createLogicalPlan(QueryContext queryContext, Expr expression) throws Throwable { |
| |
| VerificationState state = new VerificationState(); |
| preVerifier.verify(queryContext, state, expression); |
| if (!state.verified()) { |
| for (Throwable error : state.getErrors()) { |
| throw error; |
| } |
| } |
| |
| LogicalPlan plan = planner.createPlan(queryContext, expression); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("============================================="); |
| LOG.debug("Non Optimized Query: \n" + plan.toString()); |
| LOG.debug("============================================="); |
| } |
| LOG.info("Non Optimized Query: \n" + plan.toString()); |
| optimizer.optimize(queryContext, plan); |
| LOG.info("============================================="); |
| LOG.info("Optimized Query: \n" + plan.toString()); |
| LOG.info("============================================="); |
| |
| annotatedPlanVerifier.verify(state, plan); |
| verifyInsertTableSchema(state, plan); |
| |
| if (!state.verified()) { |
| for (Throwable error : state.getErrors()) { |
| throw error; |
| } |
| } |
| |
| postLogicalPlanVerifier.verify(queryContext.getLong(SessionVars.BROADCAST_CROSS_JOIN_THRESHOLD), state, plan); |
| if (!state.verified()) { |
| for (Throwable error : state.getErrors()) { |
| throw error; |
| } |
| } |
| |
| return plan; |
| } |
| |
| private void verifyInsertTableSchema(VerificationState state, LogicalPlan plan) { |
| String dataFormat = PlannerUtil.getDataFormat(plan); |
| if (dataFormat != null) { |
| LogicalRootNode rootNode = plan.getRootBlock().getRoot(); |
| if (rootNode.getChild().getType() == NodeType.INSERT) { |
| try { |
| TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); |
| InsertNode iNode = rootNode.getChild(); |
| Schema outSchema = iNode.getChild().getOutSchema(); |
| |
| TablespaceManager.get(tableDesc.getUri()).verifySchemaToWrite(tableDesc, outSchema); |
| |
| } catch (TajoException t) { |
| state.addVerification(t); |
| } catch (TajoRuntimeException t) { |
| state.addVerification(t); |
| } catch (Throwable t) { |
| state.addVerification(SyntaxErrorUtil.makeSyntaxError(t.getMessage())); |
| } |
| } |
| } |
| } |
| } |