blob: 3592dbec22183c05aef62669e0413517653533cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.metadata.query;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Setter;
import lombok.val;
public class RDBMSQueryHistoryDAO implements QueryHistoryDAO {
private static final Logger logger = LoggerFactory.getLogger(RDBMSQueryHistoryDAO.class);
@Setter
private String queryMetricMeasurement;
private final String realizationMetricMeasurement;
private final JdbcQueryHistoryStore jdbcQueryHisStore;
public static final String WEEK = "week";
public static final String DAY = "day";
public static RDBMSQueryHistoryDAO getInstance() {
return Singletons.getInstance(RDBMSQueryHistoryDAO.class);
}
public RDBMSQueryHistoryDAO() throws Exception {
val config = KylinConfig.getInstanceFromEnv();
if (!UnitOfWork.isAlreadyInTransaction()) {
logger.info("Initializing RDBMSQueryHistoryDAO with KylinConfig Id: {} ", System.identityHashCode(config));
}
String metadataIdentifier = StorageURL.replaceUrl(config.getMetadataUrl());
this.queryMetricMeasurement = metadataIdentifier + "_" + QueryHistory.QUERY_MEASUREMENT_SURFIX;
this.realizationMetricMeasurement = metadataIdentifier + "_" + QueryHistory.REALIZATION_MEASUREMENT_SURFIX;
jdbcQueryHisStore = new JdbcQueryHistoryStore(config);
}
@Override
public String getQueryMetricMeasurement() {
return queryMetricMeasurement;
}
@Override
public String getRealizationMetricMeasurement() {
return realizationMetricMeasurement;
}
public List<QueryDailyStatistic> getQueryDailyStatistic(long startTime, long endTime) {
return jdbcQueryHisStore.queryHistoryDailyStatistic(startTime, endTime);
}
public int insert(QueryMetrics metrics) {
return jdbcQueryHisStore.insert(metrics);
}
public void insert(List<QueryMetrics> metricsList) {
jdbcQueryHisStore.insert(metricsList);
}
public void dropQueryHistoryTable() throws SQLException {
jdbcQueryHisStore.dropQueryHistoryTable();
}
public void deleteAllQueryHistory() {
jdbcQueryHisStore.deleteQueryHistory();
}
public void deleteQueryHistoryByProject(String project) {
jdbcQueryHisStore.deleteQueryHistory(project);
}
public void deleteAllQueryHistoryRealizationForProject(String project) {
jdbcQueryHisStore.deleteQueryHistoryRealization(project);
}
public QueryHistory getByQueryId(String queryId) {
return jdbcQueryHisStore.queryByQueryId(queryId);
}
public void deleteQueryHistoriesIfMaxSizeReached() {
long maxSize = KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize();
long totalCount = jdbcQueryHisStore.getCountOnQueryHistory();
if (totalCount > maxSize) {
deleteQueryHistoryAndRealization((int) (totalCount - maxSize));
}
}
public void deleteQueryHistoriesIfRetainTimeReached() {
long rangeOutCount = jdbcQueryHisStore.getCountOnQueryHistory(getRetainTime());
if (rangeOutCount > 0) {
deleteQueryHistoryAndRealization((int) rangeOutCount);
}
}
public void deleteQueryHistoryAndRealization(int deleteCount) {
int singleLimit = KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
QueryHistory queryHistory = jdbcQueryHisStore.getOldestQueryHistory(currentCount);
int deletedRows = jdbcQueryHisStore.deleteQueryHistory(queryHistory.getId());
jdbcQueryHisStore.deleteQueryHistoryRealization(queryHistory.getQueryTime());
return deletedRows;
}, "Cleanup all query history");
}
public void deleteOldestQueryHistoriesByProject(String project, int deleteCount) {
int singleLimit = KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
QueryHistory queryHistory = jdbcQueryHisStore.getOldestQueryHistory(project, currentCount);
int deletedRows = jdbcQueryHisStore.deleteQueryHistory(project, queryHistory.getId());
jdbcQueryHisStore.deleteQueryHistoryRealization(project, queryHistory.getQueryTime());
return deletedRows;
}, "Cleanup project<" + project + "> query history");
}
public void batchUpdateQueryHistoriesInfo(List<Pair<Long, QueryHistoryInfo>> idToQHInfoList) {
jdbcQueryHisStore.updateQueryHistoryInfo(idToQHInfoList);
}
public static long getRetainTime() {
return new Date(
System.currentTimeMillis() - KylinConfig.getInstanceFromEnv().getQueryHistorySurvivalThreshold())
.getTime();
}
public void dropProjectMeasurement(String project) {
jdbcQueryHisStore.deleteQueryHistory(project);
jdbcQueryHisStore.deleteQueryHistoryRealization(project);
}
public List<QueryHistory> getAllQueryHistories() {
return jdbcQueryHisStore.queryAllQueryHistories();
}
public List<QueryHistory> queryQueryHistoriesByIdOffset(long idOffset, int batchSize, String project) {
return jdbcQueryHisStore.queryQueryHistoriesByIdOffset(idOffset, batchSize, project);
}
public List<QueryHistory> getQueryHistoriesByConditions(QueryHistoryRequest request, int limit, int page) {
return jdbcQueryHisStore.queryQueryHistoriesByConditions(request, limit, page * limit);
}
public List<QueryHistory> getQueryHistoriesByConditionsWithOffset(QueryHistoryRequest request, int limit,
int offset) {
return jdbcQueryHisStore.queryQueryHistoriesByConditions(request, limit, offset);
}
public long getQueryHistoriesSize(QueryHistoryRequest request, String project) {
return jdbcQueryHisStore.queryQueryHistoriesSize(request).getCount();
}
public List<QueryHistory> getQueryHistoriesSubmitters(QueryHistoryRequest request, int size) {
return jdbcQueryHisStore.queryQueryHistoriesSubmitters(request, size);
}
public List<QueryStatistics> getQueryHistoriesModelIds(QueryHistoryRequest request) {
return jdbcQueryHisStore.queryQueryHistoriesModelIds(request);
}
public QueryStatistics getQueryCountAndAvgDuration(long startTime, long endTime, String project) {
List<QueryStatistics> result = jdbcQueryHisStore.queryCountAndAvgDuration(startTime, endTime, project);
if (CollectionUtils.isEmpty(result))
return new QueryStatistics();
return result.get(0);
}
public QueryStatistics getQueryCountAndAvgDurationRealization(long startTime, long endTime, String project) {
List<QueryStatistics> result = jdbcQueryHisStore.queryCountAndAvgDurationRealization(startTime, endTime,
project);
if (CollectionUtils.isEmpty(result))
return new QueryStatistics();
return result.get(0);
}
public List<QueryStatistics> getQueryCountByModel(long startTime, long endTime, String project) {
return jdbcQueryHisStore.queryCountByModel(startTime, endTime, project);
}
public QueryStatistics getQueryCountByRange(long startTime, long endTime, String project) {
return jdbcQueryHisStore.queryRecentQueryCount(startTime, endTime, project);
}
public long getQueryHistoryCountBeyondOffset(long offset, String project) {
return jdbcQueryHisStore.queryQueryHistoryCountBeyondOffset(offset, project);
}
public long getQueryHistoryMaxId(String project) {
return jdbcQueryHisStore.queryQueryHistoryMaxId(project);
}
public List<QueryStatistics> getQueryCountByTime(long startTime, long endTime, String timeDimension,
String project) {
return jdbcQueryHisStore.queryCountByTime(startTime, endTime, timeDimension, project);
}
public List<QueryStatistics> getQueryCountRealizationByTime(long startTime, long endTime, String timeDimension,
String project) {
return jdbcQueryHisStore.queryCountRealizationByTime(startTime, endTime, timeDimension, project);
}
public List<QueryStatistics> getAvgDurationByModel(long startTime, long endTime, String project) {
return jdbcQueryHisStore.queryAvgDurationByModel(startTime, endTime, project);
}
public List<QueryStatistics> getAvgDurationByTime(long startTime, long endTime, String timeDimension,
String project) {
return jdbcQueryHisStore.queryAvgDurationByTime(startTime, endTime, timeDimension, project);
}
public List<QueryStatistics> getAvgDurationRealizationByTime(long startTime, long endTime, String timeDimension,
String project) {
return jdbcQueryHisStore.queryAvgDurationRealizationByTime(startTime, endTime, timeDimension, project);
}
@Override
public Map<String, Long> getQueryCountByProject() {
return jdbcQueryHisStore.getCountGroupByProject();
}
public static void fillZeroForQueryStatistics(List<QueryStatistics> queryStatistics, long startTime, long endTime,
String dimension) {
if (!dimension.equalsIgnoreCase(DAY) && !dimension.equalsIgnoreCase(WEEK)) {
return;
}
if (dimension.equalsIgnoreCase(WEEK)) {
startTime = TimeUtil.getWeekStart(startTime);
endTime = TimeUtil.getWeekStart(endTime);
}
Set<Instant> instantSet = queryStatistics.stream().map(QueryStatistics::getTime).collect(Collectors.toSet());
int rawOffsetTime = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()).getRawOffset();
long startOffSetTime = Instant.ofEpochMilli(startTime).plusMillis(rawOffsetTime).toEpochMilli();
Instant startInstant = Instant.ofEpochMilli(startOffSetTime - startOffSetTime % (1000 * 60 * 60 * 24));
long endOffSetTime = Instant.ofEpochMilli(endTime).plusMillis(rawOffsetTime).toEpochMilli();
Instant endInstant = Instant.ofEpochMilli(endOffSetTime - endOffSetTime % (1000 * 60 * 60 * 24));
while (!startInstant.isAfter(endInstant)) {
if (!instantSet.contains(startInstant)) {
QueryStatistics zeroStatistics = new QueryStatistics();
zeroStatistics.setCount(0);
zeroStatistics.setTime(startInstant);
queryStatistics.add(zeroStatistics);
}
if (dimension.equalsIgnoreCase(DAY)) {
startInstant = startInstant.plus(Duration.ofDays(1));
} else if (dimension.equalsIgnoreCase(WEEK)) {
startInstant = startInstant.plus(Duration.ofDays(7));
}
}
}
public static void largeSplitToSmallTask(int totalCount, int singleSize, IntFunction<Integer> function,
String description) {
int retainCount = totalCount;
while (retainCount > 0) {
int currentCount = Math.min(retainCount, singleSize);
int actualCount = function.apply(currentCount);
if (currentCount != actualCount && logger.isWarnEnabled()) {
logger.warn("The task {} was not performed as expected, expect:{}, actual:{}", description,
currentCount, actualCount);
}
retainCount -= currentCount;
}
}
}