blob: 793560d60cb7d3f97b0226e4a499e80d7e0130ff [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.runners.dataflow.worker.counters;
import static;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.dataflow.worker.counters.Counter.AtomicCounterValue;
import org.apache.beam.runners.dataflow.worker.counters.Counter.CounterUpdateExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A CounterSet is a {@link CounterFactory} that can be used for creating counters. It also retains
* all of the created counters and allows retrieving them later..
* <p>Thread-safe.
public class CounterSet extends CounterFactory {
private static final Logger LOG = LoggerFactory.getLogger(CounterSet.class);
* Registered counters.
* <p>Uses a ConcurrentHashMap explicitly because we rely on the stronger iteration guarantees.
private final ConcurrentHashMap<CounterName, Counter<?, ?>> counters = new ConcurrentHashMap<>();
protected <InputT, AccumT> Counter<InputT, AccumT> createCounter(
CounterName name, AtomicCounterValue<InputT, AccumT> counterValue) {
Counter<InputT, AccumT> counter = super.createCounter(name, counterValue);
Counter<?, ?> oldCounter = counters.putIfAbsent(name, counter);
if (oldCounter != null) {
"Counter %s duplicates incompatible counter %s in %s",
Counter<InputT, AccumT> compatibleCounter = (Counter<InputT, AccumT>) oldCounter;
return compatibleCounter;
return counter;
public Counter<?, ?> getExistingCounter(CounterName name) {
return counters.get(name);
* Returns the Counter with the given name in this CounterSet; returns null if no such Counter
* exists.
public Counter<?, ?> getExistingCounter(String name) {
CounterName counterName = CounterName.named(name);
return getExistingCounter(counterName);
public long size() {
return counters.size();
* Extract an update. If there is a exception from the extractor, return null and logs the error.
private static <UpdateT> UpdateT extractUpdate(
Counter<?, ?> counter, boolean delta, CounterUpdateExtractor<UpdateT> extractor) {
try {
return counter.extractUpdate(delta, extractor);
} catch (IllegalArgumentException e) {
LOG.warn("Error extracting counter update from counter {}: ", counter, e);
return null;
* Exracts counter updates. When both dirtyOnly and delta flags are set, only modified counter
* updates are returned and the counters are marked 'committed'.
private <UpdateT> List<UpdateT> extractUpdatesImpl(
boolean delta, boolean dirtyOnly, CounterUpdateExtractor<UpdateT> extractors) {
// We don't allow removing counters, only adding counters. If a counter is added, it may or may
// not show up in this iteration (counters.values()). But, ConcurrentHashMap guarantees that the
// elements in the iterator will reflect the state "at or after the creation of the iterator".
List<UpdateT> updates = new ArrayList<>(counters.size());
for (Counter<?, ?> counter : counters.values()) {
// commit the counter for detla updates when 'dirtyOnly' is set. This could
// apply to cumulative counters too in future, not just for deltas.
boolean shouldCommit = delta && dirtyOnly && counter.isDirty();
if (shouldCommit) {
if (!dirtyOnly || shouldCommit) {
UpdateT update = extractUpdate(counter, delta, extractors);
if (update != null) {
if (shouldCommit) {
return Collections.unmodifiableList(updates);
* Return a list of all the updates. This synchronizes with the addition/retrieval of counters so
* it should be thread safe.
public <UpdateT> List<UpdateT> extractUpdates(
boolean delta, CounterUpdateExtractor<UpdateT> extractors) {
return extractUpdatesImpl(delta, false, extractors);
* Returns a list of delta updates of dirty counters. This does not includes counter updates that
* haven't been modified since last extraction.
public <UpdateT> List<UpdateT> extractModifiedDeltaUpdates(
CounterUpdateExtractor<UpdateT> extractors) {
return extractUpdatesImpl(true, true, extractors);
public String toString() {
return MoreObjects.toStringHelper(this).add("counters", counters.values()).toString();