Merge pull request #627 from siyuanh/cdr

#626 Operators to filter invalid CDRs based on some validation rules
diff --git a/apps/src/main/java/com/datatorrent/apps/telecom/operator/CDRCleanOperator.java b/apps/src/main/java/com/datatorrent/apps/telecom/operator/CDRCleanOperator.java
new file mode 100644
index 0000000..18c41d3
--- /dev/null
+++ b/apps/src/main/java/com/datatorrent/apps/telecom/operator/CDRCleanOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.apps.telecom.operator;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.FilterOperator;
+
+public class CDRCleanOperator extends FilterOperator<Map<String, Object>>
+{
+  private transient List<CDRValidator> vList = new LinkedList<CDRCleanOperator.CDRValidator>();
+  
+  @OutputPortFieldAnnotation(name="invalid cdr output", optional=true)
+  public transient DefaultOutputPort<Map<String, Object>> invalidOutput = new DefaultOutputPort<Map<String,Object>>();
+
+  @Override
+  public boolean satisfiesFilter(Map<String, Object> tuple)
+  {
+    for (CDRValidator v : vList) {
+      if(!v.validateTuple(tuple)){
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void handleInvalidTuple(Map<String, Object> tuple)
+  {
+    invalidOutput.emit(tuple);
+  }
+
+  
+  void registValidator(CDRValidator validator){
+    vList.add(validator);
+  }
+  
+  
+  
+  public static interface CDRValidator{
+    boolean validateTuple(Map<String, Object> tuple);
+  }
+  
+}
diff --git a/apps/src/test/java/com/datatorrent/apps/telecom/operator/CDRCleanOperatorTest.java b/apps/src/test/java/com/datatorrent/apps/telecom/operator/CDRCleanOperatorTest.java
new file mode 100644
index 0000000..6c9da9a
--- /dev/null
+++ b/apps/src/test/java/com/datatorrent/apps/telecom/operator/CDRCleanOperatorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.apps.telecom.operator;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+
+import org.junit.Test;
+
+import com.datatorrent.apps.telecom.operator.CDRCleanOperator.CDRValidator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.google.common.collect.Sets;
+
+public class CDRCleanOperatorTest
+{
+  
+  private static final String[] keys = {"Call Type", "Sell Price", "Phone Number"};
+  
+  private static final Object[][] values = {{"V", 132.4, "2039827836"},{"Z", 34.2, "6378948847"}, {"I", -10.0, "3787784738"}};
+
+  @Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testOperator()
+  {
+    CDRCleanOperator oper = new CDRCleanOperator();
+
+    CollectorTestSink successSink = new CollectorTestSink();
+    oper.out.setSink(successSink);
+    
+    CollectorTestSink failureSink = new CollectorTestSink();
+    oper.invalidOutput.setSink(failureSink);
+    
+    oper.registValidator(new CDRValidator() {
+      @Override
+      public boolean validateTuple(Map<String, Object> tuple)
+      {
+        return Sets.newHashSet("V", "VOIP", "D", "C", "N", "I", "U", "B", "X", "M", "G").contains(tuple.get("Call Type"));
+      }
+    });
+    
+    oper.registValidator(new CDRValidator() {
+      
+      @Override
+      public boolean validateTuple(Map<String, Object> tuple)
+      {
+        return (Double)tuple.get("Sell Price") > 0 && (Double)tuple.get("Sell Price") < 1000;
+      }
+    });
+    
+    for (Map<String, Object> t : getTestMaps()) {
+      oper.in.process(t); 
+    }
+    
+    assertEquals("only one valid test tuple", 1, successSink.collectedTuples.size());
+    assertEquals("2 invalid test tuples", 2, failureSink.collectedTuples.size());
+
+  }
+  
+  List<Map<String, Object>> getTestMaps(){
+    List<Map<String, Object>> testMaps = new LinkedList<Map<String,Object>>();
+    for (Object[] tupleValue : values) {
+      Map<String, Object> testRecord = new HashMap<String, Object>();
+      int j = 0;
+      for (Object object : tupleValue) {
+        testRecord.put(keys[j++], object);
+      }
+      testMaps.add(testRecord);
+    }
+    return testMaps;
+  }
+
+}
diff --git a/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java
index 73df400..7e13a41 100644
--- a/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/util/FilterOperator.java
@@ -26,22 +26,27 @@
  *
  * @since 0.3.4
  */
-public abstract class FilterOperator extends BaseOperator
+public abstract class FilterOperator<T> extends BaseOperator
 {
   @InputPortFieldAnnotation(name = "in", optional = false)
-  public final transient DefaultInputPort<Object> in = new DefaultInputPort<Object>()
+  public final transient DefaultInputPort<T> in = new DefaultInputPort<T>()
   {
     @Override
-    public void process(Object tuple)
+    public void process(T tuple)
     {
       if (satisfiesFilter(tuple)) {
         out.emit(tuple);
+      } else { 
+        handleInvalidTuple(tuple);
       }
     }
 
   };
   @OutputPortFieldAnnotation(name = "out", optional = false)
-  public final transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>();
+  public final transient DefaultOutputPort<T> out = new DefaultOutputPort<T>();
 
-  public abstract boolean satisfiesFilter(Object tuple);
+  public abstract boolean satisfiesFilter(T tuple);
+
+  public abstract void handleInvalidTuple(T tuple);
+  
 }
diff --git a/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java b/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java
index 195eae8..6f771ea 100644
--- a/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/util/JavaScriptFilterOperator.java
@@ -16,8 +16,11 @@
 package com.datatorrent.lib.util;
 
 import com.datatorrent.api.Context.OperatorContext;
+
 import java.util.Map;
+
 import javax.script.*;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -26,7 +29,7 @@
  *
  * @since 0.3.4
  */
-public class JavaScriptFilterOperator extends FilterOperator
+public class JavaScriptFilterOperator extends FilterOperator<Map<String, Object>>
 {
   protected transient ScriptEngineManager sem = new ScriptEngineManager();
   protected transient ScriptEngine engine = sem.getEngineByName("JavaScript");
@@ -73,36 +76,35 @@
   }
 
   @Override
-  public boolean satisfiesFilter(Object tuple)
+  public boolean satisfiesFilter(Map<String, Object> tuple)
   {
-    if (tuple instanceof Map) {
-      Map<String, Object> map = (Map<String, Object>)tuple;
-      for (Map.Entry<String, Object> entry : map.entrySet()) {
-        LOG.debug("Putting {} = {}", entry.getKey(), entry.getValue());
-        engine.put(entry.getKey(), entry.getValue());
-      }
+    for (Map.Entry<String, Object> entry : tuple.entrySet()) {
+      LOG.debug("Putting {} = {}", entry.getKey(), entry.getValue());
+      engine.put(entry.getKey(), entry.getValue());
     }
     try {
-      Object result = ((Invocable)engine).invokeFunction(functionName);
+      Object result = ((Invocable) engine).invokeFunction(functionName);
       if (result instanceof Boolean) {
-        return (Boolean)result;
-      }
-      else if (result instanceof Integer) {
-        return ((Integer)result) != 0;
-      }
-      else if (result instanceof Long) {
-        return ((Long)result) != 0;
-      }
-      else if (result instanceof String) {
-        return Boolean.getBoolean((String)result);
-      }
-      else {
+        return (Boolean) result;
+      } else if (result instanceof Integer) {
+        return ((Integer) result) != 0;
+      } else if (result instanceof Long) {
+        return ((Long) result) != 0;
+      } else if (result instanceof String) {
+        return Boolean.getBoolean((String) result);
+      } else {
         LOG.warn("The script result (type: {}) cannot be converted to boolean. Returning false.", result == null ? "null" : result.getClass().getName());
         return false;
       }
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }
+
+  @Override
+  public void handleInvalidTuple(Map<String, Object> tuple)
+  {
+    // TODO Auto-generated method stub
+    
+  }
 }