blob: 6824978893df82dcd672fcf5ab26f031fd371412 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.ops;
import java.util.Iterator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.MetricValue;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile.Builder;
import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
import com.carrotsearch.hppc.IntDoubleHashMap;
import com.carrotsearch.hppc.IntLongHashMap;
import com.carrotsearch.hppc.cursors.IntDoubleCursor;
import com.carrotsearch.hppc.cursors.IntLongCursor;
import com.carrotsearch.hppc.procedures.IntDoubleProcedure;
import com.carrotsearch.hppc.procedures.IntLongProcedure;
import org.apache.drill.exec.server.rest.profile.CoreOperatorType;
import com.google.common.annotations.VisibleForTesting;
public class OperatorStats {
protected final int operatorId;
protected final String operatorType;
private final BufferAllocator allocator;
private final IntLongHashMap longMetrics = new IntLongHashMap();
private final IntDoubleHashMap doubleMetrics = new IntDoubleHashMap();
public long[] recordsReceivedByInput;
public long[] batchesReceivedByInput;
private final long[] schemaCountByInput;
private boolean inProcessing;
private boolean inSetup;
private boolean inWait;
protected long processingNanos;
protected long setupNanos;
protected long waitNanos;
private long processingMark;
private long setupMark;
private long waitMark;
private final int inputCount;
public OperatorStats(OpProfileDef def, BufferAllocator allocator){
this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator);
}
/**
* Copy constructor to be able to create a copy of existing stats object shell and use it independently
* this is useful if stats have to be updated in different threads, since it is not really
* possible to update such stats as waitNanos, setupNanos and processingNanos across threads
* @param original - OperatorStats object to create a copy from
* @param isClean - flag to indicate whether to start with clean state indicators or inherit those from original object
*/
public OperatorStats(OperatorStats original, boolean isClean) {
this(original.operatorId, original.operatorType, original.inputCount, original.allocator);
if ( !isClean ) {
inProcessing = original.inProcessing;
inSetup = original.inSetup;
inWait = original.inWait;
processingMark = original.processingMark;
setupMark = original.setupMark;
waitMark = original.waitMark;
}
}
@VisibleForTesting
public OperatorStats(int operatorId, String operatorType, int inputCount, BufferAllocator allocator) {
super();
this.allocator = allocator;
this.operatorId = operatorId;
this.operatorType = operatorType;
this.inputCount = inputCount;
this.recordsReceivedByInput = new long[inputCount];
this.batchesReceivedByInput = new long[inputCount];
this.schemaCountByInput = new long[inputCount];
}
private String assertionError(String msg){
return String.format("Failure while %s for operator id %d. Currently have states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, inSetup, inWait);
}
/**
* OperatorStats merger - to merge stats from other OperatorStats
* this is needed in case some processing is multithreaded that needs to have
* separate OperatorStats to deal with
* WARN - this will only work for metrics that can be added
* @param from - OperatorStats from where to merge to "this"
* @return OperatorStats - for convenience so one can merge multiple stats in one go
*/
public OperatorStats mergeMetrics(OperatorStats from) {
final IntLongHashMap fromMetrics = from.longMetrics;
final Iterator<IntLongCursor> iter = fromMetrics.iterator();
while (iter.hasNext()) {
final IntLongCursor next = iter.next();
longMetrics.putOrAdd(next.key, next.value, next.value);
}
final IntDoubleHashMap fromDMetrics = from.doubleMetrics;
final Iterator<IntDoubleCursor> iterD = fromDMetrics.iterator();
while (iterD.hasNext()) {
final IntDoubleCursor next = iterD.next();
doubleMetrics.putOrAdd(next.key, next.value, next.value);
}
return this;
}
/**
* Clear stats
*/
public synchronized void clear() {
processingNanos = 0l;
setupNanos = 0l;
waitNanos = 0l;
longMetrics.clear();
doubleMetrics.clear();
}
public synchronized void startSetup() {
assert !inSetup : assertionError("starting setup");
stopProcessing();
inSetup = true;
setupMark = System.nanoTime();
}
public synchronized void stopSetup() {
assert inSetup : assertionError("stopping setup");
startProcessing();
setupNanos += System.nanoTime() - setupMark;
inSetup = false;
}
public synchronized void startProcessing() {
assert !inProcessing : assertionError("starting processing");
processingMark = System.nanoTime();
inProcessing = true;
}
public synchronized void stopProcessing() {
assert inProcessing : assertionError("stopping processing");
processingNanos += System.nanoTime() - processingMark;
inProcessing = false;
}
public synchronized void startWait() {
assert !inWait : assertionError("starting waiting");
stopProcessing();
inWait = true;
waitMark = System.nanoTime();
}
public synchronized void stopWait() {
assert inWait : assertionError("stopping waiting");
startProcessing();
waitNanos += System.nanoTime() - waitMark;
inWait = false;
}
public synchronized void batchReceived(int inputIndex, long records, boolean newSchema) {
recordsReceivedByInput[inputIndex] += records;
batchesReceivedByInput[inputIndex]++;
if (newSchema) {
schemaCountByInput[inputIndex]++;
}
}
public String getId() {
return new StringBuilder()
.append(this.operatorId)
.append(":")
.append("[")
.append(operatorType)
.append("]")
.toString();
}
@SuppressWarnings("deprecation")
public OperatorProfile getProfile() {
final OperatorProfile.Builder b = OperatorProfile
.newBuilder()
.setOperatorTypeName(operatorType)
.setOperatorId(operatorId)
.setSetupNanos(setupNanos)
.setProcessNanos(processingNanos)
.setWaitNanos(waitNanos);
CoreOperatorType coreOperatorType = CoreOperatorType.forName(operatorType);
if (coreOperatorType != null) {
b.setOperatorType(coreOperatorType.getId());
}
if (allocator != null) {
b.setPeakLocalMemoryAllocated(allocator.getPeakMemoryAllocation());
}
addAllMetrics(b);
return b.build();
}
public void addAllMetrics(OperatorProfile.Builder builder) {
addStreamProfile(builder);
addLongMetrics(builder);
addDoubleMetrics(builder);
}
public void addStreamProfile(OperatorProfile.Builder builder) {
for(int i = 0; i < recordsReceivedByInput.length; i++){
builder.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i]));
}
}
private static class LongProc implements IntLongProcedure {
private final OperatorProfile.Builder builder;
public LongProc(Builder builder) {
super();
this.builder = builder;
}
@Override
public void apply(int key, long value) {
builder.addMetric(MetricValue.newBuilder().setMetricId(key).setLongValue(value));
}
}
public void addLongMetrics(OperatorProfile.Builder builder) {
if (longMetrics.size() > 0) {
longMetrics.forEach(new LongProc(builder));
}
}
private static class DoubleProc implements IntDoubleProcedure {
private final OperatorProfile.Builder builder;
public DoubleProc(Builder builder) {
super();
this.builder = builder;
}
@Override
public void apply(int key, double value) {
builder.addMetric(MetricValue.newBuilder().setMetricId(key).setDoubleValue(value));
}
}
public void addDoubleMetrics(OperatorProfile.Builder builder) {
if (doubleMetrics.size() > 0) {
doubleMetrics.forEach(new DoubleProc(builder));
}
}
/**
* Set a stat to the specified long value. Creates the stat
* if the stat does not yet exist.
*
* @param metric the metric to update
* @param value the value to set
*/
public void addLongStat(MetricDef metric, long value){
longMetrics.putOrAdd(metric.metricId(), value, value);
}
@VisibleForTesting
public long getLongStat(MetricDef metric) {
return longMetrics.get(metric.metricId());
}
/**
* Add a double value to the existing value. Creates the stat
* (with an initial value of zero) if the stat does not yet
* exist.
*
* @param metric the metric to update
* @param value the value to add to the existing value
*/
public void addDoubleStat(MetricDef metric, double value){
doubleMetrics.putOrAdd(metric.metricId(), value, value);
}
@VisibleForTesting
public double getDoubleStat(MetricDef metric) {
return doubleMetrics.get(metric.metricId());
}
/**
* Add a long value to the existing value. Creates the stat
* (with an initial value of zero) if the stat does not yet
* exist.
*
* @param metric the metric to update
* @param value the value to add to the existing value
*/
public void setLongStat(MetricDef metric, long value){
longMetrics.put(metric.metricId(), value);
}
/**
* Set a stat to the specified double value. Creates the stat
* if the stat does not yet exist.
*
* @param metric the metric to update
* @param value the value to set
*/
public void setDoubleStat(MetricDef metric, double value){
doubleMetrics.put(metric.metricId(), value);
}
public long getWaitNanos() {
return waitNanos;
}
/**
* Adjust waitNanos based on client calculations
* @param waitNanosOffset - could be negative as well as positive
*/
public void adjustWaitNanos(long waitNanosOffset) {
this.waitNanos += waitNanosOffset;
}
public long getProcessingNanos() {
return processingNanos;
}
}