Merge pull request #627 from siyuanh/cdr
#626 Operators to filter invalid CDRs based on some validation rules
diff --git a/apps/src/main/java/com/datatorrent/apps/telecom/operator/CDRCleanOperator.java b/apps/src/main/java/com/datatorrent/apps/telecom/operator/CDRCleanOperator.java
new file mode 100644
index 0000000..18c41d3
--- /dev/null
+++ b/apps/src/main/java/com/datatorrent/apps/telecom/operator/CDRCleanOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.apps.telecom.operator;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.FilterOperator;
+
+public class CDRCleanOperator extends FilterOperator<Map<String, Object>>
+{
+ private transient List<CDRValidator> vList = new LinkedList<CDRCleanOperator.CDRValidator>();
+
+ @OutputPortFieldAnnotation(name="invalid cdr output", optional=true)
+ public transient DefaultOutputPort<Map<String, Object>> invalidOutput = new DefaultOutputPort<Map<String,Object>>();
+
+ @Override
+ public boolean satisfiesFilter(Map<String, Object> tuple)
+ {
+ for (CDRValidator v : vList) {
+ if(!v.validateTuple(tuple)){
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void handleInvalidTuple(Map<String, Object> tuple)
+ {
+ invalidOutput.emit(tuple);
+ }
+
+
+ void registValidator(CDRValidator validator){
+ vList.add(validator);
+ }
+
+
+
+ public static interface CDRValidator{
+ boolean validateTuple(Map<String, Object> tuple);
+ }
+
+}
diff --git a/apps/src/test/java/com/datatorrent/apps/telecom/operator/CDRCleanOperatorTest.java b/apps/src/test/java/com/datatorrent/apps/telecom/operator/CDRCleanOperatorTest.java
new file mode 100644
index 0000000..6c9da9a
--- /dev/null
+++ b/apps/src/test/java/com/datatorrent/apps/telecom/operator/CDRCleanOperatorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.apps.telecom.operator;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+
+import org.junit.Test;
+
+import com.datatorrent.apps.telecom.operator.CDRCleanOperator.CDRValidator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.google.common.collect.Sets;
+
+public class CDRCleanOperatorTest
+{
+
+ private static final String[] keys = {"Call Type", "Sell Price", "Phone Number"};
+
+ private static final Object[][] values = {{"V", 132.4, "2039827836"},{"Z", 34.2, "6378948847"}, {"I", -10.0, "3787784738"}};
+
+ @Test
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void testOperator()
+ {
+ CDRCleanOperator oper = new CDRCleanOperator();
+
+ CollectorTestSink successSink = new CollectorTestSink();
+ oper.out.setSink(successSink);
+
+ CollectorTestSink failureSink = new CollectorTestSink();
+ oper.invalidOutput.setSink(failureSink);
+
+ oper.registValidator(new CDRValidator() {
+ @Override
+ public boolean validateTuple(Map<String, Object> tuple)
+ {
+ return Sets.newHashSet("V", "VOIP", "D", "C", "N", "I", "U", "B", "X", "M", "G").contains(tuple.get("Call Type"));
+ }
+ });
+
+ oper.registValidator(new CDRValidator() {
+
+ @Override
+ public boolean validateTuple(Map<String, Object> tuple)
+ {
+ return (Double)tuple.get("Sell Price") > 0 && (Double)tuple.get("Sell Price") < 1000;
+ }
+ });
+
+ for (Map<String, Object> t : getTestMaps()) {
+ oper.in.process(t);
+ }
+
+ assertEquals("only one valid test tuple", 1, successSink.collectedTuples.size());
+ assertEquals("2 invalid test tuples", 2, failureSink.collectedTuples.size());
+
+ }
+
+ List<Map<String, Object>> getTestMaps(){
+ List<Map<String, Object>> testMaps = new LinkedList<Map<String,Object>>();
+ for (Object[] tupleValue : values) {
+ Map<String, Object> testRecord = new HashMap<String, Object>();
+ int j = 0;
+ for (Object object : tupleValue) {
+ testRecord.put(keys[j++], object);
+ }
+ testMaps.add(testRecord);
+ }
+ return testMaps;
+ }
+
+}
diff --git a/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java
index 73df400..7e13a41 100644
--- a/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java
@@ -26,22 +26,27 @@
*
* @since 0.3.4
*/
-public abstract class FilterOperator extends BaseOperator
+public abstract class FilterOperator<T> extends BaseOperator
{
@InputPortFieldAnnotation(name = "in", optional = false)
- public final transient DefaultInputPort<Object> in = new DefaultInputPort<Object>()
+ public final transient DefaultInputPort<T> in = new DefaultInputPort<T>()
{
@Override
- public void process(Object tuple)
+ public void process(T tuple)
{
if (satisfiesFilter(tuple)) {
out.emit(tuple);
+ } else {
+ handleInvalidTuple(tuple);
}
}
};
@OutputPortFieldAnnotation(name = "out", optional = false)
- public final transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>();
+ public final transient DefaultOutputPort<T> out = new DefaultOutputPort<T>();
- public abstract boolean satisfiesFilter(Object tuple);
+ public abstract boolean satisfiesFilter(T tuple);
+
+ public abstract void handleInvalidTuple(T tuple);
+
}
diff --git a/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java b/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java
index 195eae8..6f771ea 100644
--- a/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java
@@ -16,8 +16,11 @@
package com.datatorrent.lib.util;
import com.datatorrent.api.Context.OperatorContext;
+
import java.util.Map;
+
import javax.script.*;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,7 +29,7 @@
*
* @since 0.3.4
*/
-public class JavaScriptFilterOperator extends FilterOperator
+public class JavaScriptFilterOperator extends FilterOperator<Map<String, Object>>
{
protected transient ScriptEngineManager sem = new ScriptEngineManager();
protected transient ScriptEngine engine = sem.getEngineByName("JavaScript");
@@ -73,36 +76,35 @@
}
@Override
- public boolean satisfiesFilter(Object tuple)
+ public boolean satisfiesFilter(Map<String, Object> tuple)
{
- if (tuple instanceof Map) {
- Map<String, Object> map = (Map<String, Object>)tuple;
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- LOG.debug("Putting {} = {}", entry.getKey(), entry.getValue());
- engine.put(entry.getKey(), entry.getValue());
- }
+ for (Map.Entry<String, Object> entry : tuple.entrySet()) {
+ LOG.debug("Putting {} = {}", entry.getKey(), entry.getValue());
+ engine.put(entry.getKey(), entry.getValue());
}
try {
- Object result = ((Invocable)engine).invokeFunction(functionName);
+ Object result = ((Invocable) engine).invokeFunction(functionName);
if (result instanceof Boolean) {
- return (Boolean)result;
- }
- else if (result instanceof Integer) {
- return ((Integer)result) != 0;
- }
- else if (result instanceof Long) {
- return ((Long)result) != 0;
- }
- else if (result instanceof String) {
- return Boolean.getBoolean((String)result);
- }
- else {
+ return (Boolean) result;
+ } else if (result instanceof Integer) {
+ return ((Integer) result) != 0;
+ } else if (result instanceof Long) {
+ return ((Long) result) != 0;
+ } else if (result instanceof String) {
+ return Boolean.getBoolean((String) result);
+ } else {
LOG.warn("The script result (type: {}) cannot be converted to boolean. Returning false.", result == null ? "null" : result.getClass().getName());
return false;
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
+
+ @Override
+ public void handleInvalidTuple(Map<String, Object> tuple)
+ {
+ // TODO Auto-generated method stub
+
+ }
}