blob: 96206486305b524627d9aad4999cea36340f35a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.federated;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;
import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics.FedStatsCollection.CacheStatsCollection;
import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics.FedStatsCollection.GCStatsCollection;
import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics.FedStatsCollection.LineageCacheStatsCollection;
import org.apache.sysds.runtime.controlprogram.federated.FederatedStatistics.FedStatsCollection.MultiTenantStatsCollection;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ListObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.utils.Statistics;
public class FederatedStatistics {
// stats of the federated worker on the coordinator site
private static Set<Pair<String, Integer>> _fedWorkerAddresses = new HashSet<>();
private static final LongAdder readCount = new LongAdder();
private static final LongAdder putCount = new LongAdder();
private static final LongAdder getCount = new LongAdder();
private static final LongAdder executeInstructionCount = new LongAdder();
private static final LongAdder executeUDFCount = new LongAdder();
private static final LongAdder transferredScalarCount = new LongAdder();
private static final LongAdder transferredListCount = new LongAdder();
private static final LongAdder transferredMatrixCount = new LongAdder();
private static final LongAdder transferredFrameCount = new LongAdder();
private static final LongAdder transferredMatCharCount = new LongAdder();
private static final LongAdder transferredMatrixBytes = new LongAdder();
private static final LongAdder transferredFrameBytes = new LongAdder();
private static final LongAdder asyncPrefetchCount = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();
private static final LongAdder bytesReceived = new LongAdder();
// stats on the federated worker itself
private static final LongAdder fedLookupTableGetCount = new LongAdder();
private static final LongAdder fedLookupTableGetTime = new LongAdder(); // in milli sec
private static final LongAdder fedLookupTableEntryCount = new LongAdder();
private static final LongAdder fedReuseReadHitCount = new LongAdder();
private static final LongAdder fedReuseReadBytesCount = new LongAdder();
private static final LongAdder fedBytesSent = new LongAdder();
private static final LongAdder fedBytesReceived = new LongAdder();
private static final LongAdder fedPutLineageCount = new LongAdder();
private static final LongAdder fedPutLineageItems = new LongAdder();
private static final LongAdder fedSerializationReuseCount = new LongAdder();
private static final LongAdder fedSerializationReuseBytes = new LongAdder();
// Traffic between federated worker and a coordinator site
// in the form of [{ datetime, coordinatorAddress, transferredBytes }, { ... }] }
private static CopyOnWriteArrayList<Triple<LocalDateTime, String, Long>> coordinatorsTrafficBytes = new CopyOnWriteArrayList<>();
public static void logServerTraffic(long read, long written) {
bytesReceived.add(read);
bytesSent.add(written);
}
public static void logWorkerTraffic(long read, long written) {
fedBytesReceived.add(read);
fedBytesSent.add(written);
}
public static synchronized void incFederated(RequestType rqt, List<Object> data){
switch (rqt) {
case READ_VAR:
readCount.increment();
break;
case PUT_VAR:
putCount.increment();
incFedTransfer(data.get(0));
break;
case GET_VAR:
getCount.increment();
break;
case EXEC_INST:
executeInstructionCount.increment();
break;
case EXEC_UDF:
executeUDFCount.increment();
incFedTransfer(data);
break;
default:
break;
}
}
private static void incFedTransfer(List<Object> data) {
for(Object dataObj : data)
incFedTransfer(dataObj);
}
private static void incFedTransfer(Object dataObj) {
incFedTransfer(dataObj, null);
}
public static void incFedTransfer(Object dataObj, String host) {
long byteAmount = 0;
if(dataObj instanceof MatrixBlock) {
transferredMatrixCount.increment();
byteAmount = ((MatrixBlock)dataObj).getInMemorySize();
transferredMatrixBytes.add(byteAmount);
}
else if(dataObj instanceof FrameBlock) {
transferredFrameCount.increment();
byteAmount = ((FrameBlock)dataObj).getInMemorySize();
transferredFrameBytes.add(byteAmount);
}
else if(dataObj instanceof ScalarObject)
transferredScalarCount.increment();
else if(dataObj instanceof ListObject)
transferredListCount.increment();
else if(dataObj instanceof MatrixCharacteristics)
transferredMatCharCount.increment();
if (host != null && byteAmount > 0) {
coordinatorsTrafficBytes.add(new ImmutableTriple<>(LocalDateTime.now(), host, byteAmount));
}
}
public static void incAsyncPrefetchCount(long c) {
asyncPrefetchCount.add(c);
}
public static long getTotalFedTransferCount() {
return transferredScalarCount.longValue() + transferredListCount.longValue()
+ transferredMatrixCount.longValue() + transferredFrameCount.longValue()
+ transferredMatCharCount.longValue();
}
public static void reset() {
readCount.reset();
putCount.reset();
getCount.reset();
executeInstructionCount.reset();
executeUDFCount.reset();
transferredScalarCount.reset();
transferredListCount.reset();
transferredMatrixCount.reset();
transferredFrameCount.reset();
transferredMatCharCount.reset();
transferredMatrixBytes.reset();
transferredFrameBytes.reset();
asyncPrefetchCount.reset();
fedLookupTableGetCount.reset();
fedLookupTableGetTime.reset();
fedLookupTableEntryCount.reset();
fedReuseReadHitCount.reset();
fedReuseReadBytesCount.reset();
fedPutLineageCount.reset();
fedPutLineageItems.reset();
fedSerializationReuseCount.reset();
fedSerializationReuseBytes.reset();
bytesSent.reset();
bytesReceived.reset();
fedBytesSent.reset();
fedBytesReceived.reset();
//TODO merge with existing
coordinatorsTrafficBytes.clear();
}
public static String displayFedIOExecStatistics() {
if( readCount.longValue() > 0){ // only if there happened something on the federated worker
StringBuilder sb = new StringBuilder();
sb.append("Federated I/O (Read, Put, Get):\t" +
readCount.longValue() + "/" +
putCount.longValue() + "/" +
getCount.longValue() + ".\n");
sb.append("Federated Execute (Inst, UDF):\t" +
executeInstructionCount.longValue() + "/" +
executeUDFCount.longValue() + ".\n");
if(getTotalFedTransferCount() > 0)
sb.append("Fed Put Count (Sc/Li/Ma/Fr/MC):\t" +
transferredScalarCount.longValue() + "/" +
transferredListCount.longValue() + "/" +
transferredMatrixCount.longValue() + "/" +
transferredFrameCount.longValue() + "/" +
transferredMatCharCount.longValue() + ".\n");
if(transferredMatrixBytes.longValue() > 0 || transferredFrameBytes.longValue() > 0)
sb.append("Fed Put Bytes (Mat/Frame):\t" +
transferredMatrixBytes.longValue() + "/" +
transferredFrameBytes.longValue() + " Bytes.\n");
sb.append("Federated prefetch count:\t" +
asyncPrefetchCount.longValue() + ".\n");
return sb.toString();
}
return "";
}
public static String displayNetworkTrafficStatistics() {
return "Server I/O bytes (read/written):\t" +
bytesReceived.longValue() +
"/" +
bytesSent.longValue() +
"\n" +
"Worker I/O bytes (read/written):\t" +
fedBytesReceived.longValue() +
"/" +
fedBytesSent.longValue() +
"\n";
}
public static void registerFedWorker(String host, int port) {
_fedWorkerAddresses.add(new ImmutablePair<>(host, Integer.valueOf(port)));
}
public static String displayFedWorkers() {
StringBuilder sb = new StringBuilder();
sb.append("Federated Worker Addresses:\n");
for(Pair<String, Integer> fedAddr : _fedWorkerAddresses) {
sb.append(String.format(" %s:%d", fedAddr.getLeft(), fedAddr.getRight().intValue()));
sb.append("\n");
}
return sb.toString();
}
public static String displayFedWorkerStats() {
StringBuilder sb = new StringBuilder();
sb.append(displayFedLookupTableStats());
sb.append(displayFedReuseReadStats());
sb.append(displayFedPutLineageStats());
sb.append(displayFedSerializationReuseStats());
//FIXME: the following statistics need guards to only show
// results if federated operations where executed, also the CPU
// and mem usage only probe once at the time of stats printing
//sb.append(displayFedTransfer());
//sb.append(displayCPUUsage());
//sb.append(displayMemoryUsage());
return sb.toString();
}
public static String displayStatistics(int numHeavyHitters) {
FedStatsCollection fedStats = collectFedStats();
return displayStatistics(fedStats, numHeavyHitters);
}
public static String displayStatistics(FedStatsCollection fedStats, int numHeavyHitters) {
StringBuilder sb = new StringBuilder();
sb.append("SystemDS Federated Statistics:\n");
sb.append(displayCacheStats(fedStats.cacheStats));
sb.append(String.format("Total JIT compile time:\t\t%.3f sec.\n", fedStats.jitCompileTime));
sb.append(displayGCStats(fedStats.gcStats));
sb.append(displayLinCacheStats(fedStats.linCacheStats));
sb.append(displayMultiTenantStats(fedStats.mtStats));
sb.append(displayCPUUsage());
sb.append(displayMemoryUsage());
sb.append(displayFedTransfer());
sb.append(displayHeavyHitters(fedStats.heavyHitters, numHeavyHitters));
sb.append(displayNetworkTrafficStatistics());
return sb.toString();
}
private static String displayCacheStats(CacheStatsCollection csc) {
StringBuilder sb = new StringBuilder();
sb.append(String.format("Cache hits (Mem/Li/WB/FS/HDFS):\t%d/%d/%d/%d/%d.\n",
csc.memHits, csc.linHits, csc.fsBuffHits, csc.fsHits, csc.hdfsHits));
sb.append(String.format("Cache writes (Li/WB/FS/HDFS):\t%d/%d/%d/%d.\n",
csc.linWrites, csc.fsBuffWrites, csc.fsWrites, csc.hdfsWrites));
sb.append(String.format("Cache times (ACQr/m, RLS, EXP):\t%.3f/%.3f/%.3f/%.3f sec.\n",
csc.acqRTime, csc.acqMTime, csc.rlsTime, csc.expTime));
return sb.toString();
}
private static String displayGCStats(GCStatsCollection gcsc) {
StringBuilder sb = new StringBuilder();
sb.append(String.format("Total JVM GC count:\t\t%d.\n", gcsc.gcCount));
sb.append(String.format("Total JVM GC time:\t\t%.3f sec.\n", gcsc.gcTime));
return sb.toString();
}
private static String displayLinCacheStats(LineageCacheStatsCollection lcsc) {
StringBuilder sb = new StringBuilder();
sb.append(String.format("LinCache hits (Mem/FS/Del):\t%d/%d/%d.\n",
lcsc.numHitsMem, lcsc.numHitsFS, lcsc.numHitsDel));
sb.append(String.format("LinCache MultiLvl (Ins/SB/Fn):\t%d/%d/%d.\n",
lcsc.numHitsInst, lcsc.numHitsSB, lcsc.numHitsFunc));
sb.append(String.format("LinCache writes (Mem/FS/Del):\t%d/%d/%d.\n",
lcsc.numWritesMem, lcsc.numWritesFS, lcsc.numMemDel));
return sb.toString();
}
private static String displayMultiTenantStats(MultiTenantStatsCollection mtsc) {
StringBuilder sb = new StringBuilder();
sb.append(displayFedLookupTableStats(mtsc.fLTGetCount, mtsc.fLTEntryCount, mtsc.fLTGetTime));
sb.append(displayFedReuseReadStats(mtsc.reuseReadHits, mtsc.reuseReadBytes));
sb.append(displayFedPutLineageStats(mtsc.putLineageCount, mtsc.putLineageItems));
sb.append(displayFedSerializationReuseStats(mtsc.serializationReuseCount, mtsc.serializationReuseBytes));
return sb.toString();
}
@SuppressWarnings("unused")
private static String displayHeavyHitters(HashMap<String, Pair<Long, Double>> heavyHitters) {
return displayHeavyHitters(heavyHitters, 10);
}
private static String displayFedTransfer() {
StringBuilder sb = new StringBuilder();
sb.append("Transferred bytes (Host/Datetime/ByteAmount):\n");
for (var entry: coordinatorsTrafficBytes) {
sb.append(String.format("%s/%s/%d.\n",
entry.getLeft().format(DateTimeFormatter.ISO_DATE_TIME), entry.getMiddle(), entry.getRight()));
}
return sb.toString();
}
private static String displayCPUUsage() {
StringBuilder sb = new StringBuilder();
double cpuUsage = getCPUUsage();
sb.append(String.format("CPU usage %%: %.2f\n", cpuUsage));
return sb.toString();
}
private static String displayMemoryUsage() {
StringBuilder sb = new StringBuilder();
double memoryUsage = getMemoryUsage();
sb.append(String.format("Memory usage %%: %.2f\n", memoryUsage));
return sb.toString();
}
private static String displayHeavyHitters(HashMap<String, Pair<Long, Double>> heavyHitters, int num) {
StringBuilder sb = new StringBuilder();
@SuppressWarnings("unchecked")
Entry<String, Pair<Long, Double>>[] hhArr = heavyHitters.entrySet().toArray(new Entry[0]);
Arrays.sort(hhArr, new Comparator<Entry<String, Pair<Long, Double>>>() {
public int compare(Entry<String, Pair<Long, Double>> e1, Entry<String, Pair<Long, Double>> e2) {
return e1.getValue().getRight().compareTo(e2.getValue().getRight());
}
});
sb.append("Heavy hitter instructions:\n");
final String numCol = "#";
final String instCol = "Instruction";
final String timeSCol = "Time(s)";
final String countCol = "Count";
int numHittersToDisplay = Math.min(num, hhArr.length);
int maxNumLen = String.valueOf(numHittersToDisplay).length();
int maxInstLen = instCol.length();
int maxTimeSLen = timeSCol.length();
int maxCountLen = countCol.length();
DecimalFormat sFormat = new DecimalFormat("#,##0.000");
for (int counter = 0; counter < numHittersToDisplay; counter++) {
Entry<String, Pair<Long, Double>> hh = hhArr[hhArr.length - 1 - counter];
String instruction = hh.getKey();
maxInstLen = Math.max(maxInstLen, instruction.length());
String timeString = sFormat.format(hh.getValue().getRight());
maxTimeSLen = Math.max(maxTimeSLen, timeString.length());
maxCountLen = Math.max(maxCountLen, String.valueOf(hh.getValue().getLeft()).length());
}
maxInstLen = Math.min(maxInstLen, DMLScript.STATISTICS_MAX_WRAP_LEN);
sb.append(String.format( " %" + maxNumLen + "s %-" + maxInstLen + "s %"
+ maxTimeSLen + "s %" + maxCountLen + "s", numCol, instCol, timeSCol, countCol));
sb.append("\n");
for (int counter = 0; counter < numHittersToDisplay; counter++) {
String instruction = hhArr[hhArr.length - 1 - counter].getKey();
String [] wrappedInstruction = Statistics.wrap(instruction, maxInstLen);
String timeSString = sFormat.format(hhArr[hhArr.length - 1 - counter].getValue().getRight());
long count = hhArr[hhArr.length - 1 - counter].getValue().getLeft();
int numLines = wrappedInstruction.length;
for(int wrapIter = 0; wrapIter < numLines; wrapIter++) {
String instStr = (wrapIter < wrappedInstruction.length) ? wrappedInstruction[wrapIter] : "";
if(wrapIter == 0) {
sb.append(String.format(
" %" + maxNumLen + "d %-" + maxInstLen + "s %" + maxTimeSLen + "s %"
+ maxCountLen + "d", (counter + 1), instStr, timeSString, count));
}
else {
sb.append(String.format(
" %" + maxNumLen + "s %-" + maxInstLen + "s %" + maxTimeSLen + "s %"
+ maxCountLen + "s", "", instStr, "", ""));
}
sb.append("\n");
}
}
return sb.toString();
}
private static FedStatsCollection collectFedStats() {
Future<FederatedResponse>[] responses = getFederatedResponses();
FedStatsCollection aggFedStats = new FedStatsCollection();
for(Future<FederatedResponse> res : responses) {
try {
Object[] tmp = res.get().getData();
if(tmp[0] instanceof FedStatsCollection)
aggFedStats.aggregate((FedStatsCollection)tmp[0]);
} catch(Exception e) {
throw new DMLRuntimeException("Exception of type " + e.getClass().toString()
+ " thrown while " + "getting the federated stats of the federated response: ", e);
}
}
return aggFedStats;
}
private static Future<FederatedResponse>[] getFederatedResponses() {
List<Future<FederatedResponse>> ret = new ArrayList<>();
for(Pair<String, Integer> fedAddr : _fedWorkerAddresses) {
InetSocketAddress isa = new InetSocketAddress(fedAddr.getLeft(), fedAddr.getRight());
FederatedRequest frUDF = new FederatedRequest(RequestType.EXEC_UDF, -1,
new FedStatsCollectFunction());
try {
ret.add(FederatedData.executeFederatedOperation(isa, frUDF));
} catch(DMLRuntimeException dre) {
// silently ignore this exception --> caused by offline federated workers
} catch (Exception e) {
System.out.println("Exeption of type " + e.getClass().getName()
+ " thrown while getting stats from federated worker: " + e.getMessage());
}
}
@SuppressWarnings("unchecked")
Future<FederatedResponse>[] retArr = ret.toArray(new Future[0]);
return retArr;
}
public static long getFedLookupTableGetCount() {
return fedLookupTableGetCount.longValue();
}
public static List<Triple<LocalDateTime, String, Long>> getCoordinatorsTrafficBytes() {
return coordinatorsTrafficBytes;
}
public static double getCPUUsage() {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
double cpuUsage = 0.0f;
for(Long threadID : threadMXBean.getAllThreadIds()) {
cpuUsage += threadMXBean.getThreadCpuTime(threadID);
}
cpuUsage /= 1000000000; // nanoseconds to seconds
return cpuUsage;
}
public static double getMemoryUsage() {
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
double maxMemory = (double)memoryMXBean.getHeapMemoryUsage().getMax() / 1073741824;
double usedMemory = (double)memoryMXBean.getHeapMemoryUsage().getUsed() / 1073741824;
return (usedMemory / maxMemory) * 100;
}
public static long getFedLookupTableGetTime() {
return fedLookupTableGetTime.longValue();
}
public static long getFedLookupTableEntryCount() {
return fedLookupTableEntryCount.longValue();
}
public static long getFedReuseReadHitCount() {
return fedReuseReadHitCount.longValue();
}
public static long getFedReuseReadBytesCount() {
return fedReuseReadBytesCount.longValue();
}
public static long getFedPutLineageCount() {
return fedPutLineageCount.longValue();
}
public static long getFedPutLineageItems() {
return fedPutLineageItems.longValue();
}
public static long getFedSerializationReuseCount() {
return fedSerializationReuseCount.longValue();
}
public static long getFedSerializationReuseBytes() {
return fedSerializationReuseBytes.longValue();
}
public static void incFedLookupTableGetCount() {
fedLookupTableGetCount.increment();
}
public static void incFedLookupTableGetTime(long time) {
fedLookupTableGetTime.add(time);
}
public static void incFedLookupTableEntryCount() {
fedLookupTableEntryCount.increment();
}
public static void incFedReuseReadHitCount() {
fedReuseReadHitCount.increment();
}
public static void incFedReuseReadBytesCount(CacheableData<?> data) {
fedReuseReadBytesCount.add(data.getDataSize());
}
public static void incFedReuseReadBytesCount(CacheBlock cb) {
fedReuseReadBytesCount.add(cb.getInMemorySize());
}
public static void aggFedPutLineage(String serializedLineage) {
fedPutLineageCount.increment();
fedPutLineageItems.add(serializedLineage.lines().count());
}
public static void aggFedSerializationReuse(long bytes) {
fedSerializationReuseCount.increment();
fedSerializationReuseBytes.add(bytes);
}
public static String displayFedLookupTableStats() {
return displayFedLookupTableStats(fedLookupTableGetCount.longValue(),
fedLookupTableEntryCount.longValue(), fedLookupTableGetTime.doubleValue() / 1000000000);
}
public static String displayFedLookupTableStats(long fltGetCount, long fltEntryCount, double fltGetTime) {
if(fltGetCount > 0) {
return InstructionUtils.concatStrings(
"Fed LookupTable (Get, Entries):\t",
String.valueOf(fltGetCount), "/", String.valueOf(fltEntryCount),".\n");
}
return "";
}
public static String displayFedReuseReadStats() {
return displayFedReuseReadStats(
fedReuseReadHitCount.longValue(),
fedReuseReadBytesCount.longValue());
}
public static String displayFedReuseReadStats(long rrHits, long rrBytes) {
if(rrHits > 0) {
return InstructionUtils.concatStrings(
"Fed ReuseRead (Hits, Bytes):\t",
String.valueOf(rrHits), "/", String.valueOf(rrBytes), ".\n");
}
return "";
}
public static String displayFedPutLineageStats() {
return displayFedPutLineageStats(fedPutLineageCount.longValue(),
fedPutLineageItems.longValue());
}
public static String displayFedPutLineageStats(long plCount, long plItems) {
if(plCount > 0) {
return InstructionUtils.concatStrings(
"Fed PutLineage (Count, Items):\t",
String.valueOf(plCount), "/", String.valueOf(plItems), ".\n");
}
return "";
}
public static String displayFedSerializationReuseStats() {
return displayFedSerializationReuseStats(fedSerializationReuseCount.longValue(),
fedSerializationReuseBytes.longValue());
}
public static String displayFedSerializationReuseStats(long srCount, long srBytes) {
if(srCount > 0) {
return InstructionUtils.concatStrings(
"Fed SerialReuse (Count, Bytes):\t",
String.valueOf(srCount), "/", String.valueOf(srBytes), ".\n");
}
return "";
}
public static class FedStatsCollectFunction extends FederatedUDF {
private static final long serialVersionUID = 1L;
public FedStatsCollectFunction() {
super(new long[] { });
}
@Override
public FederatedResponse execute(ExecutionContext ec, Data... data) {
FedStatsCollection fedStats = new FedStatsCollection();
fedStats.collectStats();
return new FederatedResponse(FederatedResponse.ResponseType.SUCCESS, fedStats);
}
@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
return null;
}
}
public static class FedStatsCollection implements Serializable {
private static final long serialVersionUID = 1L;
private void collectStats() {
cacheStats.collectStats();
jitCompileTime = ((double)Statistics.getJITCompileTime()) / 1000; // in sec
cpuUsage = getCPUUsage();
memoryUsage = getMemoryUsage();
gcStats.collectStats();
linCacheStats.collectStats();
mtStats.collectStats();
heavyHitters = Statistics.getHeavyHittersHashMap();
coordinatorsTrafficBytes = getCoordinatorsTrafficBytes();
}
public void aggregate(FedStatsCollection that) {
cacheStats.aggregate(that.cacheStats);
jitCompileTime += that.jitCompileTime;
cpuUsage += that.cpuUsage;
memoryUsage += that.memoryUsage;
gcStats.aggregate(that.gcStats);
linCacheStats.aggregate(that.linCacheStats);
mtStats.aggregate(that.mtStats);
that.heavyHitters.forEach(
(key, value) -> heavyHitters.merge(key, value, (v1, v2) ->
new ImmutablePair<>(v1.getLeft() + v2.getLeft(), v1.getRight() + v2.getRight()))
);
that.coordinatorsTrafficBytes.addAll(coordinatorsTrafficBytes);
}
protected static class CacheStatsCollection implements Serializable {
private static final long serialVersionUID = 1L;
private void collectStats() {
memHits = CacheStatistics.getMemHits();
linHits = CacheStatistics.getLinHits();
fsBuffHits = CacheStatistics.getFSBuffHits();
fsHits = CacheStatistics.getFSHits();
hdfsHits = CacheStatistics.getHDFSHits();
linWrites = CacheStatistics.getLinWrites();
fsBuffWrites = CacheStatistics.getFSBuffWrites();
fsWrites = CacheStatistics.getFSWrites();
hdfsWrites = CacheStatistics.getHDFSWrites();
acqRTime = ((double)CacheStatistics.getAcquireRTime()) / 1000000000; // in sec
acqMTime = ((double)CacheStatistics.getAcquireMTime()) / 1000000000; // in sec
rlsTime = ((double)CacheStatistics.getReleaseTime()) / 1000000000; // in sec
expTime = ((double)CacheStatistics.getExportTime()) / 1000000000; // in sec
}
private void aggregate(CacheStatsCollection that) {
memHits += that.memHits;
linHits += that.linHits;
fsBuffHits += that.fsBuffHits;
fsHits += that.fsHits;
hdfsHits += that.hdfsHits;
linWrites += that.linWrites;
fsBuffWrites += that.fsBuffWrites;
fsWrites += that.fsWrites;
hdfsWrites += that.hdfsWrites;
acqRTime += that.acqRTime;
acqMTime += that.acqMTime;
rlsTime += that.rlsTime;
expTime += that.expTime;
}
private long memHits = 0;
private long linHits = 0;
private long fsBuffHits = 0;
private long fsHits = 0;
private long hdfsHits = 0;
private long linWrites = 0;
private long fsBuffWrites = 0;
private long fsWrites = 0;
private long hdfsWrites = 0;
private double acqRTime = 0;
private double acqMTime = 0;
private double rlsTime = 0;
private double expTime = 0;
}
protected static class GCStatsCollection implements Serializable {
private static final long serialVersionUID = 1L;
private void collectStats() {
gcCount = Statistics.getJVMgcCount();
gcTime = ((double)Statistics.getJVMgcTime()) / 1000; // in sec
}
private void aggregate(GCStatsCollection that) {
gcCount += that.gcCount;
gcTime += that.gcTime;
}
private long gcCount = 0;
private double gcTime = 0;
}
protected static class LineageCacheStatsCollection implements Serializable {
private static final long serialVersionUID = 1L;
private void collectStats() {
numHitsMem = LineageCacheStatistics.getMemHits();
numHitsFS = LineageCacheStatistics.getFSHits();
numHitsDel = LineageCacheStatistics.getDelHits();
numHitsInst = LineageCacheStatistics.getInstHits();
numHitsSB = LineageCacheStatistics.getSBHits();
numHitsFunc = LineageCacheStatistics.getFuncHits();
numWritesMem = LineageCacheStatistics.getMemWrites();
numWritesFS = LineageCacheStatistics.getFSWrites();
numMemDel = LineageCacheStatistics.getMemDeletes();
}
private void aggregate(LineageCacheStatsCollection that) {
numHitsMem += that.numHitsMem;
numHitsFS += that.numHitsFS;
numHitsDel += that.numHitsDel;
numHitsInst += that.numHitsInst;
numHitsSB += that.numHitsSB;
numHitsFunc += that.numHitsFunc;
numWritesMem += that.numWritesMem;
numWritesFS += that.numWritesFS;
numMemDel += that.numMemDel;
}
private long numHitsMem = 0;
private long numHitsFS = 0;
private long numHitsDel = 0;
private long numHitsInst = 0;
private long numHitsSB = 0;
private long numHitsFunc = 0;
private long numWritesMem = 0;
private long numWritesFS = 0;
private long numMemDel = 0;
}
protected static class MultiTenantStatsCollection implements Serializable {
private static final long serialVersionUID = 1L;
private void collectStats() {
fLTGetCount = getFedLookupTableGetCount();
fLTGetTime = ((double)getFedLookupTableGetTime()) / 1000000000; // in sec
fLTEntryCount = getFedLookupTableEntryCount();
reuseReadHits = getFedReuseReadHitCount();
reuseReadBytes = getFedReuseReadBytesCount();
putLineageCount = getFedPutLineageCount();
putLineageItems = getFedPutLineageItems();
serializationReuseCount = getFedSerializationReuseCount();
serializationReuseBytes = getFedSerializationReuseBytes();
}
private void aggregate(MultiTenantStatsCollection that) {
fLTGetCount += that.fLTGetCount;
fLTGetTime += that.fLTGetTime;
fLTEntryCount += that.fLTEntryCount;
reuseReadHits += that.reuseReadHits;
reuseReadBytes += that.reuseReadBytes;
putLineageCount += that.putLineageCount;
putLineageItems += that.putLineageItems;
serializationReuseCount += that.serializationReuseCount;
serializationReuseBytes += that.serializationReuseBytes;
}
private long fLTGetCount = 0;
private double fLTGetTime = 0;
private long fLTEntryCount = 0;
private long reuseReadHits = 0;
private long reuseReadBytes = 0;
private long putLineageCount = 0;
private long putLineageItems = 0;
private long serializationReuseCount = 0;
private long serializationReuseBytes = 0;
}
private CacheStatsCollection cacheStats = new CacheStatsCollection();
public double jitCompileTime = 0;
public double cpuUsage = 0;
public double memoryUsage = 0;
private GCStatsCollection gcStats = new GCStatsCollection();
private LineageCacheStatsCollection linCacheStats = new LineageCacheStatsCollection();
private MultiTenantStatsCollection mtStats = new MultiTenantStatsCollection();
public HashMap<String, Pair<Long, Double>> heavyHitters = new HashMap<>();
public List<Triple<LocalDateTime, String, Long>> coordinatorsTrafficBytes = new ArrayList<>();
}
}