blob: 21c53e75f5b924f3f670853e3a73bd3120c7ae24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.core.client.operator;
import static java.util.Objects.requireNonNull;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.audience.Audience;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.Derived;
import org.apache.beam.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryPredicate;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Builders;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.PCollectionLists;
import org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Operator performing a filter operation.
*
* <p>Output elements that pass given condition.
*
* <h3>Builders:</h3>
*
* <ol>
* <li>{@code [named] ..................} give name to the operator [optional]
* <li>{@code of .......................} input dataset
* <li>{@code by .......................} apply {@link UnaryPredicate} to input elements
* <li>{@code output ...................} build output dataset
* </ol>
*/
@Audience(Audience.Type.CLIENT)
@Derived(state = StateComplexity.ZERO, repartitions = 0)
public class Filter<InputT> extends Operator<InputT> implements CompositeOperator<InputT, InputT> {
/**
* Starts building a nameless {@link Filter} operator to process the given input dataset.
*
* @param <InputT> the type of elements of the input dataset
* @param input the input data set to be processed
* @return a builder to complete the setup of the new operator
* @see #named(String)
* @see OfBuilder#of(PCollection)
*/
public static <InputT> ByBuilder<InputT> of(PCollection<InputT> input) {
return named(null).of(input);
}
/**
* Starts building a named {@link Filter} operator.
*
* @param name a user provided name of the new operator to build
* @return a builder to complete the setup of the new operator
*/
public static OfBuilder named(@Nullable String name) {
return new Builder(name);
}
/** Builder for the 'of' step. */
public interface OfBuilder extends Builders.Of {
@Override
<InputT> ByBuilder<InputT> of(PCollection<InputT> input);
}
/** Builder for the 'by' step. */
public interface ByBuilder<InputT> {
/**
* Specifies the function that is capable of input elements filtering.
*
* @param predicate the function that filters out elements if the return value for the element
* is false
* @return the next builder to complete the setup of the operator
*/
Builders.Output<InputT> by(UnaryPredicate<InputT> predicate);
}
private static class Builder<InputT>
implements OfBuilder, ByBuilder<InputT>, Builders.Output<InputT> {
private final @Nullable String name;
private PCollection<InputT> input;
private UnaryPredicate<InputT> predicate;
private Builder(@Nullable String name) {
this.name = name;
}
@Override
public <T> ByBuilder<T> of(PCollection<T> input) {
@SuppressWarnings("unchecked")
final Builder<T> cast = (Builder) this;
cast.input = requireNonNull(input);
return cast;
}
@Override
public Builders.Output<InputT> by(UnaryPredicate<InputT> predicate) {
this.predicate = requireNonNull(predicate);
return this;
}
@Override
public PCollection<InputT> output() {
final Filter<InputT> filter = new Filter<>(name, predicate, input.getTypeDescriptor());
return OperatorTransform.apply(filter, PCollectionList.of(input));
}
}
private final UnaryPredicate<InputT> predicate;
private Filter(
@Nullable String name,
UnaryPredicate<InputT> predicate,
@Nullable TypeDescriptor<InputT> outputType) {
super(name, outputType);
this.predicate = predicate;
}
public UnaryPredicate<InputT> getPredicate() {
return predicate;
}
@Override
public PCollection<InputT> expand(PCollectionList<InputT> inputs) {
return FlatMap.named(getName().orElse(null))
.of(PCollectionLists.getOnlyElement(inputs))
.using(
(InputT element, Collector<InputT> collector) -> {
if (getPredicate().apply(element)) {
collector.collect(element);
}
},
getOutputType().orElse(null))
.output();
}
}