[BEAM-342] Implement Filter#greaterThan,etc with Filter#byPredicate
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index d725e0a..3e4440c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -235,7 +235,7 @@
.of(larger.get(1).apply(ParDo.of(new FlattenTops())))
// ...together with those (previously excluded) candidates of length
// exactly minPrefix...
- .and(input.apply(Filter.byPredicate(
+ .and(input.apply(Filter.by(
new SerializableFunction<CompletionCandidate, Boolean>() {
@Override
public Boolean apply(CompletionCandidate c) {
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 93dd0be..b2ed9a2 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -85,13 +85,13 @@
PCollection<KV<String, List<CompletionCandidate>>> output =
input.apply(new ComputeTopCompletions(2, recursive))
- .apply(Filter.byPredicate(
- new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
- @Override
- public Boolean apply(KV<String, List<CompletionCandidate>> element) {
- return element.getKey().length() <= 2;
- }
- }));
+ .apply(Filter.by(
+ new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() {
+ @Override
+ public Boolean apply(KV<String, List<CompletionCandidate>> element) {
+ return element.getKey().length() <= 2;
+ }
+ }));
PAssert.that(output).containsInAnyOrder(
KV.of("a", parseList("apple:2", "apricot:1")),
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index d491741..0ad1a04 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -55,7 +55,7 @@
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(TypeDescriptors.strings()))
- .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+ .apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 845c56f..ba3983d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -44,7 +44,7 @@
/**
* This class is the second in a series of four pipelines that tell a story in a 'gaming'
* domain, following {@link UserScore}. In addition to the concepts introduced in {@link UserScore},
- * new concepts include: windowing and element timestamps; use of {@code Filter.byPredicate()}.
+ * new concepts include: windowing and element timestamps; use of {@code Filter.by()}.
*
* <p> This pipeline processes data collected from gaming events in batch, building on {@link
* UserScore} but using fixed windows. It calculates the sum of scores per team, for each window,
@@ -164,10 +164,10 @@
// (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
// that fall after the time period we want to analyze.
// [START DocInclude_HTSFilters]
- .apply("FilterStartTime", Filter.byPredicate(
+ .apply("FilterStartTime", Filter.by(
(GameActionInfo gInfo)
-> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
- .apply("FilterEndTime", Filter.byPredicate(
+ .apply("FilterEndTime", Filter.by(
(GameActionInfo gInfo)
-> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
// [END DocInclude_HTSFilters]
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index f73250f..4dfa474 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -66,7 +66,7 @@
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(TypeDescriptors.strings()))
- .apply(Filter.byPredicate((String word) -> !word.isEmpty()))
+ .apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 5ff615a..4254902 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -96,7 +96,7 @@
PCollection<KV<String, Integer>> output = input
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
- .apply("FilterStartTime", Filter.byPredicate(
+ .apply("FilterStartTime", Filter.by(
(GameActionInfo gInfo)
-> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
// run a map to access the fields in the result.
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index 9d1168b..d83e662 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -215,13 +215,13 @@
// ...together with those (previously excluded) candidates of length
// exactly minPrefix...
.and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() {
- private static final long serialVersionUID = 0;
+ private static final long serialVersionUID = 0;
- @Override
- public Boolean apply(CompletionCandidate c) {
- return c.getValue().length() == minPrefix;
- }
- })))
+ @Override
+ public Boolean apply(CompletionCandidate c) {
+ return c.getValue().length() == minPrefix;
+ }
+ })))
.apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections())
// ...set the key to be the minPrefix-length prefix...
.apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix)))
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 57796b8..a31799e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -41,7 +41,7 @@
* <pre> {@code
* PCollection<String> wordList = ...;
* PCollection<String> longWords =
- * wordList.apply(Filter.byPredicate(new MatchIfWordLengthGT(6)));
+ * wordList.apply(Filter.by(new MatchIfWordLengthGT(6)));
* } </pre>
*
* <p>See also {@link #lessThan}, {@link #lessThanEq},
@@ -50,25 +50,8 @@
* the elements' natural ordering.
*/
public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T>
- byPredicate(PredicateT predicate) {
- return new Filter<T>("Filter", predicate);
- }
-
- /**
- * @deprecated use {@link #byPredicate}, which returns a {@link Filter} transform instead of
- * a {@link ParDo.Bound}.
- */
- @Deprecated
- public static <T, PredicateT extends SerializableFunction<T, Boolean>> ParDo.Bound<T, T>
- by(final PredicateT filterPred) {
- return ParDo.named("Filter").of(new DoFn<T, T>() {
- @Override
- public void processElement(ProcessContext c) {
- if (filterPred.apply(c.element()) == true) {
- c.output(c.element());
- }
- }
- });
+ by(PredicateT predicate) {
+ return new Filter<>(predicate);
}
/**
@@ -89,24 +72,16 @@
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThan(final T value) {
- return ParDo.named("Filter.lessThan").of(new DoFn<T, T>() {
+ public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) < 0) {
- c.output(c.element());
- }
+ public Boolean apply(T input) {
+ return input.compareTo(value) < 0;
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x < %s", value));
- }
- });
+ }).described(String.format("x < %s", value));
}
@@ -128,24 +103,16 @@
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThan(final T value) {
- return ParDo.named("Filter.greaterThan").of(new DoFn<T, T>() {
+ public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) > 0) {
- c.output(c.element());
- }
+ public Boolean apply(T input) {
+ return input.compareTo(value) > 0;
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x > %s", value));
- }
- });
+ }).described(String.format("x > %s", value));
}
/**
@@ -166,24 +133,16 @@
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> lessThanEq(final T value) {
- return ParDo.named("Filter.lessThanEq").of(new DoFn<T, T>() {
+ public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) <= 0) {
- c.output(c.element());
- }
+ public Boolean apply(T input) {
+ return input.compareTo(value) <= 0;
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x ≤ %s", value));
- }
- });
+ }).described(String.format("x ≤ %s", value));
}
/**
@@ -204,46 +163,46 @@
* inequalities with the specified value based on the elements'
* natural ordering.
*
- * <p>See also {@link #byPredicate}, which returns elements
+ * <p>See also {@link #by}, which returns elements
* that satisfy the given predicate.
*/
- public static <T extends Comparable<T>> ParDo.Bound<T, T> greaterThanEq(final T value) {
- return ParDo.named("Filter.greaterThanEq").of(new DoFn<T, T>() {
+ public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
+ return by(new SerializableFunction<T, Boolean>() {
@Override
- public void processElement(ProcessContext c) {
- if (c.element().compareTo(value) >= 0) {
- c.output(c.element());
- }
+ public Boolean apply(T input) {
+ return input.compareTo(value) >= 0;
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- Filter.populateDisplayData(builder, String.format("x ≥ %s", value));
- }
- });
+ }).described(String.format("x ≥ %s", value));
}
///////////////////////////////////////////////////////////////////////////////
private SerializableFunction<T, Boolean> predicate;
+ private String predicateDescription;
private Filter(SerializableFunction<T, Boolean> predicate) {
- this.predicate = predicate;
+ this(predicate, "Filter.predicate");
}
- private Filter(String name, SerializableFunction<T, Boolean> predicate) {
- super(name);
+ private Filter(SerializableFunction<T, Boolean> predicate,
+ String predicateDescription) {
this.predicate = predicate;
+ this.predicateDescription = predicateDescription;
}
- public Filter<T> named(String name) {
- return new Filter<>(name, predicate);
+ /**
+ * Returns a new {@link Filter} {@link PTransform} that's like this
+ * {@link PTransform} but with the specified description for {@link DisplayData}. Does not
+ * modify this {@link PTransform}.
+ */
+ Filter<T> described(String description) {
+ return new Filter<>(predicate, description);
+
}
@Override
public PCollection<T> apply(PCollection<T> input) {
- PCollection<T> output = input.apply(ParDo.named("Filter").of(new DoFn<T, T>() {
+ PCollection<T> output = input.apply(ParDo.of(new DoFn<T, T>() {
@Override
public void processElement(ProcessContext c) {
if (predicate.apply(c.element()) == true) {
@@ -259,8 +218,9 @@
return input.getCoder();
}
- private static void populateDisplayData(
- DisplayData.Builder builder, String predicateDescription) {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
builder.add(DisplayData.item("predicate", predicateDescription)
.withLabel("Filter Predicate"));
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index 367bbc0..2edab05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -21,7 +21,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
-import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -61,48 +60,6 @@
}
}
- @Deprecated
- @Test
- @Category(RunnableOnService.class)
- public void testIdentityFilterBy() {
- TestPipeline p = TestPipeline.create();
-
- PCollection<Integer> output = p
- .apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
- .apply(Filter.by(new TrivialFn(true)));
-
- PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
- p.run();
- }
-
- @Deprecated
- @Test
- @Category(NeedsRunner.class)
- public void testNoFilter() {
- TestPipeline p = TestPipeline.create();
-
- PCollection<Integer> output = p
- .apply(Create.of(1, 2, 4, 5))
- .apply(Filter.by(new TrivialFn(false)));
-
- PAssert.that(output).empty();
- p.run();
- }
-
- @Deprecated
- @Test
- @Category(RunnableOnService.class)
- public void testFilterBy() {
- TestPipeline p = TestPipeline.create();
-
- PCollection<Integer> output = p
- .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.by(new EvenFn()));
-
- PAssert.that(output).containsInAnyOrder(2, 4, 6);
- p.run();
- }
-
@Test
@Category(RunnableOnService.class)
public void testIdentityFilterByPredicate() {
@@ -110,7 +67,7 @@
PCollection<Integer> output = p
.apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
- .apply(Filter.byPredicate(new TrivialFn(true)));
+ .apply(Filter.by(new TrivialFn(true)));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
p.run();
@@ -123,7 +80,7 @@
PCollection<Integer> output = p
.apply(Create.of(1, 2, 4, 5))
- .apply(Filter.byPredicate(new TrivialFn(false)));
+ .apply(Filter.by(new TrivialFn(false)));
PAssert.that(output).empty();
p.run();
@@ -136,7 +93,7 @@
PCollection<Integer> output = p
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.byPredicate(new EvenFn()));
+ .apply(Filter.by(new EvenFn()));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
p.run();
@@ -169,17 +126,39 @@
}
@Test
+ @Category(RunnableOnService.class)
+ public void testFilterLessThanEq() {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<Integer> output = p
+ .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+ .apply(Filter.lessThanEq(4));
+
+ PAssert.that(output).containsInAnyOrder(1, 2, 3, 4);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testFilterGreaterThanEq() {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<Integer> output = p
+ .apply(Create.of(1, 2, 3, 4, 5, 6, 7))
+ .apply(Filter.greaterThanEq(4));
+
+ PAssert.that(output).containsInAnyOrder(4, 5, 6, 7);
+ p.run();
+ }
+
+ @Test
public void testDisplayData() {
- ParDo.Bound<Integer, Integer> lessThan = Filter.lessThan(123);
- assertThat(DisplayData.from(lessThan), hasDisplayItem("predicate", "x < 123"));
+ assertThat(DisplayData.from(Filter.lessThan(123)), hasDisplayItem("predicate", "x < 123"));
- ParDo.Bound<Integer, Integer> lessThanOrEqual = Filter.lessThanEq(234);
- assertThat(DisplayData.from(lessThanOrEqual), hasDisplayItem("predicate", "x ≤ 234"));
+ assertThat(DisplayData.from(Filter.lessThanEq(234)), hasDisplayItem("predicate", "x ≤ 234"));
- ParDo.Bound<Integer, Integer> greaterThan = Filter.greaterThan(345);
- assertThat(DisplayData.from(greaterThan), hasDisplayItem("predicate", "x > 345"));
+ assertThat(DisplayData.from(Filter.greaterThan(345)), hasDisplayItem("predicate", "x > 345"));
- ParDo.Bound<Integer, Integer> greaterThanOrEqual = Filter.greaterThanEq(456);
- assertThat(DisplayData.from(greaterThanOrEqual), hasDisplayItem("predicate", "x ≥ 456"));
+ assertThat(DisplayData.from(Filter.greaterThanEq(456)), hasDisplayItem("predicate", "x ≥ 456"));
}
}
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
index 170071b..3c83be2 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FilterJava8Test.java
@@ -50,7 +50,7 @@
PCollection<Integer> output = pipeline
.apply(Create.of(591, 11789, 1257, 24578, 24799, 307))
- .apply(Filter.byPredicate(i -> true));
+ .apply(Filter.by(i -> true));
PAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307);
pipeline.run();
@@ -62,7 +62,7 @@
PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 4, 5))
- .apply(Filter.byPredicate(i -> false));
+ .apply(Filter.by(i -> false));
PAssert.that(output).empty();
pipeline.run();
@@ -75,7 +75,7 @@
PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.byPredicate(i -> i % 2 == 0));
+ .apply(Filter.by(i -> i % 2 == 0));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
pipeline.run();
@@ -105,7 +105,7 @@
PCollection<Integer> output = pipeline
.apply(Create.of(1, 2, 3, 4, 5, 6, 7))
- .apply(Filter.byPredicate(new EvenFilter()::isEven));
+ .apply(Filter.by(new EvenFilter()::isEven));
PAssert.that(output).containsInAnyOrder(2, 4, 6);
pipeline.run();