blob: 3001e2de40963b060a157c70c76d50a1feeb846b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.stats;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.common.logging.impl.StatisticLevel;
import static org.apache.carbondata.core.util.CarbonUtil.printLine;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
/**
* Class will be used to record and log the query statistics
*/
public class DriverQueryStatisticsRecorderImpl implements QueryStatisticsRecorder {
private static final Logger LOGGER =
LogServiceFactory.getLogService(DriverQueryStatisticsRecorderImpl.class.getName());
/**
* singleton QueryStatisticsRecorder for driver
*/
private Map<String, List<QueryStatistic>> queryStatisticsMap;
/**
* lock for log statistics table
*/
private static final Object lock = new Object();
private DriverQueryStatisticsRecorderImpl() {
// use ConcurrentHashMap, it is thread-safe
queryStatisticsMap = new ConcurrentHashMap<String, List<QueryStatistic>>();
}
private static DriverQueryStatisticsRecorderImpl carbonLoadStatisticsImplInstance =
new DriverQueryStatisticsRecorderImpl();
public static DriverQueryStatisticsRecorderImpl getInstance() {
return carbonLoadStatisticsImplInstance;
}
public void recordStatistics(QueryStatistic statistic) {
}
public void logStatistics() {
}
public TaskStatistics statisticsForTask(long taskId, long startTime) {
return null;
}
public void logStatisticsForTask(TaskStatistics task) {
}
/**
* Below method will be used to add the statistics
*
* @param statistic
*/
public void recordStatisticsForDriver(QueryStatistic statistic, String queryId) {
if (null != queryId) {
synchronized (lock) {
// refresh query Statistics Map
if (queryStatisticsMap.get(queryId) != null) {
queryStatisticsMap.get(queryId).add(statistic);
} else {
List<QueryStatistic> newQueryStatistics = new ArrayList<QueryStatistic>();
newQueryStatistics.add(statistic);
queryStatisticsMap.put(queryId, newQueryStatistics);
}
}
}
}
/**
* Below method will be used to show statistic log as table
*/
public void logStatisticsAsTableDriver() {
synchronized (lock) {
Iterator<Map.Entry<String, List<QueryStatistic>>> entries =
queryStatisticsMap.entrySet().iterator();
while (entries.hasNext()) {
Map.Entry<String, List<QueryStatistic>> entry = entries.next();
String queryId = entry.getKey();
// clear the unknown query statistics
if (StringUtils.isEmpty(queryId)) {
entries.remove();
} else {
// clear the timeout query statistics
long interval = System.nanoTime() - Long.parseLong(queryId);
if (interval > QueryStatisticsConstants.CLEAR_STATISTICS_TIMEOUT) {
entries.remove();
} else {
// print sql_parse_t,load_meta_t,block_allocation_t,block_identification_t
// or just print block_allocation_t,block_identification_t
if (entry.getValue().size() >= 2) {
String tableInfo = collectDriverStatistics(entry.getValue(), queryId);
if (null != tableInfo) {
LOGGER.log(StatisticLevel.STATISTIC, tableInfo);
// clear the statistics that has been printed
entries.remove();
}
}
}
}
}
}
}
/**
* Below method will parse queryStatisticsMap and put time into table
*/
public String collectDriverStatistics(List<QueryStatistic> statisticsList, String queryId) {
String sql_parse_time = "";
String load_meta_time = "";
String load_blocks_time = "";
String block_allocation_time = "";
String block_identification_time = "";
long driver_part_time_tmp = 0L;
long driver_part_time_tmp2 = 0L;
long load_blocks_time_tmp = 0L;
String splitChar = " ";
try {
// get statistic time from the QueryStatistic
for (QueryStatistic statistic : statisticsList) {
switch (statistic.getMessage()) {
case QueryStatisticsConstants.SQL_PARSE:
sql_parse_time += statistic.getTimeTaken() + splitChar;
driver_part_time_tmp += statistic.getTimeTaken();
break;
case QueryStatisticsConstants.LOAD_META:
load_meta_time += statistic.getTimeTaken() + splitChar;
driver_part_time_tmp += statistic.getTimeTaken();
break;
case QueryStatisticsConstants.LOAD_BLOCKS_DRIVER:
// multi segments will generate multi load_blocks_time
load_blocks_time_tmp += statistic.getTimeTaken();
driver_part_time_tmp += statistic.getTimeTaken();
driver_part_time_tmp2 += statistic.getTimeTaken();
break;
case QueryStatisticsConstants.BLOCK_ALLOCATION:
block_allocation_time += statistic.getTimeTaken() + splitChar;
driver_part_time_tmp += statistic.getTimeTaken();
driver_part_time_tmp2 += statistic.getTimeTaken();
break;
case QueryStatisticsConstants.BLOCK_IDENTIFICATION:
block_identification_time += statistic.getTimeTaken() + splitChar;
driver_part_time_tmp += statistic.getTimeTaken();
driver_part_time_tmp2 += statistic.getTimeTaken();
break;
default:
break;
}
}
load_blocks_time = load_blocks_time_tmp + splitChar;
String driver_part_time = driver_part_time_tmp + splitChar;
// structure the query statistics info table
StringBuilder tableInfo = new StringBuilder();
int len1 = 8;
int len2 = 20;
int len3 = 21;
int len4 = 24;
String line = "+" + printLine("-", len1) + "+" + printLine("-", len2) + "+" +
printLine("-", len3) + "+" + printLine("-", len4) + "+";
String line2 = "|" + printLine(" ", len1) + "+" + printLine("-", len2) + "+" +
printLine(" ", len3) + "+" + printLine("-", len4) + "+";
// table header
tableInfo.append(line).append("\n");
tableInfo.append("|" + printLine(" ", (len1 - "Module".length())) + "Module" + "|" +
printLine(" ", (len2 - "Operation Step".length())) + "Operation Step" + "|" +
printLine(" ", (len3 - "Total Query Cost".length())) + "Total Query Cost" + "|" +
printLine(" ", (len4 - "Query Cost".length())) + "Query Cost" + "|" + "\n");
tableInfo.append(line).append("\n");
// print sql_parse_t,load_meta_t,block_allocation_t,block_identification_t
if (!StringUtils.isEmpty(sql_parse_time) &&
!StringUtils.isEmpty(load_meta_time) &&
!StringUtils.isEmpty(block_allocation_time) &&
!StringUtils.isEmpty(block_identification_time)) {
tableInfo.append("|" + printLine(" ", len1) + "|" +
printLine(" ", (len2 - "SQL parse".length())) + "SQL parse" + "|" +
printLine(" ", len3) + "|" +
printLine(" ", (len4 - sql_parse_time.length())) + sql_parse_time + "|" + "\n");
tableInfo.append(line2).append("\n");
tableInfo.append("|" + printLine(" ", (len1 - "Driver".length())) + "Driver" + "|" +
printLine(" ", (len2 - "Load meta data".length())) + "Load meta data" + "|" +
printLine(" ", (len3 - driver_part_time.length())) + driver_part_time + "|" +
printLine(" ", (len4 - load_meta_time.length())) +
load_meta_time + "|" + "\n");
tableInfo.append(line2).append("\n");
tableInfo.append("|" + printLine(" ", (len1 - "Part".length())) + "Part" + "|" +
printLine(" ", (len2 - "Load blocks driver".length())) +
"Load blocks driver" + "|" +
printLine(" ", len3) + "|" +
printLine(" ", (len4 - load_blocks_time.length())) +
load_blocks_time + "|" + "\n");
tableInfo.append(line2).append("\n");
tableInfo.append("|" + printLine(" ", len1) + "|" +
printLine(" ", (len2 - "Block allocation".length())) + "Block allocation" + "|" +
printLine(" ", len3) + "|" +
printLine(" ", (len4 - block_allocation_time.length())) +
block_allocation_time + "|" + "\n");
tableInfo.append(line2).append("\n");
tableInfo.append("|" +
printLine(" ", len1) + "|" +
printLine(" ", (len2 - "Block identification".length())) +
"Block identification" + "|" +
printLine(" ", len3) + "|" +
printLine(" ", (len4 - block_identification_time.length())) +
block_identification_time + "|" + "\n");
tableInfo.append(line).append("\n");
// show query statistic as "query id" + "table"
return "Print query statistic for query id: " + queryId + "\n" + tableInfo.toString();
} else if (!StringUtils.isEmpty(block_allocation_time) &&
!StringUtils.isEmpty(block_identification_time)) {
// when we can't get sql parse time, we only print the last two
driver_part_time = driver_part_time_tmp2 + splitChar;
tableInfo.append("|" + printLine(" ", (len1 - "Driver".length())) + "Driver" + "|" +
printLine(" ", (len2 - "Load blocks driver".length())) +
"Load blocks driver" + "|" +
printLine(" ", len3) + "|" +
printLine(" ", (len4 - load_blocks_time.length())) +
load_blocks_time + "|" + "\n");
tableInfo.append(line2).append("\n");
tableInfo.append("|" + printLine(" ", (len1 - "Part".length())) + "Part" + "|" +
printLine(" ", (len2 - "Block allocation".length())) + "Block allocation" + "|" +
printLine(" ", (len3 - driver_part_time.length())) + driver_part_time + "|" +
printLine(" ", (len4 - block_allocation_time.length())) +
block_allocation_time + "|" + "\n");
tableInfo.append(line2).append("\n");
tableInfo.append("|" +
printLine(" ", len1) + "|" +
printLine(" ", (len2 - "Block identification".length())) +
"Block identification" + "|" +
printLine(" ", len3) + "|" +
printLine(" ", (len4 - block_identification_time.length())) +
block_identification_time + "|" + "\n");
tableInfo.append(line).append("\n");
// show query statistic as "query id" + "table"
return "Print query statistic for query id: " + queryId + "\n" + tableInfo.toString();
}
return null;
} catch (Exception ex) {
return "Put statistics into table failed, catch exception: " + ex.getMessage();
}
}
}