blob: 4e9e0ac80a23316e4a2139218a24e77c7a366539 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import org.apache.beam.runners.core.ElementByteSizeObservable;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
/**
* A Dataflow-specific version of {@link ElementCounter}, which specifies the object counter name
* differently as PhysicalElementCount. Additionally, it counts element windows as ElementCount.
*/
public class DataflowOutputCounter implements ElementCounter {
/** Number of physical element and multiple-window assignments that were serialized/processed. */
private static final String OBJECT_COUNTER_NAME = "-PhysicalElementCount";
/** Number of logical element and single window pairs that were processed. */
private static final String ELEMENT_COUNTER_NAME = "-ElementCount";
private static final String MEAN_BYTE_COUNTER_NAME = "-MeanByteCount";
private OutputObjectAndByteCounter objectAndByteCounter;
private Counter<Long, ?> elementCount;
public DataflowOutputCounter(
String outputName, CounterFactory counterFactory, NameContext nameContext) {
this(outputName, null, counterFactory, nameContext);
}
public DataflowOutputCounter(
String outputName,
ElementByteSizeObservable<?> elementByteSizeObservable,
CounterFactory counterFactory,
NameContext nameContext) {
objectAndByteCounter =
new OutputObjectAndByteCounter(elementByteSizeObservable, counterFactory, nameContext);
objectAndByteCounter.countObject(outputName + OBJECT_COUNTER_NAME);
objectAndByteCounter.countMeanByte(outputName + MEAN_BYTE_COUNTER_NAME);
createElementCounter(counterFactory, nameContext, outputName + ELEMENT_COUNTER_NAME);
}
@Override
public void update(Object elem) throws Exception {
objectAndByteCounter.update(elem);
long windowsSize = ((WindowedValue<?>) elem).getWindows().size();
if (windowsSize == 0) {
// GroupingShuffleReader produces ValueInEmptyWindows.
// For now, we count the element at least once to keep the current counter
// behavior.
elementCount.addValue(1L);
} else {
elementCount.addValue(windowsSize);
}
}
@Override
public void finishLazyUpdate(Object elem) {
objectAndByteCounter.finishLazyUpdate(elem);
}
@VisibleForTesting
static String getElementCounterName(String prefix) {
return prefix + ELEMENT_COUNTER_NAME;
}
@VisibleForTesting
static String getObjectCounterName(String prefix) {
return prefix + OBJECT_COUNTER_NAME;
}
@VisibleForTesting
static String getMeanByteCounterName(String prefix) {
return prefix + MEAN_BYTE_COUNTER_NAME;
}
private void createElementCounter(CounterFactory factory, NameContext nameContext, String name) {
// TODO: use the name context to name the counter
elementCount = factory.longSum(CounterName.named(name));
}
}