blob: abdd0611f1a463e1a97bd4f825d5f29c46b6c78b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.cep;
import org.apache.flink.annotation.Internal;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.adaptors.PatternFlatSelectAdapter;
import org.apache.flink.cep.functions.adaptors.PatternSelectAdapter;
import org.apache.flink.cep.functions.adaptors.PatternTimeoutFlatSelectAdapter;
import org.apache.flink.cep.functions.adaptors.PatternTimeoutSelectAdapter;
import org.apache.flink.util.OutputTag;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Builder for adapting pre-1.8 functions like {@link PatternFlatSelectFunction}, {@link PatternFlatTimeoutFunction}
* into {@link PatternProcessFunction}.
*/
@Internal
class PatternProcessFunctionBuilder {
/**
* Starts constructing a {@link PatternProcessFunction} from a {@link PatternFlatSelectFunction} that
* emitted elements through {@link org.apache.flink.util.Collector}.
*/
static <IN, OUT> FlatSelectBuilder<IN, OUT> fromFlatSelect(final PatternFlatSelectFunction<IN, OUT> function) {
return new FlatSelectBuilder<>(function);
}
/**
* Starts constructing a {@link PatternProcessFunction} from a {@link PatternSelectFunction} that
* emitted elements through return value.
*/
static <IN, OUT> SelectBuilder<IN, OUT> fromSelect(final PatternSelectFunction<IN, OUT> function) {
return new SelectBuilder<>(function);
}
/**
* Wraps {@link PatternFlatSelectFunction} in a builder. The builder can construct a
* * {@link PatternProcessFunction} adapter.
*/
static class FlatSelectBuilder<IN, OUT> {
private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
FlatSelectBuilder(PatternFlatSelectFunction<IN, OUT> function) {
this.flatSelectFunction = checkNotNull(function);
}
<TIMED_OUT> FlatTimeoutSelectBuilder<IN, OUT, TIMED_OUT> withTimeoutHandler(
final OutputTag<TIMED_OUT> outputTag,
final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler) {
return new FlatTimeoutSelectBuilder<>(flatSelectFunction, timeoutHandler, outputTag);
}
PatternProcessFunction<IN, OUT> build() {
return new PatternFlatSelectAdapter<>(flatSelectFunction);
}
}
/**
* Wraps {@link PatternFlatSelectFunction} and {@link PatternFlatTimeoutFunction} in a builder. The builder will
* create a {@link PatternProcessFunction} adapter that handles timed out partial matches as well.
*/
static class FlatTimeoutSelectBuilder<IN, OUT, TIMED_OUT> {
private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
private final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler;
private final OutputTag<TIMED_OUT> outputTag;
FlatTimeoutSelectBuilder(
final PatternFlatSelectFunction<IN, OUT> flatSelectFunction,
final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler,
final OutputTag<TIMED_OUT> outputTag) {
this.flatSelectFunction = checkNotNull(flatSelectFunction);
this.timeoutHandler = checkNotNull(timeoutHandler);
this.outputTag = checkNotNull(outputTag);
}
PatternProcessFunction<IN, OUT> build() {
return new PatternTimeoutFlatSelectAdapter<>(flatSelectFunction, timeoutHandler, outputTag);
}
}
/**
* Wraps {@link PatternSelectFunction} in a builder. The builder can construct a
* {@link PatternProcessFunction} adapter.
*/
static class SelectBuilder<IN, OUT> {
private final PatternSelectFunction<IN, OUT> selectFunction;
SelectBuilder(PatternSelectFunction<IN, OUT> function) {
this.selectFunction = checkNotNull(function);
}
<TIMED_OUT> TimeoutSelectBuilder<IN, OUT, TIMED_OUT> withTimeoutHandler(
final OutputTag<TIMED_OUT> outputTag,
final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler) {
return new TimeoutSelectBuilder<>(selectFunction, timeoutHandler, outputTag);
}
PatternProcessFunction<IN, OUT> build() {
return new PatternSelectAdapter<>(selectFunction);
}
}
/**
* Wraps {@link PatternSelectFunction} and {@link PatternTimeoutFunction} in a builder. The builder will create a
* {@link PatternProcessFunction} adapter that handles timed out partial matches as well.
*/
static class TimeoutSelectBuilder<IN, OUT, TIMED_OUT> {
private final PatternSelectFunction<IN, OUT> selectFunction;
private final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler;
private final OutputTag<TIMED_OUT> outputTag;
TimeoutSelectBuilder(
final PatternSelectFunction<IN, OUT> flatSelectFunction,
final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler,
final OutputTag<TIMED_OUT> outputTag) {
this.selectFunction = checkNotNull(flatSelectFunction);
this.timeoutHandler = checkNotNull(timeoutHandler);
this.outputTag = checkNotNull(outputTag);
}
PatternProcessFunction<IN, OUT> build() {
return new PatternTimeoutSelectAdapter<>(selectFunction, timeoutHandler, outputTag);
}
}
}