blob: ed06c7378e6d3cb22445e71b6f64ef4930706280 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.util;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.log4j.Logger;
/**
* A util which provide methods used to record time information druing data loading.
*/
public class CarbonLoadStatisticsImpl implements LoadStatistics {
private CarbonLoadStatisticsImpl() {
}
private static CarbonLoadStatisticsImpl carbonLoadStatisticsImplInstance =
new CarbonLoadStatisticsImpl();
public static CarbonLoadStatisticsImpl getInstance() {
return carbonLoadStatisticsImplInstance;
}
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonLoadStatisticsImpl.class.getName());
/*
*We only care about the earliest start time(EST) and the latest end time(LET) of different
*threads, who does the same thing, LET - EST is the cost time of doing one thing using
*multiple thread.
*/
private long loadCsvfilesToDfStartTime = 0;
private long loadCsvfilesToDfCostTime = 0;
private long dicShuffleAndWriteFileTotalStartTime = 0;
//LRU cache load one time
private double lruCacheLoadTime = 0;
//Generate surrogate keys total time for each partition:
private ConcurrentHashMap<String, Long[]> parDictionaryValuesTotalTimeMap =
new ConcurrentHashMap<String, Long[]>();
private ConcurrentHashMap<String, Long[]> parCsvInputStepTimeMap =
new ConcurrentHashMap<String, Long[]>();
private ConcurrentHashMap<String, Long[]> parGeneratingDictionaryValuesTimeMap =
new ConcurrentHashMap<String, Long[]>();
//Sort rows step total time for each partition:
private ConcurrentHashMap<String, Long[]> parSortRowsStepTotalTimeMap =
new ConcurrentHashMap<String, Long[]>();
//MDK generate total time for each partition:
private ConcurrentHashMap<String, Long[]> parMdkGenerateTotalTimeMap =
new ConcurrentHashMap<String, Long[]>();
private ConcurrentHashMap<String, Long[]> parDictionaryValue2MdkAdd2FileTime =
new ConcurrentHashMap<String, Long[]>();
//Node block process information
private ConcurrentHashMap<String, Integer> hostBlockMap =
new ConcurrentHashMap<String, Integer>();
//Partition block process information
private ConcurrentHashMap<String, Integer> partitionBlockMap =
new ConcurrentHashMap<String, Integer>();
private long totalRecords = 0;
private double totalTime = 0;
@Override
public void initPartitionInfo(String PartitionId) {
parDictionaryValuesTotalTimeMap.put(PartitionId, new Long[2]);
parCsvInputStepTimeMap.put(PartitionId, new Long[2]);
parSortRowsStepTotalTimeMap.put(PartitionId, new Long[2]);
parGeneratingDictionaryValuesTimeMap.put(PartitionId, new Long[2]);
parMdkGenerateTotalTimeMap.put(PartitionId, new Long[2]);
parDictionaryValue2MdkAdd2FileTime.put(PartitionId, new Long[2]);
}
//Record the time
public void recordDicShuffleAndWriteTime() {
long dicShuffleAndWriteTimePoint = System.currentTimeMillis();
if (0 == dicShuffleAndWriteFileTotalStartTime) {
dicShuffleAndWriteFileTotalStartTime = dicShuffleAndWriteTimePoint;
}
}
public void recordLoadCsvfilesToDfTime() {
long loadCsvfilesToDfTimePoint = System.currentTimeMillis();
if (0 == loadCsvfilesToDfStartTime) {
loadCsvfilesToDfStartTime = loadCsvfilesToDfTimePoint;
}
if (loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime > loadCsvfilesToDfCostTime) {
loadCsvfilesToDfCostTime = loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime;
}
}
public double getLruCacheLoadTime() {
return lruCacheLoadTime;
}
public void recordDictionaryValuesTotalTime(String partitionID,
Long dictionaryValuesTotalTimeTimePoint) {
if (null != parDictionaryValuesTotalTimeMap.get(partitionID)) {
if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[0]) {
parDictionaryValuesTotalTimeMap.get(partitionID)[0] = dictionaryValuesTotalTimeTimePoint;
}
if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[1] ||
dictionaryValuesTotalTimeTimePoint - parDictionaryValuesTotalTimeMap.get(partitionID)[0] >
parDictionaryValuesTotalTimeMap.get(partitionID)[1]) {
parDictionaryValuesTotalTimeMap.get(partitionID)[1] = dictionaryValuesTotalTimeTimePoint -
parDictionaryValuesTotalTimeMap.get(partitionID)[0];
}
}
}
public void recordCsvInputStepTime(String partitionID,
Long csvInputStepTimePoint) {
if (null != parCsvInputStepTimeMap.get(partitionID)) {
if (null == parCsvInputStepTimeMap.get(partitionID)[0]) {
parCsvInputStepTimeMap.get(partitionID)[0] = csvInputStepTimePoint;
}
if (null == parCsvInputStepTimeMap.get(partitionID)[1] ||
csvInputStepTimePoint - parCsvInputStepTimeMap.get(partitionID)[0] >
parCsvInputStepTimeMap.get(partitionID)[1]) {
parCsvInputStepTimeMap.get(partitionID)[1] = csvInputStepTimePoint -
parCsvInputStepTimeMap.get(partitionID)[0];
}
}
}
public void recordLruCacheLoadTime(double lruCacheLoadTime) {
this.lruCacheLoadTime = lruCacheLoadTime;
}
public void recordGeneratingDictionaryValuesTime(String partitionID,
Long generatingDictionaryValuesTimePoint) {
if (null != parGeneratingDictionaryValuesTimeMap.get(partitionID)) {
if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[0]) {
parGeneratingDictionaryValuesTimeMap.get(partitionID)[0] =
generatingDictionaryValuesTimePoint;
}
if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] ||
generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
.get(partitionID)[0] > parGeneratingDictionaryValuesTimeMap
.get(partitionID)[1]) {
parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] =
generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
.get(partitionID)[0];
}
}
}
public void recordSortRowsStepTotalTime(String partitionID,
Long sortRowsStepTotalTimePoint) {
if (null != parSortRowsStepTotalTimeMap.get(partitionID)) {
if (null == parSortRowsStepTotalTimeMap.get(partitionID)[0]) {
parSortRowsStepTotalTimeMap.get(partitionID)[0] = sortRowsStepTotalTimePoint;
}
if (null == parSortRowsStepTotalTimeMap.get(partitionID)[1] ||
sortRowsStepTotalTimePoint - parSortRowsStepTotalTimeMap.get(partitionID)[0] >
parSortRowsStepTotalTimeMap.get(partitionID)[1]) {
parSortRowsStepTotalTimeMap.get(partitionID)[1] = sortRowsStepTotalTimePoint -
parSortRowsStepTotalTimeMap.get(partitionID)[0];
}
}
}
public void recordMdkGenerateTotalTime(String partitionID,
Long mdkGenerateTotalTimePoint) {
if (null != parMdkGenerateTotalTimeMap.get(partitionID)) {
if (null == parMdkGenerateTotalTimeMap.get(partitionID)[0]) {
parMdkGenerateTotalTimeMap.get(partitionID)[0] = mdkGenerateTotalTimePoint;
}
if (null == parMdkGenerateTotalTimeMap.get(partitionID)[1] ||
mdkGenerateTotalTimePoint - parMdkGenerateTotalTimeMap.get(partitionID)[0] >
parMdkGenerateTotalTimeMap.get(partitionID)[1]) {
parMdkGenerateTotalTimeMap.get(partitionID)[1] = mdkGenerateTotalTimePoint -
parMdkGenerateTotalTimeMap.get(partitionID)[0];
}
}
}
public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
Long dictionaryValue2MdkAdd2FileTimePoint) {
if (null != parDictionaryValue2MdkAdd2FileTime.get(partitionID)) {
if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0]) {
parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0] =
dictionaryValue2MdkAdd2FileTimePoint;
}
if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] ||
dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
.get(partitionID)[0] > parDictionaryValue2MdkAdd2FileTime
.get(partitionID)[1]) {
parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] =
dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
.get(partitionID)[0];
}
}
}
//Record the node blocks information map
public void recordHostBlockMap(String host, Integer numBlocks) {
hostBlockMap.put(host, numBlocks);
}
//Record the partition blocks information map
public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
partitionBlockMap.put(partitionID, numBlocks);
}
public void recordTotalRecords(long totalRecords) {
this.totalRecords = totalRecords;
}
private double getLoadCsvfilesToDfTime() {
return loadCsvfilesToDfCostTime / 1000.0;
}
private double getDictionaryValuesTotalTime(String partitionID) {
return parDictionaryValuesTotalTimeMap.get(partitionID)[1] / 1000.0;
}
private double getCsvInputStepTime(String partitionID) {
return parCsvInputStepTimeMap.get(partitionID)[1] / 1000.0;
}
private double getGeneratingDictionaryValuesTime(String partitionID) {
return parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] / 1000.0;
}
private double getSortRowsStepTotalTime(String partitionID) {
return parSortRowsStepTotalTimeMap.get(partitionID)[1] / 1000.0;
}
private double getDictionaryValue2MdkAdd2FileTime(String partitionID) {
return parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] / 1000.0;
}
//Get the hostBlockMap
private ConcurrentHashMap<String, Integer> getHostBlockMap() {
return hostBlockMap;
}
//Get the partitionBlockMap
private ConcurrentHashMap<String, Integer> getPartitionBlockMap() {
return partitionBlockMap;
}
//Speed calculate
private long getTotalRecords() {
return this.totalRecords;
}
private int getLoadSpeed() {
return (int)(totalRecords / totalTime);
}
private int getReadCSVSpeed(String partitionID) {
return (int)(totalRecords / getCsvInputStepTime(partitionID));
}
private int getGenSurKeySpeed(String partitionID) {
return (int)(totalRecords / getGeneratingDictionaryValuesTime(partitionID));
}
private int getSortKeySpeed(String partitionID) {
return (int)(totalRecords / getSortRowsStepTotalTime(partitionID));
}
private int getMDKSpeed(String partitionID) {
return (int)(totalRecords / getDictionaryValue2MdkAdd2FileTime(partitionID));
}
private double getTotalTime(String partitionID) {
this.totalTime = getLoadCsvfilesToDfTime() +
getLruCacheLoadTime() + getDictionaryValuesTotalTime(partitionID) +
getDictionaryValue2MdkAdd2FileTime(partitionID);
return totalTime;
}
//Print the statistics information
private void printDicGenStatisticsInfo() {
double loadCsvfilesToDfTime = getLoadCsvfilesToDfTime();
LOGGER.info("STAGE 1 ->Load csv to DataFrame and generate" +
" block distinct values: " + loadCsvfilesToDfTime + "(s)");
}
private void printLruCacheLoadTimeInfo() {
LOGGER.info("STAGE 2 ->LRU cache load: " + getLruCacheLoadTime() + "(s)");
}
private void printDictionaryValuesGenStatisticsInfo(String partitionID) {
double dictionaryValuesTotalTime = getDictionaryValuesTotalTime(partitionID);
LOGGER.info("STAGE 3 ->Total cost of gen dictionary values, sort and write to temp files: "
+ dictionaryValuesTotalTime + "(s)");
double csvInputStepTime = getCsvInputStepTime(partitionID);
double generatingDictionaryValuesTime = getGeneratingDictionaryValuesTime(partitionID);
LOGGER.info("STAGE 3.1 -> |_read csv file: " + csvInputStepTime + "(s)");
LOGGER.info("STAGE 3.2 -> |_transform to surrogate key: "
+ generatingDictionaryValuesTime + "(s)");
}
private void printSortRowsStepStatisticsInfo(String partitionID) {
double sortRowsStepTotalTime = getSortRowsStepTotalTime(partitionID);
LOGGER.info("STAGE 3.3 -> |_sort rows and write to temp file: "
+ sortRowsStepTotalTime + "(s)");
}
private void printGenMdkStatisticsInfo(String partitionID) {
double dictionaryValue2MdkAdd2FileTime = getDictionaryValue2MdkAdd2FileTime(partitionID);
LOGGER.info("STAGE 4 ->Transform to MDK, compress and write fact files: "
+ dictionaryValue2MdkAdd2FileTime + "(s)");
}
//Print the node blocks information
private void printHostBlockMapInfo() {
LOGGER.info("========== BLOCK_INFO ==========");
if (getHostBlockMap().size() > 0) {
for (String host: getHostBlockMap().keySet()) {
LOGGER.info("BLOCK_INFO ->Node host: " + host);
LOGGER.info("BLOCK_INFO ->The block count in this node: " + getHostBlockMap().get(host));
}
} else if (getPartitionBlockMap().size() > 0) {
for (String parID: getPartitionBlockMap().keySet()) {
LOGGER.info("BLOCK_INFO ->Partition ID: " + parID);
LOGGER.info("BLOCK_INFO ->The block count in this partition: " +
getPartitionBlockMap().get(parID));
}
}
}
//Print the speed information
private void printLoadSpeedInfo(String partitionID) {
LOGGER.info("===============Load_Speed_Info===============");
LOGGER.info("Total Num of Records Processed: " + getTotalRecords());
LOGGER.info("Total Time Cost: " + getTotalTime(partitionID) + "(s)");
LOGGER.info("Total Load Speed: " + getLoadSpeed() + "records/s");
LOGGER.info("Read CSV Speed: " + getReadCSVSpeed(partitionID) + " records/s");
LOGGER.info("Generate Surrogate Key Speed: " + getGenSurKeySpeed(partitionID) + " records/s");
LOGGER.info("Sort Key/Write Temp Files Speed: " + getSortKeySpeed(partitionID) + " records/s");
LOGGER.info("MDK Step Speed: " + getMDKSpeed(partitionID) + " records/s");
LOGGER.info("=============================================");
}
public void printStatisticsInfo(String partitionID) {
try {
LOGGER.info("========== TIME_STATISTICS PartitionID: " + partitionID + "==========");
printDicGenStatisticsInfo();
printLruCacheLoadTimeInfo();
printDictionaryValuesGenStatisticsInfo(partitionID);
printSortRowsStepStatisticsInfo(partitionID);
printGenMdkStatisticsInfo(partitionID);
printHostBlockMapInfo();
printLoadSpeedInfo(partitionID);
} catch (Exception e) {
LOGGER.error("Can't print Statistics Information");
} finally {
resetLoadStatistics();
}
}
//Reset the load statistics values
private void resetLoadStatistics() {
loadCsvfilesToDfStartTime = 0;
loadCsvfilesToDfCostTime = 0;
dicShuffleAndWriteFileTotalStartTime = 0;
lruCacheLoadTime = 0;
totalRecords = 0;
totalTime = 0;
parDictionaryValuesTotalTimeMap.clear();
parCsvInputStepTimeMap.clear();
parSortRowsStepTotalTimeMap.clear();
parGeneratingDictionaryValuesTimeMap.clear();
parMdkGenerateTotalTimeMap.clear();
parDictionaryValue2MdkAdd2FileTime.clear();
}
}