blob: 88419355ce5e5f6cf43257a7f8b2daebddec4b45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.util;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.log4j.Logger;
/**
* A util which provide methods used to record time information during data loading.
*/
public class CarbonLoadStatisticsImpl implements LoadStatistics {
private CarbonLoadStatisticsImpl() {
}
private static CarbonLoadStatisticsImpl carbonLoadStatisticsImplInstance =
new CarbonLoadStatisticsImpl();
public static CarbonLoadStatisticsImpl getInstance() {
return carbonLoadStatisticsImplInstance;
}
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonLoadStatisticsImpl.class.getName());
/*
*We only care about the earliest start time(EST) and the latest end time(LET) of different
*threads, who does the same thing, LET - EST is the cost time of doing one thing using
*multiple thread.
*/
private long loadCsvFilesToDfStartTime = 0;
private long loadCsvFilesToDfCostTime = 0;
private long dicShuffleAndWriteFileTotalStartTime = 0;
//LRU cache load one time
private double lruCacheLoadTime = 0;
//Generate surrogate keys total time for each partition:
private ConcurrentHashMap<String, Long[]> parDictionaryValuesTotalTimeMap =
new ConcurrentHashMap<String, Long[]>();
private ConcurrentHashMap<String, Long[]> parCsvInputStepTimeMap =
new ConcurrentHashMap<String, Long[]>();
private ConcurrentHashMap<String, Long[]> parGeneratingDictionaryValuesTimeMap =
new ConcurrentHashMap<String, Long[]>();
//Sort rows step total time for each partition:
private ConcurrentHashMap<String, Long[]> parSortRowsStepTotalTimeMap =
new ConcurrentHashMap<String, Long[]>();
//MDK generate total time for each partition:
private ConcurrentHashMap<String, Long[]> parMdkGenerateTotalTimeMap =
new ConcurrentHashMap<String, Long[]>();
private ConcurrentHashMap<String, Long[]> parDictionaryValue2MdkAdd2FileTime =
new ConcurrentHashMap<String, Long[]>();
//Node block process information
private ConcurrentHashMap<String, Integer> hostBlockMap =
new ConcurrentHashMap<String, Integer>();
//Partition block process information
private ConcurrentHashMap<String, Integer> partitionBlockMap =
new ConcurrentHashMap<String, Integer>();
private long totalRecords = 0;
private double totalTime = 0;
@Override
public void initPartitionInfo(String PartitionId) {
parDictionaryValuesTotalTimeMap.put(PartitionId, new Long[2]);
parCsvInputStepTimeMap.put(PartitionId, new Long[2]);
parSortRowsStepTotalTimeMap.put(PartitionId, new Long[2]);
parGeneratingDictionaryValuesTimeMap.put(PartitionId, new Long[2]);
parMdkGenerateTotalTimeMap.put(PartitionId, new Long[2]);
parDictionaryValue2MdkAdd2FileTime.put(PartitionId, new Long[2]);
}
//Record the time
public void recordDicShuffleAndWriteTime() {
long dicShuffleAndWriteTimePoint = System.currentTimeMillis();
if (0 == dicShuffleAndWriteFileTotalStartTime) {
dicShuffleAndWriteFileTotalStartTime = dicShuffleAndWriteTimePoint;
}
}
public void recordLoadCsvFilesToDfTime() {
long loadCsvFilesToDfTimePoint = System.currentTimeMillis();
if (0 == loadCsvFilesToDfStartTime) {
loadCsvFilesToDfStartTime = loadCsvFilesToDfTimePoint;
}
if (loadCsvFilesToDfTimePoint - loadCsvFilesToDfStartTime > loadCsvFilesToDfCostTime) {
loadCsvFilesToDfCostTime = loadCsvFilesToDfTimePoint - loadCsvFilesToDfStartTime;
}
}
public double getLruCacheLoadTime() {
return lruCacheLoadTime;
}
private void recordStatistics(Map<String, Long[]> statMap, String partitionID, Long value) {
if (null != statMap.get(partitionID)) {
if (null == statMap.get(partitionID)[0]) {
statMap.get(partitionID)[0] = value;
}
if (null == statMap.get(partitionID)[1] ||
value - statMap.get(partitionID)[0] > statMap.get(partitionID)[1]) {
statMap.get(partitionID)[1] = value - statMap.get(partitionID)[0];
}
}
}
public void recordDictionaryValuesTotalTime(String partitionID,
Long dictionaryValuesTotalTimeTimePoint) {
recordStatistics(parDictionaryValuesTotalTimeMap, partitionID,
dictionaryValuesTotalTimeTimePoint);
}
public void recordCsvInputStepTime(String partitionID, Long csvInputStepTimePoint) {
recordStatistics(parCsvInputStepTimeMap, partitionID, csvInputStepTimePoint);
}
public void recordLruCacheLoadTime(double lruCacheLoadTime) {
this.lruCacheLoadTime = lruCacheLoadTime;
}
public void recordGeneratingDictionaryValuesTime(String partitionID,
Long generatingDictionaryValuesTimePoint) {
recordStatistics(parGeneratingDictionaryValuesTimeMap, partitionID,
generatingDictionaryValuesTimePoint);
}
public void recordSortRowsStepTotalTime(String partitionID, Long sortRowsStepTotalTimePoint) {
recordStatistics(parSortRowsStepTotalTimeMap, partitionID, sortRowsStepTotalTimePoint);
}
public void recordMdkGenerateTotalTime(String partitionID, Long mdkGenerateTotalTimePoint) {
recordStatistics(parMdkGenerateTotalTimeMap, partitionID, mdkGenerateTotalTimePoint);
}
public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
Long dictionaryValue2MdkAdd2FileTimePoint) {
recordStatistics(parDictionaryValue2MdkAdd2FileTime, partitionID,
dictionaryValue2MdkAdd2FileTimePoint);
}
//Record the node blocks information map
public void recordHostBlockMap(String host, Integer numBlocks) {
hostBlockMap.put(host, numBlocks);
}
//Record the partition blocks information map
public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
partitionBlockMap.put(partitionID, numBlocks);
}
public void recordTotalRecords(long totalRecords) {
this.totalRecords = totalRecords;
}
private double getLoadCsvFilesToDfTime() {
return loadCsvFilesToDfCostTime / 1000.0;
}
private double getDictionaryValuesTotalTime(String partitionID) {
return parDictionaryValuesTotalTimeMap.get(partitionID)[1] / 1000.0;
}
private double getCsvInputStepTime(String partitionID) {
return parCsvInputStepTimeMap.get(partitionID)[1] / 1000.0;
}
private double getGeneratingDictionaryValuesTime(String partitionID) {
return parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] / 1000.0;
}
private double getSortRowsStepTotalTime(String partitionID) {
return parSortRowsStepTotalTimeMap.get(partitionID)[1] / 1000.0;
}
private double getDictionaryValue2MdkAdd2FileTime(String partitionID) {
return parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] / 1000.0;
}
//Get the hostBlockMap
private ConcurrentHashMap<String, Integer> getHostBlockMap() {
return hostBlockMap;
}
//Get the partitionBlockMap
private ConcurrentHashMap<String, Integer> getPartitionBlockMap() {
return partitionBlockMap;
}
//Speed calculate
private long getTotalRecords() {
return this.totalRecords;
}
private int getLoadSpeed() {
return (int)(totalRecords / totalTime);
}
private int getReadCSVSpeed(String partitionID) {
return (int)(totalRecords / getCsvInputStepTime(partitionID));
}
private int getGenSurKeySpeed(String partitionID) {
return (int)(totalRecords / getGeneratingDictionaryValuesTime(partitionID));
}
private int getSortKeySpeed(String partitionID) {
return (int)(totalRecords / getSortRowsStepTotalTime(partitionID));
}
private int getMDKSpeed(String partitionID) {
return (int)(totalRecords / getDictionaryValue2MdkAdd2FileTime(partitionID));
}
private double getTotalTime(String partitionID) {
this.totalTime = getLoadCsvFilesToDfTime() +
getLruCacheLoadTime() + getDictionaryValuesTotalTime(partitionID) +
getDictionaryValue2MdkAdd2FileTime(partitionID);
return totalTime;
}
//Print the statistics information
private void printDicGenStatisticsInfo() {
double loadCsvFilesToDfTime = getLoadCsvFilesToDfTime();
LOGGER.info("STAGE 1 ->Load csv to DataFrame and generate" +
" block distinct values: " + loadCsvFilesToDfTime + "(s)");
}
private void printLruCacheLoadTimeInfo() {
LOGGER.info("STAGE 2 ->LRU cache load: " + getLruCacheLoadTime() + "(s)");
}
private void printDictionaryValuesGenStatisticsInfo(String partitionID) {
double dictionaryValuesTotalTime = getDictionaryValuesTotalTime(partitionID);
LOGGER.info("STAGE 3 ->Total cost of gen dictionary values, sort and write to temp files: "
+ dictionaryValuesTotalTime + "(s)");
double csvInputStepTime = getCsvInputStepTime(partitionID);
double generatingDictionaryValuesTime = getGeneratingDictionaryValuesTime(partitionID);
LOGGER.info("STAGE 3.1 -> |_read csv file: " + csvInputStepTime + "(s)");
LOGGER.info("STAGE 3.2 -> |_transform to surrogate key: "
+ generatingDictionaryValuesTime + "(s)");
}
private void printSortRowsStepStatisticsInfo(String partitionID) {
double sortRowsStepTotalTime = getSortRowsStepTotalTime(partitionID);
LOGGER.info("STAGE 3.3 -> |_sort rows and write to temp file: "
+ sortRowsStepTotalTime + "(s)");
}
private void printGenMdkStatisticsInfo(String partitionID) {
double dictionaryValue2MdkAdd2FileTime = getDictionaryValue2MdkAdd2FileTime(partitionID);
LOGGER.info("STAGE 4 ->Transform to MDK, compress and write fact files: "
+ dictionaryValue2MdkAdd2FileTime + "(s)");
}
//Print the node blocks information
private void printHostBlockMapInfo() {
LOGGER.info("========== BLOCK_INFO ==========");
if (getHostBlockMap().size() > 0) {
for (String host: getHostBlockMap().keySet()) {
LOGGER.info("BLOCK_INFO ->Node host: " + host);
LOGGER.info("BLOCK_INFO ->The block count in this node: " + getHostBlockMap().get(host));
}
} else if (getPartitionBlockMap().size() > 0) {
for (String parID: getPartitionBlockMap().keySet()) {
LOGGER.info("BLOCK_INFO ->Partition ID: " + parID);
LOGGER.info("BLOCK_INFO ->The block count in this partition: " +
getPartitionBlockMap().get(parID));
}
}
}
//Print the speed information
private void printLoadSpeedInfo(String partitionID) {
LOGGER.info("===============Load_Speed_Info===============");
LOGGER.info("Total Num of Records Processed: " + getTotalRecords());
LOGGER.info("Total Time Cost: " + getTotalTime(partitionID) + "(s)");
LOGGER.info("Total Load Speed: " + getLoadSpeed() + "records/s");
LOGGER.info("Read CSV Speed: " + getReadCSVSpeed(partitionID) + " records/s");
LOGGER.info("Generate Surrogate Key Speed: " + getGenSurKeySpeed(partitionID) + " records/s");
LOGGER.info("Sort Key/Write Temp Files Speed: " + getSortKeySpeed(partitionID) + " records/s");
LOGGER.info("MDK Step Speed: " + getMDKSpeed(partitionID) + " records/s");
LOGGER.info("=============================================");
}
public void printStatisticsInfo(String partitionID) {
try {
LOGGER.info("========== TIME_STATISTICS PartitionID: " + partitionID + "==========");
printDicGenStatisticsInfo();
printLruCacheLoadTimeInfo();
printDictionaryValuesGenStatisticsInfo(partitionID);
printSortRowsStepStatisticsInfo(partitionID);
printGenMdkStatisticsInfo(partitionID);
printHostBlockMapInfo();
printLoadSpeedInfo(partitionID);
} catch (Exception e) {
LOGGER.error("Can't print Statistics Information");
} finally {
resetLoadStatistics();
}
}
//Reset the load statistics values
private void resetLoadStatistics() {
loadCsvFilesToDfStartTime = 0;
loadCsvFilesToDfCostTime = 0;
dicShuffleAndWriteFileTotalStartTime = 0;
lruCacheLoadTime = 0;
totalRecords = 0;
totalTime = 0;
parDictionaryValuesTotalTimeMap.clear();
parCsvInputStepTimeMap.clear();
parSortRowsStepTotalTimeMap.clear();
parGeneratingDictionaryValuesTimeMap.clear();
parMdkGenerateTotalTimeMap.clear();
parDictionaryValue2MdkAdd2FileTime.clear();
}
}