blob: 779c9b78383620dd674918ecf0ed51a099338754 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.test.analytics.math3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.edgent.analytics.math3.Aggregations;
import org.apache.edgent.analytics.math3.MvResultMap;
import org.apache.edgent.analytics.math3.ResultMap;
import org.apache.edgent.analytics.math3.UnivariateAggregate;
import org.apache.edgent.analytics.math3.stat.Regression2;
import org.apache.edgent.analytics.math3.stat.Statistic2;
import org.apache.edgent.function.Functions;
import org.apache.edgent.test.providers.direct.DirectTopologyTestBase;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.TWindow;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.tester.Condition;
import org.junit.Test;
import com.google.gson.JsonObject;
/** test Aggregations, Statistics2 and Regression2 */
public class Statistics2Test extends DirectTopologyTestBase {
// Expected results for **continuous last-2** aggregations of (1, 4, 102, 0) for the various stats.
protected static Map<UnivariateAggregate,Double[]> STAT_RESULTS = new HashMap<>();
static {
STAT_RESULTS.put(
Statistic2.COUNT,
new Double[] {
1.0, // 1
2.0, // 1,4
2.0, // 4,102
2.0 // 102,0
});
STAT_RESULTS.put(
Statistic2.MIN,
new Double[] {
1.0, // 1
1.0, // 1,4
4.0, // 4,102
0.0 // 102,0
});
STAT_RESULTS.put(
Statistic2.MAX,
new Double[] {
1.0, // 1
4.0, // 1,4
102.0, // 4,102
102.0 // 102,0
});
STAT_RESULTS.put(
Statistic2.SUM,
new Double[] {
1.0, // 1
5.0, // 1,4
106.0, // 4,102
102.0 // 102,0
});
STAT_RESULTS.put(
Statistic2.MEAN,
new Double[] {
1.0, // 1
2.5, // 1,4
53.0, // 4,102
51.0 // 102,0
});
STAT_RESULTS.put(
Statistic2.STDDEV,
new Double[] {
0.0, // 1
2.12, // 1,4
69.29, // 4,102
72.12 // 102,0
});
STAT_RESULTS.put(
Regression2.SLOPE,
new Double[] {
null, // 1
3.0, // 1,4
98.0, // 4,102
-102.0 // 102,0
});
}
/* test Aggregations.sum() */
@Test
public void testSum() throws Exception {
Double act = Aggregations.sum(Arrays.asList(1, 2, 3.8)); // check with mixed Number
Double exp = 1 + 2 + 3.8;
assertEquals(exp, act, 0.01);
act = Aggregations.sum(Arrays.asList());
exp = 0.0;
assertEquals(exp, act, 0.01);
}
/* test Aggregations.sumInts() */
@Test
public void testSumInts() throws Exception {
long act = Aggregations.sumInts(Arrays.asList(1, 2, 3));
long exp = 1 + 2 + 3;
assertEquals(exp, act);
act = Aggregations.sumInts(Arrays.asList());
exp = 0;
assertEquals(exp, act);
}
/* util to test Aggregations.aggregate(List<Number>, stat) */
protected void aggregate(UnivariateAggregate stat, Double[] expResults) throws Exception {
// for "continuous last-2" style inputs of (1, 4, 102, 0)
int i = 0;
Double result;
result = Aggregations.aggregate(Arrays.asList(1), stat);
assertResult(i, stat, expResults[i++], result);
result = Aggregations.aggregate(Arrays.asList(1,4), stat);
assertResult(i, stat, expResults[i++], result);
result = Aggregations.aggregate(Arrays.asList(4,102), stat);
assertResult(i, stat, expResults[i++], result);
result = Aggregations.aggregate(Arrays.asList(102,0), stat);
assertResult(i, stat, expResults[i++], result);
}
/* util to test Aggregations.aggregate(List<T>, getter, stat) */
protected void aggregateT(UnivariateAggregate stat, Double[] expResults) throws Exception {
// for "continuous last-2" style inputs of (1, 4, 102, 0)
int i = 0;
Double result;
List<Object> values;
values = Arrays.asList(1);
result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat);
assertResult(i, stat, expResults[i++], result);
values = Arrays.asList(1,4);
result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat);
assertResult(i, stat, expResults[i++], result);
values = Arrays.asList(4,102);
result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat);
assertResult(i, stat, expResults[i++], result);
values = Arrays.asList(102,0);
result = Aggregations.aggregate(values, o -> Double.valueOf(o.toString()), stat);
assertResult(i, stat, expResults[i++], result);
}
/* util to test Aggregations.aggregateN(List<Number>, stats) */
protected void aggregateN(UnivariateAggregate[] stats) throws Exception {
// for "continuous last-2" style inputs of (1, 4, 102, 0)
int i = 0;
ResultMap result;
result = Aggregations.aggregateN(Arrays.asList(1), stats);
assertResult(i++, stats, STAT_RESULTS, result);
result = Aggregations.aggregateN(Arrays.asList(1,4), stats);
assertResult(i++, stats, STAT_RESULTS, result);
result = Aggregations.aggregateN(Arrays.asList(4,102), stats);
assertResult(i++, stats, STAT_RESULTS, result);
result = Aggregations.aggregateN(Arrays.asList(102,0), stats);
assertResult(i++, stats, STAT_RESULTS, result);
}
/* util to test Aggregations.aggregateN(List<T>, getter, stats) */
protected void aggregateNT(UnivariateAggregate[] stats) throws Exception {
// for "continuous last-2" style inputs of (1, 4, 102, 0)
int i = 0;
ResultMap result;
List<Object> values;
values = Arrays.asList(1);
result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats);
assertResult(i++, stats, STAT_RESULTS, result);
values = Arrays.asList(1,4);
result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats);
assertResult(i++, stats, STAT_RESULTS, result);
values = Arrays.asList(4,102);
result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats);
assertResult(i++, stats, STAT_RESULTS, result);
values = Arrays.asList(102,0);
result = Aggregations.aggregateN(values, o -> Double.valueOf(o.toString()), stats);
assertResult(i++, stats, STAT_RESULTS, result);
}
protected static void assertResult(int index, UnivariateAggregate stat, Double exp, Double act) {
if (exp == null && act == null)
return; // match
assertFalse("index="+index+" "+stat.name()+" exp="+exp+" act="+act, exp == null || act == null);
assertEquals("index="+index+" "+stat.name(), exp, act, 0.01);
}
protected static void assertResult(int i, UnivariateAggregate[] stats, Map<UnivariateAggregate,Double[]> expResults, ResultMap actResults) {
for (UnivariateAggregate stat : stats) {
Double act = actResults.get(stat);
Double exp = expResults.get(stat)[i];
assertResult(i, stat, exp, act);
}
}
/* util to test Aggregations.aggregate(List<Number>, stat) */
protected void testAggregate(UnivariateAggregate stat) throws Exception {
aggregate(stat, STAT_RESULTS.get(stat));
}
@Test
public void testCOUNT() throws Exception {
testAggregate(Statistic2.COUNT);
}
@Test
public void testMIN() throws Exception {
testAggregate(Statistic2.MIN);
}
@Test
public void testMAX() throws Exception {
testAggregate(Statistic2.MAX);
}
@Test
public void testSUM() throws Exception {
testAggregate(Statistic2.SUM);
}
@Test
public void testMEAN() throws Exception {
testAggregate(Statistic2.MEAN);
}
@Test
public void testSTDDEV() throws Exception {
testAggregate(Statistic2.STDDEV);
}
@Test
public void testSLOPE() throws Exception {
testAggregate(Regression2.SLOPE);
}
@Test
public void testEmptyAggregate() throws Exception {
Double result;
result = Aggregations.aggregate(Collections.emptyList(), Statistic2.COUNT);
assertResult(0, Statistic2.COUNT, null, result);
result = Aggregations.aggregate(Collections.emptyList(), Statistic2.MIN);
assertResult(0, Statistic2.MIN, null, result);
// ...
ResultMap resultMap = Aggregations.aggregateN(Collections.emptyList(), Statistic2.MIN);
assertTrue(resultMap.isEmpty());
resultMap = Aggregations.aggregateN(Arrays.asList(1));
assertTrue(resultMap.isEmpty());
resultMap = Aggregations.aggregateN(Arrays.asList(1), new UnivariateAggregate[0]);
assertTrue(resultMap.isEmpty());
}
@Test
public void testAggregateT() throws Exception {
aggregateT(Statistic2.MIN, STAT_RESULTS.get(Statistic2.MIN));
}
/* test Aggregations.aggregateN(List<Number>, stats) */
@Test
public void testAggregateN() throws Exception {
aggregateN(STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0]));
}
/* test Aggregations.aggregateN(List<T>, getter, stats) */
@Test
public void testAggregateNT() throws Exception {
aggregateNT(STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0]));
}
/* test Aggregations.newResultToJson() */
@Test
public void testNewResultsToJson() throws Exception {
ResultMap result = new ResultMap();
result.put(Statistic2.MIN, 2.5);
result.put(Statistic2.MAX, 4.5);
JsonObject jo = ResultMap.toJsonObject().apply(result);
assertTrue(jo.get(Statistic2.MIN.name()) != null);
assertEquals(jo.get(Statistic2.MIN.name()).getAsDouble(), 2.5, 0.01);
assertTrue(jo.get(Statistic2.MAX.name()) != null);
assertEquals(jo.get(Statistic2.MAX.name()).getAsDouble(), 4.5, 0.01);
}
/* test Aggregations.newMvResultToJson() */
@Test
public void testNewMvResultsToJson() throws Exception {
ResultMap var1result = new ResultMap();
var1result.put(Statistic2.MIN, 2.5);
var1result.put(Statistic2.MAX, 4.5);
ResultMap var2result = new ResultMap();
var2result.put(Statistic2.SUM, 27.1);
MvResultMap result = new MvResultMap();
result.put("var1", var1result);
result.put("var2", var2result);
JsonObject jo = MvResultMap.toJsonObject().apply(result);
assertTrue(jo.get("var1") != null);
JsonObject joVar1 = jo.get("var1").getAsJsonObject();
assertTrue(joVar1.get(Statistic2.MIN.name()) != null);
assertEquals(joVar1.get(Statistic2.MIN.name()).getAsDouble(), 2.5, 0.01);
assertTrue(joVar1.get(Statistic2.MAX.name()) != null);
assertEquals(joVar1.get(Statistic2.MAX.name()).getAsDouble(), 4.5, 0.01);
assertTrue(jo.get("var2") != null);
JsonObject joVar2 = jo.get("var2").getAsJsonObject();
assertTrue(joVar2.get(Statistic2.SUM.name()) != null);
assertEquals(joVar2.get(Statistic2.SUM.name()).getAsDouble(), 27.1, 0.01);
}
/* test Aggregations.aggregateN(list, stats) in a Stream/Window context */
@Test
public void testAggregateNStream() throws Exception {
// Aggregations.aggregate are ignorant of Stream/Window but
// to be safe verify a full Stream/Window based use case works
UnivariateAggregate[] stats = STAT_RESULTS.keySet().toArray(new UnivariateAggregate[0]);
Topology topology = newTopology("testAggregateNStream");
// (1, 4, 102, 0)
TStream<Integer> sourceData = sourceData(topology);
TWindow<Integer, Integer> window = sourceData.last(2, Functions.unpartitioned());
TStream<ResultMap> aggregate = window.aggregate( (list,partition) -> {
return Aggregations.aggregateN(list, stats);
});
Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 4);
Condition<List<ResultMap>> contents = topology.getTester().streamContents(aggregate);
complete(topology, count);
assertTrue(count.valid());
List<ResultMap> tuples = contents.getResult();
assertEquals(4, tuples.size());
for (int i = 0; i < tuples.size(); i++) {
assertResult(i, stats, STAT_RESULTS, tuples.get(i));
}
}
/* test Aggregations.aggregateN(list, stats) in a multivariable Stream/Window context */
@Test
public void testMvAggregateNStream() throws Exception {
// Aggregations.aggregate are ignorant of Stream/Window but
// to be safe verify a full Stream/Window based use case works (TStream<MvResultMap>)
UnivariateAggregate[] stats = { Statistic2.MIN, Statistic2.MAX };
Map<UnivariateAggregate,Double[]> var2_STAT_RESULTS = new HashMap<>();
var2_STAT_RESULTS.put(Statistic2.MIN, new Double[] {1001.0, 1001.0, 1004.0, 1000.0});
var2_STAT_RESULTS.put(Statistic2.MAX, new Double[] {1001.0, 1004.0, 1102.0, 1102.0});
Topology topology = newTopology("testAggregateNStream");
// (1, 4, 102, 0)
TStream<SensorReadings> sourceData = sourceData(topology)
.map(i -> new SensorReadings(i, i+1000));
TWindow<SensorReadings, Integer> window = sourceData.last(2, Functions.unpartitioned());
TStream<MvResultMap> aggregate = window.aggregate( (list,partition) -> {
ResultMap var1result = Aggregations.aggregateN(list, sr -> sr.var1, stats);
ResultMap var2result = Aggregations.aggregateN(list, sr -> sr.var2, stats);
MvResultMap result = new MvResultMap();
result.put("var1", var1result);
result.put("var2", var2result);
return result;
});
Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 4);
Condition<List<MvResultMap>> contents = topology.getTester().streamContents(aggregate);
complete(topology, count);
assertTrue(count.valid());
List<MvResultMap> tuples = contents.getResult();
assertEquals(4, tuples.size());
for (int i = 0; i < tuples.size(); i++) {
MvResultMap results = tuples.get(i);
assertResult(i, stats, STAT_RESULTS, results.get("var1"));
assertResult(i, stats, var2_STAT_RESULTS, results.get("var2"));
}
}
protected static class SensorReadings {
int var1;
int var2;
SensorReadings(int var1, int var2) {
this.var1 = var1;
this.var2 = var2;
}
}
protected static TStream<Integer> sourceData(Topology topology)
{
return topology.of(1, 4, 102, 0);
}
}