blob: fb9bc616a5bb84a5b4b93785741a1f47e90b377e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.oplet.plumbing;
import java.util.LinkedList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.Functions;
import org.apache.edgent.oplet.OpletContext;
import org.apache.edgent.oplet.core.Pipe;
import org.apache.edgent.window.Partition;
import org.apache.edgent.window.PartitionedState;
import org.apache.edgent.window.Policies;
import org.apache.edgent.window.Window;
import org.apache.edgent.window.Windows;
/**
* Relieve pressure on upstream oplets by discarding tuples.
* This oplet ensures that upstream processing is not
* constrained by any delay in downstream processing,
* for example by a sink oplet not being able to connect
* to its external system.
* When downstream processing cannot keep up with the input rate
* this oplet maintains a defined window of the most recent
* tuples and discards any earlier tuples using arrival order.
* <P>
* A window partition is maintained for each key seen
* on the input stream. Any tuple arriving on the input
* stream is inserted into the window. Asynchronously
* tuples are taken from the window using FIFO and
* submitted downstream. The submission of tuples maintains
* order within a partition but not across partitions.
* </P>
* <P>
* Tuples are <B>discarded and not</B> submitted to the
* output port if the downstream processing cannot keep up
* the incoming tuple rate.
* </P>
* <UL>
* <LI>For a {@link #PressureReliever(int, Function) count}
* {@code PressureReliever} up to last (most recent) {@code N} tuples
* are maintained in a window partition.
* <BR> Asynchronous tuple submission removes the last (oldest) tuple in the partition
* before submitting it downstream.
* <BR> If when an input tuple is processed the window partition contains N tuples, then
* the first (oldest) tuple in the partition is discarded before the input tuple is inserted into the window.
* </UL>
* <P>
* Insertion of the oplet into a stream disconnects the
* upstream processing from the downstream processing,
* so that downstream processing is executed on a different
* thread to the thread that processed the input tuple.
* </P>
*
* @param <T> Tuple type.
* @param <K> Key type.
*/
public class PressureReliever<T, K> extends Pipe<T, T> {
private static final long serialVersionUID = 1L;
private ScheduledExecutorService executor;
private final Window<T, K, LinkedList<T>> window;
/**
* Pressure reliever that maintains up to {@code count} most recent tuples per key.
*
* @param count Number of tuples to maintain where downstream processing cannot keep up.
* @param keyFunction Key function for tuples.
*/
public PressureReliever(int count, Function<T, K> keyFunction) {
window = Windows.window(
Policies.alwaysInsert(),
Policies.countContentsPolicy(count),
Policies.evictOldest(),
new FirstSubmitter(),
keyFunction,
() -> new LinkedList<T>());
// No processing of the window takes place
window.registerPartitionProcessor((tuples, k) -> { });
}
@Override
public void initialize(OpletContext<T, T> context) {
super.initialize(context);
executor = context.getService(ScheduledExecutorService.class);
}
@Override
public void accept(T tuple) {
window.insert(tuple);
}
@Override
public void close() throws Exception {
}
private class FirstSubmitter extends PartitionedState<K, AtomicBoolean>
implements BiConsumer<Partition<T, K, LinkedList<T>>, T> {
/**
*
*/
private static final long serialVersionUID = 1L;
FirstSubmitter() {
super(() -> new AtomicBoolean());
}
/**
* Process the window (to consume the oldest tuple in the partition)
* only if a tuple from this partition is not already being consumed.
*
* @param t
* @param v
*/
@Override
public void accept(Partition<T, K, LinkedList<T>> partition, T tuple) {
submitNextTuple(partition);
}
private void submitNextTuple(Partition<T, K, LinkedList<T>> partition) {
final K key = partition.getKey();
final AtomicBoolean latch = getState(key);
if (!latch.compareAndSet(false, true))
return;
final T firstTuple;
synchronized (partition) {
final LinkedList<T> contents = partition.getContents();
if (contents.isEmpty()) {
latch.set(false);
return;
}
firstTuple = contents.removeFirst();
}
Runnable submit = Functions.delayedConsume(getDestination(), firstTuple);
submit = Functions.runWithFinal(submit, () -> {
latch.set(false);
submitNextTuple(partition);
});
executor.execute(submit);
}
}
}