blob: da0a2175efb8b2f7c28943c6d6251f376daeb943 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.window;
import static org.apache.edgent.window.Policies.alwaysInsert;
import static org.apache.edgent.window.Policies.countContentsPolicy;
import static org.apache.edgent.window.Policies.evictOldest;
import static org.apache.edgent.window.Policies.processOnInsert;
import java.util.LinkedList;
import java.util.List;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Supplier;
/**
* Factory to create {@code Window} implementations.
*
*/
public class Windows {
/**
* Create a window using the passed in policies.
*
* @param <T> Tuple type.
* @param <K> Key type.
* @param <L> List type.
*
* @param insertionPolicy Policy indicating if a tuple should be inserted
* into the window.
* @param contentsPolicy Contents policy called prior to insertion of a tuple.
* @param evictDeterminer Policy that determines action to take when
* {@link Partition#evict()} is called.
* @param triggerPolicy Trigger policy that is invoked after the insertion
* of a tuple into a partition.
* @param keyFunction Function that gets the partition key from a tuple.
* @param listSupplier Supplier function for the {@code List} that holds
* tuples within a partition.
* @return Window using the passed in policies.
*/
public static <T, K, L extends List<T>> Window<T, K, L> window(
BiFunction<Partition<T, K, L>, T, Boolean> insertionPolicy,
BiConsumer<Partition<T, K, L>, T> contentsPolicy,
Consumer<Partition<T, K, L> > evictDeterminer,
BiConsumer<Partition<T, K, L>, T> triggerPolicy,
Function<T, K> keyFunction,
Supplier<L> listSupplier){
return new WindowImpl<>(insertionPolicy, contentsPolicy, evictDeterminer, triggerPolicy, keyFunction, listSupplier);
}
/**
* Return a window that maintains the last {@code count} tuples inserted
* with processing triggered on every insert. This provides
* a continuous processing, where processing is invoked every
* time the window changes. Since insertion drives eviction
* there is no need to process on eviction, thus once the window
* has reached {@code count} tuples, each insertion results in an
* eviction followed by processing of {@code count} tuples
* including the tuple just inserted, which is the definition of
* the window.
*
* @param <T> Tuple type.
* @param <K> Key type.
*
* @param count Number of tuple to maintain per partition
* @param keyFunction Tuple partitioning key function
* @return window that maintains the last {@code count} tuples on a stream
*/
public static <T, K> Window<T, K, LinkedList<T>> lastNProcessOnInsert(final int count,
Function<T, K> keyFunction) {
Window<T, K, LinkedList<T>> window = Windows.window(
alwaysInsert(),
countContentsPolicy(count),
evictOldest(),
processOnInsert(),
keyFunction,
() -> new LinkedList<T>());
return window;
}
}