blob: 0c231c849d56e933c3ec05549ee7c1e5168dbdee [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.samples.apps;
import static org.apache.edgent.analytics.math3.stat.Statistic.MAX;
import static org.apache.edgent.analytics.math3.stat.Statistic.MEAN;
import static org.apache.edgent.analytics.math3.stat.Statistic.MIN;
import static org.apache.edgent.analytics.math3.stat.Statistic.STDDEV;
import java.util.List;
import org.apache.commons.math3.util.Pair;
import org.apache.edgent.analytics.math3.json.JsonAnalytics;
import org.apache.edgent.analytics.math3.stat.Statistic;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Function;
import org.apache.edgent.topology.TStream;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
/**
* Utilties to ease working working with sensor "samples" by wrapping them
* in JsonObjects.
* <p>
* The Json Tuple sensor "samples" have a standard collection of properties.
*/
public class JsonTuples {
/*
* Common attributes in the JsonObject
*/
public static final String KEY_ID = "id";
public static final String KEY_TS = "msec";
public static final String KEY_READING = "reading";
public static final String KEY_AGG_BEGIN_TS = "agg.begin.msec";
public static final String KEY_AGG_COUNT = "agg.count";
/**
* Create a JsonObject wrapping a raw {@code Pair<Long msec,T reading>>} sample.
* @param <T> Tuple type
* @param sample the raw sample
* @param id the sensor's Id
* @return the wrapped sample
*/
public static <T> JsonObject wrap(Pair<Long,T> sample, String id) {
JsonObject jo = new JsonObject();
jo.addProperty(KEY_ID, id);
jo.addProperty(KEY_TS, sample.getFirst());
T value = sample.getSecond();
if (value instanceof Number)
jo.addProperty(KEY_READING, (Number)sample.getSecond());
else if (value instanceof String)
jo.addProperty(KEY_READING, (String)sample.getSecond());
else if (value instanceof Boolean)
jo.addProperty(KEY_READING, (Boolean)sample.getSecond());
// else if (value instanceof array) {
// // TODO cvt to JsonArray
// }
// else if (value instanceof Object) {
// // TODO cvt to JsonObject
// }
else {
Class<?> clazz = value != null ? value.getClass() : Object.class;
throw new IllegalArgumentException("Unhandled value type: "+ clazz);
}
return jo;
}
/**
* Create a stream of JsonObject wrapping a stream of
* raw {@code Pair<Long msec,T reading>>} samples.
*
* @param <T> Tuple type
* @param stream the raw input stream
* @param id the sensor's Id
* @return the wrapped stream
*/
public static <T> TStream<JsonObject> wrap(TStream<Pair<Long,T>> stream, String id) {
return stream.map(pair -> wrap(pair, id));
}
/**
* The partition key function for wrapped sensor samples.
* <p>
* The {@code KEY_ID} property is returned for the key.
* @return the function
*/
public static Function<JsonObject,String> keyFn() {
return sample -> sample.get(KEY_ID).getAsString();
}
/**
* Get a statistic value from a sample.
* <p>
* Same as {@code getStatistic(jo, JsonTuples.KEY_READING, stat)}.
*
* @param jo the sample
* @param stat the Statistic of interest
* @return the JsonElement for the Statistic
* @throws RuntimeException of the stat isn't present
*/
public static JsonElement getStatistic(JsonObject jo, Statistic stat) {
return getStatistic(jo, JsonTuples.KEY_READING, stat);
}
/**
* Get a statistic value from a sample.
* <p>
* Convenience for working with samples containing a property
* whose value is one or more {@link Statistic}
* as created by
* {@link JsonAnalytics#aggregate(org.apache.edgent.topology.TWindow, String, String, org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate...) JsonAnalytics.aggregate()}
*
* @param jo the sample
* @param valueKey the name of the property containing the JsonObject of Statistics
* @param stat the Statistic of interest
* @return the JsonElement for the Statistic
* @throws RuntimeException of the stat isn't present
*/
public static JsonElement getStatistic(JsonObject jo, String valueKey, Statistic stat) {
JsonObject statsjo = jo.get(valueKey).getAsJsonObject();
return statsjo.get(stat.name());
}
/**
* Create a function that computes the specified statistics on the list of
* samples and returns a new sample containing the result.
* <p>
* The single tuple contains the specified statistics computed over
* all of the {@code JsonTuple.KEY_READING}
* values from {@code List<JsonObject>}.
* <p>
* The resulting sample contains the properties:
* <ul>
* <li>JsonTuple.KEY_ID</li>
* <li>JsonTuple.KEY_MSEC - msecTimestamp of the last sample in the window</li>
* <li>JsonTuple.KEY_AGG_BEGIN_MSEC - msecTimestamp of the first sample in the window</li>
* <li>JsonTuple.KEY_AGG_COUNT - number of samples in the window ({@code value=factor})</li>
* <li>JsonTuple.KEY_READING - a JsonObject of the statistics
* as defined by
* {@link JsonAnalytics#aggregate(org.apache.edgent.topology.TWindow, String, String, org.apache.edgent.analytics.math3.json.JsonUnivariateAggregate...) JsonAnalytics.aggregate()}
* </ul>
* <p>
* Sample use:
* <pre>{@code
* TStream<JsonObject> s = ...
* // reduce s by a factor of 100 with stats MEAN and STDEV
* TStream<JsonObject> reduced = s.batch(100, statistics(Statistic.MEAN, Statistic.STDDEV));
* }</pre>
*
* @param statistics the statistics to calculate over the window
* @return {@code TStream<JsonObject>} for the reduced {@code stream}
*/
public static BiFunction<List<JsonObject>,String,JsonObject> statistics(Statistic... statistics) {
BiFunction<List<JsonObject>,JsonElement,JsonObject> statsFn =
JsonAnalytics.aggregateList(KEY_ID, KEY_READING,
j -> j.get(KEY_READING).getAsDouble(),
MIN, MAX, MEAN, STDDEV);
return (samples, key) -> {
JsonObject jo = statsFn.apply(samples, samples.get(0).get(KEY_ID));
JsonTuples.addAggStdInfo(jo, samples);
return jo;
};
}
private static void addAggStdInfo(JsonObject jo, List<JsonObject> samples) {
// beginMsec, endMsec, nSamples
long msec = samples.get(0).get(KEY_TS).getAsLong();
long msec2 = samples.get(samples.size()-1).get(KEY_TS).getAsLong();
int nSamples = samples.size();
jo.addProperty(KEY_TS, msec2);
jo.addProperty(KEY_AGG_BEGIN_TS, msec);
jo.addProperty(KEY_AGG_COUNT, nSamples);
}
}