| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.rest.service; |
| |
| import static org.apache.kylin.common.util.CheckUtil.checkCondition; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.net.UnknownHostException; |
| import java.sql.Connection; |
| import java.sql.DatabaseMetaData; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.sql.Time; |
| import java.sql.Timestamp; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import javax.annotation.PostConstruct; |
| |
| import org.apache.calcite.avatica.ColumnMetaData.Rep; |
| import org.apache.calcite.config.CalciteConnectionConfig; |
| import org.apache.calcite.jdbc.CalcitePrepare; |
| import org.apache.calcite.prepare.CalcitePrepareImpl; |
| import org.apache.calcite.prepare.OnlyPrepareEarlyAbortException; |
| import org.apache.calcite.rel.type.RelDataTypeField; |
| import org.apache.calcite.sql.type.BasicSqlType; |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.lang3.exception.ExceptionUtils; |
| import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; |
| import org.apache.commons.pool2.PooledObject; |
| import org.apache.commons.pool2.impl.DefaultPooledObject; |
| import org.apache.commons.pool2.impl.GenericKeyedObjectPool; |
| import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; |
| import org.apache.kylin.cache.cachemanager.MemcachedCacheManager; |
| import org.apache.kylin.common.KylinConfig; |
| import org.apache.kylin.common.MissingMeasureSegment; |
| import org.apache.kylin.common.QueryContext; |
| import org.apache.kylin.common.QueryContextFacade; |
| import org.apache.kylin.common.debug.BackdoorToggles; |
| import org.apache.kylin.common.exceptions.ResourceLimitExceededException; |
| import org.apache.kylin.common.persistence.ResourceStore; |
| import org.apache.kylin.common.persistence.RootPersistentEntity; |
| import org.apache.kylin.common.persistence.Serializer; |
| import org.apache.kylin.common.util.DBUtils; |
| import org.apache.kylin.common.util.JsonUtil; |
| import org.apache.kylin.common.util.Pair; |
| import org.apache.kylin.common.util.SetThreadName; |
| import org.apache.kylin.common.util.StringUtil; |
| import org.apache.kylin.cube.CubeInstance; |
| import org.apache.kylin.cube.CubeManager; |
| import org.apache.kylin.cube.cuboid.Cuboid; |
| import org.apache.kylin.metadata.badquery.BadQueryEntry; |
| import org.apache.kylin.metadata.model.DataModelDesc; |
| import org.apache.kylin.metadata.model.JoinDesc; |
| import org.apache.kylin.metadata.model.JoinTableDesc; |
| import org.apache.kylin.metadata.model.ModelDimensionDesc; |
| import org.apache.kylin.metadata.model.TableRef; |
| import org.apache.kylin.metadata.project.ProjectInstance; |
| import org.apache.kylin.metadata.project.ProjectManager; |
| import org.apache.kylin.metadata.querymeta.ColumnMeta; |
| import org.apache.kylin.metadata.querymeta.ColumnMetaWithType; |
| import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; |
| import org.apache.kylin.metadata.querymeta.TableMeta; |
| import org.apache.kylin.metadata.querymeta.TableMetaWithType; |
| import org.apache.kylin.metadata.realization.IRealization; |
| import org.apache.kylin.metrics.MetricsManager; |
| import org.apache.kylin.query.QueryConnection; |
| import org.apache.kylin.query.relnode.OLAPContext; |
| import org.apache.kylin.query.util.PushDownUtil; |
| import org.apache.kylin.query.util.QueryUtil; |
| import org.apache.kylin.query.util.TempStatementUtil; |
| import org.apache.kylin.rest.constant.Constant; |
| import org.apache.kylin.rest.exception.BadRequestException; |
| import org.apache.kylin.rest.exception.InternalErrorException; |
| import org.apache.kylin.rest.metrics.QueryMetrics2Facade; |
| import org.apache.kylin.rest.metrics.QueryMetricsFacade; |
| import org.apache.kylin.rest.model.Query; |
| import org.apache.kylin.rest.msg.Message; |
| import org.apache.kylin.rest.msg.MsgPicker; |
| import org.apache.kylin.rest.request.PrepareSqlRequest; |
| import org.apache.kylin.rest.request.SQLRequest; |
| import org.apache.kylin.rest.response.SQLResponse; |
| import org.apache.kylin.rest.util.AclEvaluate; |
| import org.apache.kylin.rest.util.AclPermissionUtil; |
| import org.apache.kylin.rest.util.QueryRequestLimits; |
| import org.apache.kylin.rest.util.SQLResponseSignatureUtil; |
| import org.apache.kylin.rest.util.TableauInterceptor; |
| import org.apache.kylin.storage.hybrid.HybridInstance; |
| import org.apache.kylin.storage.hybrid.HybridManager; |
| import org.apache.kylin.storage.stream.StreamStorageQuery; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.beans.factory.annotation.Qualifier; |
| import org.springframework.cache.Cache; |
| import org.springframework.cache.CacheManager; |
| import org.springframework.security.core.GrantedAuthority; |
| import org.springframework.security.core.context.SecurityContextHolder; |
| import org.springframework.stereotype.Component; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Strings; |
| import com.google.common.collect.Lists; |
| |
| /** |
| * @author xduo |
| */ |
| @Component("queryService") |
| public class QueryService extends BasicService { |
| |
| public static final String QUERY_CACHE = "StorageCache"; |
| public static final String QUERY_STORE_PATH_PREFIX = "/query/"; |
| private static final Logger logger = LoggerFactory.getLogger(QueryService.class); |
| final BadQueryDetector badQueryDetector = new BadQueryDetector(); |
| final ResourceStore queryStore; |
| |
| @Autowired |
| protected CacheManager cacheManager; |
| |
| @Autowired |
| @Qualifier("cacheService") |
| private CacheService cacheService; |
| |
| @Autowired |
| @Qualifier("modelMgmtService") |
| private ModelService modelService; |
| |
| @Autowired |
| private AclEvaluate aclEvaluate; |
| |
| private GenericKeyedObjectPool<PreparedContextKey, PreparedContext> preparedContextPool; |
| |
| public QueryService() { |
| queryStore = ResourceStore.getStore(getConfig()); |
| preparedContextPool = createPreparedContextPool(); |
| badQueryDetector.start(); |
| } |
| |
| private GenericKeyedObjectPool<PreparedContextKey, PreparedContext> createPreparedContextPool() { |
| PreparedContextFactory factory = new PreparedContextFactory(); |
| KylinConfig kylinConfig = getConfig(); |
| GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig(); |
| config.setMaxTotalPerKey(kylinConfig.getQueryMaxCacheStatementInstancePerKey()); |
| config.setMaxTotal(kylinConfig.getQueryMaxCacheStatementNum()); |
| config.setBlockWhenExhausted(false); |
| config.setMinEvictableIdleTimeMillis(10 * 60 * 1000L); // cached statement will be evict if idle for 10 minutes |
| GenericKeyedObjectPool<PreparedContextKey, PreparedContext> pool = new GenericKeyedObjectPool<>(factory, |
| config); |
| return pool; |
| } |
| |
| protected static void close(ResultSet resultSet, Statement stat, Connection conn) { |
| OLAPContext.clearParameter(); |
| DBUtils.closeQuietly(resultSet); |
| DBUtils.closeQuietly(stat); |
| DBUtils.closeQuietly(conn); |
| } |
| |
| private static String getQueryKeyById(String creator) { |
| return QUERY_STORE_PATH_PREFIX + creator; |
| } |
| |
| @PostConstruct |
| public void init() throws IOException { |
| Preconditions.checkNotNull(cacheManager, "cacheManager is not injected yet"); |
| } |
| |
| public List<TableMeta> getMetadata(String project) throws SQLException { |
| return getMetadata(getCubeManager(), project); |
| } |
| |
| public SQLResponse query(SQLRequest sqlRequest, String queryId) throws Exception { |
| SQLResponse ret = null; |
| try { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user, queryId); |
| |
| ret = queryWithSqlMassage(sqlRequest); |
| return ret; |
| |
| } finally { |
| String badReason = (ret != null && ret.isPushDown()) ? BadQueryEntry.ADJ_PUSHDOWN : null; |
| badQueryDetector.queryEnd(Thread.currentThread(), badReason); |
| Thread.interrupted(); //reset if interrupted |
| } |
| } |
| |
| public SQLResponse update(SQLRequest sqlRequest) throws Exception { |
| // non select operations, only supported when enable pushdown |
| logger.debug("Query pushdown enabled, redirect the non-select query to pushdown engine."); |
| Connection conn = null; |
| try { |
| conn = QueryConnection.getConnection(sqlRequest.getProject()); |
| Pair<List<List<String>>, List<SelectedColumnMeta>> r = PushDownUtil.tryPushDownNonSelectQuery( |
| sqlRequest.getProject(), sqlRequest.getSql(), conn.getSchema(), BackdoorToggles.getPrepareOnly()); |
| |
| List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); |
| columnMetas.add(new SelectedColumnMeta(false, false, false, false, 1, false, Integer.MAX_VALUE, "c0", "c0", |
| null, null, null, Integer.MAX_VALUE, 128, 1, "char", false, false, false)); |
| |
| return buildSqlResponse(sqlRequest.getProject(), true, r.getFirst(), columnMetas); |
| |
| } catch (Exception e) { |
| logger.info("pushdown engine failed to finish current non-select query"); |
| throw e; |
| } finally { |
| close(null, null, conn); |
| } |
| } |
| |
| public void saveQuery(final String creator, final Query query) throws IOException { |
| List<Query> queries = getQueries(creator); |
| queries.add(query); |
| Query[] queryArray = new Query[queries.size()]; |
| QueryRecord record = new QueryRecord(queries.toArray(queryArray)); |
| queryStore.putResource(getQueryKeyById(creator), record, System.currentTimeMillis(), |
| QueryRecordSerializer.getInstance()); |
| return; |
| } |
| |
| public void removeQuery(final String creator, final String id) throws IOException { |
| List<Query> queries = getQueries(creator); |
| Iterator<Query> queryIter = queries.iterator(); |
| |
| boolean changed = false; |
| while (queryIter.hasNext()) { |
| Query temp = queryIter.next(); |
| if (temp.getId().equals(id)) { |
| queryIter.remove(); |
| changed = true; |
| break; |
| } |
| } |
| |
| if (!changed) { |
| return; |
| } |
| Query[] queryArray = new Query[queries.size()]; |
| QueryRecord record = new QueryRecord(queries.toArray(queryArray)); |
| queryStore.putResource(getQueryKeyById(creator), record, System.currentTimeMillis(), |
| QueryRecordSerializer.getInstance()); |
| return; |
| } |
| |
| public List<Query> getQueries(final String creator) throws IOException { |
| return getQueries(creator, null); |
| } |
| |
| public List<Query> getQueries(final String creator, final String project) throws IOException { |
| if (null == creator) { |
| return null; |
| } |
| List<Query> queries = new ArrayList<>(); |
| QueryRecord record = queryStore.getResource(getQueryKeyById(creator), QueryRecordSerializer.getInstance()); |
| if (record != null) { |
| for (Query query : record.getQueries()) { |
| if (project == null || query.getProject().equals(project)) |
| queries.add(query); |
| } |
| } |
| return queries; |
| } |
| |
| public void logQuery(final String queryId, final SQLRequest request, final SQLResponse response) { |
| final String user = aclEvaluate.getCurrentUserName(); |
| final List<String> realizationNames = new LinkedList<>(); |
| final Set<Long> cuboidIds = new HashSet<Long>(); |
| float duration = response.getDuration() / (float) 1000; |
| boolean storageCacheUsed = response.isStorageCacheUsed(); |
| boolean isPushDown = response.isPushDown(); |
| |
| if (!response.isHitExceptionCache() && null != OLAPContext.getThreadLocalContexts()) { |
| for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { |
| Cuboid cuboid = ctx.storageContext.getCuboid(); |
| if (cuboid != null) { |
| //Some queries do not involve cuboid, e.g. lookup table query |
| cuboidIds.add(cuboid.getId()); |
| } |
| |
| if (ctx.realization != null) { |
| realizationNames.add(ctx.realization.getCanonicalName()); |
| } |
| |
| } |
| } |
| |
| if (realizationNames.isEmpty()) { |
| if (!Strings.isNullOrEmpty(response.getCube())) { |
| realizationNames.addAll(Lists.newArrayList(StringUtil.splitByComma(response.getCube()))); |
| } |
| } |
| |
| int resultRowCount = 0; |
| if (!response.getIsException() && response.getResults() != null) { |
| resultRowCount = response.getResults().size(); |
| } |
| |
| String newLine = System.getProperty("line.separator"); |
| StringBuilder stringBuilder = new StringBuilder(); |
| stringBuilder.append(newLine); |
| stringBuilder.append("==========================[QUERY]===============================").append(newLine); |
| stringBuilder.append("Query Id: ").append(queryId).append(newLine); |
| stringBuilder.append("SQL: ").append(request.getSql()).append(newLine); |
| stringBuilder.append("User: ").append(user).append(newLine); |
| stringBuilder.append("Success: ").append((null == response.getExceptionMessage())).append(newLine); |
| stringBuilder.append("Duration: ").append(duration).append(newLine); |
| stringBuilder.append("Project: ").append(request.getProject()).append(newLine); |
| stringBuilder.append("Realization Names: ").append(realizationNames).append(newLine); |
| stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine); |
| stringBuilder.append("Total scan count: ").append(response.getTotalScanCount()).append(newLine); |
| stringBuilder.append("Total scan bytes: ").append(response.getTotalScanBytes()).append(newLine); |
| stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine); |
| stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine); |
| stringBuilder.append("Is Partial Result: ").append(response.isPartial()).append(newLine); |
| stringBuilder.append("Hit Exception Cache: ").append(response.isHitExceptionCache()).append(newLine); |
| stringBuilder.append("Storage cache used: ").append(storageCacheUsed).append(newLine); |
| stringBuilder.append("Is Query Push-Down: ").append(isPushDown).append(newLine); |
| stringBuilder.append("Is Prepare: ").append(BackdoorToggles.getPrepareOnly()).append(newLine); |
| stringBuilder.append("Trace URL: ").append(response.getTraceUrl()).append(newLine); |
| stringBuilder.append("Message: ").append(response.getExceptionMessage()).append(newLine); |
| stringBuilder.append("==========================[QUERY]===============================").append(newLine); |
| |
| logger.info(stringBuilder.toString()); |
| } |
| |
| public SQLResponse querySystemCube(String sql) { |
| SQLRequest sqlRequest = new SQLRequest(); |
| sqlRequest.setProject(MetricsManager.SYSTEM_PROJECT); |
| sqlRequest.setSql(sql); |
| return doQueryWithCache(sqlRequest, false); |
| } |
| |
| public SQLResponse doQueryWithCache(SQLRequest sqlRequest) { |
| long t = System.currentTimeMillis(); |
| aclEvaluate.checkProjectReadPermission(sqlRequest.getProject()); |
| logger.info("Check query permission in " + (System.currentTimeMillis() - t) + " ms."); |
| return doQueryWithCache(sqlRequest, false); |
| } |
| |
| public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect) { |
| Message msg = MsgPicker.getMsg(); |
| sqlRequest.setUsername(getUserName()); |
| |
| KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| String serverMode = kylinConfig.getServerMode(); |
| if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase(Locale.ROOT)) |
| || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase(Locale.ROOT)))) { |
| throw new BadRequestException(String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), serverMode)); |
| } |
| if (StringUtils.isBlank(sqlRequest.getProject())) { |
| throw new BadRequestException(msg.getEMPTY_PROJECT_NAME()); |
| } |
| // project not found |
| ProjectManager mgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()); |
| if (mgr.getProject(sqlRequest.getProject()) == null) { |
| throw new BadRequestException( |
| String.format(Locale.ROOT, msg.getPROJECT_NOT_FOUND(), sqlRequest.getProject())); |
| } |
| if (StringUtils.isBlank(sqlRequest.getSql())) { |
| throw new BadRequestException(msg.getNULL_EMPTY_SQL()); |
| } |
| |
| if (sqlRequest.getBackdoorToggles() != null) |
| BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); |
| |
| final QueryContext queryContext = QueryContextFacade.current(); |
| |
| try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { |
| SQLResponse sqlResponse = null; |
| String sql = sqlRequest.getSql(); |
| String project = sqlRequest.getProject(); |
| boolean isQueryCacheEnabled = isQueryCacheEnabled(kylinConfig); |
| logger.info("Using project: " + project); |
| logger.info("The original query: " + sql); |
| |
| sql = QueryUtil.removeCommentInSql(sql); |
| |
| Pair<Boolean, String> result = TempStatementUtil.handleTempStatement(sql, kylinConfig); |
| boolean isCreateTempStatement = result.getFirst(); |
| sql = result.getSecond(); |
| sqlRequest.setSql(sql); |
| |
| // try some cheap executions |
| if (sqlResponse == null && isQueryInspect) { |
| sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql()); |
| } |
| |
| if (sqlResponse == null && isCreateTempStatement) { |
| sqlResponse = new SQLResponse(null, null, 0, false, null); |
| } |
| |
| if (sqlResponse == null && isQueryCacheEnabled) { |
| sqlResponse = searchQueryInCache(sqlRequest); |
| } |
| |
| // real execution if required |
| if (sqlResponse == null) { |
| try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) { |
| sqlResponse = queryAndUpdateCache(sqlRequest, isQueryCacheEnabled); |
| } |
| } |
| |
| sqlResponse.setDuration(queryContext.getAccumulatedMillis()); |
| logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse); |
| try { |
| recordMetric(sqlRequest, sqlResponse); |
| } catch (Throwable th) { |
| logger.warn("Write metric error.", th); |
| } |
| if (sqlResponse.getIsException()) |
| throw new InternalErrorException(sqlResponse.getExceptionMessage()); |
| // add hint about the missing measure within the query |
| List<MissingMeasureSegment> missingMeasureSegmentList = queryContext.getMissingMeasureSegments(); |
| List<String> missMeasureMsgs = missingMeasureSegmentList.stream() |
| .map(mms -> mms.getMissingMsg()) |
| .collect(Collectors.toList()); |
| sqlResponse.setMissMeasureMessage(missMeasureMsgs); |
| return sqlResponse; |
| |
| } finally { |
| BackdoorToggles.cleanToggles(); |
| QueryContextFacade.resetCurrent(); |
| } |
| } |
| |
| private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCacheEnabled) { |
| KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| Message msg = MsgPicker.getMsg(); |
| final QueryContext queryContext = QueryContextFacade.current(); |
| |
| boolean isDummpyResponseEnabled = queryCacheEnabled && kylinConfig.isLazyQueryEnabled(); |
| SQLResponse sqlResponse = null; |
| try { |
| // Add dummy response which will be updated or evicted when query finishes |
| if (isDummpyResponseEnabled) { |
| SQLResponse dummyResponse = new SQLResponse(); |
| dummyResponse.setLazyQueryStartTime(System.currentTimeMillis()); |
| cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), dummyResponse); |
| } |
| |
| final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql()); |
| if (isSelect) { |
| sqlResponse = query(sqlRequest, queryContext.getQueryId()); |
| } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) { |
| sqlResponse = update(sqlRequest); |
| } else { |
| logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); |
| throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); |
| } |
| |
| long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); |
| long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); |
| long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); |
| sqlResponse.setDuration(queryContext.getAccumulatedMillis()); |
| logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // |
| String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), |
| String.valueOf(sqlResponse.getTotalScanCount())); |
| |
| boolean realtimeQuery = false; |
| Collection<OLAPContext> olapContexts = OLAPContext.getThreadLocalContexts(); |
| if (olapContexts != null) { |
| for (OLAPContext ctx : olapContexts) { |
| try { |
| if (ctx.storageContext.getStorageQuery() instanceof StreamStorageQuery) { |
| realtimeQuery = true; |
| logger.debug("Shutdown query cache for realtime."); |
| } |
| } catch (Exception e) { |
| logger.error("Error", e); |
| } |
| |
| } |
| } |
| |
| if (checkCondition(queryCacheEnabled, "query cache is disabled") // |
| && checkCondition(!Strings.isNullOrEmpty(sqlResponse.getCube()), |
| "query does not hit cube nor hybrid") // |
| && checkCondition(!sqlResponse.getIsException(), "query has exception") // |
| && checkCondition( |
| !(sqlResponse.isPushDown() |
| && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), |
| "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // |
| && checkCondition( |
| cacheManager.getCache(QUERY_CACHE) instanceof MemcachedCacheManager.MemCachedCacheAdaptor |
| || sqlResponse.getDuration() > durationThreshold |
| || sqlResponse.getTotalScanCount() > scanCountThreshold |
| || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // |
| "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", |
| sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), |
| scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) |
| && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), |
| "query response is too large: {} ({})", sqlResponse.getResults().size(), |
| kylinConfig.getLargeQueryThreshold())) { |
| |
| if (!realtimeQuery) { |
| cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse); |
| } |
| } else if (isDummpyResponseEnabled) { |
| cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey()); |
| } |
| |
| } catch (Throwable e) { // calcite may throw AssertError |
| queryContext.stop(e); |
| |
| logger.error("Exception while executing query", e); |
| String errMsg = makeErrorMsgUserFriendly(e); |
| |
| sqlResponse = buildSqlResponse(sqlRequest.getProject(), false, null, null, true, errMsg); |
| sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); |
| |
| if (queryCacheEnabled && e.getCause() != null |
| && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { |
| Cache exceptionCache = cacheManager.getCache(QUERY_CACHE); |
| exceptionCache.put(sqlRequest.getCacheKey(), sqlResponse); |
| } |
| } |
| return sqlResponse; |
| } |
| |
| private boolean isQueryCacheEnabled(KylinConfig kylinConfig) { |
| return checkCondition(kylinConfig.isQueryCacheEnabled(), "query cache disabled in KylinConfig") && // |
| checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); |
| } |
| |
| protected void recordMetric(SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException { |
| QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse); |
| QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse); |
| } |
| |
| private String getUserName() { |
| String username = SecurityContextHolder.getContext().getAuthentication().getName(); |
| if (StringUtils.isEmpty(username)) { |
| username = ""; |
| } |
| return username; |
| } |
| |
| public SQLResponse searchQueryInCache(SQLRequest sqlRequest) { |
| Cache cache = cacheManager.getCache(QUERY_CACHE); |
| Cache.ValueWrapper wrapper = cache.get(sqlRequest.getCacheKey()); |
| if (wrapper == null) { |
| return null; |
| } |
| SQLResponse response = (SQLResponse) wrapper.get(); |
| if (response == null) { |
| return null; |
| } |
| |
| // Check whether duplicate query exists |
| while (response.isRunning()) { |
| // Wait at most one minute |
| if (System.currentTimeMillis() - response.getLazyQueryStartTime() >= getConfig() |
| .getLazyQueryWaitingTimeoutMilliSeconds()) { |
| cache.evict(sqlRequest.getCacheKey()); |
| return null; |
| } |
| logger.info("Duplicated SQL request is running, waiting..."); |
| try { |
| Thread.sleep(100L); |
| } catch (InterruptedException e) { |
| } |
| wrapper = cache.get(sqlRequest.getCacheKey()); |
| if (wrapper == null) { |
| return null; |
| } |
| response = (SQLResponse) wrapper.get(); |
| if (response == null) { |
| return null; |
| } |
| } |
| logger.info("The sqlResponse is found in QUERY_CACHE"); |
| if (getConfig().isQueryCacheSignatureEnabled() |
| && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) { |
| logger.info("The sql response signature is changed. Remove it from QUERY_CACHE."); |
| cache.evict(sqlRequest.getCacheKey()); |
| return null; |
| } |
| if (response.getIsException()) { |
| response.setHitExceptionCache(true); |
| } else { |
| response.setStorageCacheUsed(true); |
| } |
| return response; |
| } |
| |
| private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception { |
| Connection conn = null; |
| boolean isPrepareRequest = isPrepareStatementWithParams(sqlRequest); |
| boolean borrowPrepareContext = false; |
| PreparedContextKey preparedContextKey = null; |
| PreparedContext preparedContext = null; |
| |
| try { |
| conn = QueryConnection.getConnection(sqlRequest.getProject()); |
| String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); |
| QueryContext context = QueryContextFacade.current(); |
| context.setUsername(userInfo); |
| context.setGroups(AclPermissionUtil.getCurrentUserGroups()); |
| final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() |
| .getAuthentication().getAuthorities(); |
| for (GrantedAuthority grantedAuthority : grantedAuthorities) { |
| userInfo += ","; |
| userInfo += grantedAuthority.getAuthority(); |
| } |
| |
| SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(sqlRequest.getSql()); |
| if (null != fakeResponse) { |
| logger.debug("Return fake response, is exception? " + fakeResponse.getIsException()); |
| return fakeResponse; |
| } |
| String correctedSql = QueryUtil.massageSql(sqlRequest.getSql(), sqlRequest.getProject(), |
| sqlRequest.getLimit(), sqlRequest.getOffset(), conn.getSchema(), Constant.FakeCatalogName); |
| if (!correctedSql.equals(sqlRequest.getSql())) { |
| logger.info("The corrected query: " + correctedSql); |
| |
| //CAUTION: should not change sqlRequest content! |
| //sqlRequest.setSql(correctedSql); |
| } |
| |
| // add extra parameters into olap context, like acceptPartial |
| Map<String, String> parameters = new HashMap<String, String>(); |
| parameters.put(OLAPContext.PRM_USER_AUTHEN_INFO, userInfo); |
| parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(sqlRequest.isAcceptPartial())); |
| OLAPContext.setParameters(parameters); |
| // force clear the query context before a new query |
| OLAPContext.clearThreadLocalContexts(); |
| |
| // special case for prepare query. |
| List<List<String>> results = Lists.newArrayList(); |
| List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); |
| if (BackdoorToggles.getPrepareOnly()) { |
| return getPrepareOnlySqlResponse(sqlRequest.getProject(), correctedSql, conn, false, results, |
| columnMetas); |
| } |
| if (!isPrepareRequest) { |
| return executeRequest(correctedSql, sqlRequest, conn); |
| } else { |
| long prjLastModifyTime = getProjectManager().getProject(sqlRequest.getProject()).getLastModified(); |
| preparedContextKey = new PreparedContextKey(sqlRequest.getProject(), prjLastModifyTime, correctedSql); |
| PrepareSqlRequest prepareSqlRequest = (PrepareSqlRequest) sqlRequest; |
| if (getConfig().isQueryPreparedStatementCacheEnable() && prepareSqlRequest.isEnableStatementCache()) { |
| try { |
| preparedContext = preparedContextPool.borrowObject(preparedContextKey); |
| borrowPrepareContext = true; |
| } catch (NoSuchElementException noElementException) { |
| borrowPrepareContext = false; |
| preparedContext = createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql()); |
| } |
| for (OLAPContext olapContext : preparedContext.olapContexts) { |
| resetRealizationInContext(olapContext); |
| OLAPContext.registerContext(olapContext); |
| } |
| } else { |
| preparedContext = createPreparedContext(sqlRequest.getProject(), sqlRequest.getSql()); |
| } |
| return executePrepareRequest(correctedSql, prepareSqlRequest, preparedContext); |
| } |
| |
| } finally { |
| DBUtils.closeQuietly(conn); |
| if (preparedContext != null) { |
| if (borrowPrepareContext) { |
| preparedContextPool.returnObject(preparedContextKey, preparedContext); |
| } else { |
| preparedContext.close(); |
| } |
| } |
| } |
| } |
| |
| private void resetRealizationInContext(OLAPContext olapContext) { |
| IRealization realization = olapContext.realization; |
| if (realization == null) { |
| return; |
| } |
| KylinConfig config = getConfig(); |
| HybridInstance hybridInstance = HybridManager.getInstance(config).getHybridInstance(realization.getName()); |
| if (hybridInstance != null) { |
| olapContext.realization = hybridInstance; |
| return; |
| } |
| CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(realization.getName()); |
| if (cubeInstance != null) { |
| olapContext.realization = cubeInstance; |
| } |
| } |
| |
| protected List<TableMeta> getMetadata(CubeManager cubeMgr, String project) throws SQLException { |
| |
| Connection conn = null; |
| ResultSet columnMeta = null; |
| List<TableMeta> tableMetas = null; |
| if (StringUtils.isBlank(project)) { |
| return Collections.emptyList(); |
| } |
| ResultSet JDBCTableMeta = null; |
| try { |
| conn = QueryConnection.getConnection(project); |
| DatabaseMetaData metaData = conn.getMetaData(); |
| |
| JDBCTableMeta = metaData.getTables(null, null, null, null); |
| |
| tableMetas = new LinkedList<TableMeta>(); |
| Map<String, TableMeta> tableMap = new HashMap<String, TableMeta>(); |
| while (JDBCTableMeta.next()) { |
| String catalogName = JDBCTableMeta.getString(1); |
| String schemaName = JDBCTableMeta.getString(2); |
| |
| // Not every JDBC data provider offers full 10 columns, e.g., PostgreSQL has only 5 |
| TableMeta tblMeta = new TableMeta(catalogName == null ? Constant.FakeCatalogName : catalogName, |
| schemaName == null ? Constant.FakeSchemaName : schemaName, JDBCTableMeta.getString(3), |
| JDBCTableMeta.getString(4), JDBCTableMeta.getString(5), null, null, null, null, null); |
| |
| if (!"metadata".equalsIgnoreCase(tblMeta.getTABLE_SCHEM())) { |
| tableMetas.add(tblMeta); |
| tableMap.put(tblMeta.getTABLE_SCHEM() + "#" + tblMeta.getTABLE_NAME(), tblMeta); |
| } |
| } |
| |
| columnMeta = metaData.getColumns(null, null, null, null); |
| |
| while (columnMeta.next()) { |
| String catalogName = columnMeta.getString(1); |
| String schemaName = columnMeta.getString(2); |
| |
| // kylin(optiq) is not strictly following JDBC specification |
| ColumnMeta colmnMeta = new ColumnMeta(catalogName == null ? Constant.FakeCatalogName : catalogName, |
| schemaName == null ? Constant.FakeSchemaName : schemaName, columnMeta.getString(3), |
| columnMeta.getString(4), columnMeta.getInt(5), columnMeta.getString(6), columnMeta.getInt(7), |
| getInt(columnMeta.getString(8)), columnMeta.getInt(9), columnMeta.getInt(10), |
| columnMeta.getInt(11), columnMeta.getString(12), columnMeta.getString(13), |
| getInt(columnMeta.getString(14)), getInt(columnMeta.getString(15)), columnMeta.getInt(16), |
| columnMeta.getInt(17), columnMeta.getString(18), columnMeta.getString(19), |
| columnMeta.getString(20), columnMeta.getString(21), getShort(columnMeta.getString(22)), |
| columnMeta.getString(23)); |
| |
| if (!"metadata".equalsIgnoreCase(colmnMeta.getTABLE_SCHEM()) |
| && !colmnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT).startsWith("_KY_")) { |
| tableMap.get(colmnMeta.getTABLE_SCHEM() + "#" + colmnMeta.getTABLE_NAME()).addColumn(colmnMeta); |
| } |
| } |
| } finally { |
| close(columnMeta, null, conn); |
| if (JDBCTableMeta != null) { |
| JDBCTableMeta.close(); |
| } |
| } |
| |
| return tableMetas; |
| } |
| |
| public List<TableMetaWithType> getMetadataV2(String project) throws SQLException, IOException { |
| return getMetadataV2(getCubeManager(), project); |
| } |
| |
| @SuppressWarnings("checkstyle:methodlength") |
| protected List<TableMetaWithType> getMetadataV2(CubeManager cubeMgr, String project) |
| throws SQLException, IOException { |
| //Message msg = MsgPicker.getMsg(); |
| |
| Connection conn = null; |
| ResultSet columnMeta = null; |
| List<TableMetaWithType> tableMetas = null; |
| Map<String, TableMetaWithType> tableMap = null; |
| Map<String, ColumnMetaWithType> columnMap = null; |
| if (StringUtils.isBlank(project)) { |
| return Collections.emptyList(); |
| } |
| ResultSet JDBCTableMeta = null; |
| try { |
| conn = QueryConnection.getConnection(project); |
| DatabaseMetaData metaData = conn.getMetaData(); |
| |
| JDBCTableMeta = metaData.getTables(null, null, null, null); |
| |
| tableMetas = new LinkedList<TableMetaWithType>(); |
| tableMap = new HashMap<String, TableMetaWithType>(); |
| columnMap = new HashMap<String, ColumnMetaWithType>(); |
| while (JDBCTableMeta.next()) { |
| String catalogName = JDBCTableMeta.getString(1); |
| String schemaName = JDBCTableMeta.getString(2); |
| |
| // Not every JDBC data provider offers full 10 columns, e.g., PostgreSQL has only 5 |
| TableMetaWithType tblMeta = new TableMetaWithType( |
| catalogName == null ? Constant.FakeCatalogName : catalogName, |
| schemaName == null ? Constant.FakeSchemaName : schemaName, JDBCTableMeta.getString(3), |
| JDBCTableMeta.getString(4), JDBCTableMeta.getString(5), null, null, null, null, null); |
| |
| if (!"metadata".equalsIgnoreCase(tblMeta.getTABLE_SCHEM())) { |
| tableMetas.add(tblMeta); |
| tableMap.put(tblMeta.getTABLE_SCHEM() + "#" + tblMeta.getTABLE_NAME(), tblMeta); |
| } |
| } |
| |
| columnMeta = metaData.getColumns(null, null, null, null); |
| |
| while (columnMeta.next()) { |
| String catalogName = columnMeta.getString(1); |
| String schemaName = columnMeta.getString(2); |
| |
| // kylin(optiq) is not strictly following JDBC specification |
| ColumnMetaWithType colmnMeta = new ColumnMetaWithType( |
| catalogName == null ? Constant.FakeCatalogName : catalogName, |
| schemaName == null ? Constant.FakeSchemaName : schemaName, columnMeta.getString(3), |
| columnMeta.getString(4), columnMeta.getInt(5), columnMeta.getString(6), columnMeta.getInt(7), |
| getInt(columnMeta.getString(8)), columnMeta.getInt(9), columnMeta.getInt(10), |
| columnMeta.getInt(11), columnMeta.getString(12), columnMeta.getString(13), |
| getInt(columnMeta.getString(14)), getInt(columnMeta.getString(15)), columnMeta.getInt(16), |
| columnMeta.getInt(17), columnMeta.getString(18), columnMeta.getString(19), |
| columnMeta.getString(20), columnMeta.getString(21), getShort(columnMeta.getString(22)), |
| columnMeta.getString(23)); |
| |
| if (!"metadata".equalsIgnoreCase(colmnMeta.getTABLE_SCHEM()) |
| && !colmnMeta.getCOLUMN_NAME().toUpperCase(Locale.ROOT).startsWith("_KY_")) { |
| tableMap.get(colmnMeta.getTABLE_SCHEM() + "#" + colmnMeta.getTABLE_NAME()).addColumn(colmnMeta); |
| columnMap.put(colmnMeta.getTABLE_SCHEM() + "#" + colmnMeta.getTABLE_NAME() + "#" |
| + colmnMeta.getCOLUMN_NAME(), colmnMeta); |
| } |
| } |
| |
| } finally { |
| close(columnMeta, null, conn); |
| if (JDBCTableMeta != null) { |
| JDBCTableMeta.close(); |
| } |
| } |
| |
| ProjectInstance projectInstance = getProjectManager().getProject(project); |
| for (String modelName : projectInstance.getModels()) { |
| |
| DataModelDesc dataModelDesc = modelService.getModel(modelName, project); |
| if (dataModelDesc != null && !dataModelDesc.isDraft()) { |
| |
| // update table type: FACT |
| for (TableRef factTable : dataModelDesc.getFactTables()) { |
| String factTableName = factTable.getTableIdentity().replace('.', '#'); |
| if (tableMap.containsKey(factTableName)) { |
| tableMap.get(factTableName).getTYPE().add(TableMetaWithType.tableTypeEnum.FACT); |
| } else { |
| // should be used after JDBC exposes all tables and columns |
| // throw new BadRequestException(msg.getTABLE_META_INCONSISTENT()); |
| } |
| } |
| |
| // update table type: LOOKUP |
| for (TableRef lookupTable : dataModelDesc.getLookupTables()) { |
| String lookupTableName = lookupTable.getTableIdentity().replace('.', '#'); |
| if (tableMap.containsKey(lookupTableName)) { |
| tableMap.get(lookupTableName).getTYPE().add(TableMetaWithType.tableTypeEnum.LOOKUP); |
| } else { |
| // throw new BadRequestException(msg.getTABLE_META_INCONSISTENT()); |
| } |
| } |
| |
| // update column type: PK and FK |
| for (JoinTableDesc joinTableDesc : dataModelDesc.getJoinTables()) { |
| JoinDesc joinDesc = joinTableDesc.getJoin(); |
| for (String pk : joinDesc.getPrimaryKey()) { |
| String columnIdentity = (dataModelDesc.findTable(pk.substring(0, pk.indexOf("."))) |
| .getTableIdentity() + pk.substring(pk.indexOf("."))).replace('.', '#'); |
| if (columnMap.containsKey(columnIdentity)) { |
| columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.PK); |
| } else { |
| // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); |
| } |
| } |
| |
| for (String fk : joinDesc.getForeignKey()) { |
| String columnIdentity = (dataModelDesc.findTable(fk.substring(0, fk.indexOf("."))) |
| .getTableIdentity() + fk.substring(fk.indexOf("."))).replace('.', '#'); |
| if (columnMap.containsKey(columnIdentity)) { |
| columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.FK); |
| } else { |
| // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); |
| } |
| } |
| } |
| |
| // update column type: DIMENSION AND MEASURE |
| List<ModelDimensionDesc> dimensions = dataModelDesc.getDimensions(); |
| for (ModelDimensionDesc dimension : dimensions) { |
| for (String column : dimension.getColumns()) { |
| String columnIdentity = (dataModelDesc.findTable(dimension.getTable()).getTableIdentity() + "." |
| + column).replace('.', '#'); |
| if (columnMap.containsKey(columnIdentity)) { |
| columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.DIMENSION); |
| } else { |
| // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); |
| } |
| |
| } |
| } |
| |
| String[] measures = dataModelDesc.getMetrics(); |
| for (String measure : measures) { |
| String columnIdentity = (dataModelDesc.findTable(measure.substring(0, measure.indexOf("."))) |
| .getTableIdentity() + measure.substring(measure.indexOf("."))).replace('.', '#'); |
| if (columnMap.containsKey(columnIdentity)) { |
| columnMap.get(columnIdentity).getTYPE().add(ColumnMetaWithType.columnTypeEnum.MEASURE); |
| } else { |
| // throw new BadRequestException(msg.getCOLUMN_META_INCONSISTENT()); |
| } |
| } |
| } |
| } |
| |
| return tableMetas; |
| } |
| |
| protected void processStatementAttr(Statement s, SQLRequest sqlRequest) throws SQLException { |
| Integer statementMaxRows = BackdoorToggles.getStatementMaxRows(); |
| if (statementMaxRows != null) { |
| logger.info("Setting current statement's max rows to {}", statementMaxRows); |
| s.setMaxRows(statementMaxRows); |
| } |
| } |
| |
| /** |
| * @param correctedSql |
| * @param sqlRequest |
| * @return |
| * @throws Exception |
| */ |
| private SQLResponse executeRequest(String correctedSql, SQLRequest sqlRequest, Connection conn) throws Exception { |
| Statement stat = null; |
| ResultSet resultSet = null; |
| boolean isPushDown = false; |
| |
| Pair<List<List<String>>, List<SelectedColumnMeta>> r = null; |
| try { |
| stat = conn.createStatement(); |
| processStatementAttr(stat, sqlRequest); |
| resultSet = stat.executeQuery(correctedSql); |
| |
| r = createResponseFromResultSet(resultSet); |
| |
| } catch (SQLException sqlException) { |
| r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException); |
| if (r == null) |
| throw sqlException; |
| |
| isPushDown = true; |
| } finally { |
| close(resultSet, stat, null); //conn is passed in, not my duty to close |
| } |
| |
| return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond()); |
| } |
| |
| private SQLResponse executePrepareRequest(String correctedSql, PrepareSqlRequest sqlRequest, |
| PreparedContext preparedContext) throws Exception { |
| ResultSet resultSet = null; |
| boolean isPushDown = false; |
| |
| Pair<List<List<String>>, List<SelectedColumnMeta>> r = null; |
| Connection conn = preparedContext.conn; |
| try { |
| PreparedStatement preparedStatement = preparedContext.preparedStatement; |
| processStatementAttr(preparedStatement, sqlRequest); |
| for (int i = 0; i < sqlRequest.getParams().length; i++) { |
| setParam(preparedStatement, i + 1, (sqlRequest.getParams()[i])); |
| } |
| resultSet = preparedStatement.executeQuery(); |
| r = createResponseFromResultSet(resultSet); |
| } catch (SQLException sqlException) { |
| r = pushDownQuery(sqlRequest, correctedSql, conn, sqlException); |
| if (r == null) |
| throw sqlException; |
| |
| isPushDown = true; |
| } finally { |
| DBUtils.closeQuietly(resultSet); |
| } |
| |
| return buildSqlResponse(sqlRequest.getProject(), isPushDown, r.getFirst(), r.getSecond()); |
| } |
| |
| private Pair<List<List<String>>, List<SelectedColumnMeta>> pushDownQuery(SQLRequest sqlRequest, String correctedSql, |
| Connection conn, SQLException sqlException) throws Exception { |
| try { |
| return PushDownUtil.tryPushDownSelectQuery(sqlRequest.getProject(), correctedSql, conn.getSchema(), |
| sqlException, BackdoorToggles.getPrepareOnly()); |
| } catch (Exception e2) { |
| logger.error("pushdown engine failed current query too", e2); |
| //exception in pushdown, throw it instead of exception in calcite |
| throw e2; |
| } |
| } |
| |
| private Pair<List<List<String>>, List<SelectedColumnMeta>> createResponseFromResultSet(ResultSet resultSet) |
| throws Exception { |
| List<List<String>> results = Lists.newArrayList(); |
| List<SelectedColumnMeta> columnMetas = Lists.newArrayList(); |
| |
| ResultSetMetaData metaData = resultSet.getMetaData(); |
| int columnCount = metaData.getColumnCount(); |
| |
| // Fill in selected column meta |
| for (int i = 1; i <= columnCount; ++i) { |
| columnMetas.add(new SelectedColumnMeta(metaData.isAutoIncrement(i), metaData.isCaseSensitive(i), |
| metaData.isSearchable(i), metaData.isCurrency(i), metaData.isNullable(i), metaData.isSigned(i), |
| metaData.getColumnDisplaySize(i), metaData.getColumnLabel(i), metaData.getColumnName(i), |
| metaData.getSchemaName(i), metaData.getCatalogName(i), metaData.getTableName(i), |
| metaData.getPrecision(i), metaData.getScale(i), metaData.getColumnType(i), |
| metaData.getColumnTypeName(i), metaData.isReadOnly(i), metaData.isWritable(i), |
| metaData.isDefinitelyWritable(i))); |
| } |
| |
| // fill in results |
| while (resultSet.next()) { |
| List<String> oneRow = Lists.newArrayListWithCapacity(columnCount); |
| for (int i = 0; i < columnCount; i++) { |
| oneRow.add((resultSet.getString(i + 1))); |
| } |
| |
| results.add(oneRow); |
| } |
| |
| return new Pair<>(results, columnMetas); |
| } |
| |
| protected String makeErrorMsgUserFriendly(Throwable e) { |
| return QueryUtil.makeErrorMsgUserFriendly(e); |
| } |
| |
| private SQLResponse getPrepareOnlySqlResponse(String projectName, String correctedSql, Connection conn, |
| Boolean isPushDown, List<List<String>> results, List<SelectedColumnMeta> columnMetas) throws SQLException { |
| |
| CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true); |
| |
| PreparedStatement preparedStatement = null; |
| try { |
| preparedStatement = conn.prepareStatement(correctedSql); |
| throw new IllegalStateException("Should have thrown OnlyPrepareEarlyAbortException"); |
| } catch (Exception e) { |
| Throwable rootCause = ExceptionUtils.getRootCause(e); |
| if (rootCause != null && rootCause instanceof OnlyPrepareEarlyAbortException) { |
| OnlyPrepareEarlyAbortException abortException = (OnlyPrepareEarlyAbortException) rootCause; |
| CalcitePrepare.Context context = abortException.getContext(); |
| CalcitePrepare.ParseResult preparedResult = abortException.getPreparedResult(); |
| List<RelDataTypeField> fieldList = preparedResult.rowType.getFieldList(); |
| |
| CalciteConnectionConfig config = context.config(); |
| |
| // Fill in selected column meta |
| for (int i = 0; i < fieldList.size(); ++i) { |
| |
| RelDataTypeField field = fieldList.get(i); |
| String columnName = field.getKey(); |
| |
| if (columnName.startsWith("_KY_")) { |
| continue; |
| } |
| BasicSqlType basicSqlType = (BasicSqlType) field.getValue(); |
| |
| columnMetas.add(new SelectedColumnMeta(false, config.caseSensitive(), false, false, |
| basicSqlType.isNullable() ? 1 : 0, true, basicSqlType.getPrecision(), columnName, |
| columnName, null, null, null, basicSqlType.getPrecision(), |
| basicSqlType.getScale() < 0 ? 0 : basicSqlType.getScale(), |
| basicSqlType.getSqlTypeName().getJdbcOrdinal(), basicSqlType.getSqlTypeName().getName(), |
| true, false, false)); |
| } |
| |
| } else { |
| throw e; |
| } |
| } finally { |
| CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(false); |
| DBUtils.closeQuietly(preparedStatement); |
| } |
| |
| return buildSqlResponse(projectName, isPushDown, results, columnMetas); |
| } |
| |
| private boolean isPrepareStatementWithParams(SQLRequest sqlRequest) { |
| if (sqlRequest instanceof PrepareSqlRequest && ((PrepareSqlRequest) sqlRequest).getParams() != null |
| && ((PrepareSqlRequest) sqlRequest).getParams().length > 0) |
| return true; |
| return false; |
| } |
| |
| private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results, |
| List<SelectedColumnMeta> columnMetas) { |
| return buildSqlResponse(projectName, isPushDown, results, columnMetas, false, null); |
| } |
| |
| private SQLResponse buildSqlResponse(String projectName, Boolean isPushDown, List<List<String>> results, |
| List<SelectedColumnMeta> columnMetas, boolean isException, String exceptionMessage) { |
| |
| boolean isPartialResult = false; |
| |
| List<String> realizations = Lists.newLinkedList(); |
| StringBuilder cubeSb = new StringBuilder(); |
| StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: "); |
| QueryContext queryContext = QueryContextFacade.current(); |
| if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' |
| for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { |
| String realizationName = "NULL"; |
| int realizationType = -1; |
| if (ctx.realization != null) { |
| isPartialResult |= ctx.storageContext.isPartialResultReturned(); |
| if (cubeSb.length() > 0) { |
| cubeSb.append(","); |
| } |
| cubeSb.append(ctx.realization.getCanonicalName()); |
| logSb.append(ctx.storageContext.getProcessedRowCount()).append(" "); |
| |
| realizationName = ctx.realization.getName(); |
| realizationType = ctx.realization.getStorageType(); |
| |
| realizations.add(realizationName); |
| } |
| queryContext.setContextRealization(ctx.id, realizationName, realizationType); |
| } |
| } |
| logger.info(logSb.toString()); |
| |
| SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException, |
| exceptionMessage, isPartialResult, isPushDown); |
| response.setTotalScanCount(queryContext.getScannedRows()); |
| response.setTotalScanBytes(queryContext.getScannedBytes()); |
| response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); |
| if (getConfig().isQueryCacheSignatureEnabled()) { |
| response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName)); |
| } |
| return response; |
| } |
| |
| /** |
| * @param preparedState |
| * @param param |
| * @throws SQLException |
| */ |
| private void setParam(PreparedStatement preparedState, int index, PrepareSqlRequest.StateParam param) |
| throws SQLException { |
| boolean isNull = (null == param.getValue()); |
| |
| Class<?> clazz; |
| try { |
| clazz = Class.forName(param.getClassName()); |
| } catch (ClassNotFoundException e) { |
| throw new InternalErrorException(e); |
| } |
| |
| Rep rep = Rep.of(clazz); |
| |
| switch (rep) { |
| case PRIMITIVE_CHAR: |
| case CHARACTER: |
| case STRING: |
| preparedState.setString(index, isNull ? null : String.valueOf(param.getValue())); |
| break; |
| case PRIMITIVE_INT: |
| case INTEGER: |
| preparedState.setInt(index, isNull ? 0 : Integer.parseInt(param.getValue())); |
| break; |
| case PRIMITIVE_SHORT: |
| case SHORT: |
| preparedState.setShort(index, isNull ? 0 : Short.parseShort(param.getValue())); |
| break; |
| case PRIMITIVE_LONG: |
| case LONG: |
| preparedState.setLong(index, isNull ? 0 : Long.parseLong(param.getValue())); |
| break; |
| case PRIMITIVE_FLOAT: |
| case FLOAT: |
| preparedState.setFloat(index, isNull ? 0 : Float.parseFloat(param.getValue())); |
| break; |
| case PRIMITIVE_DOUBLE: |
| case DOUBLE: |
| preparedState.setDouble(index, isNull ? 0 : Double.parseDouble(param.getValue())); |
| break; |
| case PRIMITIVE_BOOLEAN: |
| case BOOLEAN: |
| preparedState.setBoolean(index, !isNull && Boolean.parseBoolean(param.getValue())); |
| break; |
| case PRIMITIVE_BYTE: |
| case BYTE: |
| preparedState.setByte(index, isNull ? 0 : Byte.parseByte(param.getValue())); |
| break; |
| case JAVA_UTIL_DATE: |
| case JAVA_SQL_DATE: |
| preparedState.setDate(index, isNull ? null : java.sql.Date.valueOf(param.getValue())); |
| break; |
| case JAVA_SQL_TIME: |
| preparedState.setTime(index, isNull ? null : Time.valueOf(param.getValue())); |
| break; |
| case JAVA_SQL_TIMESTAMP: |
| preparedState.setTimestamp(index, isNull ? null : Timestamp.valueOf(param.getValue())); |
| break; |
| default: |
| preparedState.setObject(index, isNull ? null : param.getValue()); |
| } |
| } |
| |
| protected int getInt(String content) { |
| try { |
| return Integer.parseInt(content); |
| } catch (Exception e) { |
| return -1; |
| } |
| } |
| |
| protected short getShort(String content) { |
| try { |
| return Short.parseShort(content); |
| } catch (Exception e) { |
| return -1; |
| } |
| } |
| |
| public void setCacheManager(CacheManager cacheManager) { |
| this.cacheManager = cacheManager; |
| } |
| |
| private static PreparedContext createPreparedContext(String project, String sql) throws Exception { |
| Connection conn = QueryConnection.getConnection(project); |
| PreparedStatement preparedStatement = conn.prepareStatement(sql); |
| Collection<OLAPContext> olapContexts = OLAPContext.getThreadLocalContexts(); |
| return new PreparedContext(conn, preparedStatement, olapContexts); |
| } |
| |
| private static class QueryRecordSerializer implements Serializer<QueryRecord> { |
| |
| private static final QueryRecordSerializer serializer = new QueryRecordSerializer(); |
| |
| QueryRecordSerializer() { |
| |
| } |
| |
| public static QueryRecordSerializer getInstance() { |
| return serializer; |
| } |
| |
| @Override |
| public void serialize(QueryRecord record, DataOutputStream out) throws IOException { |
| String jsonStr = JsonUtil.writeValueAsString(record); |
| out.writeUTF(jsonStr); |
| } |
| |
| @Override |
| public QueryRecord deserialize(DataInputStream in) throws IOException { |
| String jsonStr = in.readUTF(); |
| return JsonUtil.readValue(jsonStr, QueryRecord.class); |
| } |
| } |
| |
| private static class PreparedContextFactory |
| extends BaseKeyedPooledObjectFactory<PreparedContextKey, PreparedContext> { |
| |
| @Override |
| public PreparedContext create(PreparedContextKey key) throws Exception { |
| return createPreparedContext(key.project, key.sql); |
| } |
| |
| @Override |
| public PooledObject<PreparedContext> wrap(PreparedContext value) { |
| return new DefaultPooledObject<>(value); |
| } |
| |
| @Override |
| public void destroyObject(final PreparedContextKey key, final PooledObject<PreparedContext> p) { |
| PreparedContext cachedContext = p.getObject(); |
| cachedContext.close(); |
| } |
| |
| @Override |
| public boolean validateObject(final PreparedContextKey key, final PooledObject<PreparedContext> p) { |
| return true; |
| } |
| } |
| |
| private static class PreparedContextKey { |
| private String project; |
| private long prjLastModifyTime; |
| private String sql; |
| |
| public PreparedContextKey(String project, long prjLastModifyTime, String sql) { |
| this.project = project; |
| this.prjLastModifyTime = prjLastModifyTime; |
| this.sql = sql; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| PreparedContextKey that = (PreparedContextKey) o; |
| |
| if (prjLastModifyTime != that.prjLastModifyTime) |
| return false; |
| if (project != null ? !project.equals(that.project) : that.project != null) |
| return false; |
| return sql != null ? sql.equals(that.sql) : that.sql == null; |
| |
| } |
| |
| @Override |
| public int hashCode() { |
| int result = project != null ? project.hashCode() : 0; |
| result = 31 * result + (int) (prjLastModifyTime ^ (prjLastModifyTime >>> 32)); |
| result = 31 * result + (sql != null ? sql.hashCode() : 0); |
| return result; |
| } |
| } |
| |
| private static class PreparedContext { |
| private Connection conn; |
| private PreparedStatement preparedStatement; |
| private Collection<OLAPContext> olapContexts; |
| |
| public PreparedContext(Connection conn, PreparedStatement preparedStatement, |
| Collection<OLAPContext> olapContexts) { |
| this.conn = conn; |
| this.preparedStatement = preparedStatement; |
| this.olapContexts = olapContexts; |
| } |
| |
| public void close() { |
| if (conn != null) { |
| DBUtils.closeQuietly(conn); |
| } |
| if (preparedStatement != null) { |
| DBUtils.closeQuietly(preparedStatement); |
| } |
| } |
| } |
| |
| } |
| |
| @SuppressWarnings("serial") |
| class QueryRecord extends RootPersistentEntity { |
| |
| @JsonProperty() |
| private Query[] queries; |
| |
| public QueryRecord() { |
| |
| } |
| |
| public QueryRecord(Query[] queries) { |
| this.queries = queries; |
| } |
| |
| public Query[] getQueries() { |
| return queries; |
| } |
| |
| public void setQueries(Query[] queries) { |
| this.queries = queries; |
| } |
| |
| } |