blob: 3fa574cc292a237dbd1e942abc475c559a33940f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.apps.logstream;
import java.util.*;
import java.util.HashMap;
import java.util.Map;
import javax.validation.ValidationException;
import org.codehaus.janino.ExpressionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.apps.logstream.PropertyRegistry.LogstreamPropertyRegistry;
import com.datatorrent.apps.logstream.PropertyRegistry.PropertyRegistry;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.netlet.util.DTThrowable;
/**
*
* Filters an input tuple and emits a filter stamped tuple for each satisfied filter
*
* @since 0.9.4
*/
public class FilterOperator extends BaseOperator
{
public final transient DefaultOutputPort<HashMap<String, Object>> outputMap = new DefaultOutputPort<HashMap<String, Object>>();
public final transient DefaultInputPort<Map<String, Object>> input = new DefaultInputPort<Map<String, Object>>()
{
@Override
public void process(Map<String, Object> map)
{
HashMap<String, Object> filterTuple;
int typeId = (Integer)map.get(LogstreamUtil.LOG_TYPE);
Map<String, String[]> conditions = conditionList.get(typeId);
if (conditions != null) {
for (String condition : conditions.keySet()) {
if (evaluate(condition, map, conditions.get(condition))) {
int index = registry.getIndex(LogstreamUtil.FILTER, condition);
filterTuple = new HashMap<String, Object>(map);
filterTuple.put(LogstreamUtil.FILTER, index);
outputMap.emit(filterTuple);
}
}
}
// emit the same tuple for default condition
int defaultFilterIndex = registry.getIndex(LogstreamUtil.FILTER, registry.lookupValue(typeId) + "_" + "DEFAULT");
if (defaultFilterIndex >= 0) {
map.put(LogstreamUtil.FILTER, defaultFilterIndex);
outputMap.emit((HashMap<String, Object>)map);
}
}
private boolean evaluate(String condition, Map<String, Object> map, String[] keys)
{
boolean ret = false;
Object[] values = new Object[keys.length];
Class[] classTypes = new Class[keys.length];
ExpressionEvaluator ee = evaluators.get(condition);
try {
for (int i = 0; i < keys.length; i++) {
Object val = map.get(keys[i]);
if (val == null) {
logger.debug("filter key {} missing in input record {}", keys[i], map);
return ret;
}
else {
values[i] = val;
classTypes[i] = val.getClass();
}
}
if (ee == null) {
ee = new ExpressionEvaluator(condition, Boolean.class, keys, classTypes);
evaluators.put(condition, ee);
}
ret = (Boolean)ee.evaluate(values);
logger.debug("expression evaluated to {} for expression: {} with key class types: {} keys: {} values: {}", ret, condition, Arrays.toString(classTypes), Arrays.toString(keys), Arrays.toString(values));
}
catch (Throwable t) {
DTThrowable.rethrow(t);
}
return ret;
}
};
private static final Logger logger = LoggerFactory.getLogger(FilterOperator.class);
/**
* key: type
* value --> map of
* key: condition expression
* value: list of keys on which the condition is
*/
private HashMap<Integer, Map<String, String[]>> conditionList = new HashMap<Integer, Map<String, String[]>>();
private transient HashMap<String, ExpressionEvaluator> evaluators = new HashMap<String, ExpressionEvaluator>();
private PropertyRegistry<String> registry;
/**
* supply the registry object which is used to store and retrieve meta information about each tuple
*
* @param registry
*/
public void setRegistry(PropertyRegistry<String> registry)
{
this.registry = registry;
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
LogstreamPropertyRegistry.setInstance(registry);
}
/**
* Supply the properties to the operator.
* Input includes following properties:
* type=logtype // input logtype for which the properties are to be set
* followed by list of keys followed by the filter expression
* eg: type=apache, response, response.equals("404")
*
* if default filter is needed i.e. emit a tuple with no filter applied on it
* Input includes following properties:
* type=logtype // input logtype for which the properties are to be set
* followed by the following default condition
* default=true
* eg: type=apache, default=true
*
* @param properties
*/
public void addFilterCondition(String[] properties)
{
try {
// TODO: validations
if (properties.length == 2) {
logger.debug(Arrays.toString(properties));
String[] split = properties[0].split("=");
String type = split[1];
String[] split1 = properties[1].split("=");
if (split1[1].toLowerCase().equals("true")) {
registry.bind(LogstreamUtil.FILTER, type + "_" + "DEFAULT");
}
}
else if (properties.length > 2) {
String[] split = properties[0].split("=");
String type = split[1];
int typeId = registry.getIndex(LogstreamUtil.LOG_TYPE, type);
String[] keys = new String[properties.length - 2];
System.arraycopy(properties, 1, keys, 0, keys.length);
String expression = properties[properties.length - 1];
Map<String, String[]> conditions = conditionList.get(typeId);
if (conditions == null) {
conditions = new HashMap<String, String[]>();
conditionList.put(typeId, conditions);
}
conditions.put(expression, keys);
if (registry != null) {
registry.bind(LogstreamUtil.FILTER, expression);
}
}
else {
throw new ValidationException("Invalid input property string " + Arrays.toString(properties));
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}