blob: 00698281c4b490ad5a1baa7de5f3846e01a497bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql.impl.cep;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelCollation;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rel.RelFieldCollation;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexLiteral;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.sql.SqlOperator;
import org.apache.beam.vendor.calcite.v1_26_0.org.apache.calcite.util.ImmutableBitSet;
/**
* Some utility methods for transforming Calcite's constructs into our own Beam constructs (for
* serialization purpose).
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class CEPUtils {
private static Quantifier getQuantifier(int start, int end, boolean isReluctant) {
Quantifier quantToAdd;
if (!isReluctant) {
if (start == end) {
quantToAdd = new Quantifier("{ " + start + " }", start, end, false);
} else {
if (end == -1) {
if (start == 0) {
quantToAdd = Quantifier.ASTERISK;
} else if (start == 1) {
quantToAdd = Quantifier.PLUS;
} else {
quantToAdd = new Quantifier("{ " + start + " }", start, end, false);
}
} else {
if (start == 0 && end == 1) {
quantToAdd = Quantifier.QMARK;
} else if (start == -1) {
quantToAdd = new Quantifier("{ , " + end + " }", start, end, false);
} else {
quantToAdd = new Quantifier("{ " + start + " , }", start, end, false);
}
}
}
} else {
if (start == end) {
quantToAdd = new Quantifier("{ " + start + " }?", start, end, true);
} else {
if (end == -1) {
if (start == 0) {
quantToAdd = Quantifier.ASTERISK_RELUCTANT;
} else if (start == 1) {
quantToAdd = Quantifier.PLUS_RELUCTANT;
} else {
quantToAdd = new Quantifier("{ " + start + " }?", start, end, true);
}
} else {
if (start == 0 && end == 1) {
quantToAdd = Quantifier.QMARK_RELUCTANT;
} else if (start == -1) {
quantToAdd = new Quantifier("{ , " + end + " }?", start, end, true);
} else {
quantToAdd = new Quantifier("{ " + start + " , }?", start, end, true);
}
}
}
}
return quantToAdd;
}
/** Construct a list of {@code CEPPattern}s from a {@code RexNode}. */
public static ArrayList<CEPPattern> getCEPPatternFromPattern(
Schema upStreamSchema, RexNode call, Map<String, RexNode> patternDefs) {
ArrayList<CEPPattern> patternList = new ArrayList<>();
if (call.getClass() == RexLiteral.class) {
String p = ((RexLiteral) call).getValueAs(String.class);
RexNode pd = patternDefs.get(p);
patternList.add(CEPPattern.of(upStreamSchema, p, (RexCall) pd, Quantifier.NONE));
} else {
RexCall patCall = (RexCall) call;
SqlOperator operator = patCall.getOperator();
List<RexNode> operands = patCall.getOperands();
// check if the node has quantifier
if (operator.getKind() == SqlKind.PATTERN_QUANTIFIER) {
String p = ((RexLiteral) operands.get(0)).getValueAs(String.class);
RexNode pd = patternDefs.get(p);
int start = ((RexLiteral) operands.get(1)).getValueAs(Integer.class);
int end = ((RexLiteral) operands.get(2)).getValueAs(Integer.class);
boolean isReluctant = ((RexLiteral) operands.get(3)).getValueAs(Boolean.class);
patternList.add(
CEPPattern.of(upStreamSchema, p, (RexCall) pd, getQuantifier(start, end, isReluctant)));
} else {
for (RexNode i : operands) {
patternList.addAll(getCEPPatternFromPattern(upStreamSchema, i, patternDefs));
}
}
}
return patternList;
}
/** Recursively construct a regular expression from a {@code RexNode}. */
public static String getRegexFromPattern(RexNode call) {
if (call.getClass() == RexLiteral.class) {
return ((RexLiteral) call).getValueAs(String.class);
} else {
RexCall opr = (RexCall) call;
SqlOperator operator = opr.getOperator();
List<RexNode> operands = opr.getOperands();
if (operator.getKind() == SqlKind.PATTERN_QUANTIFIER) {
String p = ((RexLiteral) operands.get(0)).getValueAs(String.class);
int start = ((RexLiteral) operands.get(1)).getValueAs(Integer.class);
int end = ((RexLiteral) operands.get(2)).getValueAs(Integer.class);
boolean isReluctant = ((RexLiteral) operands.get(3)).getValueAs(Boolean.class);
Quantifier quantifier = getQuantifier(start, end, isReluctant);
return p + quantifier.toString();
}
return getRegexFromPattern(opr.getOperands().get(0))
+ getRegexFromPattern(opr.getOperands().get(1));
}
}
/** Transform a list of keys in Calcite to {@code ORDER BY} to {@code OrderKey}s. */
public static ArrayList<OrderKey> makeOrderKeysFromCollation(RelCollation orderKeys) {
List<RelFieldCollation> relOrderKeys = orderKeys.getFieldCollations();
ArrayList<OrderKey> orderKeysList = new ArrayList<>();
for (RelFieldCollation i : relOrderKeys) {
orderKeysList.add(OrderKey.of(i));
}
return orderKeysList;
}
/** Transform the partition columns into serializable CEPFieldRef. */
public static List<CEPFieldRef> getCEPFieldRefFromParKeys(ImmutableBitSet partitionKeys) {
ArrayList<CEPFieldRef> fieldList = new ArrayList<>();
for (int index : partitionKeys.asList()) {
fieldList.add(new CEPFieldRef("Partition Key " + index, index));
}
return fieldList;
}
/** a function that finds a pattern reference recursively. */
public static CEPFieldRef getFieldRef(CEPOperation opr) {
if (opr.getClass() == CEPFieldRef.class) {
CEPFieldRef field = (CEPFieldRef) opr;
return field;
} else if (opr.getClass() == CEPCall.class) {
CEPCall call = (CEPCall) opr;
CEPFieldRef field;
for (CEPOperation i : call.getOperands()) {
field = getFieldRef(i);
if (field != null) {
return field;
}
}
return null;
} else {
return null;
}
}
public static Schema.FieldType getFieldType(Schema streamSchema, CEPOperation measureOperation) {
if (measureOperation.getClass() == CEPFieldRef.class) {
CEPFieldRef field = (CEPFieldRef) measureOperation;
return streamSchema.getField(field.getIndex()).getType();
} else if (measureOperation.getClass() == CEPCall.class) {
CEPCall call = (CEPCall) measureOperation;
CEPKind oprKind = call.getOperator().getCepKind();
if (oprKind == CEPKind.SUM || oprKind == CEPKind.COUNT) {
return Schema.FieldType.INT32;
} else if (oprKind == CEPKind.AVG) {
return Schema.FieldType.DOUBLE;
}
CEPFieldRef refOpt;
for (CEPOperation i : call.getOperands()) {
refOpt = getFieldRef(i);
if (refOpt != null) {
return streamSchema.getField(refOpt.getIndex()).getType();
}
}
throw new UnsupportedOperationException("the function in Measures is not recognized.");
} else {
throw new UnsupportedOperationException("the function in Measures is not recognized.");
}
}
}