blob: 0530fa315349ec53509ffb8e23c3d115d9341e6a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.utils.stats;
import java.util.concurrent.atomic.LongAdder;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
public class SparkStatistics {
private static long ctxCreateTime = 0;
private static final LongAdder parallelizeTime = new LongAdder();
private static final LongAdder parallelizeCount = new LongAdder();
private static final LongAdder collectTime = new LongAdder();
private static final LongAdder collectCount = new LongAdder();
private static final LongAdder broadcastTime = new LongAdder();
private static final LongAdder broadcastCount = new LongAdder();
private static final LongAdder asyncPrefetchCount = new LongAdder();
private static final LongAdder asyncBroadcastCount = new LongAdder();
private static final LongAdder asyncTriggerCheckpointCount = new LongAdder();
private static final LongAdder asyncSparkOpCount = new LongAdder();
public static boolean createdSparkContext() {
return ctxCreateTime > 0;
}
public static void setCtxCreateTime(long ns) {
ctxCreateTime = ns;
}
public static void accParallelizeTime(long t) {
parallelizeTime.add(t);
}
public static void incParallelizeCount(long c) {
parallelizeCount.add(c);
}
public static void accCollectTime(long t) {
collectTime.add(t);
incCollectCount(1);
}
private static void incCollectCount(long c) {
collectCount.add(c);
}
public static void accBroadCastTime(long t) {
broadcastTime.add(t);
}
public static void incBroadcastCount(long c) {
broadcastCount.add(c);
}
public static void incAsyncPrefetchCount(long c) {
asyncPrefetchCount.add(c);
}
public static void incAsyncBroadcastCount(long c) {
asyncBroadcastCount.add(c);
}
public static void incAsyncTriggerCheckpointCount(long c) {
asyncTriggerCheckpointCount.add(c);
}
public static void incAsyncSparkOpCount(long c) {
asyncSparkOpCount.add(c);
}
public static long getSparkCollectCount() {
return collectCount.longValue();
}
public static long getAsyncPrefetchCount() {
return asyncPrefetchCount.longValue();
}
public static long getAsyncSparkOpCount() {
return asyncSparkOpCount.longValue();
}
public static long getAsyncBroadcastCount() {
return asyncBroadcastCount.longValue();
}
public static long getasyncTriggerCheckpointCount() {
return asyncTriggerCheckpointCount.longValue();
}
public static void reset() {
ctxCreateTime = 0;
parallelizeTime.reset();
parallelizeCount.reset();
broadcastTime.reset();
broadcastCount.reset();
collectTime.reset();
collectCount.reset();
asyncPrefetchCount.reset();
asyncBroadcastCount.reset();
asyncTriggerCheckpointCount.reset();
}
public static boolean anyAsyncOp() {
return (getAsyncPrefetchCount() > 0) || (getAsyncBroadcastCount() > 0) || (getAsyncSparkOpCount() > 0);
}
public static String displayStatistics() {
StringBuilder sb = new StringBuilder();
String lazy = SparkExecutionContext.isLazySparkContextCreation() ? "(lazy)" : "(eager)";
sb.append("Spark ctx create time "+lazy+":\t"+
String.format("%.3f", ctxCreateTime*1e-9) + " sec.\n" ); // nanoSec --> sec
sb.append("Spark trans counts (par,bc,col):" +
String.format("%d/%d/%d.\n", parallelizeCount.longValue(),
broadcastCount.longValue(), collectCount.longValue()));
sb.append("Spark trans times (par,bc,col):\t" +
String.format("%.3f/%.3f/%.3f secs.\n",
parallelizeTime.longValue()*1e-9,
broadcastTime.longValue()*1e-9,
collectTime.longValue()*1e-9));
return sb.toString();
}
public static String displayAsyncStats() {
StringBuilder sb = new StringBuilder();
sb.append("Async. OP count (pf,bc,op): \t" +
String.format("%d/%d/%d.\n", getAsyncPrefetchCount(), getAsyncBroadcastCount(), getAsyncSparkOpCount()));
return sb.toString();
}
}