blob: fcd44f834eef6b5b67306286fa8d6423c33a2691 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.counters;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import org.apache.beam.runners.dataflow.worker.NameContextsForTests;
import org.apache.beam.runners.dataflow.worker.counters.Counter.CommitState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for the {@link Counter} API. */
@RunWith(JUnit4.class)
public class CounterTest {
private final CounterName name = CounterName.named("undertest");
private final CounterName name2 = CounterName.named("othername");
private final CounterFactory counters = new CounterFactory();
@Rule public ExpectedException thrown = ExpectedException.none();
private static final double EPSILON = 0.00000000001;
@Test
public void testCompatibility() {
// Equal counters are compatible, of all kinds.
assertEquals(counters.longSum(name), counters.longSum(name));
assertEquals(counters.intSum(name), counters.intSum(name));
assertEquals(counters.doubleSum(name), counters.doubleSum(name));
assertEquals(counters.booleanOr(name), counters.booleanOr(name));
// The name, kind, and type of the counter must match.
assertFalse(counters.longSum(name).equals(counters.longSum(name2)));
assertFalse(counters.longSum(name).equals(counters.longMax(name)));
assertFalse(counters.longSum(name).equals(counters.intSum(name)));
// The value of the counters are ignored.
assertEquals(counters.longSum(name).addValue(666L), counters.longSum(name).addValue(42L));
}
// Tests for SUM.
@Test
public void testSumLong() {
Counter<Long, Long> c = counters.longSum(name);
assertEquals(0L, (long) c.getAggregate());
c.addValue(13L).addValue(42L).addValue(0L);
assertEquals(13L + 42L, (long) c.getAggregate());
c.getAndReset();
c.addValue(120L).addValue(17L).addValue(37L);
assertEquals(120L + 17L + 37L, (long) c.getAggregate());
c.addValue(15L).addValue(42L);
assertEquals(120L + 17L + 37L + 15L + 42L, (long) c.getAggregate());
c.getAndReset();
c.addValue(100L).addValue(17L).addValue(49L);
assertEquals(100L + 17L + 49L, (long) c.getAggregate());
assertEquals(
"getAndReset should return previous value", 100L + 17L + 49L, (long) c.getAndReset());
assertEquals("getAndReset should have reset value", 0, (long) c.getAggregate());
}
@Test
public void testSumDouble() {
Counter<Double, Double> c = counters.doubleSum(name);
c.addValue(Math.E).addValue(Math.PI).addValue(0.0);
assertEquals(Math.E + Math.PI, c.getAggregate(), EPSILON);
c.getAndReset();
c.addValue(Math.sqrt(2)).addValue(2 * Math.PI).addValue(3 * Math.E);
assertEquals(Math.sqrt(2) + 2 * Math.PI + 3 * Math.E, c.getAggregate(), EPSILON);
assertEquals(
"getAndReset should return previous value",
Math.sqrt(2) + 2 * Math.PI + 3 * Math.E,
c.getAndReset(),
EPSILON);
assertEquals("getAndReset should have reset value", 0.0, c.getAggregate(), EPSILON);
}
// Tests for MAX.
@Test
public void testMaxLong() {
Counter<Long, Long> c = counters.longMax(name);
assertEquals(Long.MIN_VALUE, (long) c.getAggregate());
c.addValue(13L).addValue(42L).addValue(0L);
assertEquals(42L, (long) c.getAggregate());
c.getAndReset();
c.addValue(120L).addValue(17L).addValue(37L);
assertEquals(120L, (long) c.getAggregate());
c.addValue(15L).addValue(42L);
assertEquals(120L, (long) c.getAggregate());
c.addValue(137L);
assertEquals(137L, (long) c.getAggregate());
c.getAndReset();
c.addValue(100L).addValue(17L).addValue(49L);
assertEquals(100L, (long) c.getAggregate());
assertEquals("getAndReset should return previous value", 100L, (long) c.getAndReset());
assertEquals("getAndReset should have reset value", Long.MIN_VALUE, (long) c.getAggregate());
}
@Test
public void testMaxDouble() {
Counter<Double, Double> c = counters.doubleMax(name);
assertEquals(Double.NEGATIVE_INFINITY, c.getAggregate(), EPSILON);
c.addValue(Math.E).addValue(Math.PI).addValue(Double.NEGATIVE_INFINITY);
assertEquals(Math.PI, c.getAggregate(), EPSILON);
c.getAndReset();
c.addValue(Math.sqrt(12345)).addValue(2 * Math.PI).addValue(3 * Math.E);
assertEquals(Math.sqrt(12345), c.getAggregate(), EPSILON);
assertEquals(
"getAndReset should return previous value", Math.sqrt(12345), c.getAndReset(), EPSILON);
assertEquals(Double.NEGATIVE_INFINITY, c.getAggregate(), EPSILON);
c.addValue(7 * Math.PI).addValue(5 * Math.E);
assertEquals(7 * Math.PI, c.getAggregate(), EPSILON);
c.getAndReset();
c.addValue(Math.sqrt(17)).addValue(171.0).addValue(49.0);
assertEquals(171.0, c.getAggregate(), EPSILON);
}
// Tests for MIN.
@Test
public void testMinLong() {
Counter<Long, Long> c = counters.longMin(name);
assertEquals(Long.MAX_VALUE, (long) c.getAggregate());
c.addValue(13L).addValue(42L).addValue(Long.MAX_VALUE);
assertEquals(13L, (long) c.getAggregate());
c.getAndReset();
c.addValue(120L).addValue(17L).addValue(37L);
assertEquals(17L, (long) c.getAggregate());
assertEquals("getAndReset should return previous value", 17L, (long) c.getAndReset());
assertEquals(
"getAndReset should have reset the value", Long.MAX_VALUE, (long) c.getAggregate());
c.addValue(42L).addValue(18L);
assertEquals(18L, (long) c.getAggregate());
}
@Test
public void testMinDouble() {
Counter<Double, Double> c = counters.doubleMin(name);
assertEquals(Double.POSITIVE_INFINITY, c.getAggregate(), EPSILON);
c.addValue(Math.E).addValue(Math.PI).addValue(Double.POSITIVE_INFINITY);
assertEquals(Math.E, c.getAggregate(), EPSILON);
c.getAndReset();
c.addValue(Math.sqrt(12345)).addValue(2 * Math.PI).addValue(3 * Math.E);
assertEquals(2 * Math.PI, c.getAggregate(), EPSILON);
assertEquals("getAndReset should return previous value", 2 * Math.PI, c.getAndReset(), EPSILON);
assertEquals(
"getAndReset should have reset the value",
Double.POSITIVE_INFINITY,
c.getAggregate(),
EPSILON);
c.getAndReset();
c.addValue(Math.sqrt(17)).addValue(171.0).addValue(0.0);
assertEquals(0.0, c.getAggregate(), EPSILON);
}
// Tests for MEAN.
private void assertMean(long s, long c, Counter<Long, CounterMean<Long>> cn) {
CounterMean<Long> mean = cn.getAggregate();
assertEquals(s, mean.getAggregate().longValue());
assertEquals(c, mean.getCount());
}
private void assertMean(double s, long c, Counter<Double, CounterMean<Double>> cn) {
CounterMean<Double> mean = cn.getAggregate();
assertEquals(s, mean.getAggregate().doubleValue(), EPSILON);
assertEquals(c, mean.getCount());
}
@Test
public void testMeanLong() {
Counter<Long, CounterMean<Long>> c = counters.longMean(name);
assertMean(0, 0, c);
c.addValue(13L).addValue(42L).addValue(0L);
assertMean(13 + 42 + 0, 3, c);
c.getAndReset();
c.addValue(120L).addValue(17L).addValue(37L);
assertMean(120 + 17 + 37, 3, c);
CounterMean<Long> mean = c.getAndReset();
assertEquals(
"getAndReset should return previous value", 120 + 17 + 37, (long) mean.getAggregate());
assertEquals("getAndReset should return previous count", 3, mean.getCount());
// getAndReset should reset the value
assertMean(0, 0, c);
c.getAndReset();
c.addValue(33L).addValue(33L).addValue(34L).addValue(17L).addValue(49L);
assertMean(166, 5, c);
}
@Test
public void testMeanDouble() {
Counter<Double, CounterMean<Double>> c = counters.doubleMean(name);
double expTotal = 0.0;
long expCountTotal = 0;
assertMean(expTotal, expCountTotal, c);
c.addValue(Math.E).addValue(Math.PI).addValue(0.0);
expTotal += Math.E + Math.PI;
expCountTotal += 3;
assertMean(expTotal, expCountTotal, c);
c.getAndReset();
c.addValue(Math.sqrt(2)).addValue(2 * Math.PI).addValue(3 * Math.E);
expTotal = Math.sqrt(2) + 2 * Math.PI + 3 * Math.E;
assertMean(expTotal, expCountTotal, c);
CounterMean<Double> mean = c.getAndReset();
assertEquals(
"getAndReset should return previous value",
expTotal,
(double) mean.getAggregate(),
EPSILON);
assertEquals("getAndReset should return previous count", expCountTotal, mean.getCount());
assertMean(0, 0, c);
c.getAndReset();
c.addValue(Math.sqrt(17)).addValue(0.0).addValue(0.0).addValue(17.0).addValue(49.0);
expTotal = Math.sqrt(17.0) + 17.0 + 49.0;
expCountTotal = 5;
assertMean(expTotal, expCountTotal, c);
}
@Test
public void testDistribution() {
Counter<Long, CounterDistribution> c = counters.distribution(name);
CounterDistribution expected =
CounterDistribution.builder()
.minMax(Long.MAX_VALUE, 0L)
.count(0L)
.sum(0L)
.sumOfSquares(0f)
.buckets(0, new ArrayList<>())
.build();
assertEquals(expected, c.getAggregate());
c.addValue(2L).addValue(10L).addValue(4L);
expected =
CounterDistribution.builder()
.minMax(2L, 10L)
.count(3)
.sum(2L + 10L + 4L)
.sumOfSquares(4L + 100L + 16L)
.buckets(2, Lists.newArrayList(2L, 0L, 1L))
.build();
assertEquals(expected, c.getAggregate());
c.getAndReset();
c.addValue(0L).addValue(0L);
expected =
CounterDistribution.builder()
.minMax(0L, 0L)
.count(2L)
.sum(0L)
.sumOfSquares(0f)
.buckets(0, Lists.newArrayList(2L))
.build();
assertEquals(expected, c.getAggregate());
CounterDistribution distribution = c.getAndReset();
assertEquals("getAndReset should return previous value", expected, distribution);
expected =
CounterDistribution.builder()
.minMax(Long.MAX_VALUE, 0L)
.count(0L)
.sum(0L)
.sumOfSquares(0f)
.buckets(0, new ArrayList<>())
.build();
assertEquals(expected, c.getAggregate());
}
@Test
public void testBoolAnd() {
Counter<Boolean, Boolean> c = counters.booleanAnd(name);
assertTrue(c.getAggregate());
c.addValue(true);
assertTrue(c.getAggregate());
c.addValue(false);
assertFalse(c.getAggregate());
c.getAndReset();
c.addValue(true).addValue(true);
assertTrue(c.getAggregate());
c.addValue(false);
assertFalse(c.getAggregate());
assertFalse(c.getAndReset());
assertTrue(c.getAggregate());
c.addValue(false);
assertFalse(c.getAggregate());
}
@Test
public void testBoolOr() {
Counter<Boolean, Boolean> c = counters.booleanOr(name);
assertFalse(c.getAggregate());
c.addValue(false);
assertFalse(c.getAggregate());
c.addValue(true);
assertTrue(c.getAggregate());
c.getAndReset();
c.addValue(false).addValue(false);
assertFalse(c.getAggregate());
c.addValue(true);
assertTrue(c.getAggregate());
assertTrue(c.getAndReset());
assertFalse(c.getAggregate());
c.addValue(true);
assertTrue(c.getAggregate());
}
@Test
public void testDirtyBit() {
verifyDirtyBit(counters.longSum(CounterName.named("long-sum")), 1L);
verifyDirtyBit(counters.longMean(CounterName.named("long-mean")), 1L);
verifyDirtyBit(counters.doubleSum(CounterName.named("double-sum")), 1.0);
verifyDirtyBit(counters.doubleMean(CounterName.named("double-mean")), 1.0);
verifyDirtyBit(counters.intSum(CounterName.named("int-sum")), 1);
verifyDirtyBit(counters.intMean(CounterName.named("int-mean")), 1);
verifyDirtyBit(counters.booleanAnd(CounterName.named("and")), true);
}
/** Verify dirty bit is set correctly through various Counter state transitions */
private <InputT> void verifyDirtyBit(Counter<InputT, ?> counter, InputT sampleValue) {
String name = String.format("counter '%s'", counter.getName().name());
// Test counters are not dirty and are COMMITTED initially.
assertFalse(
String.format("%s should not be dirty on initialization.", name), counter.isDirty());
assertEquals(
String.format("%s should not be COMMITTED on initialization.", name),
CommitState.COMMITTED,
counter.commitState.get());
// Test counters are dirty after mutating.
counter.addValue(sampleValue);
assertTrue(String.format("%s should be dirty after mutating.", name), counter.isDirty());
assertEquals(
String.format("%s should have DIRTY state after mutating.", name),
CommitState.DIRTY,
counter.commitState.get());
// Test counters are dirty and are COMMITTING.
assertTrue(
String.format("Committing %s should succeed when in DIRTY state.", name),
counter.committing());
assertTrue(String.format("%s should be dirty after committing.", name), counter.isDirty());
assertEquals(
String.format("%s should have COMMITTING state after mutating.", name),
CommitState.COMMITTING,
counter.commitState.get());
// Test counters are dirty again after mutating.
counter.addValue(sampleValue);
assertFalse(
String.format("Marking %s committed should succeed after mutating.", name),
counter.committed());
assertTrue(
String.format("%s should be dirty after marking committed.", name), counter.isDirty());
assertEquals(
String.format("%s state should be DIRTY after marking committed.", name),
CommitState.DIRTY,
counter.commitState.get());
// Test counters are not dirty and are COMMITTED.
assertTrue(
String.format("Committing %s should succeed when in DIRTY state.", name),
counter.committing());
assertTrue(
String.format("Marking %s committed should succeed after committing.", name),
counter.committed());
assertFalse(
String.format("%s should be dirty after being marked committed.", name), counter.isDirty());
assertEquals(
String.format("%s should have COMMITTED state after marking committed.", name),
CommitState.COMMITTED,
counter.commitState.get());
}
@Test
public void testStructuredNames() {
Counter<?, ?> unstructured = counters.intSum(name);
Counter<?, ?> structuredOriginal =
counters.intSum(name.withOriginalName(NameContextsForTests.nameContextForTest()));
Counter<?, ?> structuredSystem =
counters.intSum(name.withSystemName(NameContextsForTests.nameContextForTest()));
Counter<?, ?> structuredCompatible =
counters.intSum(name.withOriginalName(NameContextsForTests.nameContextForTest()));
// unstructured is equal to nothing
assertFalse(unstructured.equals(structuredOriginal));
assertFalse(unstructured.equals(structuredSystem));
assertFalse(unstructured.equals(structuredCompatible));
// structuredOriginal is only equal to structuredCompatible
assertEquals(structuredOriginal, structuredCompatible);
assertFalse(structuredOriginal.equals(structuredSystem));
// structuredSystem is equal to nothing
assertFalse(structuredSystem.equals(structuredCompatible));
}
}