KAFKA-3449: Rename filterOut() to filterNot() to achieve better terminology
…nology
Hi all,
This is my first contribution and I hope it will be good.
The PR is related to this issue:
https://issues.apache.org/jira/browse/KAFKA-3449
Thanks a lot,
Andrea
Author: Andrea Cosentino <ancosen@gmail.com>
Reviewers: Yasuhiro Matsuda, Guozhang Wang
Closes #1134 from oscerd/KAFKA-3449
(cherry picked from commit c1d8c38345e0a1e04ced143ed07e63fe02ceb8b0)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index c4188de..2313b8b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -43,7 +43,7 @@
*
* @param predicate the instance of {@link Predicate}
*/
- KStream<K, V> filterOut(Predicate<K, V> predicate);
+ KStream<K, V> filterNot(Predicate<K, V> predicate);
/**
* Create a new instance of {@link KStream} by transforming each element in this stream into a different element in the new stream.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 9a2a8a8..30ea882 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -42,7 +42,7 @@
*
* @param predicate the instance of {@link Predicate}
*/
- KTable<K, V> filterOut(Predicate<K, V> predicate);
+ KTable<K, V> filterNot(Predicate<K, V> predicate);
/**
* Create a new instance of {@link KTable} by transforming the value of each element in this stream into a new value in the new stream.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
index 0b1f1e0..f5c2fbc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java
@@ -25,11 +25,11 @@
class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {
private final Predicate<K, V> predicate;
- private final boolean filterOut;
+ private final boolean filterNot;
- public KStreamFilter(Predicate<K, V> predicate, boolean filterOut) {
+ public KStreamFilter(Predicate<K, V> predicate, boolean filterNot) {
this.predicate = predicate;
- this.filterOut = filterOut;
+ this.filterNot = filterNot;
}
@Override
@@ -40,7 +40,7 @@
private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
@Override
public void process(K key, V value) {
- if (filterOut ^ predicate.test(key, value)) {
+ if (filterNot ^ predicate.test(key, value)) {
context().forward(key, value);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 567b06c..5889e07 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -106,7 +106,7 @@
}
@Override
- public KStream<K, V> filterOut(final Predicate<K, V> predicate) {
+ public KStream<K, V> filterNot(final Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 72f1d88..080fd9d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -26,14 +26,14 @@
private final KTableImpl<K, ?, V> parent;
private final Predicate<K, V> predicate;
- private final boolean filterOut;
+ private final boolean filterNot;
private boolean sendOldValues = false;
- public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterOut) {
+ public KTableFilter(KTableImpl<K, ?, V> parent, Predicate<K, V> predicate, boolean filterNot) {
this.parent = parent;
this.predicate = predicate;
- this.filterOut = filterOut;
+ this.filterNot = filterNot;
}
@Override
@@ -64,7 +64,7 @@
private V computeValue(K key, V value) {
V newValue = null;
- if (value != null && (filterOut ^ predicate.test(key, value)))
+ if (value != null && (filterNot ^ predicate.test(key, value)))
newValue = value;
return newValue;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ca1e659..fd464a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -111,7 +111,7 @@
}
@Override
- public KTable<K, V> filterOut(final Predicate<K, V> predicate) {
+ public KTable<K, V> filterNot(final Predicate<K, V> predicate) {
String name = topology.newName(FILTER_NAME);
KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index ecf1115..75465c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -60,7 +60,7 @@
}
@Test
- public void testFilterOut() {
+ public void testFilterNot() {
KStreamBuilder builder = new KStreamBuilder();
final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7};
@@ -69,7 +69,7 @@
processor = new MockProcessorSupplier<>();
stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
- stream.filterOut(isMultipleOfThree).process(processor);
+ stream.filterNot(isMultipleOfThree).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
for (int i = 0; i < expectedKeys.length; i++) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 38182bc..b5c3d47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -52,7 +52,7 @@
public boolean test(String key, String value) {
return true;
}
- }).filterOut(new Predicate<String, String>() {
+ }).filterNot(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
return false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 5491ea3..78d274e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -53,7 +53,7 @@
return (value % 2) == 0;
}
});
- KTable<String, Integer> table3 = table1.filterOut(new Predicate<String, Integer>() {
+ KTable<String, Integer> table3 = table1.filterNot(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
@@ -95,7 +95,7 @@
return (value % 2) == 0;
}
});
- KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterOut(
+ KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {