blob: 2a8667436b5aab3f856e0ada22470b9879727396 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.counters;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
/**
* A Counter enables the aggregation of a stream of values over time. The cumulative aggregate value
* is updated as new values are added. Multiple kinds of aggregation are supported depending on the
* type of the counter.
*
* <p>After all possible mutations have completed, the reader should check {@link #isDirty} for
* every counter, otherwise updates may be lost.
*
* <p>A counter may become dirty without a corresponding update to the value. This generally will
* occur when the calls to {@code addValue()}, {@code committing()}, and {@code committed()} are
* interleaved such that the value is updated between the calls to committing and the read of the
* value.
*
* @param <InputT> the type of values aggregated by this counter.
* @param <AccumT> the type of aggregator stored by this counter.
*/
public class Counter<InputT, AccumT> {
/**
* An instance of {@link AtomicCounterValue} specifies how a counter is aggregated and stored
* internally. It should ensure that all operations are atomic when used in multiple threads.
*/
public interface AtomicCounterValue<InputT, AccumT> {
void addValue(InputT value);
AccumT getAggregate();
AccumT getAndReset();
<UpdateT> UpdateT extractUpdate(
CounterName name, boolean delta, CounterUpdateExtractor<UpdateT> updateExtractor);
}
/**
* An instance of {@link CounterUpdateExtractor} specifies how a counter can be turned into an
* appropriate update proto.
*
* <p>This class is essentially a visitor for counter values.
*/
public interface CounterUpdateExtractor<UpdateT> {
@Nullable
UpdateT longSum(CounterName name, boolean delta, Long value);
@Nullable
UpdateT longMin(CounterName name, boolean delta, Long value);
@Nullable
UpdateT longMax(CounterName name, boolean delta, Long value);
@Nullable
UpdateT longMean(CounterName name, boolean delta, CounterMean<Long> value);
@Nullable
UpdateT intSum(CounterName name, boolean delta, Integer value);
@Nullable
UpdateT intMin(CounterName name, boolean delta, Integer value);
@Nullable
UpdateT intMax(CounterName name, boolean delta, Integer value);
@Nullable
UpdateT intMean(CounterName name, boolean delta, CounterMean<Integer> value);
@Nullable
UpdateT doubleSum(CounterName name, boolean delta, Double value);
@Nullable
UpdateT doubleMin(CounterName name, boolean delta, Double value);
@Nullable
UpdateT doubleMax(CounterName name, boolean delta, Double value);
@Nullable
UpdateT doubleMean(CounterName name, boolean delta, CounterMean<Double> value);
@Nullable
UpdateT boolOr(CounterName name, boolean delta, Boolean value);
@Nullable
UpdateT boolAnd(CounterName name, boolean delta, Boolean value);
@Nullable
UpdateT distribution(CounterName name, boolean delta, CounterDistribution value);
}
//////////////////////////////////////////////////////////////////////////////
/** The name and metadata of this counter. */
private final CounterName name;
/** The commit state of this counter. */
@VisibleForTesting final AtomicReference<CommitState> commitState;
/** The implementation of the counter aggregation. */
private final AtomicCounterValue<InputT, AccumT> internal;
public Counter(CounterName name, AtomicCounterValue<InputT, AccumT> internal) {
this.name = name;
this.commitState = new AtomicReference<>(CommitState.COMMITTED);
this.internal = internal;
// Make sure the counter is properly initialized.
internal.getAndReset();
}
/** Adds a new value to the aggregation stream. Returns this (to allow method chaining). */
public Counter<InputT, AccumT> addValue(InputT value) {
try {
internal.addValue(value);
} finally {
setDirty();
}
return this;
}
/** Atomically resets the counter's value and returns the previous aggregate. */
public AccumT getAndReset() {
return internal.getAndReset();
}
/** Returns the counter's name. */
public CounterName getName() {
return name;
}
public <UpdateT> UpdateT extractUpdate(
boolean delta, CounterUpdateExtractor<UpdateT> updateExtractor) {
return internal.extractUpdate(name, delta, updateExtractor);
}
/** Returns the counter's flat name. This is only suitable for hashing purposes */
public String getFlatName() {
return name.getFlatName();
}
/** Returns the aggregated value. */
public AccumT getAggregate() {
return internal.getAggregate();
}
/**
* Represents whether counters' values have been committed to the backend.
*
* <p>Runners can use this information to optimize counters updates. For example, if counters are
* committed, runners may choose to skip the updates.
*
* <p>Counter state transition table: {@code Action\Current State COMMITTED DIRTY COMMITTING
* addValue() DIRTY DIRTY DIRTY committing() None COMMITTING None committed() None None COMMITTED
* }
*/
@VisibleForTesting
enum CommitState {
/** There are no local updates that haven't been committed to the backend. */
COMMITTED,
/** There are local updates that haven't been committed to the backend. */
DIRTY,
/** Local updates are committing to the backend, but are still pending. */
COMMITTING,
}
/** Returns if the counter contains non-committed aggregate. */
public boolean isDirty() {
return commitState.get() != CommitState.COMMITTED;
}
/**
* Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}.
*
* @return true if successful. False return indicates that the commit state was not in {@code
* CommitState.DIRTY}.
*/
public boolean committing() {
return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING);
}
/**
* Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}.
*
* @return true if successful.
* <p>False return indicates that the counter was updated while the committing is pending.
* That counter update might or might not has been committed. The {@code commitState} has to
* stay in {@code CommitState.DIRTY}.
*/
public boolean committed() {
return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED);
}
/**
* Sets the counter to {@code CommitState.DIRTY}.
*
* <p>Must be called at the end of any methods that add values to the counter (specifically,
* {@link #addValue}. This does not need to be called by {@link #getAndReset} because it doesn't
* add any new values to the counter (just returns the current values).
*/
protected void setDirty() {
commitState.set(CommitState.DIRTY);
}
/**
* Returns a string representation of the Counter. Useful for debugging logs. Example return
* value: "ElementCount:SUM(15)".
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getFlatName());
sb.append(":");
sb.append(internal.getClass().getSimpleName());
sb.append("(").append(getAggregate()).append(")");
return sb.toString();
}
public String toPrettyString() {
return name.getPrettyName();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o instanceof Counter) {
Counter<?, ?> that = (Counter<?, ?>) o;
return Objects.equals(this.name, that.name)
&& Objects.equals(internal.getClass(), that.internal.getClass());
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(internal.getClass(), name);
}
}