blob: 05ac38b1da84403ecdba8bc92f1b6d7e6a62917f [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.algo;
import java.util.Iterator;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.mutable.MutableInt;
import com.google.common.collect.Lists;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
/**
* <p>
* This operator searches for a given pattern in the input stream.<br>
* For e.g. If the pattern is defined as “aa” and your input events arrive in following manner “a”, “a”, “a”, then this operator
* will emit 2 matches for the given pattern. One matching event 1 and 2 and other matching 2 and 3.
* </p>
*
* <br>
* <b> StateFull : Yes, </b> Pattern is found over application window(s). <br>
* <b> Partitionable : No, </b> will yield wrong result. <br>
*
* <br>
* <b>Ports</b>:<br>
* <b>inputPort</b>: the port to receive input<br>
*
* <br>
* <b>Properties</b>:<br>
* <b>pattern</b>: The pattern that needs to be searched<br>
*
* @param <T> event type
*
* @since 2.0.0
*/
@OperatorAnnotation(partitionable = false)
public abstract class AbstractStreamPatternMatcher<T> extends BaseOperator
{
/**
* The pattern to be searched in the input stream of events
*/
@NotNull
private Pattern<T> pattern;
// this stores the index of the partial matches found so far
private List<MutableInt> partialMatches = Lists.newLinkedList();
private transient MutableInt patternLength;
/**
* Set the pattern that needs to be searched in the input stream of events
*
* @param pattern The pattern to be searched
*/
public void setPattern(Pattern<T> pattern)
{
this.pattern = pattern;
partialMatches.clear();
patternLength = new MutableInt(pattern.getStates().length - 1);
}
@Override
public void setup(Context.OperatorContext context)
{
super.setup(context);
patternLength = new MutableInt(pattern.getStates().length - 1);
}
/**
* Get the pattern that is searched in the input stream of events
*
* @return Returns the pattern searched
*/
public Pattern<T> getPattern()
{
return pattern;
}
public transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
{
@Override
public void process(T t)
{
if (pattern.checkState(t, 0)) {
partialMatches.add(new MutableInt(-1));
}
if (partialMatches.size() > 0) {
MutableInt tempInt;
Iterator<MutableInt> itr = partialMatches.iterator();
while (itr.hasNext()) {
tempInt = itr.next();
tempInt.increment();
if (!pattern.checkState(t, tempInt.intValue())) {
itr.remove();
}
else if (tempInt.equals(patternLength)) {
itr.remove();
processPatternFound();
}
}
}
}
};
/**
* This function determines how to process the pattern found
*/
public abstract void processPatternFound();
public static class Pattern<T>
{
/**
* The states of the pattern
*/
@NotNull
private final T[] states;
//for kryo
private Pattern()
{
states = null;
}
public Pattern(@NotNull T[] states)
{
this.states = states;
}
/**
* Checks if the input state matches the state at index "index" of the pattern
*
* @param t The input state
* @param index The index to match in the pattern
* @return True if the state exists at index "index" else false
*/
public boolean checkState(T t, int index)
{
return states[index].equals(t);
}
/**
* Get the states of the pattern
*
* @return The states of the pattern
*/
public T[] getStates()
{
return states;
}
}
}