blob: fb1f641915c1db0887fdb3e547accc12d3a06597 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;
import static org.apache.beam.runners.dataflow.worker.NameContextsForTests.nameContextForTest;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterKindMatcher.hasKind;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterNameMatcher.hasName;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterStructuredNameMatcher.hasStructuredName;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateBooleanValueMatcher.hasBooleanValue;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateDistributionMatcher.hasDistribution;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateDoubleCountMatcher.hasDoubleCount;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateDoubleSumMatcher.hasDoubleSum;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateDoubleValueMatcher.hasDoubleValue;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateIntegerCountMatcher.hasIntegerCount;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateIntegerSumMatcher.hasIntegerSum;
import static org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher.hasIntegerValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SplitInt64;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleReadCounter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link DataflowCounterUpdateExtractor}. */
@RunWith(JUnit4.class)
public class DataflowCounterUpdateExtractorTest {
private static final String COUNTER_NAME = "CounterName";
private final CounterFactory counterFactory = new CounterFactory();
@Test
public void testNameKindAndCloudCounterRepresentation() {
Counter<Long, ?> c1 = counterFactory.longSum(CounterName.named("c1"));
Counter<Double, ?> c2 = counterFactory.doubleMax(CounterName.named("c2"));
Counter<Double, ?> c3 = counterFactory.doubleMin(CounterName.named("c3"));
Counter<Double, ?> c4 = counterFactory.doubleMean(CounterName.named("c4"));
Counter<Integer, ?> c5 = counterFactory.intMin(CounterName.named("c5"));
Counter<Boolean, ?> c6 = counterFactory.booleanAnd(CounterName.named("c6"));
Counter<Boolean, ?> c7 = counterFactory.booleanOr(CounterName.named("c7"));
Counter<Integer, ?> c8 = counterFactory.intMean(CounterName.named("c8"));
Counter<Long, ?> c9 = counterFactory.distribution(CounterName.named("c9"));
assertThat(
c1.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c1"), hasKind("SUM"), hasIntegerValue(0)));
c1.addValue(123L).addValue(-13L);
assertThat(
c1.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE), hasIntegerValue(110));
assertThat(
c2.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c2"), hasKind("MAX"), hasDoubleValue(Double.NEGATIVE_INFINITY)));
c2.getAndReset();
c2.addValue(Math.PI).addValue(Math.E);
assertThat(
c2.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE), hasDoubleValue(Math.PI));
c3.addValue(Math.PI).addValue(-Math.PI).addValue(-Math.sqrt(2));
assertThat(
c3.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c3"), hasKind("MIN"), hasDoubleValue(-Math.PI)));
// zero-count means are not sent to the service
assertThat(c4.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE), is(nullValue()));
c4.addValue(Math.PI).addValue(Math.E).addValue(Math.sqrt(2));
assertThat(
c4.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(
hasName("c4"), hasKind("MEAN"),
hasDoubleSum(Math.PI + Math.E + Math.sqrt(2)), hasDoubleCount(3)));
c4.addValue(2.0).addValue(5.0);
assertThat(
c4.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasDoubleSum(7.0), hasDoubleCount(2)));
assertThat(
c5.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c5"), hasKind("MIN"), hasIntegerValue(Integer.MAX_VALUE)));
c5.addValue(123).addValue(-13);
assertThat(
c5.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE), hasIntegerValue(-13));
assertThat(
c6.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c6"), hasKind("AND"), hasBooleanValue(true)));
c6.addValue(false);
assertThat(
c6.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE), hasBooleanValue(false));
assertThat(
c7.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c7"), hasKind("OR"), hasBooleanValue(false)));
c7.addValue(true);
assertThat(
c7.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE), hasBooleanValue(true));
c8.addValue(1).addValue(2).addValue(3).addValue(4).addValue(5);
assertThat(
c8.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasName("c8"), hasKind("MEAN"), hasIntegerSum(15), hasIntegerCount(5)));
c9.addValue(1L).addValue(0L).addValue(1L).addValue(9L).addValue(19L);
assertThat(
c9.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(
hasName("c9"),
hasKind("DISTRIBUTION"),
hasDistribution(
CounterFactory.CounterDistribution.builder()
.minMax(0L, 19L)
.count(5L)
.sum(30L)
.sumOfSquares(444f)
.firstBucketOffset(0)
.buckets(Lists.newArrayList(1L, 2L, 0L, 1L, 1L))
.build())));
}
private SplitInt64 createSplitInt(int highBits, long lowBits) {
SplitInt64 splitInt = new SplitInt64();
// low bits 0 high bits negative
splitInt.setHighBits(highBits);
splitInt.setLowBits(lowBits);
return splitInt;
}
@Test
public void testSplitIntToLong() {
// high bits negative max, low bits 0
assertEquals(
0x80000000_00000000L,
DataflowCounterUpdateExtractor.splitIntToLong(createSplitInt(Integer.MIN_VALUE, 0L)));
// high bits 0, low bits 0
assertEquals(
0x00000000_00000000L, DataflowCounterUpdateExtractor.splitIntToLong(createSplitInt(0, 0L)));
// high bits positive max, low bits 0
assertEquals(
0x7fffffff_00000000L,
DataflowCounterUpdateExtractor.splitIntToLong(createSplitInt(Integer.MAX_VALUE, 0L)));
// high bits negative, low bits max
assertEquals(
0x80000000_ffffffffL,
DataflowCounterUpdateExtractor.splitIntToLong(
createSplitInt(Integer.MIN_VALUE, 0xffffffffL)));
// high bits 0, low bits max
assertEquals(
0x00000000_ffffffffL,
DataflowCounterUpdateExtractor.splitIntToLong(createSplitInt(0, 0xffffffffL)));
// high bits positive, low bits max
assertEquals(
0x7fffffff_ffffffffL,
DataflowCounterUpdateExtractor.splitIntToLong(
createSplitInt(Integer.MAX_VALUE, 0xffffffffL)));
}
@Test
public void testCloudCounterRepresentationCaseNegative() {
Counter<Double, ?> c1 = counterFactory.doubleSum(CounterName.named("c1"));
Counter<Integer, ?> c2 = counterFactory.intSum(CounterName.named("c2"));
Counter<Double, ?> c3 = counterFactory.doubleMean(CounterName.named("c3"));
Counter<Integer, ?> c4 = counterFactory.intMean(CounterName.named("c4"));
Counter<Double, ?> c5 = counterFactory.doubleMin(CounterName.named("c5"));
Counter<Integer, ?> c6 = counterFactory.intMin(CounterName.named("c6"));
Counter<Double, ?> c7 = counterFactory.doubleMax(CounterName.named("c7"));
Counter<Integer, ?> c8 = counterFactory.intMax(CounterName.named("c8"));
c1.addValue(-1D).addValue(-1D);
assertThat(
((Counter<?, ?>) c1).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasDoubleValue(-2D));
c2.addValue(-1).addValue(-1);
assertThat(
((Counter<?, ?>) c2).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(-2));
c3.addValue(-1.5D).addValue(-2.5D).addValue(-3D);
assertThat(
((Counter<?, ?>) c3).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasDoubleSum(-7D), hasDoubleCount(3)));
c4.addValue(-1).addValue(-2).addValue(-3);
assertThat(
((Counter<?, ?>) c4).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
allOf(hasIntegerSum(-6), hasIntegerCount(3)));
c5.addValue(-1D).addValue(-2D);
assertThat(
((Counter<?, ?>) c5).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasDoubleValue(-2D));
c6.addValue(-1).addValue(-2);
assertThat(
((Counter<?, ?>) c6).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(-2));
c7.addValue(-1D).addValue(-2D);
assertThat(
((Counter<?, ?>) c7).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasDoubleValue(-1D));
c8.addValue(-1).addValue(-2);
assertThat(
((Counter<?, ?>) c8).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(-1));
}
@Test
public void testCloudCounterRepresentationNoDelta() {
Counter<Integer, ?> c1 = counterFactory.intSum(CounterName.named("c1"));
c1.addValue(1);
assertThat(
((Counter<?, ?>) c1).extractUpdate(false, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(1));
assertThat(
((Counter<?, ?>) c1).extractUpdate(false, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(1));
c1.addValue(2);
assertThat(
((Counter<?, ?>) c1).extractUpdate(false, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(3));
assertThat(
((Counter<?, ?>) c1).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(3));
assertThat(
((Counter<?, ?>) c1).extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasIntegerValue(0));
}
@Test
public void testExtractUnstructuredNameCorrectly() {
CounterName unstructuredName = CounterName.named(COUNTER_NAME);
Counter<?, ?> unstructured = counterFactory.intSum(unstructuredName);
// unstructured counter should not have a structured name
CounterUpdate counterUpdate =
unstructured.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE);
assertThat(counterUpdate, not(hasStructuredName()));
assertThat(counterUpdate, hasName(COUNTER_NAME));
}
@Test
public void testExtractStructuredNameCorrectly() {
CounterName unstructuredName = CounterName.named(COUNTER_NAME);
CounterName structuredOriginalName = unstructuredName.withOriginalName(nameContextForTest());
CounterName structuredSystemName = unstructuredName.withSystemName(nameContextForTest());
Counter<?, ?> structuredOriginal = counterFactory.intSum(structuredOriginalName);
Counter<?, ?> structuredSystem = counterFactory.intSum(structuredSystemName);
// Other two counters should be structured and should not conflict
assertThat(
structuredOriginal.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasStructuredName(structuredOriginalName, "SUM"));
assertThat(
structuredSystem.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasStructuredName(structuredSystemName, "SUM"));
}
@Test
public void testExtractStructuredNameWithIoInfo() {
CounterName counterName =
CounterName.named(COUNTER_NAME).withOriginalRequestingStepName("stepReq").withInputIndex(1);
Counter<?, ?> structuredOriginal = counterFactory.intSum(counterName);
assertThat(
structuredOriginal.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE),
hasStructuredName(counterName, "SUM"));
}
@Test
public void testExtractShuffleReadCounter() {
CounterSet counterSet = new CounterSet();
CounterName counterName =
ShuffleReadCounter.generateCounterName(
"originalShuffleStepName", "originalExecutingStepName");
Counter<?, ?> counter = counterFactory.longSum(counterName);
CounterUpdate counterUpdate =
counter.extractUpdate(true, DataflowCounterUpdateExtractor.INSTANCE);
hasStructuredName(counterName, "SUM");
}
}