blob: 2a8667436b5aab3f856e0ada22470b9879727396 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.runners.dataflow.worker.counters;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
* A Counter enables the aggregation of a stream of values over time. The cumulative aggregate value
* is updated as new values are added. Multiple kinds of aggregation are supported depending on the
* type of the counter.
* <p>After all possible mutations have completed, the reader should check {@link #isDirty} for
* every counter, otherwise updates may be lost.
* <p>A counter may become dirty without a corresponding update to the value. This generally will
* occur when the calls to {@code addValue()}, {@code committing()}, and {@code committed()} are
* interleaved such that the value is updated between the calls to committing and the read of the
* value.
* @param <InputT> the type of values aggregated by this counter.
* @param <AccumT> the type of aggregator stored by this counter.
public class Counter<InputT, AccumT> {
* An instance of {@link AtomicCounterValue} specifies how a counter is aggregated and stored
* internally. It should ensure that all operations are atomic when used in multiple threads.
public interface AtomicCounterValue<InputT, AccumT> {
void addValue(InputT value);
AccumT getAggregate();
AccumT getAndReset();
<UpdateT> UpdateT extractUpdate(
CounterName name, boolean delta, CounterUpdateExtractor<UpdateT> updateExtractor);
* An instance of {@link CounterUpdateExtractor} specifies how a counter can be turned into an
* appropriate update proto.
* <p>This class is essentially a visitor for counter values.
public interface CounterUpdateExtractor<UpdateT> {
UpdateT longSum(CounterName name, boolean delta, Long value);
UpdateT longMin(CounterName name, boolean delta, Long value);
UpdateT longMax(CounterName name, boolean delta, Long value);
UpdateT longMean(CounterName name, boolean delta, CounterMean<Long> value);
UpdateT intSum(CounterName name, boolean delta, Integer value);
UpdateT intMin(CounterName name, boolean delta, Integer value);
UpdateT intMax(CounterName name, boolean delta, Integer value);
UpdateT intMean(CounterName name, boolean delta, CounterMean<Integer> value);
UpdateT doubleSum(CounterName name, boolean delta, Double value);
UpdateT doubleMin(CounterName name, boolean delta, Double value);
UpdateT doubleMax(CounterName name, boolean delta, Double value);
UpdateT doubleMean(CounterName name, boolean delta, CounterMean<Double> value);
UpdateT boolOr(CounterName name, boolean delta, Boolean value);
UpdateT boolAnd(CounterName name, boolean delta, Boolean value);
UpdateT distribution(CounterName name, boolean delta, CounterDistribution value);
/** The name and metadata of this counter. */
private final CounterName name;
/** The commit state of this counter. */
@VisibleForTesting final AtomicReference<CommitState> commitState;
/** The implementation of the counter aggregation. */
private final AtomicCounterValue<InputT, AccumT> internal;
public Counter(CounterName name, AtomicCounterValue<InputT, AccumT> internal) { = name;
this.commitState = new AtomicReference<>(CommitState.COMMITTED);
this.internal = internal;
// Make sure the counter is properly initialized.
/** Adds a new value to the aggregation stream. Returns this (to allow method chaining). */
public Counter<InputT, AccumT> addValue(InputT value) {
try {
} finally {
return this;
/** Atomically resets the counter's value and returns the previous aggregate. */
public AccumT getAndReset() {
return internal.getAndReset();
/** Returns the counter's name. */
public CounterName getName() {
return name;
public <UpdateT> UpdateT extractUpdate(
boolean delta, CounterUpdateExtractor<UpdateT> updateExtractor) {
return internal.extractUpdate(name, delta, updateExtractor);
/** Returns the counter's flat name. This is only suitable for hashing purposes */
public String getFlatName() {
return name.getFlatName();
/** Returns the aggregated value. */
public AccumT getAggregate() {
return internal.getAggregate();
* Represents whether counters' values have been committed to the backend.
* <p>Runners can use this information to optimize counters updates. For example, if counters are
* committed, runners may choose to skip the updates.
* <p>Counter state transition table: {@code Action\Current State COMMITTED DIRTY COMMITTING
* addValue() DIRTY DIRTY DIRTY committing() None COMMITTING None committed() None None COMMITTED
* }
enum CommitState {
/** There are no local updates that haven't been committed to the backend. */
/** There are local updates that haven't been committed to the backend. */
/** Local updates are committing to the backend, but are still pending. */
/** Returns if the counter contains non-committed aggregate. */
public boolean isDirty() {
return commitState.get() != CommitState.COMMITTED;
* Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}.
* @return true if successful. False return indicates that the commit state was not in {@code
* CommitState.DIRTY}.
public boolean committing() {
return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING);
* Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}.
* @return true if successful.
* <p>False return indicates that the counter was updated while the committing is pending.
* That counter update might or might not has been committed. The {@code commitState} has to
* stay in {@code CommitState.DIRTY}.
public boolean committed() {
return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED);
* Sets the counter to {@code CommitState.DIRTY}.
* <p>Must be called at the end of any methods that add values to the counter (specifically,
* {@link #addValue}. This does not need to be called by {@link #getAndReset} because it doesn't
* add any new values to the counter (just returns the current values).
protected void setDirty() {
* Returns a string representation of the Counter. Useful for debugging logs. Example return
* value: "ElementCount:SUM(15)".
public String toString() {
StringBuilder sb = new StringBuilder();
return sb.toString();
public String toPrettyString() {
return name.getPrettyName();
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o instanceof Counter) {
Counter<?, ?> that = (Counter<?, ?>) o;
return Objects.equals(,
&& Objects.equals(internal.getClass(), that.internal.getClass());
return false;
public int hashCode() {
return Objects.hash(internal.getClass(), name);