blob: 261259aeb75cea3de43676d7509579fb034c9730 [file] [log] [blame]
/*
* Copyright 2005-2009 WSO2, Inc. (http://wso2.com)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.stratos.cep.extension;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
import org.wso2.siddhi.core.event.remove.RemoveEvent;
import org.wso2.siddhi.core.event.remove.RemoveListEvent;
import org.wso2.siddhi.core.persistence.ThreadBarrier;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import org.wso2.siddhi.query.api.expression.Expression;
import org.wso2.siddhi.query.api.expression.Variable;
import org.wso2.siddhi.query.api.expression.constant.IntConstant;
import org.wso2.siddhi.query.api.expression.constant.LongConstant;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@SiddhiExtension(namespace = "stratos", function = "secondDerivative")
public class SecondDerivativeFinderWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
static final Logger log = Logger.getLogger(SecondDerivativeFinderWindowProcessor.class);
private ScheduledExecutorService eventRemoverScheduler;
private ScheduledFuture<?> lastSchedule;
private long timeToKeep;
private int subjectedAttrIndex;
private Type subjectedAttrType;
private List<InEvent> newEventList;
private List<RemoveEvent> oldEventList;
private ThreadBarrier threadBarrier;
private ISchedulerSiddhiQueue<StreamEvent> window;
@Override
protected void processEvent(InEvent event) {
acquireLock();
try {
newEventList.add(event);
} finally {
releaseLock();
}
}
@Override
protected void processEvent(InListEvent listEvent) {
acquireLock();
try {
System.out.println(listEvent);
for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
newEventList.add((InEvent) listEvent.getEvent(i));
}
} finally {
releaseLock();
}
}
@Override
public Iterator<StreamEvent> iterator() {
return window.iterator();
}
@Override
public Iterator<StreamEvent> iterator(String predicate) {
if (siddhiContext.isDistributedProcessingEnabled()) {
return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
} else {
return window.iterator();
}
}
@Override
public void run() {
acquireLock();
try {
long scheduledTime = System.currentTimeMillis();
try {
oldEventList.clear();
while (true) {
threadBarrier.pass();
RemoveEvent removeEvent = (RemoveEvent) window.poll();
if (removeEvent == null) {
if (oldEventList.size() > 0) {
nextProcessor.process(new RemoveListEvent(
oldEventList.toArray(new RemoveEvent[oldEventList.size()])));
oldEventList.clear();
}
if (newEventList.size() > 0) {
InEvent[] inEvents =
newEventList.toArray(new InEvent[newEventList.size()]);
for (InEvent inEvent : inEvents) {
window.put(new RemoveEvent(inEvent, -1));
}
// in order to find second derivative, we need at least 3 events.
if (newEventList.size() > 2) {
InEvent firstDerivative1 =
gradient(inEvents[0],
inEvents[(newEventList.size() / 2) - 1],
null)[0];
InEvent firstDerivative2 =
gradient(inEvents[newEventList.size() / 2],
inEvents[newEventList.size() - 1],
null)[0];
InEvent[] secondDerivative =
gradient(firstDerivative1,
firstDerivative2, Type.DOUBLE);
for (InEvent inEvent : secondDerivative) {
window.put(new RemoveEvent(inEvent, -1));
}
nextProcessor.process(new InListEvent(secondDerivative));
} else {
log.debug("Insufficient events to calculate second derivative. We need at least 3 events. Current event count: " +
newEventList.size());
}
newEventList.clear();
}
long diff = timeToKeep - (System.currentTimeMillis() - scheduledTime);
if (diff > 0) {
try {
if (lastSchedule != null) {
lastSchedule.cancel(false);
}
lastSchedule = eventRemoverScheduler.schedule(this, diff, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ex) {
log.warn("scheduling cannot be accepted for execution: elementID " +
elementId);
}
break;
}
scheduledTime = System.currentTimeMillis();
} else {
oldEventList.add(new RemoveEvent(removeEvent, System.currentTimeMillis()));
}
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
} finally {
releaseLock();
}
}
/**
* This function will calculate the linear gradient (per second) of the events received during
* a specified time period.
*/
private InEvent[] gradient(InEvent firstInEvent, InEvent lastInEvent, Type type) {
Type attrType = type == null ? subjectedAttrType : type;
double firstVal = 0.0, lastVal = 0.0;
// FIXME I'm not sure whether there's some other good way to do correct casting,
// based on the type.
if (Type.DOUBLE.equals(attrType)) {
firstVal = (Double) firstInEvent.getData()[subjectedAttrIndex];
lastVal = (Double) lastInEvent.getData()[subjectedAttrIndex];
} else if (Type.INT.equals(attrType)) {
firstVal = (Integer) firstInEvent.getData()[subjectedAttrIndex];
lastVal = (Integer) lastInEvent.getData()[subjectedAttrIndex];
} else if (Type.LONG.equals(attrType)) {
firstVal = (Long) firstInEvent.getData()[subjectedAttrIndex];
lastVal = (Long) lastInEvent.getData()[subjectedAttrIndex];
} else if (Type.FLOAT.equals(attrType)) {
firstVal = (Float) firstInEvent.getData()[subjectedAttrIndex];
lastVal = (Float) lastInEvent.getData()[subjectedAttrIndex];
}
long t1 = firstInEvent.getTimeStamp();
long t2 = lastInEvent.getTimeStamp();
long millisecondsForASecond = 1000;
long tGap = t2 - t1 > millisecondsForASecond ? t2 - t1 : millisecondsForASecond;
double gradient = 0.0;
if (tGap > 0) {
gradient = ((lastVal - firstVal) * millisecondsForASecond) / tGap;
}
if (log.isDebugEnabled()) {
log.debug("Gradient: " + gradient + " Last val: " + lastVal +
" First val: " + firstVal + " Time Gap: " + tGap + " t1: "+t1+ " t2: "+
t2+" hash: "+this.hashCode());
}
Object[] data = firstInEvent.getData().clone();
data[subjectedAttrIndex] = gradient;
InEvent gradientEvent =
new InEvent(firstInEvent.getStreamId(), t1+((t2-t1)/2),
data);
InEvent[] output = new InEvent[1];
output[0] = gradientEvent;
return output;
}
@Override
protected Object[] currentState() {
return new Object[]{window.currentState(), oldEventList, newEventList};
}
@Override
protected void restoreState(Object[] data) {
window.restoreState(data);
window.restoreState((Object[]) data[0]);
oldEventList = ((ArrayList<RemoveEvent>) data[1]);
newEventList = ((ArrayList<InEvent>) data[2]);
window.reSchedule();
}
@Override
protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
} else {
timeToKeep = ((LongConstant) parameters[0]).getValue();
}
String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
subjectedAttrType = streamDefinition.getAttributeType(subjectedAttr);
oldEventList = new ArrayList<RemoveEvent>();
if (this.siddhiContext.isDistributedProcessingEnabled()) {
newEventList = this.siddhiContext.getHazelcastInstance().getList(elementId + "-newEventList");
} else {
newEventList = new ArrayList<InEvent>();
}
if (this.siddhiContext.isDistributedProcessingEnabled()) {
window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
} else {
window = new SchedulerSiddhiQueue<StreamEvent>(this);
}
//Ordinary scheduling
window.schedule();
}
@Override
public void schedule() {
if (lastSchedule != null) {
lastSchedule.cancel(false);
}
lastSchedule = eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
}
public void scheduleNow() {
if (lastSchedule != null) {
lastSchedule.cancel(false);
}
lastSchedule = eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
}
@Override
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.eventRemoverScheduler = scheduledExecutorService;
}
public void setThreadBarrier(ThreadBarrier threadBarrier) {
this.threadBarrier = threadBarrier;
}
@Override
public void destroy(){
oldEventList = null;
newEventList = null;
window = null;
}
}