blob: aa481cf82f335c92985335b259d6de2849928561 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.util;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
/**
* This class maintains task level metrics info for all spawned child threads and parent task thread
*/
public class TaskMetricsMap {
private static final Logger LOGGER =
LogServiceFactory.getLogService(TaskMetricsMap.class.getName());
private static final InheritableThreadLocal<Long> threadLocal = new InheritableThreadLocal<>();
/**
* In this map we are maintaining all spawned child threads callback info for each parent thread
* here key = parent thread id & values = list of spawned child threads callbacks
*/
public static final Map<Long, List<CarbonFSBytesReadOnThreadCallback>> metricMap =
new ConcurrentHashMap<>();
public static final TaskMetricsMap taskMetricsMap = new TaskMetricsMap();
public static TaskMetricsMap getInstance() {
return taskMetricsMap;
}
public static InheritableThreadLocal<Long> getThreadLocal() {
return threadLocal;
}
/**
* initializes thread local to current thread id
*
* @return
*/
public static void initializeThreadLocal() {
// In case of multi level RDD (say insert into scenario, where DataFrameRDD calling ScanRDD)
// parent thread id should not be overwritten by child thread id.
// so don't set if it is already set.
if (threadLocal.get() == null) {
threadLocal.set(Thread.currentThread().getId());
}
}
/**
* registers current thread callback using parent thread id
*
* @return
*/
public void registerThreadCallback() {
// parent thread id should not be null as we are setting the same for all RDDs
if (null != threadLocal.get()) {
long parentThreadId = threadLocal.get();
new CarbonFSBytesReadOnThreadCallback(parentThreadId);
}
}
/**
* removes parent thread entry from map.
*
* @param threadId
*/
public void removeEntry(long threadId) {
metricMap.remove(threadId);
}
/**
* returns all spawned child threads callback list of given parent thread
*
* @param threadId
* @return
*/
public List<CarbonFSBytesReadOnThreadCallback> getCallbackList(long threadId) {
return metricMap.get(threadId);
}
public boolean isCallbackEmpty(long threadId) {
List<CarbonFSBytesReadOnThreadCallback> callbackList = getCallbackList(threadId);
if (null == callbackList) {
return true;
}
return callbackList.isEmpty();
}
/**
* This function updates read bytes of given thread
* After completing the task, each spawned child thread should update current read bytes,
* by calling this function.
*
* @param callbackThreadId
*/
public void updateReadBytes(long callbackThreadId) {
// parent thread id should not be null as we are setting the same for all RDDs
if (null != threadLocal.get()) {
long parentThreadId = threadLocal.get();
List<CarbonFSBytesReadOnThreadCallback> callbackList = getCallbackList(parentThreadId);
if (null != callbackList) {
for (CarbonFSBytesReadOnThreadCallback callback : callbackList) {
if (callback.threadId == callbackThreadId) {
callback.updatedReadBytes += callback.readBytes();
break;
}
}
}
}
}
/**
* returns total task read bytes, by summing all parent & spawned threads read bytes
*
* @param threadName
* @return
*/
public long getReadBytesSum(long threadName) {
List<CarbonFSBytesReadOnThreadCallback> callbacks = getCallbackList(threadName);
long sum = 0;
if (null != callbacks) {
for (CarbonFSBytesReadOnThreadCallback callback : callbacks) {
sum += callback.getReadBytes();
}
}
return sum;
}
/**
* adds spawned thread callback entry in metric map using parentThreadId
*
* @param parentThreadId
* @param callback
*/
private void addEntry(long parentThreadId, CarbonFSBytesReadOnThreadCallback callback) {
List<CarbonFSBytesReadOnThreadCallback> callbackList = getCallbackList(parentThreadId);
if (null == callbackList) {
//create new list
List<CarbonFSBytesReadOnThreadCallback> list = new CopyOnWriteArrayList<>();
list.add(callback);
metricMap.put(parentThreadId, list);
} else {
// add to existing list
callbackList.add(callback);
}
}
/**
* This class maintains getReadBytes info of each thread
*/
class CarbonFSBytesReadOnThreadCallback {
long baseline = 0;
long updatedReadBytes = 0;
long threadId = Thread.currentThread().getId();
CarbonFSBytesReadOnThreadCallback(long parentThread) {
// reads current thread readBytes
this.baseline = readBytes();
addEntry(parentThread, this);
}
/**
* returns current thread read bytes from FileSystem Statistics
*
* @return
*/
public long readBytes() {
List<FileSystem.Statistics> statisticsList = FileSystem.getAllStatistics();
long sum = 0;
try {
for (FileSystem.Statistics statistics : statisticsList) {
Class statisticsClass = Class.forName(statistics.getClass().getName());
Method getThreadStatisticsMethod =
statisticsClass.getDeclaredMethod("getThreadStatistics");
Class statisticsDataClass =
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData");
Method getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead");
sum += (Long) getBytesReadMethod
.invoke(statisticsDataClass.cast(getThreadStatisticsMethod.invoke(statistics, null)),
null);
}
} catch (Exception ex) {
LOGGER.debug(ex.getLocalizedMessage());
}
return sum;
}
/**
* After completing task, each child thread should update corresponding
* read bytes using updatedReadBytes method.
* if updatedReadBytes > 0 then return updatedReadBytes (i.e thread read bytes).
*
* @return
*/
public long getReadBytes() {
return updatedReadBytes - baseline;
}
}
}