| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.ops; |
| |
| import java.util.Iterator; |
| |
| import org.apache.drill.exec.memory.BufferAllocator; |
| import org.apache.drill.exec.proto.UserBitShared.MetricValue; |
| import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; |
| import org.apache.drill.exec.proto.UserBitShared.OperatorProfile.Builder; |
| import org.apache.drill.exec.proto.UserBitShared.StreamProfile; |
| |
| import com.carrotsearch.hppc.IntDoubleHashMap; |
| import com.carrotsearch.hppc.IntLongHashMap; |
| import com.carrotsearch.hppc.cursors.IntDoubleCursor; |
| import com.carrotsearch.hppc.cursors.IntLongCursor; |
| import com.carrotsearch.hppc.procedures.IntDoubleProcedure; |
| import com.carrotsearch.hppc.procedures.IntLongProcedure; |
| import org.apache.drill.exec.server.rest.profile.CoreOperatorType; |
| import com.google.common.annotations.VisibleForTesting; |
| |
| public class OperatorStats { |
| protected final int operatorId; |
| protected final String operatorType; |
| private final BufferAllocator allocator; |
| |
| private final IntLongHashMap longMetrics = new IntLongHashMap(); |
| private final IntDoubleHashMap doubleMetrics = new IntDoubleHashMap(); |
| |
| public long[] recordsReceivedByInput; |
| public long[] batchesReceivedByInput; |
| private final long[] schemaCountByInput; |
| |
| |
| private boolean inProcessing; |
| private boolean inSetup; |
| private boolean inWait; |
| |
| protected long processingNanos; |
| protected long setupNanos; |
| protected long waitNanos; |
| |
| private long processingMark; |
| private long setupMark; |
| private long waitMark; |
| |
| private final int inputCount; |
| |
| public OperatorStats(OpProfileDef def, BufferAllocator allocator){ |
| this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator); |
| } |
| |
| /** |
| * Copy constructor to be able to create a copy of existing stats object shell and use it independently |
| * this is useful if stats have to be updated in different threads, since it is not really |
| * possible to update such stats as waitNanos, setupNanos and processingNanos across threads |
| * @param original - OperatorStats object to create a copy from |
| * @param isClean - flag to indicate whether to start with clean state indicators or inherit those from original object |
| */ |
| |
| public OperatorStats(OperatorStats original, boolean isClean) { |
| this(original.operatorId, original.operatorType, original.inputCount, original.allocator); |
| |
| if ( !isClean ) { |
| inProcessing = original.inProcessing; |
| inSetup = original.inSetup; |
| inWait = original.inWait; |
| |
| processingMark = original.processingMark; |
| setupMark = original.setupMark; |
| waitMark = original.waitMark; |
| } |
| } |
| |
| @VisibleForTesting |
| public OperatorStats(int operatorId, String operatorType, int inputCount, BufferAllocator allocator) { |
| super(); |
| this.allocator = allocator; |
| this.operatorId = operatorId; |
| this.operatorType = operatorType; |
| this.inputCount = inputCount; |
| this.recordsReceivedByInput = new long[inputCount]; |
| this.batchesReceivedByInput = new long[inputCount]; |
| this.schemaCountByInput = new long[inputCount]; |
| } |
| |
| private String assertionError(String msg){ |
| return String.format("Failure while %s for operator id %d. Currently have states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, inSetup, inWait); |
| } |
| |
| /** |
| * OperatorStats merger - to merge stats from other OperatorStats |
| * this is needed in case some processing is multithreaded that needs to have |
| * separate OperatorStats to deal with |
| * WARN - this will only work for metrics that can be added |
| * @param from - OperatorStats from where to merge to "this" |
| * @return OperatorStats - for convenience so one can merge multiple stats in one go |
| */ |
| |
| public OperatorStats mergeMetrics(OperatorStats from) { |
| final IntLongHashMap fromMetrics = from.longMetrics; |
| |
| final Iterator<IntLongCursor> iter = fromMetrics.iterator(); |
| while (iter.hasNext()) { |
| final IntLongCursor next = iter.next(); |
| longMetrics.putOrAdd(next.key, next.value, next.value); |
| } |
| |
| final IntDoubleHashMap fromDMetrics = from.doubleMetrics; |
| final Iterator<IntDoubleCursor> iterD = fromDMetrics.iterator(); |
| |
| while (iterD.hasNext()) { |
| final IntDoubleCursor next = iterD.next(); |
| doubleMetrics.putOrAdd(next.key, next.value, next.value); |
| } |
| return this; |
| } |
| |
| /** |
| * Clear stats |
| */ |
| public synchronized void clear() { |
| processingNanos = 0l; |
| setupNanos = 0l; |
| waitNanos = 0l; |
| longMetrics.clear(); |
| doubleMetrics.clear(); |
| } |
| |
| public synchronized void startSetup() { |
| assert !inSetup : assertionError("starting setup"); |
| stopProcessing(); |
| inSetup = true; |
| setupMark = System.nanoTime(); |
| } |
| |
| public synchronized void stopSetup() { |
| assert inSetup : assertionError("stopping setup"); |
| startProcessing(); |
| setupNanos += System.nanoTime() - setupMark; |
| inSetup = false; |
| } |
| |
| public synchronized void startProcessing() { |
| assert !inProcessing : assertionError("starting processing"); |
| processingMark = System.nanoTime(); |
| inProcessing = true; |
| } |
| |
| public synchronized void stopProcessing() { |
| assert inProcessing : assertionError("stopping processing"); |
| processingNanos += System.nanoTime() - processingMark; |
| inProcessing = false; |
| } |
| |
| public synchronized void startWait() { |
| assert !inWait : assertionError("starting waiting"); |
| stopProcessing(); |
| inWait = true; |
| waitMark = System.nanoTime(); |
| } |
| |
| public synchronized void stopWait() { |
| assert inWait : assertionError("stopping waiting"); |
| startProcessing(); |
| waitNanos += System.nanoTime() - waitMark; |
| inWait = false; |
| } |
| |
| public synchronized void batchReceived(int inputIndex, long records, boolean newSchema) { |
| recordsReceivedByInput[inputIndex] += records; |
| batchesReceivedByInput[inputIndex]++; |
| if (newSchema) { |
| schemaCountByInput[inputIndex]++; |
| } |
| } |
| |
| public String getId() { |
| return new StringBuilder() |
| .append(this.operatorId) |
| .append(":") |
| .append("[") |
| .append(operatorType) |
| .append("]") |
| .toString(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| public OperatorProfile getProfile() { |
| final OperatorProfile.Builder b = OperatorProfile |
| .newBuilder() |
| .setOperatorTypeName(operatorType) |
| .setOperatorId(operatorId) |
| .setSetupNanos(setupNanos) |
| .setProcessNanos(processingNanos) |
| .setWaitNanos(waitNanos); |
| |
| CoreOperatorType coreOperatorType = CoreOperatorType.forName(operatorType); |
| |
| if (coreOperatorType != null) { |
| b.setOperatorType(coreOperatorType.getId()); |
| } |
| |
| if (allocator != null) { |
| b.setPeakLocalMemoryAllocated(allocator.getPeakMemoryAllocation()); |
| } |
| |
| addAllMetrics(b); |
| return b.build(); |
| } |
| |
| public void addAllMetrics(OperatorProfile.Builder builder) { |
| addStreamProfile(builder); |
| addLongMetrics(builder); |
| addDoubleMetrics(builder); |
| } |
| |
| public void addStreamProfile(OperatorProfile.Builder builder) { |
| for(int i = 0; i < recordsReceivedByInput.length; i++){ |
| builder.addInputProfile(StreamProfile.newBuilder().setBatches(batchesReceivedByInput[i]).setRecords(recordsReceivedByInput[i]).setSchemas(this.schemaCountByInput[i])); |
| } |
| } |
| |
| private static class LongProc implements IntLongProcedure { |
| |
| private final OperatorProfile.Builder builder; |
| |
| public LongProc(Builder builder) { |
| super(); |
| this.builder = builder; |
| } |
| |
| @Override |
| public void apply(int key, long value) { |
| builder.addMetric(MetricValue.newBuilder().setMetricId(key).setLongValue(value)); |
| } |
| } |
| |
| public void addLongMetrics(OperatorProfile.Builder builder) { |
| if (longMetrics.size() > 0) { |
| longMetrics.forEach(new LongProc(builder)); |
| } |
| } |
| |
| private static class DoubleProc implements IntDoubleProcedure { |
| private final OperatorProfile.Builder builder; |
| |
| public DoubleProc(Builder builder) { |
| super(); |
| this.builder = builder; |
| } |
| |
| @Override |
| public void apply(int key, double value) { |
| builder.addMetric(MetricValue.newBuilder().setMetricId(key).setDoubleValue(value)); |
| } |
| } |
| |
| public void addDoubleMetrics(OperatorProfile.Builder builder) { |
| if (doubleMetrics.size() > 0) { |
| doubleMetrics.forEach(new DoubleProc(builder)); |
| } |
| } |
| |
| /** |
| * Set a stat to the specified long value. Creates the stat |
| * if the stat does not yet exist. |
| * |
| * @param metric the metric to update |
| * @param value the value to set |
| */ |
| |
| public void addLongStat(MetricDef metric, long value){ |
| longMetrics.putOrAdd(metric.metricId(), value, value); |
| } |
| |
| @VisibleForTesting |
| public long getLongStat(MetricDef metric) { |
| return longMetrics.get(metric.metricId()); |
| } |
| |
| /** |
| * Add a double value to the existing value. Creates the stat |
| * (with an initial value of zero) if the stat does not yet |
| * exist. |
| * |
| * @param metric the metric to update |
| * @param value the value to add to the existing value |
| */ |
| |
| public void addDoubleStat(MetricDef metric, double value){ |
| doubleMetrics.putOrAdd(metric.metricId(), value, value); |
| } |
| |
| @VisibleForTesting |
| public double getDoubleStat(MetricDef metric) { |
| return doubleMetrics.get(metric.metricId()); |
| } |
| |
| /** |
| * Add a long value to the existing value. Creates the stat |
| * (with an initial value of zero) if the stat does not yet |
| * exist. |
| * |
| * @param metric the metric to update |
| * @param value the value to add to the existing value |
| */ |
| |
| public void setLongStat(MetricDef metric, long value){ |
| longMetrics.put(metric.metricId(), value); |
| } |
| |
| /** |
| * Set a stat to the specified double value. Creates the stat |
| * if the stat does not yet exist. |
| * |
| * @param metric the metric to update |
| * @param value the value to set |
| */ |
| |
| public void setDoubleStat(MetricDef metric, double value){ |
| doubleMetrics.put(metric.metricId(), value); |
| } |
| |
| public long getWaitNanos() { |
| return waitNanos; |
| } |
| |
| /** |
| * Adjust waitNanos based on client calculations |
| * @param waitNanosOffset - could be negative as well as positive |
| */ |
| public void adjustWaitNanos(long waitNanosOffset) { |
| this.waitNanos += waitNanosOffset; |
| } |
| |
| public long getProcessingNanos() { |
| return processingNanos; |
| } |
| } |