blob: 9a6143bcb63e16f91b88a7432210388f423ac66b [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.test.analytics.math3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.math3.util.Pair;
import org.apache.edgent.analytics.math3.json.JsonAnalytics;
import org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate;
import org.apache.edgent.analytics.math3.stat.Regression;
import org.apache.edgent.analytics.math3.stat.Statistic;
import org.apache.edgent.test.providers.direct.DirectTopologyTestBase;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.TWindow;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.tester.Condition;
import org.junit.Test;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
public class StatisticsTest extends DirectTopologyTestBase {
@Test
public void testMin() throws Exception {
Topology topology = newTopology("testMin");
TStream<JsonObject> aggregate = aggregate(topology, Statistic.MIN);
Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
complete(topology, count);
assertTrue(count.valid());
List<JsonObject> tuples = contents.getResult();
assertEquals(11, tuples.size());
assertOutputStructure(tuples, Statistic.MIN);
// "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
assertResult(tuples, Statistic.MIN, 0, "A", 1.0);
assertResult(tuples, Statistic.MIN, 1, "B", 7.0);
assertResult(tuples, Statistic.MIN, 2, "C", 4.0);
assertResult(tuples, Statistic.MIN, 3, "A", 1.0);
assertResult(tuples, Statistic.MIN, 4, "B", 3.0);
assertResult(tuples, Statistic.MIN, 5, "C", 4.0);
assertResult(tuples, Statistic.MIN, 6, "A", 4.0);
assertResult(tuples, Statistic.MIN, 7, "B", 3.0);
assertResult(tuples, Statistic.MIN, 8, "B", 13.0);
assertResult(tuples, Statistic.MIN, 9, "A", 0.0);
assertResult(tuples, Statistic.MIN, 10, "C", 99.0);
}
@Test
public void testMaxMean() throws Exception {
Topology topology = newTopology("testMaxMean");
TStream<JsonObject> aggregate = aggregate(topology, Statistic.MAX, Statistic.MEAN);
Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
complete(topology, count);
assertTrue(count.valid());
List<JsonObject> tuples = contents.getResult();
assertEquals(11, tuples.size());
assertOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
// "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
// "C700"
assertResult(tuples, Statistic.MAX, 0, "A", 1.0);
assertResult(tuples, Statistic.MAX, 1, "B", 7.0);
assertResult(tuples, Statistic.MAX, 2, "C", 4.0);
assertResult(tuples, Statistic.MAX, 3, "A", 4.0);
assertResult(tuples, Statistic.MAX, 4, "B", 7.0);
assertResult(tuples, Statistic.MAX, 5, "C", 99.0);
assertResult(tuples, Statistic.MAX, 6, "A", 102.0);
assertResult(tuples, Statistic.MAX, 7, "B", 43.0);
assertResult(tuples, Statistic.MAX, 8, "B", 43.0);
assertResult(tuples, Statistic.MAX, 9, "A", 102.0);
assertResult(tuples, Statistic.MAX, 10, "C", 700.0);
assertResult(tuples, Statistic.MEAN, 0, "A", 1.0);
assertResult(tuples, Statistic.MEAN, 1, "B", 7.0);
assertResult(tuples, Statistic.MEAN, 2, "C", 4.0);
assertResult(tuples, Statistic.MEAN, 3, "A", 2.5);
assertResult(tuples, Statistic.MEAN, 4, "B", 5.0);
assertResult(tuples, Statistic.MEAN, 5, "C", 51.5);
assertResult(tuples, Statistic.MEAN, 6, "A", 53.0);
assertResult(tuples, Statistic.MEAN, 7, "B", 23.0);
assertResult(tuples, Statistic.MEAN, 8, "B", 28.0);
assertResult(tuples, Statistic.MEAN, 9, "A", 51.0);
assertResult(tuples, Statistic.MEAN, 10, "C", 399.5);
}
@Test
public void testSlope() throws Exception {
Topology topology = newTopology("testSlope");
TStream<JsonObject> aggregate = aggregate(topology, Regression.SLOPE);
Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
complete(topology, count);
assertTrue(count.valid());
List<JsonObject> tuples = contents.getResult();
assertEquals(11, tuples.size());
// "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0", "C700"
assertResult(tuples, Regression.SLOPE, 0, "A", null);
assertResult(tuples, Regression.SLOPE, 1, "B", null);
assertResult(tuples, Regression.SLOPE, 2, "C", null);
assertResult(tuples, Regression.SLOPE, 3, "A", 3.0);
assertResult(tuples, Regression.SLOPE, 4, "B", -4.0);
assertResult(tuples, Regression.SLOPE, 5, "C", 95.0);
assertResult(tuples, Regression.SLOPE, 6, "A", 98.0);
assertResult(tuples, Regression.SLOPE, 7, "B", 40.0);
assertResult(tuples, Regression.SLOPE, 8, "B", -30.0);
assertResult(tuples, Regression.SLOPE, 9, "A", -102.0);
assertResult(tuples, Regression.SLOPE, 10, "C", 601.0);
}
@Test
public void testMvMaxMean() throws Exception {
Topology topology = newTopology("testMvMaxMean");
TStream<JsonObject> aggregate = mvAggregate(topology, Statistic.MAX, Statistic.MEAN);
Condition<Long> count = topology.getTester().atLeastTupleCount(aggregate, 11);
Condition<List<JsonObject>> contents = topology.getTester().streamContents(aggregate);
complete(topology, count);
assertTrue(count.valid());
List<JsonObject> tuples = contents.getResult();
assertEquals(11, tuples.size());
assertMvOutputStructure(tuples, Statistic.MAX, Statistic.MEAN);
// "A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13.0", "A0",
// "C700"
assertMvResult(tuples, Statistic.MAX, 0, "A", 1.0);
assertMvResult(tuples, Statistic.MAX, 1, "B", 7.0);
assertMvResult(tuples, Statistic.MAX, 2, "C", 4.0);
assertMvResult(tuples, Statistic.MAX, 3, "A", 4.0);
assertMvResult(tuples, Statistic.MAX, 4, "B", 7.0);
assertMvResult(tuples, Statistic.MAX, 5, "C", 99.0);
assertMvResult(tuples, Statistic.MAX, 6, "A", 102.0);
assertMvResult(tuples, Statistic.MAX, 7, "B", 43.0);
assertMvResult(tuples, Statistic.MAX, 8, "B", 43.0);
assertMvResult(tuples, Statistic.MAX, 9, "A", 102.0);
assertMvResult(tuples, Statistic.MAX, 10, "C", 700.0);
assertMvResult(tuples, Statistic.MEAN, 0, "A", 1.0);
assertMvResult(tuples, Statistic.MEAN, 1, "B", 7.0);
assertMvResult(tuples, Statistic.MEAN, 2, "C", 4.0);
assertMvResult(tuples, Statistic.MEAN, 3, "A", 2.5);
assertMvResult(tuples, Statistic.MEAN, 4, "B", 5.0);
assertMvResult(tuples, Statistic.MEAN, 5, "C", 51.5);
assertMvResult(tuples, Statistic.MEAN, 6, "A", 53.0);
assertMvResult(tuples, Statistic.MEAN, 7, "B", 23.0);
assertMvResult(tuples, Statistic.MEAN, 8, "B", 28.0);
assertMvResult(tuples, Statistic.MEAN, 9, "A", 51.0);
assertMvResult(tuples, Statistic.MEAN, 10, "C", 399.5);
}
private static void assertResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
JsonObject tuple = tuples.get(index);
assertEquals(key, tuple.get("id").getAsString());
JsonObject agg = tuple.getAsJsonObject("value");
if (value != null) {
double result = agg.get(stat.name()).getAsDouble();
assertEquals("index:" + index, value, result, 0.01);
} else {
assertFalse(agg.has(stat.name()));
}
}
private static void assertMvResult(List<JsonObject> tuples, JsonUnivariateAggregate stat, int index, String key, Double value) {
JsonObject tuple = tuples.get(index);
assertEquals(key, tuple.get("id").getAsString());
if (value != null) {
Double result = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value", stat).getAsDouble();
assertEquals("index:" + index + " value "+stat, value, result, 0.01);
Double result2 = JsonAnalytics.getMvAggregate(tuple, "aggResults", "value2", stat).getAsDouble();
assertEquals("index:" + index + " value2 "+stat, value+1000, result2, 0.01);
}
else {
assertFalse("index:" + index + " value "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value", stat));
assertFalse("index:" + index + " value2 "+stat, JsonAnalytics.hasMvAggregate(tuple, "aggResults", "value2", stat));
}
}
public static void assertOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
for (JsonObject j : tuples) {
assertTrue(j.has("id")); // Value of the key
assertTrue(j.has("value")); // Value of the key
JsonObject v = j.getAsJsonObject("value");
for (JsonUnivariateAggregate stat : stats) {
assertTrue(v.has(stat.name()));
}
}
}
public static void assertMvOutputStructure(List<JsonObject> tuples, JsonUnivariateAggregate ... stats) {
for (JsonObject j : tuples) {
assertTrue(j.has("id")); // Value of the key
assertTrue(j.has("aggResults")); // Value of the key
for (JsonUnivariateAggregate stat : stats) {
assertTrue("value "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value", stat));
}
for (JsonUnivariateAggregate stat : stats) {
assertTrue("value2 "+stat, JsonAnalytics.hasMvAggregate(j, "aggResults", "value2", stat));
}
}
}
public static TStream<JsonObject> aggregate(Topology topology, JsonUnivariateAggregate ... stats) {
TStream<JsonObject> sourceData = sourceData(topology);
TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
return JsonAnalytics.aggregate(window, "id", "value",
j -> j.get("value").getAsDouble(), stats);
}
public static TStream<JsonObject> sourceData(Topology topology)
{
TStream<String> seed = topology.strings("A1", "B7", "C4", "A4", "B3", "C99", "A102", "B43", "B13", "A0", "C700");
return seed.map(s -> {
JsonObject j = new JsonObject();
j.addProperty("id", s.substring(0, 1));
j.addProperty("value", Integer.valueOf(s.substring(1)));
return j;
});
}
public static TStream<JsonObject> mvAggregate(Topology topology, JsonUnivariateAggregate ... stats) {
TStream<JsonObject> sourceData = sourceMvData(topology);
TWindow<JsonObject, JsonElement> window = sourceData.last(2, j -> j.get("id"));
List<Pair<String, JsonUnivariateAggregate[]>> aggSpecs = new ArrayList<>();
aggSpecs.add(JsonAnalytics.mkAggregationSpec("value", stats));
aggSpecs.add(JsonAnalytics.mkAggregationSpec("value2", stats));
return JsonAnalytics.mvAggregate(window, "id", "aggResults", aggSpecs);
}
/*
* same JsonObject as sourceData() but with an additional
* "value2" variable whose value is the the "value" variable's value + 1000
*/
public static TStream<JsonObject> sourceMvData(Topology topology)
{
return sourceData(topology)
.map(jo -> {
jo.addProperty("value2", jo.get("value").getAsLong() + 1000);
return jo;
});
}
}