blob: b1e32bdcad9cb8312d61a5a6ba65bd715ab08b83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.evaluator.nodata;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.engine.Collector;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.utils.TimePeriodUtils;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
public class NoDataPolicyTimeBatchHandler implements PolicyStreamHandler {
private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyTimeBatchHandler.class);
private Map<String, StreamDefinition> sds;
private volatile List<Integer> wisbFieldIndices = new ArrayList<>();
// reuse PolicyDefinition.defintion.value field to store full set of values
// separated by comma
private volatile PolicyDefinition policyDef;
private volatile Collector<AlertStreamEvent> collector;
private volatile PolicyHandlerContext context;
private volatile NoDataWisbType wisbType;
private volatile DistinctValuesInTimeBatchWindow distinctWindow;
public NoDataPolicyTimeBatchHandler(Map<String, StreamDefinition> sds) {
this.sds = sds;
}
@Override
public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception {
this.collector = collector;
this.context = context;
this.policyDef = context.getPolicyDefinition();
List<String> inputStreams = policyDef.getInputStreams();
// validate inputStreams has to contain only one stream
if (inputStreams.size() != 1) {
throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert");
}
// validate outputStream has to contain only one stream
if (policyDef.getOutputStreams().size() != 1) {
throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert");
}
String policyValue = policyDef.getDefinition().getValue();
// assume that no data alert policy value consists of "windowPeriod,
// type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
// f2_value}
String[] segments = policyValue.split(",");
this.wisbType = NoDataWisbType.valueOf(segments[1]);
// for provided wisb values, need to parse, for dynamic wisb values, it
// is computed through a window
Set<String> wisbValues = new HashSet<String>();
if (wisbType == NoDataWisbType.provided) {
for (int i = 2; i < segments.length; i++) {
wisbValues.add(segments[i]);
}
}
long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
distinctWindow = new DistinctValuesInTimeBatchWindow(this, windowPeriod, wisbValues);
// populate wisb field names
String is = inputStreams.get(0);
StreamDefinition sd = sds.get(is);
String nodataColumnNameKey = "nodataColumnName";
if (!policyDef.getDefinition().getProperties().containsKey(nodataColumnNameKey)) {
throw new IllegalArgumentException("policy nodata column name has to be defined for no data alert");
}
wisbFieldIndices.add(sd.getColumnIndex((String) policyDef.getDefinition().getProperties().get(nodataColumnNameKey)));
}
@Override
public void send(StreamEvent event) throws Exception {
Object[] data = event.getData();
List<Object> columnValues = new ArrayList<>();
for (int i = 0; i < wisbFieldIndices.size(); i++) {
Object o = data[wisbFieldIndices.get(i)];
// convert value to string
columnValues.add(o.toString());
}
// use local timestamp rather than event timestamp
distinctWindow.send(event, columnValues, System.currentTimeMillis());
LOG.debug("event sent to window with wiri: {}", distinctWindow.distinctValues());
}
@SuppressWarnings("rawtypes")
public void compareAndEmit(Set wisb, Set wiri, StreamEvent event) {
// compare with wisbValues if wisbValues are already there for dynamic
// type
Collection noDataValues = CollectionUtils.subtract(wisb, wiri);
LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri);
if (noDataValues != null && noDataValues.size() > 0) {
LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisb);
String is = policyDef.getOutputStreams().get(0);
StreamDefinition sd = sds.get(is);
int timestampIndex = sd.getColumnIndex("timestamp");
int hostIndex = sd.getColumnIndex("host");
int originalStreamNameIndex = sd.getColumnIndex("originalStreamName");
for (Object one : noDataValues) {
Object[] triggerEvent = new Object[sd.getColumns().size()];
for (int i = 0; i < sd.getColumns().size(); i++) {
if (i == timestampIndex) {
triggerEvent[i] = System.currentTimeMillis();
} else if (i == hostIndex) {
triggerEvent[hostIndex] = ((List) one).get(0);
} else if (i == originalStreamNameIndex) {
triggerEvent[originalStreamNameIndex] = event.getStreamId();
} else if (sd.getColumns().size() < i) {
LOG.error("strema event data have different lenght compare to column definition!");
} else {
triggerEvent[i] = sd.getColumns().get(i).getDefaultValue();
}
}
AlertStreamEvent alertEvent = createAlertEvent(sd, event.getTimestamp(), triggerEvent);
LOG.info(String.format("Nodata alert %s generated and will be emitted", Joiner.on(",").join(triggerEvent)));
collector.emit(alertEvent);
}
}
}
private AlertStreamEvent createAlertEvent(StreamDefinition sd, long timestamp, Object[] triggerEvent) {
AlertStreamEvent event = new AlertStreamEvent();
event.setTimestamp(timestamp);
event.setData(triggerEvent);
event.setStreamId(policyDef.getOutputStreams().get(0));
event.setPolicyId(context.getPolicyDefinition().getName());
if (this.context.getPolicyEvaluator() != null) {
event.setCreatedBy(context.getPolicyEvaluator().getName());
}
event.setCreatedTime(System.currentTimeMillis());
event.setSchema(sd);
return event;
}
@Override
public void close() throws Exception {
}
}