/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.eagle.log.entity.filter; | |
import org.apache.eagle.log.entity.EntityQualifierUtils; | |
import org.apache.eagle.log.entity.meta.EntityDefinition; | |
import org.apache.eagle.log.entity.meta.EntitySerDeser; | |
import org.apache.eagle.log.entity.meta.Qualifier; | |
import org.apache.eagle.log.expression.ExpressionParser; | |
import org.apache.eagle.log.expression.ParsiiInvalidException; | |
import org.apache.eagle.query.parser.ComparisonOperator; | |
import org.apache.eagle.query.parser.TokenType; | |
import org.apache.commons.lang.StringUtils; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.io.WritableComparable; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import parsii.tokenizer.ParseException; | |
import java.io.DataInput; | |
import java.io.DataOutput; | |
import java.io.IOException; | |
import java.util.*; | |
/** | |
* BooleanExpressionComparator Currently support double expression only. TODO: 1) thread-safe? 2) Rewrite | |
* filter expression to evaluate once | |
*/ | |
public class BooleanExpressionComparator implements WritableComparable<List<KeyValue>> { | |
private static final Logger LOG = LoggerFactory.getLogger(BooleanExpressionComparator.class); | |
// Should be Writable | |
private QualifierFilterEntity filterEntity; | |
private EntityDefinition ed; | |
// Should not be writable | |
private double leftValue; | |
private double rightValue; | |
private BooleanExprFunc func = null; | |
public Set<String> getRequiredFields() { | |
return requiredFields; | |
} | |
private Set<String> requiredFields = new HashSet<String>(); | |
public BooleanExpressionComparator() { | |
} | |
public BooleanExpressionComparator(QualifierFilterEntity entity, EntityDefinition ed) { | |
this.filterEntity = entity; | |
this.ed = ed; | |
try { | |
this.init(); | |
} catch (Exception ex) { | |
// Client side expression validation to fast fail if having error | |
LOG.error("Got exception: " + ex.getMessage(), ex); | |
throw new ExpressionEvaluationException(ex.getMessage(), ex); | |
} | |
} | |
private void init() throws ParsiiInvalidException, ParseException { | |
LOG.info("Filter expression: " + filterEntity.toString()); | |
if (filterEntity.getKey() != null) { | |
if (filterEntity.getKeyType() == TokenType.NUMBER) { | |
leftValue = Double.parseDouble(filterEntity.getKey()); | |
} else { | |
ExpressionParser parser = ExpressionParser.parse(filterEntity.getKey()); | |
requiredFields.addAll(parser.getDependentFields()); | |
} | |
} else { | |
throw new IllegalStateException("QualifierFilterEntity key is null"); | |
} | |
if (filterEntity.getValue() != null) { | |
if (filterEntity.getValueType() == TokenType.NUMBER) { | |
rightValue = Double.parseDouble(filterEntity.getValue()); | |
} else { | |
ExpressionParser parser = ExpressionParser.parse(filterEntity.getValue()); | |
requiredFields.addAll(parser.getDependentFields()); | |
} | |
} else { | |
throw new IllegalStateException("QualifierFilterEntity value is null"); | |
} | |
if (this.filterEntity.getOp() == null) { | |
throw new IllegalStateException("QualifierFilterEntity op is null"); | |
} | |
this.func = _opExprFuncMap.get(this.filterEntity.getOp()); | |
if (this.func == null) { | |
throw new IllegalStateException("No boolean evaluation function found for operation: " | |
+ this.filterEntity.getOp()); | |
} | |
} | |
/** | |
* if(Double.isInfinite(leftValue) || Double.isInfinite(rightValue)) return false; | |
* | |
* @param context Map[String,Double] | |
* @return evaluation result as true (1) or false (0) | |
* @throws Exception | |
*/ | |
private boolean eval(Map<String, Double> context) throws Exception { | |
if (filterEntity.getKeyType() != TokenType.NUMBER) { | |
leftValue = eval(filterEntity.getKey(), context); | |
} | |
if (filterEntity.getValueType() != TokenType.NUMBER) { | |
rightValue = eval(filterEntity.getValue(), context); | |
} | |
if (Double.isInfinite(leftValue) || Double.isInfinite(rightValue)) { | |
// if(LOG.isDebugEnabled()) { | |
if (Double.isInfinite(leftValue)) { | |
LOG.warn("Evaluation result of key: " + this.filterEntity.getKey() + " is " + leftValue | |
+ " (Infinite), ignore"); | |
} else { | |
LOG.warn("Evaluation result of value: " + this.filterEntity.getValue() + " is " + rightValue | |
+ " (Infinite), ignore"); | |
} | |
// } | |
return false; | |
} | |
return func.eval(leftValue, rightValue); | |
} | |
/** | |
* if(Double.isInfinite(leftValue) || Double.isInfinite(rightValue)) return false; | |
* | |
* @param expr | |
* @param context | |
* @return | |
* @throws Exception | |
*/ | |
private double eval(String expr, Map<String, Double> context) throws Exception { | |
return ExpressionParser.parse(expr).eval(context); | |
} | |
/** | |
* @param row List[KeyValue] All key values in a row | |
* @return 0 to filter out row [false], otherwise to include row into scanner [true] | |
*/ | |
@Override | |
public int compareTo(List<KeyValue> row) { | |
Map<String, Double> context = new HashMap<String, Double>(); | |
for (KeyValue kv : row) { | |
String qualifierName = new String(kv.getQualifier()); | |
// Because assume just handle about double value | |
// so ignore tag whose value is String | |
if (!this.ed.isTag(qualifierName)) { | |
Qualifier qualifier = this.ed.getQualifierNameMap().get(qualifierName); | |
String displayName = qualifier.getDisplayName(); | |
if (displayName == null) { | |
displayName = qualifierName; | |
} | |
try { | |
if (this.requiredFields.contains(displayName)) { | |
EntitySerDeser serDeser = qualifier.getSerDeser(); | |
double value = EntityQualifierUtils | |
.convertObjToDouble(serDeser.deserialize(kv.getValue())); | |
if (Double.isNaN(value)) { | |
context.put(displayName, value); | |
} | |
} | |
} catch (Exception ex) { | |
LOG.warn("Failed to parse value of field " + displayName + " as double, ignore: " | |
+ ex.getMessage(), ex); | |
} | |
} | |
} | |
return compareTo(context); | |
} | |
/** | |
* @param context Map[String,Double] | |
* @return context.keySet().containsAll(this.requiredFields) && eval(context) ? 1:0; | |
*/ | |
int compareTo(Map<String, Double> context) { | |
try { | |
if (context.keySet().containsAll(this.requiredFields)) { | |
return eval(context) ? 1 : 0; | |
} else { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Require variables: [" + StringUtils.join(this.requiredFields, ",") | |
+ "], but just given: [" + StringUtils.join(context.keySet(), ",") + "]"); | |
} | |
return 0; | |
} | |
} catch (Exception e) { | |
LOG.error(e.getMessage(), e); | |
throw new ExpressionEvaluationException(e.getMessage(), e); | |
} | |
} | |
@Override | |
public void write(DataOutput out) throws IOException { | |
this.filterEntity.write(out); | |
this.ed.write(out); | |
} | |
@Override | |
public void readFields(DataInput in) throws IOException { | |
this.filterEntity = new QualifierFilterEntity(); | |
this.filterEntity.readFields(in); | |
this.ed = new EntityDefinition(); | |
this.ed.readFields(in); | |
try { | |
this.init(); | |
} catch (Exception ex) { | |
LOG.error("Got exception: " + ex.getMessage(), ex); | |
throw new IOException(ex.getMessage(), ex); | |
} | |
} | |
private static Map<ComparisonOperator, BooleanExprFunc> _opExprFuncMap = new HashMap<ComparisonOperator, BooleanExprFunc>(); | |
static { | |
_opExprFuncMap.put(ComparisonOperator.EQUAL, new EqualExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.IS, new EqualExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.NOT_EQUAL, new NotEqualExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.IS_NOT, new NotEqualExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.LESS, new LessExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.LESS_OR_EQUAL, new LessOrEqualExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.GREATER, new GreaterExprFunc()); | |
_opExprFuncMap.put(ComparisonOperator.GREATER_OR_EQUAL, new GreaterOrEqualExprFunc()); | |
// "Life should be much better with functional programming language" - Hao Chen Nov 18th, 2014 | |
} | |
private static interface BooleanExprFunc { | |
boolean eval(double val1, double val2); | |
} | |
private static class EqualExprFunc implements BooleanExprFunc { | |
@Override | |
public boolean eval(double val1, double val2) { | |
return val1 == val2; | |
} | |
} | |
private static class NotEqualExprFunc implements BooleanExprFunc { | |
@Override | |
public boolean eval(double val1, double val2) { | |
return val1 != val2; | |
} | |
} | |
private static class LessExprFunc implements BooleanExprFunc { | |
@Override | |
public boolean eval(double val1, double val2) { | |
return val1 < val2; | |
} | |
} | |
private static class LessOrEqualExprFunc implements BooleanExprFunc { | |
@Override | |
public boolean eval(double val1, double val2) { | |
return val1 <= val2; | |
} | |
} | |
private static class GreaterExprFunc implements BooleanExprFunc { | |
@Override | |
public boolean eval(double val1, double val2) { | |
return val1 > val2; | |
} | |
} | |
private static class GreaterOrEqualExprFunc implements BooleanExprFunc { | |
@Override | |
public boolean eval(double val1, double val2) { | |
return val1 >= val2; | |
} | |
} | |
public static class ExpressionEvaluationException extends RuntimeException { | |
public ExpressionEvaluationException(String message, Throwable cause) { | |
super(message, cause); | |
} | |
public ExpressionEvaluationException(String message) { | |
super(message); | |
} | |
public ExpressionEvaluationException(Throwable cause) { | |
super(cause); | |
} | |
} | |
@Override | |
public String toString() { | |
return this.getClass().getSimpleName() + " (" + this.filterEntity.toString() + ")"; | |
} | |
} |