blob: 2255a0ebcc86dc4971d3de0f3815701106859aac [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.beam.sdk.extensions.sql.impl.nfa;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPCall;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPFieldRef;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPKind;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPLiteral;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPOperation;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPOperator;
import org.apache.beam.sdk.extensions.sql.impl.cep.CEPPattern;
import org.apache.beam.sdk.extensions.sql.impl.cep.Quantifier;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
// TODO: add support for more quantifiers: `?`, `{19, }` ... for now, support `+` and singletons
// TODO: sort conditions based on "the last identifier" during compilation
// TODO: add optimization for the NFA
// for now, assume the conditions are sorted.
* {@code NFA} is an implementation of non-deterministic finite automata. It is used for the
* non-deterministic pattern match in the MATCH_RECOGNIZE component.
* <p>The implementation is strongly based on the UMASS paper on NFA with shared buffer:
* @see <a
* href=""></a>
"nullness" // TODO(
public class NFA implements Serializable {
private final State startState;
private ArrayList<StateLocator> currentRuns;
private final Schema upstreamSchema;
private NFA(List<CEPPattern> patterns, Schema upstreamSchema) {
this.startState = loadStates(patterns);
this.currentRuns = new ArrayList<>();
this.upstreamSchema = upstreamSchema;
public static NFA compile(List<CEPPattern> patterns, Schema outSchema) {
return new NFA(patterns, outSchema);
// process a new row,
// return a mapping of pattern variable to row to output in the DoFn if there is a match
// return null if none of locators reached the final state
public Map<String, ArrayList<Row>> processNewRow(Row inputRow) {
// wrap the input row as an event
Event inputEvent = new Event(inputRow, upstreamSchema);
ArrayList<StateLocator> nextStateLocators = new ArrayList<>();
// add a start state locator to the array
EventPointer nullPtr = new EventPointer(new ArrayList<>(), "");
currentRuns.add(new StateLocator(nullPtr, startState, startState, 0, null));
// scan for kleene plus locator, if exits, add the next next state
ArrayList<StateLocator> kleenePlusLocators = new ArrayList<>();
for (StateLocator locator : currentRuns) {
if (locator.isKleenePlusSecondary()) {
if (locator.getCurState().getNextState().isFinal) {
return processOutput(locator);
StateLocator proceedLocator = locator.proceedIgnore();
// decide next possible states
for (StateLocator currentLocator : currentRuns) {
boolean split = false;
StateLocator proceedLocator = currentLocator.proceed(inputEvent);
if (proceedLocator != null) {
// TODO: add support for after match strategy
// if the locator is at the final state
// clear current runs
// then return the output
if (proceedLocator.atFinal()) {
// reset after a match
State iterState = startState;
while (!iterState.isFinal) {
iterState = iterState.getNextState();
currentRuns.add(new StateLocator(nullPtr, startState, startState, 0, null));
return processOutput(proceedLocator);
} else {
// add the new locator to the array
split = true;
StateLocator takeLocator = currentLocator.take(inputEvent, split);
if (takeLocator != null) {
this.currentRuns = nextStateLocators;
return null;
// returns a row with the output schema once a locator reaches the final state
private Map<String, ArrayList<Row>> processOutput(StateLocator locator) {
HashMap<String, ArrayList<Row>> rows = new HashMap<>();
EventPointer iterPointer = locator.getPointer().copy();
Event curEvent = locator.getCurrentEvent();
while (curEvent != null) {
String patternVar = iterPointer.getPatternVar();
if (rows.containsKey(patternVar)) {
ArrayList<Row> patternArray = rows.get(patternVar);
} else {
ArrayList<Row> newPatternArray = new ArrayList<>();
rows.put(patternVar, newPatternArray);
curEvent = curEvent.findEvent(iterPointer);
if (curEvent != null) {
iterPointer = curEvent.getPrevPointer(iterPointer);
// restore the order
for (ArrayList<Row> i : rows.values()) {
return rows;
private static class Event implements Serializable {
private HashMap<EventPointer, Event> prevEvents =
new HashMap<>(); // a mapping from the runIndex to the previous events
private Row row;
private Schema upstreamSchema;
Event(Row inputRow, Schema upstreamSchema) {
this.row = inputRow;
this.upstreamSchema = upstreamSchema;
// store an input row as a literal
public CEPLiteral toCEPLiteral(CEPFieldRef fieldRef) {
int fieldIndex = fieldRef.getIndex();
Schema.Field field = upstreamSchema.getField(fieldIndex);
Schema.FieldType type = field.getType();
switch (type.getTypeName()) {
case BYTE:
return CEPLiteral.of(row.getByte(fieldIndex));
case INT16:
return CEPLiteral.of(row.getInt16(fieldIndex));
case INT32:
return CEPLiteral.of(row.getInt32(fieldIndex));
case INT64:
return CEPLiteral.of(row.getInt64(fieldIndex));
return CEPLiteral.of(row.getDecimal(fieldIndex));
case FLOAT:
return CEPLiteral.of(row.getFloat(fieldIndex));
case DOUBLE:
return CEPLiteral.of(row.getDouble(fieldIndex));
return CEPLiteral.of(row.getDateTime(fieldIndex));
return CEPLiteral.of(row.getBoolean(fieldIndex));
case STRING:
return CEPLiteral.of(row.getString(fieldIndex));
throw new UnsupportedOperationException(
"The type is not supported: " + type.getTypeName().toString());
private Event findEvent(EventPointer eventPointer) {
return prevEvents.getOrDefault(eventPointer, null);
private EventPointer findEventPointer(EventPointer pointer) {
for (EventPointer i : prevEvents.keySet()) {
if (i.equals(pointer)) {
return i.copy();
return null;
public EventPointer getPrevPointer(EventPointer curPointer) {
if (curPointer.isNull()) {
return null;
if (curPointer.isProceedPointer()) {
while (findEvent(curPointer) == null && curPointer.canTrim()) {
return findEventPointer(curPointer);
} else {
while (findEvent(curPointer) == null) {
if (curPointer.canDecrement()) {
} else {
return null;
return curPointer;
public Row getRow() {
return row;
public void addPrevEvent(EventPointer ptr, @Nullable Event prevEvent) {
prevEvents.put(ptr, prevEvent);
// shared buffer versioned pointer: see the UMASS paper for description
private static class EventPointer implements Serializable {
private List<Integer> ptrValues;
// labels the event that was pointed to
private final String patternVar;
EventPointer(List<Integer> ptrValues, String patternVar) {
this.ptrValues = ptrValues;
this.patternVar = patternVar;
public boolean canTrim() {
return ptrValues.size() > 1;
// for following a proceed edge
// trim the last digit
public void trim() {
if (isProceedPointer()) {
ptrValues = ptrValues.subList(0, ptrValues.size() - 1);
} else {
throw new IllegalStateException("the null event pointer cannot be trimmed.");
public boolean canDecrement() {
if (isNull()) {
return false;
int lastValue = ptrValues.get(ptrValues.size() - 1);
return lastValue > 0;
// for following the take edge
// decrement the last digit
public void decrement() {
if (!canDecrement()) {
throw new IllegalStateException("the event pointer cannot be decremented.");
if (!isProceedPointer()) {
ArrayList<Integer> newPtrValue =
new ArrayList<>(ptrValues.subList(0, ptrValues.size() - 1));
int lastValue = ptrValues.get(ptrValues.size() - 1) - 1;
ptrValues = newPtrValue;
} else {
throw new IllegalStateException("the event pointer cannot be decremented.");
public boolean equals(Object other) {
if (!(other instanceof EventPointer)) {
return false;
EventPointer otherPointer = (EventPointer) other;
return ptrValues.equals(otherPointer.ptrValues);
public boolean isProceedPointer() {
// if the last digit of the pointer is 0,
// then it is a proceed pointer
if (ptrValues.isEmpty()) {
return false;
int lastPtrValue = ptrValues.get(ptrValues.size() - 1);
if (lastPtrValue == 0) {
return true;
} else {
return false;
public int hashCode() {
return ptrValues.hashCode();
public boolean isNull() {
return ptrValues.isEmpty() && patternVar.equals("");
public String getPatternVar() {
return patternVar;
public EventPointer copy() {
ArrayList<Integer> newPtrValue = new ArrayList<>(ptrValues);
return new EventPointer(newPtrValue, patternVar);
public EventPointer getNewProceedPointer(int value, String patternVar) {
ArrayList<Integer> newPtrValue = new ArrayList<>(ptrValues);
return new EventPointer(newPtrValue, patternVar);
// for taking the take edge with splitting, set last pointer digit to the desired value
public EventPointer getNewTakePointer(int value) {
ArrayList<Integer> newPtrValue = new ArrayList<>(ptrValues);
newPtrValue.set(newPtrValue.size() - 1, value);
return new EventPointer(newPtrValue, patternVar);
public String toString() {
ArrayList<String> ptrValuesCopy = new ArrayList<>();
for (Integer i : ptrValues) {
return String.join(".", ptrValuesCopy);
// StateLocator should be thought as immutable
private static class StateLocator implements Serializable {
private EventPointer ptr;
private State startState;
private State curState;
private int takeCount = 0; // counts the number of events taken
private Event curEvent = null;
EventPointer ptr, State startState, State curState, int takeCount, Event curEvent) {
this.ptr = ptr;
this.startState = startState;
this.curState = curState;
this.takeCount = takeCount;
this.curEvent = curEvent;
public State getCurState() {
return curState;
public EventPointer getPointer() {
return ptr;
public Event getCurrentEvent() {
return curEvent;
// check if the current state is the final state
public boolean atFinal() {
return curState.isFinal;
public boolean isKleenePlusSecondary() {
return curState.isKleenePlusSecondary();
// a proceed action for secondary kleenePlus state
public StateLocator proceedIgnore() {
if (isKleenePlusSecondary()) {
EventPointer newPtr = ptr.getNewProceedPointer(0, curState.getPatternVar());
return new StateLocator(newPtr, startState, curState.getNextState(), 0, curEvent);
} else {
return null;
// represents the "proceed"/"begin" edge: transfer to a new state,
// writes the new event in the new state's buffer.
// returns a "new" state locator
// returns null if not a match
public StateLocator proceed(Event inputEvent) {
if (curState.hasProceed()) {
// try to proceed
CEPCall condition = (CEPCall) curState.getProceedCondition();
String patternVar = curState.getPatternVar();
if (evalCondition(inputEvent, condition)) {
// special case: for an event that starts a match,
// assign a new index as the pointer value
if (curState == startState && curEvent == null) {
int ptrValue = curState.assignIndex();
ArrayList<Integer> ptrArray = new ArrayList<>();
EventPointer eventPointer = new EventPointer(ptrArray, patternVar);
inputEvent.addPrevEvent(eventPointer, null);
return new StateLocator(
eventPointer, startState, curState.getNextState(), 0, inputEvent);
// for the other cases, add a zero in the event pointer
EventPointer newPtr = ptr.getNewProceedPointer(0, patternVar);
inputEvent.addPrevEvent(newPtr, curEvent);
return new StateLocator(newPtr, startState, curState.getNextState(), 0, inputEvent);
} else {
return null;
} else {
return null;
// represents the "take" edge: the state stays at current state,
// write the new event in the current state's buffer.
// returns null if not a match
public StateLocator take(Event inputEvent, boolean split) {
if (curState.hasTake()) {
if (evalCondition(inputEvent, curState.getTakeCondition())) {
if (split) {
// get another unique pointer value
int ptrValue = curState.assignIndex();
EventPointer newPtr = ptr.getNewTakePointer(ptrValue);
inputEvent.addPrevEvent(newPtr, curEvent);
return new StateLocator(newPtr, startState, curState, takeCount + 1, inputEvent);
} else {
EventPointer newPtr = ptr.copy();
inputEvent.addPrevEvent(newPtr, curEvent);
return new StateLocator(newPtr, startState, curState, takeCount + 1, inputEvent);
} else {
return null;
} else {
return null;
// evaluates the condition
// returns true if the new event satisfies the condition at a givens state
private boolean evalCondition(Event inputEvent, CEPOperation operation) {
if (operation == null) {
return true;
CEPCall condition = (CEPCall) operation;
CEPKind comparator = condition.getOperator().getCepKind();
CEPOperation leftSide =
condition.getOperands().get(0); // left side must contain field reference
CEPOperation rightSide = condition.getOperands().get(1);
CEPLiteral leftSideValue = evalLeftSideCondition(leftSide, inputEvent);
CEPLiteral rightSideValue = evalRightSideCondition(rightSide, inputEvent);
if (leftSideValue == null) {
return false;
if (rightSideValue == null) {
return true;
switch (comparator) {
case EQUALS:
return leftSideValue.compareTo(rightSideValue) == 0;
return leftSideValue.compareTo(rightSideValue) != 0;
return leftSideValue.compareTo(rightSideValue) > 0;
return leftSideValue.compareTo(rightSideValue) >= 0;
return leftSideValue.compareTo(rightSideValue) < 0;
return leftSideValue.compareTo(rightSideValue) <= 0;
throw new IllegalStateException(
"the comparator is not supported: " + comparator.toString());
// evaluates the condition value (left) given an input event
private CEPLiteral evalLeftSideCondition(CEPOperation inputOperation, Event inputEvent) {
if (inputOperation instanceof CEPLiteral) {
return (CEPLiteral) inputOperation;
} else if (inputOperation.getClass() == CEPFieldRef.class) {
return inputEvent.toCEPLiteral((CEPFieldRef) inputOperation);
} else if (inputOperation.getClass() == CEPCall.class) {
CEPCall call = (CEPCall) inputOperation;
CEPOperator operator = call.getOperator();
List<CEPOperation> operands = call.getOperands();
switch (operator.getCepKind()) {
case LAST:
return last(
operands.get(0), evalLeftSideCondition(operands.get(1), inputEvent), inputEvent);
case PLUS:
return plus(
evalLeftSideCondition(operands.get(0), inputEvent),
evalLeftSideCondition(operands.get(1), inputEvent));
throw new UnsupportedOperationException(
"the function is not supported for now: " + operator.getCepKind().toString());
} else {
throw new IllegalStateException(
"the left side CEP operation is not legal: " + inputOperation.getClass().toString());
// evaluates the condition value (right) given an input event
private CEPLiteral evalRightSideCondition(CEPOperation inputOperation, Event inputEvent) {
if (inputOperation instanceof CEPLiteral) {
return (CEPLiteral) inputOperation;
} else if (inputOperation.getClass() == CEPCall.class) {
CEPCall call = (CEPCall) inputOperation;
CEPOperator operator = call.getOperator();
List<CEPOperation> operands = call.getOperands();
switch (operator.getCepKind()) {
case PLUS:
return plus(
evalRightSideCondition(operands.get(0), inputEvent),
evalRightSideCondition(operands.get(1), inputEvent));
case PREV:
return prev(operands.get(0), (CEPLiteral) operands.get(1), ptr, curEvent, inputEvent);
throw new UnsupportedOperationException(
"the function is not supported for now: " + operator.getCepKind().toString());
} else {
throw new IllegalStateException(
"the right side CEP operation is not legal: " + inputOperation.getClass().toString());
/* below are function implementations */
// represents the PREV operation
private CEPLiteral prev(
CEPOperation opr1,
CEPLiteral opr2,
EventPointer curPointer,
Event curEvent,
Event inputEvent) {
if (opr1.getClass() != CEPFieldRef.class) {
throw new IllegalStateException(
"the first argument of the PREV operation should be a field reference. Provided: "
+ opr1.getClass().toString());
if (opr2.getTypeName() != Schema.TypeName.DECIMAL) {
throw new IllegalStateException(
"the second argument of the prev operation should be a decimal. Provided: "
+ opr2.getClass().toString());
// if the second argument is 0, return the current event
if (opr2.getDecimal().intValue() == 0) {
return inputEvent.toCEPLiteral((CEPFieldRef) opr1);
// for the starting event, return null
if (curEvent == null) {
return null;
CEPFieldRef fieldRef = (CEPFieldRef) opr1;
String alpha = fieldRef.getAlpha(); // the patternVar
int lastNumber = opr2.getDecimal().intValue();
EventPointer iterPointer = curPointer.copy();
while (curEvent != null && iterPointer.getPatternVar().equals(alpha) && lastNumber > 0) {
iterPointer = curEvent.getPrevPointer(iterPointer);
curEvent = curEvent.findEvent(iterPointer);
if (curEvent != null) {
return curEvent.toCEPLiteral(fieldRef);
} else {
return null;
// represents the LAST operation
// TODO: add implementation. for now, return the current row
private CEPLiteral last(CEPOperation opr1, CEPLiteral opr2, Event inputEvent) {
if (opr1.getClass() != CEPFieldRef.class) {
throw new IllegalStateException(
"the first argument of the PREV operation should be a field reference. Provided: "
+ opr1.getClass().toString());
if (opr2.getTypeName() != Schema.TypeName.DECIMAL) {
throw new IllegalStateException(
"the second argument of the prev operation should be a decimal. Provided: "
+ opr2.getClass().toString());
return inputEvent.toCEPLiteral((CEPFieldRef) opr1);
// represents the PLUS operation
private CEPLiteral plus(CEPLiteral opr1, CEPLiteral opr2) {
Schema.TypeName type1 = opr1.getTypeName();
Schema.TypeName type2 = opr2.getTypeName();
if (type1.isNumericType() && type1 == type2) {
switch (type1) {
case BYTE:
return CEPLiteral.of(opr1.getByte() + opr2.getByte());
case INT16:
return CEPLiteral.of(opr1.getInt16() + opr2.getInt16());
case INT32:
return CEPLiteral.of(opr1.getInt32() + opr2.getInt32());
case INT64:
return CEPLiteral.of(opr1.getInt64() + opr2.getInt64());
return CEPLiteral.of(opr1.getDecimal().add(opr2.getDecimal()));
case FLOAT:
return CEPLiteral.of(opr1.getFloat() + opr2.getFloat());
case DOUBLE:
return CEPLiteral.of(opr1.getDouble() + opr2.getDouble());
throw new UnsupportedOperationException("Type is not supported: " + type1.toString());
} else {
throw new IllegalStateException(
"Types do not match: " + type1.toString() + ", " + type2.toString());
// State are determined directly by the PATTERN clause
private static class State implements Serializable {
private int index = 0; // number ith state (pointer value) in the NFA
private final String patternVar;
private final Quantifier quant;
private final CEPOperation
condition; // condition to evaluate when taking the "begin" action and "proceed" action
private State nextState = null;
public final boolean isStart;
public final boolean isFinal;
private final boolean
isKleenePlusSecondary; // whether is the second state for a Kleene Plus pattern variable
String patternVar,
Quantifier quant,
CEPOperation condition,
boolean isStart,
boolean isFinal,
boolean isKleenePlusSecondary) {
this.patternVar = patternVar;
this.quant = quant;
this.condition = condition;
this.isStart = isStart;
this.isFinal = isFinal;
this.isKleenePlusSecondary = isKleenePlusSecondary;
public String getPatternVar() {
return patternVar;
// assigns a new index
public int assignIndex() {
return ++index;
// reset the index
public void reset() {
index = 0;
// checks if a state has a take edge
public boolean hasTake() {
return isKleenePlusSecondary;
// checks if a state has a proceed/begin edge
public boolean hasProceed() {
// except for the final state
// return true
return !isFinal;
public boolean isKleenePlusSecondary() {
return isKleenePlusSecondary;
// get the condition for the `take` condition
// returns null if the take edge takes no condition
// throws exception if there is no take edge for the state
public CEPOperation getTakeCondition() {
if (!hasTake()) {
throw new IllegalStateException("The state does not have a take edge.");
return condition;
// get the condition for the `proceed` or the `begin` edge
// returns null if the proceed (begin) edge is always evaluated to true
// throws exception if there is no proceed (begin) edge for the state
public CEPOperation getProceedCondition() {
if (!hasProceed()) {
throw new IllegalStateException("The state does not have a proceed edge.");
// for singleton state or the first state of a paired state
// return the current condition
if (!isKleenePlusSecondary) {
return condition;
} else {
return null;
public Quantifier getQuantifier() {
return quant;
public void setNextState(State theNextState) {
if (this == theNextState) {
this.nextState = theNextState;
public State getNextState() {
return nextState;
public boolean isKleenePlus() {
return quant.toString().equals("+") || quant.toString().equals("+?");
private static State setNextStatesAndAssignIndices(List<State> states) {
for (int i = 0; i < (states.size() - 1); ++i) {
State currentState = states.get(i);
State nextState = states.get(i + 1);
if (currentState.isKleenePlus()) {
State secondaryState = currentState.getNextState();
} else {
return states.get(0);
// constructs states for the NFA and returns the start state
private State loadStates(List<CEPPattern> patterns) {
boolean startState;
ArrayList<State> states = new ArrayList<>();
for (int i = 0; i < patterns.size(); ++i) {
if (i == 0) {
startState = true;
} else {
startState = false;
CEPPattern currentPattern = patterns.get(i);
CEPOperation condition = currentPattern.getPatternCondition();
Quantifier quantifier = currentPattern.getQuantifier();
if (quantifier.toString().equals("+")) {
// for Kleene plus, we need a pair of states
State primaryState =
new State(
State secondaryState =
new State(
} else {
// for non-Kleene-Plus pattern var, construct a single state
State newState =
new State(
currentPattern.getPatternVar(), quantifier, condition, startState, false, false);
// add final state
State theFinalState = new State("", Quantifier.NONE, null, false, true, false);
State beginState = setNextStatesAndAssignIndices(states);
return beginState;