KAFKA-3449: Rename filterOut() to filterNot() to achieve better terminology

…nology

Hi all,

This is my first contribution and I hope it will be good.

The PR is related to this issue:
https://issues.apache.org/jira/browse/KAFKA-3449

Thanks a lot,

Andrea

Author: Andrea Cosentino <ancosen@gmail.com>

Reviewers: Yasuhiro Matsuda, Guozhang Wang

Closes #1134 from oscerd/KAFKA-3449

(cherry picked from commit c1d8c38345e0a1e04ced143ed07e63fe02ceb8b0)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c4188de..2313b8b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -43,7 +43,7 @@
      *
      * @param predicate     the instance of {@link Predicate}
      */
-    KStream<K, V> filterOut(Predicate<K, V> predicate);
+    KStream<K, V> filterNot(Predicate<K, V> predicate);
 
     /**
      * Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 9a2a8a8..30ea882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -42,7 +42,7 @@
      *
      * @param predicate     the instance of {@link Predicate}
      */
-    KTable<K, V> filterOut(Predicate<K, V> predicate);
+    KTable<K, V> filterNot(Predicate<K, V> predicate);
 
     /**
      * Create a new instance of {@link KTable} by transforming the value of each element in this stream into a new value in the new stream.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index 0b1f1e0..f5c2fbc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -25,11 +25,11 @@
 class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
 
     private final Predicate<K, V> predicate;
-    private final boolean filterOut;
+    private final boolean filterNot;
 
-    public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) {
+    public KStreamFilter(Predicate<K, V> predicate, boolean filterNot) {
         this.predicate = predicate;
-        this.filterOut = filterOut;
+        this.filterNot = filterNot;
     }
 
     @Override
@@ -40,7 +40,7 @@
     private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
         @Override
         public void process(K key, V value) {
-            if (filterOut ^ predicate.test(key, value)) {
+            if (filterNot ^ predicate.test(key, value)) {
                 context().forward(key, value);
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 567b06c..5889e07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -106,7 +106,7 @@
     }
 
     @Override
-    public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
+    public KStream<K, V> filterNot(final Predicate<K, V> predicate) {
         String name = topology.newName(FILTER_NAME);
 
         topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 72f1d88..080fd9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -26,14 +26,14 @@
 
     private final KTableImpl<K, ?, V> parent;
     private final Predicate<K, V> predicate;
-    private final boolean filterOut;
+    private final boolean filterNot;
 
     private boolean sendOldValues = false;
 
-    public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterOut) {
+    public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterNot) {
         this.parent = parent;
         this.predicate = predicate;
-        this.filterOut = filterOut;
+        this.filterNot = filterNot;
     }
 
     @Override
@@ -64,7 +64,7 @@
     private V computeValue(K key, V value) {
         V newValue = null;
 
-        if (value != null && (filterOut ^ predicate.test(key, value)))
+        if (value != null && (filterNot ^ predicate.test(key, value)))
             newValue = value;
 
         return newValue;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ca1e659..fd464a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -111,7 +111,7 @@
     }
 
     @Override
-    public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
+    public KTable<K, V> filterNot(final Predicate<K, V> predicate) {
         String name = topology.newName(FILTER_NAME);
         KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index ecf1115..75465c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -60,7 +60,7 @@
     }
 
     @Test
-    public void testFilterOut() {
+    public void testFilterNot() {
         KStreamBuilder builder = new KStreamBuilder();
         final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
 
@@ -69,7 +69,7 @@
 
         processor = new MockProcessorSupplier<>();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
-        stream.filterOut(isMultipleOfThree).process(processor);
+        stream.filterNot(isMultipleOfThree).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);
         for (int i = 0; i < expectedKeys.length; i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 38182bc..b5c3d47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -52,7 +52,7 @@
                 public boolean test(String key, String value) {
                     return true;
                 }
-            }).filterOut(new Predicate<String, String>() {
+            }).filterNot(new Predicate<String, String>() {
                 @Override
                 public boolean test(String key, String value) {
                     return false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 5491ea3..78d274e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -53,7 +53,7 @@
                 return (value % 2) == 0;
             }
         });
-        KTable<String, Integer> table3 = table1.filterOut(new Predicate<String, Integer>() {
+        KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
             @Override
             public boolean test(String key, Integer value) {
                 return (value % 2) == 0;
@@ -95,7 +95,7 @@
                             return (value % 2) == 0;
                         }
                     });
-            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterOut(
+            KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
                     new Predicate<String, Integer>() {
                         @Override
                         public boolean test(String key, Integer value) {