blob: b8a0ceedfeef77e3c13f4245fcc6b571381e9a5d [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.samples.apps;
import java.io.File;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import org.apache.edgent.analytics.sensors.Range;
import org.apache.edgent.analytics.sensors.Ranges;
import org.apache.edgent.connectors.file.FileStreams;
import org.apache.edgent.connectors.file.FileWriterCycleConfig;
import org.apache.edgent.connectors.file.FileWriterFlushConfig;
import org.apache.edgent.connectors.file.FileWriterPolicy;
import org.apache.edgent.connectors.file.FileWriterRetentionConfig;
import org.apache.edgent.function.Predicate;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.topology.TStream;
/**
* Some general purpose application configuration driven utilities.
* <p>
* Utilities include:
* <ul>
* <li>Get a property name for a sensor configuration item</li>
* <li>Get a Range value for a sensor range item</li>
* <li>Log a stream</li>
* <li>Conditionally trace a stream</li>
* </ul>
*/
public class ApplicationUtilities {
private final Properties props;
public ApplicationUtilities(Properties props) {
this.props = props;
}
private Properties config() {
return props;
}
/**
* Trace a stream to System.out if the sensor id's "label" has been configured
* to enable tracing.
* <p>
* If tracing has not been enabled in the config, the topology will not
* be augmented to trace the stream.
*
* @param <T> Tuple type
* @param stream the stream to trace
* @param sensorId the sensor id
* @param label some unique label
* @return the input stream
*/
public <T> TStream<T> traceStream(TStream<T> stream, String sensorId, Supplier<String> label) {
return traceStream(stream, () -> sensorId+"."+label.get());
}
/**
* Trace a stream to System.out if the "label" has been configured
* to enable tracing.
* <p>
* If tracing has not been enabled in the config, the topology will not
* be augmented to trace the stream.
*
* @param <T> Tuple type
* @param stream the stream to trace
* @param label some unique label
* @return the input stream
*/
public <T> TStream<T> traceStream(TStream<T> stream, Supplier<String> label) {
if (includeTraceStreamOps(label.get())) {
TStream<?> s = stream.filter(traceTuplesFn(label.get())).tag(label.get()+".trace");
s.peek(sample -> System.out.println(String.format("%s: %s", label.get(), sample.toString())));
}
return stream;
}
private boolean includeTraceStreamOps(String label) {
String includesCsv = config().getProperty("stream.tracing.includes.csv", "");
String includesRegex = config().getProperty("stream.tracing.includes.regex", "");
String excludesCsv = config().getProperty("stream.tracing.excludes.csv", "");
String excludesRegex = config().getProperty("stream.tracing.excludes.regex", "");
Set<String> includesSet = new HashSet<>();
for (String s : includesCsv.split(","))
includesSet.add(s.trim());
Set<String> excludesSet = new HashSet<>();
for (String s : excludesCsv.split(","))
excludesSet.add(s.trim());
boolean isIncluded = false;
if (includesSet.contains(label) || label.matches(includesRegex))
isIncluded = true;
if (excludesSet.contains(label) || label.matches(excludesRegex))
isIncluded = false;
return isIncluded;
}
private <T> Predicate<T> traceTuplesFn(String label) {
return tuple -> true; // TODO make dynamic config; affected by "label" value
// check label for match against csv or regex from props
}
/**
* Get the property name for a sensor's configuration item.
* @param sensorId the sensor's id
* @param label the label for an instance of "kind" (e.g., "tempThreshold")
* @param kind the kind of configuration item (e.g., "range")
* @return the configuration property name
*/
public String getSensorPropertyName(String sensorId, String label, String kind) {
String name = kind + "." + label; // kind.label
if (sensorId!=null && !sensorId.isEmpty())
name = sensorId + "." + name; // sensorId.kind.label
return name;
}
private String getSensorConfigValue(String sensorId, String label, String kind) {
String name = getSensorPropertyName(sensorId, label, kind);
String val = config().getProperty(name);
if (val==null)
throw new IllegalArgumentException("Missing configuration property "+name);
return val;
}
/**
* Get the Range for a sensor range configuration item.
* @param sensorId the sensor's id
* @param label the range's label
* @return the Range
*/
public Range<Integer> getRangeInteger(String sensorId, String label) {
String val = getSensorConfigValue(sensorId, label, "range");
return Ranges.valueOfInteger(val);
}
/**
* Get the Range for a sensor range configuration item.
* @param sensorId the sensor's id
* @param label the range's label
* @return the Range
*/
public Range<Byte> getRangeByte(String sensorId, String label) {
String val = getSensorConfigValue(sensorId, label, "range");
return Ranges.valueOfByte(val);
}
/**
* Get the Range for a sensor range configuration item.
* @param sensorId the sensor's id
* @param label the range's label
* @return the Range
*/
public Range<Short> getRangeShort(String sensorId, String label) {
String val = getSensorConfigValue(sensorId, label, "range");
return Ranges.valueOfShort(val);
}
/**
* Get the Range for a sensor range configuration item.
* @param sensorId the sensor's id
* @param label the range's label
* @return the Range
*/
public Range<Float> getRangeFloat(String sensorId, String label) {
String val = getSensorConfigValue(sensorId, label, "range");
return Ranges.valueOfFloat(val);
}
/**
* Get the Range for a sensor range configuration item.
* @param sensorId the sensor's id
* @param label the range's label
* @return the Range
*/
public Range<Double> getRangeDouble(String sensorId, String label) {
String val = getSensorConfigValue(sensorId, label, "range");
return Ranges.valueOfDouble(val);
}
/**
* Log every tuple on the stream using the {@code FileStreams} connector.
* <p>
* The logs are added to the directory as specified
* by the "application.log.dir" property.
* The directory will be created as needed.
* <p>
* The "active" (open / being written) log file name is {@code .<baseName>}.
* <br>
* Completed stable logs have a name of {@code <baseName>_YYYYMMDD_HHMMSS}.
* <p>
* The log entry format being used is:
* {@code [<date>] [<eventTag>] <tuple>.toString()}
* <p>
* See {@link FileStreams#textFileWriter(TStream, org.apache.edgent.function.Supplier, org.apache.edgent.function.Supplier)}
*
* @param <T> Tuple type
* @param stream the TStream
* @param baseName the base log name
* @param eventTag a tag that gets added to the log entry
* @return the input stream
*/
public <T> TStream<T> logStream(TStream<T> stream, String eventTag, String baseName) {
// Define the writer policy.
// TODO could make the policy configurable via config()
FileWriterPolicy<String> policy = new FileWriterPolicy<String>(
FileWriterFlushConfig.newTimeBasedConfig(2_000/*msec*/), // flush every 2sec
FileWriterCycleConfig.newFileSizeBasedConfig(10_000), // new file every 10KB
FileWriterRetentionConfig.newFileCountBasedConfig(1) // retain 1 file
);
// Compose the base file pathname
File dir = new File(config().getProperty("application.log.dir"));
String basePathname = new File(dir, baseName).toString();
// Transform the stream to a TStream<String> of string log entry values
TStream<String> stringEntries = stream.map(sample -> String.format("[%s] [%s] %s", new Date().toString(), eventTag, sample.toString()))
.tag(baseName+".log");
// Use the FileStreams connector to write the logs.
//
// A hack for getting the log directories created at runtime
// TODO add another policy thing... or simply make textFileWriter do it?
//
FileStreams.textFileWriter(stringEntries,
() -> { if (!dir.exists()) dir.mkdirs();
return basePathname;
},
() -> policy);
return stream;
}
}