blob: 06486ac171832bd6f176c7225ab63a0ec49eacbf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.streams.processors;
import static org.apache.storm.streams.WindowNode.PUNCTUATION;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
/**
* Base implementation of the {@link Processor} interface that provides convenience methods {@link #execute(Object)} and {@link #finish()}.
*/
abstract class BaseProcessor<T> implements Processor<T> {
private final Set<String> punctuationState = new HashSet<>();
protected ProcessorContext context;
/**
* {@inheritDoc}
*/
@Override
public void init(ProcessorContext context) {
this.context = context;
}
/**
* {@inheritDoc} Processors that do not care about the source stream should override {@link BaseProcessor#execute(Object)} instead.
*/
@Override
public void execute(T input, String streamId) {
execute(input);
}
/**
* Execute some operation on the input value. Sub classes can override this when then don't care about the source stream from where the
* input is received.
*
* @param input the input
*/
protected void execute(T input) {
// NOOP
}
/**
* {@inheritDoc}
*/
@Override
public void punctuate(String stream) {
if ((stream == null) || shouldPunctuate(stream)) {
finish();
context.forward(PUNCTUATION);
punctuationState.clear();
}
}
/**
* This is triggered to signal the end of the current batch of values. Sub classes can override this to emit the result of a batch of
* values, for e.g. to emit the result of an aggregate or join operation on a batch of values. If a processor does per-value operation
* like filter, map etc, they can choose to ignore this.
*/
protected void finish() {
// NOOP
}
/**
* Forwards the result update to downstream processors. Processors that operate on a batch of tuples, like aggregation, join etc can use
* this to emit the partial results on each input if they are operating in non-windowed mode.
*
* @param result the result function
* @param <R> the result type
*/
protected final <R> void mayBeForwardAggUpdate(Supplier<R> result) {
if (!context.isWindowed()) {
context.forward(result.get());
}
}
private boolean shouldPunctuate(String parentStream) {
punctuationState.add(parentStream);
return punctuationState.equals(context.getWindowedParentStreams());
}
}