Merge pull request #55 from Hrushi20/iss449

[Streampipes-449] Update Processing Element API in module streampipes
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index 24c545a..df52d22 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -29,12 +29,12 @@
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.filters.jvm.processor.booleanfilter.BooleanFilterProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
-import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichController;
+import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeProcessor;
+import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
-import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeController;
+import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterController;
+import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
 import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
@@ -58,11 +58,11 @@
                     new NumericalFilterProcessor(),
                     new ThresholdDetectionProcessor(),
                     new ProjectionProcessor(),
-                    new MergeByEnrichController(),
-                    new MergeByTimeController(),
+                    new MergeByEnrichProcessor(),
+                    new MergeByTimeProcessor(),
                     new MergeBySchemaProcessor(),
-                    new ComposeController(),
-                    new NumericalTextFilterController(),
+                    new ComposeProcessor(),
+                    new NumericalTextFilterProcessor(),
                     new RateLimitController())
             .registerMessagingFormats(
                     new JsonDataFormatFactory(),
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/Compose.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/Compose.java
deleted file mode 100644
index 5cb9620..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/Compose.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.compose;
-
-import org.apache.streampipes.model.constants.PropertySelectorConstants;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class Compose implements EventProcessor<ComposeParameters> {
-
-  private Map<String, Event> lastEvents;
-  private EventSchema outputSchema;
-  private List<String> outputKeySelectors;
-
-
-  @Override
-  public void onInvocation(ComposeParameters composeParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    this.outputSchema = composeParameters.getGraph().getOutputStream().getEventSchema();
-    this.outputKeySelectors = composeParameters.getOutputKeySelectors();
-    this.lastEvents = new HashMap<>();
-  }
-
-  @Override
-  public void onDetach() {
-    this.lastEvents.clear();
-  }
-
-  @Override
-  public void onEvent(Event event, SpOutputCollector spOutputCollector) {
-    this.lastEvents.put(event.getSourceInfo().getSelectorPrefix(), event);
-    if (lastEvents.size() == 2) {
-      spOutputCollector.collect(buildOutEvent(event.getSourceInfo().getSelectorPrefix()));
-    }
-  }
-
-  private Event buildOutEvent(String currentSelectorPrefix) {
-    return EventFactory.fromEvents(lastEvents.get(currentSelectorPrefix), lastEvents.get
-            (getOtherSelectorPrefix(currentSelectorPrefix)), outputSchema).getSubset(outputKeySelectors);
-  }
-
-  private String getOtherSelectorPrefix(String currentSelectorPrefix) {
-    return currentSelectorPrefix.equals(PropertySelectorConstants.FIRST_STREAM_ID_PREFIX) ?
-            PropertySelectorConstants.SECOND_STREAM_ID_PREFIX : PropertySelectorConstants
-            .FIRST_STREAM_ID_PREFIX;
-  }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeController.java
deleted file mode 100644
index aa22d9f..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeController.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.compose;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class ComposeController extends StandaloneEventProcessingDeclarer<ComposeParameters> {
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.compose")
-            .category(DataProcessorType.TRANSFORM)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-            .withLocales(Locales.EN)
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-                    .build())
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-                    .build())
-            .outputStrategy(OutputStrategies.custom(true))
-            .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<ComposeParameters> onInvocation(DataProcessorInvocation graph,
-                                                                  ProcessingElementParameterExtractor extractor) {
-
-    List<String> outputKeySelectors = extractor.outputKeySelectors();
-
-    ComposeParameters staticParam = new ComposeParameters(
-            graph, outputKeySelectors);
-
-    return new ConfiguredEventProcessor<>(staticParam, Compose::new);
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeParameters.java
deleted file mode 100644
index 2c0b429..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeParameters.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.compose;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class ComposeParameters extends EventProcessorBindingParams {
-
-  private List<String> outputKeySelectors;
-
-  public ComposeParameters(DataProcessorInvocation graph, List<String> outputKeySelectors) {
-    super(graph);
-    this.outputKeySelectors = outputKeySelectors;
-  }
-
-  public List<String> getOutputKeySelectors() {
-    return outputKeySelectors;
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeProcessor.java
new file mode 100644
index 0000000..341f26c
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeProcessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.filters.jvm.processor.compose;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.constants.PropertySelectorConstants;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ComposeProcessor extends StreamPipesDataProcessor {
+
+  private List<String> outputKeySelectors;
+  private Map<String, Event> lastEvents;
+  private EventSchema outputSchema;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.compose")
+            .category(DataProcessorType.TRANSFORM)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .outputStrategy(OutputStrategies.custom(true))
+            .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+    this.outputKeySelectors = processorParams.extractor().outputKeySelectors();
+    this.outputSchema = processorParams.getGraph().getOutputStream().getEventSchema();
+    this.lastEvents = new HashMap<>();
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+    this.lastEvents.put(event.getSourceInfo().getSelectorPrefix(), event);
+    if (lastEvents.size() == 2) {
+      spOutputCollector.collect(buildOutEvent(event.getSourceInfo().getSelectorPrefix()));
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+    this.lastEvents.clear();
+  }
+
+
+  private Event buildOutEvent(String currentSelectorPrefix) {
+    return EventFactory.fromEvents(lastEvents.get(currentSelectorPrefix), lastEvents.get
+            (getOtherSelectorPrefix(currentSelectorPrefix)), outputSchema).getSubset(outputKeySelectors);
+  }
+
+  private String getOtherSelectorPrefix(String currentSelectorPrefix) {
+    return currentSelectorPrefix.equals(PropertySelectorConstants.FIRST_STREAM_ID_PREFIX) ?
+            PropertySelectorConstants.SECOND_STREAM_ID_PREFIX : PropertySelectorConstants
+            .FIRST_STREAM_ID_PREFIX;
+  }
+
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrich.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrich.java
deleted file mode 100644
index 5f24c3e..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrich.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.enrich;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-
-public class MergeByEnrich implements EventProcessor<MergeByEnrichParameters> {
-
-  private EventSchema outputSchema;
-  private List<String> outputKeySelectors;
-  private String selectedStream;
-
-  private Event eventBuffer;
-
-  @Override
-  public void onInvocation(MergeByEnrichParameters composeParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    this.outputSchema = composeParameters.getGraph().getOutputStream().getEventSchema();
-    this.outputKeySelectors = composeParameters.getOutputKeySelectors();
-
-    if (composeParameters.getSelectedStream().equals("Stream 1")) {
-      this.selectedStream = "s0";
-    } else {
-      this.selectedStream = "s1";
-    }
-
-    this.eventBuffer = null;
-  }
-
-
-  @Override
-  public void onEvent(Event event, SpOutputCollector spOutputCollector) {
-    String streamId = event.getSourceInfo().getSelectorPrefix();
-
-    // Enrich the selected stream and store last event of other stream
-    if (this.selectedStream.equals(streamId)) {
-      if (this.eventBuffer != null) {
-        Event result = mergeEvents(event, this.eventBuffer);
-        spOutputCollector.collect(result);
-      }
-    } else {
-      this.eventBuffer = event;
-    }
-
-  }
-
-  @Override
-  public void onDetach() {
-    this.eventBuffer = null;
-  }
-
-  private Event mergeEvents(Event e1, Event e2) {
-    return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
-  }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
deleted file mode 100644
index 62dee80..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.enrich;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class MergeByEnrichController extends StandaloneEventProcessingDeclarer<MergeByEnrichParameters> {
-
-  private static final String SELECT_STREAM = "select-stream";
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.enrich")
-            .category(DataProcessorType.TRANSFORM)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-            .withLocales(Locales.EN)
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-                    .build())
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-                    .build())
-            .requiredSingleValueSelection(Labels.withId(SELECT_STREAM),
-                    Options.from("Stream 1", "Stream 2"))
-            .outputStrategy(OutputStrategies.custom(true))
-            .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<MergeByEnrichParameters>
-  onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
-    List<String> outputKeySelectors = extractor.outputKeySelectors();
-
-    String selectedStream = extractor.selectedSingleValue(SELECT_STREAM, String.class);
-
-    MergeByEnrichParameters staticParam = new MergeByEnrichParameters(
-            graph, outputKeySelectors, selectedStream);
-
-    return new ConfiguredEventProcessor<>(staticParam, MergeByEnrich::new);
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichParameters.java
deleted file mode 100644
index 2a5881f..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichParameters.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.enrich;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class MergeByEnrichParameters extends EventProcessorBindingParams {
-
-  private List<String> outputKeySelectors;
-  private String selectedStream;
-
-  public MergeByEnrichParameters(DataProcessorInvocation graph, List<String> outputKeySelectors, String selectedStream) {
-    super(graph);
-    this.outputKeySelectors = outputKeySelectors;
-    this.selectedStream = selectedStream;
-  }
-
-  public List<String> getOutputKeySelectors() {
-    return outputKeySelectors;
-  }
-
-  public String getSelectedStream() {
-    return selectedStream;
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessor.java
new file mode 100644
index 0000000..d7a4e78
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.filters.jvm.processor.enrich;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.List;
+
+public class MergeByEnrichProcessor extends StreamPipesDataProcessor {
+
+  private static final String SELECT_STREAM = "select-stream";
+
+  private List<String> outputKeySelectors;
+  private String selectedStream;
+  private EventSchema outputSchema;
+  private Event eventBuffer;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.enrich")
+            .category(DataProcessorType.TRANSFORM)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredProperty(EpRequirements.anyProperty())
+                    .build())
+            .requiredSingleValueSelection(Labels.withId(SELECT_STREAM),
+                    Options.from("Stream 1", "Stream 2"))
+            .outputStrategy(OutputStrategies.custom(true))
+            .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+    this.outputKeySelectors = processorParams.extractor().outputKeySelectors();
+
+    this.selectedStream = processorParams.extractor().selectedSingleValue(SELECT_STREAM, String.class);
+
+    this.outputSchema = processorParams.getGraph().getOutputStream().getEventSchema();
+
+    if (this.selectedStream.equals("Stream 1")) {
+      this.selectedStream = "s0";
+    } else {
+      this.selectedStream = "s1";
+    }
+
+    this.eventBuffer = null;
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+    String streamId = event.getSourceInfo().getSelectorPrefix();
+
+    // Enrich the selected stream and store last event of other stream
+    if (this.selectedStream.equals(streamId)) {
+      if (this.eventBuffer != null) {
+        Event result = mergeEvents(event, this.eventBuffer);
+        spOutputCollector.collect(result);
+      }
+    } else {
+      this.eventBuffer = event;
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+
+  private Event mergeEvents(Event e1, Event e2) {
+    return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
+  }
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTime.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTime.java
deleted file mode 100644
index c53cb1c..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTime.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.merge;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-
-public class MergeByTime implements EventProcessor<MergeByTimeParameters> {
-
-  private EventSchema outputSchema;
-  private List<String> outputKeySelectors;
-
-  private String timestampFieldStream0;
-  private String timestampFieldStream1;
-  private Integer timeInterval;
-
-  private StreamBuffer streamBufferS0;
-  private StreamBuffer streamBufferS1;
-
-  @Override
-  public void onInvocation(MergeByTimeParameters composeParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-    this.outputSchema = composeParameters.getGraph().getOutputStream().getEventSchema();
-    this.outputKeySelectors = composeParameters.getOutputKeySelectors();
-    this.timestampFieldStream0 = composeParameters.getTimestampFieldStream1();
-    this.timestampFieldStream1 = composeParameters.getTimestampFieldStream2();
-    this.timeInterval = composeParameters.getTimeInterval();
-
-    this.streamBufferS0 = new StreamBuffer(this.timestampFieldStream0);
-    this.streamBufferS1 = new StreamBuffer(this.timestampFieldStream1);
-  }
-
-
-  @Override
-  public void onEvent(Event event, SpOutputCollector spOutputCollector) {
-    String streamId = event.getSourceInfo().getSelectorPrefix();
-
-    // Decide to which buffer the event should be added
-    if ("s0".equals(streamId)) {
-      this.streamBufferS0.add(event);
-    } else {
-      this.streamBufferS1.add(event);
-    }
-
-    // Calculate matching events between data streams
-    for (Event e0 : this.streamBufferS0.getList()) {
-      long time0 = e0.getFieldBySelector(timestampFieldStream0).getAsPrimitive().getAsLong();
-      for (Event e1 : this.streamBufferS1.getList()) {
-        long time1 = e1.getFieldBySelector(timestampFieldStream1).getAsPrimitive().getAsLong();
-
-        if (time0 + timeInterval > time1 && time1 > time0 - timeInterval) {
-          Event resultingEvent = mergeEvents(e0, e1);
-          spOutputCollector.collect(resultingEvent);
-          this.streamBufferS0.removeOldEvents(time0);
-          this.streamBufferS1.removeOldEvents(time1);
-        }
-      }
-    }
-
-    // Clean up buffer if events do not match to avoid buffer overflow
-    if (this.streamBufferS0.getLength() > 0 && this.streamBufferS1.getLength() > 0) {
-      Event e0 = this.streamBufferS0.get(0);
-      Event e1 = this.streamBufferS1.get(0);
-
-      long time0 = e0.getFieldBySelector(this.timestampFieldStream0).getAsPrimitive().getAsLong();
-      long time1 = e1.getFieldBySelector(this.timestampFieldStream1).getAsPrimitive().getAsLong();
-
-      if (time0 > time1) {
-        this.streamBufferS0.removeOldEvents(time0);
-        this.streamBufferS1.removeOldEvents(time0);
-      } else {
-        this.streamBufferS0.removeOldEvents(time1);
-        this.streamBufferS1.removeOldEvents(time1);
-      }
-    }
-
-  }
-
-  @Override
-  public void onDetach() {
-    this.streamBufferS0.reset();
-    this.streamBufferS1.reset();
-  }
-
-  private Event mergeEvents(Event e1, Event e2) {
-    return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
-  }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeController.java
deleted file mode 100644
index e6446f6..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeController.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.merge;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class MergeByTimeController extends StandaloneEventProcessingDeclarer<MergeByTimeParameters> {
-
-  private static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1";
-  private static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2";
-  private static final String NUMBER_MAPPING = "number_mapping";
-  private static final String TIME_INTERVAL = "time-interval";
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.merge")
-            .category(DataProcessorType.TRANSFORM)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON, "merge_description.png")
-            .withLocales(Locales.EN)
-            .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
-                    EpRequirements.timestampReq(),
-                    Labels.withId(TIMESTAMP_MAPPING_STREAM_1_KEY),
-                    PropertyScope.NONE).build())
-            .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
-                    EpRequirements.timestampReq(),
-                    Labels.withId(TIMESTAMP_MAPPING_STREAM_2_KEY),
-                    PropertyScope.NONE).build())
-            .requiredIntegerParameter(Labels.withId(TIME_INTERVAL), NUMBER_MAPPING)
-            .outputStrategy(OutputStrategies.custom(true))
-            .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<MergeByTimeParameters>
-  onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
-    List<String> outputKeySelectors = extractor.outputKeySelectors();
-
-    String timestampFieldStream1 = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_1_KEY);
-    String timestampFieldStream2 = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_2_KEY);
-    Integer timeInterval = extractor.singleValueParameter(TIME_INTERVAL, Integer.class);
-
-    MergeByTimeParameters staticParam = new MergeByTimeParameters(
-            graph, outputKeySelectors, timestampFieldStream1, timestampFieldStream2, timeInterval);
-
-    return new ConfiguredEventProcessor<>(staticParam, MergeByTime::new);
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeParameters.java
deleted file mode 100644
index f39405c..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeParameters.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.merge;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class MergeByTimeParameters extends EventProcessorBindingParams {
-
-  private List<String> outputKeySelectors;
-  private String timestampFieldStream1;
-  private String timestampFieldStream2;
-  private Integer timeInterval;
-
-  public MergeByTimeParameters(DataProcessorInvocation graph, List<String> outputKeySelectors,
-                               String timestampFieldStream1, String timestampFieldStream2, Integer timeInterval) {
-    super(graph);
-    this.outputKeySelectors = outputKeySelectors;
-    this.timestampFieldStream1 = timestampFieldStream1;
-    this.timestampFieldStream2 = timestampFieldStream2;
-    this.timeInterval = timeInterval;
-  }
-
-  public List<String> getOutputKeySelectors() {
-    return outputKeySelectors;
-  }
-
-  public String getTimestampFieldStream1() {
-    return timestampFieldStream1;
-  }
-
-  public String getTimestampFieldStream2() {
-    return timestampFieldStream2;
-  }
-
-  public Integer getTimeInterval() {
-    return timeInterval;
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java
new file mode 100644
index 0000000..8d41aa7
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.filters.jvm.processor.merge;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.List;
+
+public class MergeByTimeProcessor extends StreamPipesDataProcessor {
+
+  private static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1";
+  private static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2";
+  private static final String NUMBER_MAPPING = "number_mapping";
+  private static final String TIME_INTERVAL = "time-interval";
+
+  private List<String> outputKeySelectors;
+  private String timestampFieldStream0;
+  private String timestampFieldStream1;
+  private Integer timeInterval;
+  private EventSchema outputSchema;
+
+  private StreamBuffer streamBufferS0;
+  private StreamBuffer streamBufferS1;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.merge")
+            .category(DataProcessorType.TRANSFORM)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON, "merge_description.png")
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+                    EpRequirements.timestampReq(),
+                    Labels.withId(TIMESTAMP_MAPPING_STREAM_1_KEY),
+                    PropertyScope.NONE).build())
+            .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+                    EpRequirements.timestampReq(),
+                    Labels.withId(TIMESTAMP_MAPPING_STREAM_2_KEY),
+                    PropertyScope.NONE).build())
+            .requiredIntegerParameter(Labels.withId(TIME_INTERVAL), NUMBER_MAPPING)
+            .outputStrategy(OutputStrategies.custom(true))
+            .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+    this.outputSchema = processorParams.getGraph().getOutputStream().getEventSchema();
+    this.outputKeySelectors = processorParams.extractor().outputKeySelectors();
+    this.timestampFieldStream0 = processorParams.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_1_KEY);
+    this.timestampFieldStream1 = processorParams.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_2_KEY);
+
+    this.timeInterval = processorParams.extractor().singleValueParameter(TIME_INTERVAL,Integer.class);
+
+    this.streamBufferS0 = new StreamBuffer(this.timestampFieldStream0);
+    this.streamBufferS1 = new StreamBuffer(this.timestampFieldStream1);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+    String streamId = event.getSourceInfo().getSelectorPrefix();
+
+    // Decide to which buffer the event should be added
+    if ("s0".equals(streamId)) {
+      this.streamBufferS0.add(event);
+    } else {
+      this.streamBufferS1.add(event);
+    }
+
+    // Calculate matching events between data streams
+    for (Event e0 : this.streamBufferS0.getList()) {
+      long time0 = e0.getFieldBySelector(timestampFieldStream0).getAsPrimitive().getAsLong();
+      for (Event e1 : this.streamBufferS1.getList()) {
+        long time1 = e1.getFieldBySelector(timestampFieldStream1).getAsPrimitive().getAsLong();
+
+        if (time0 + timeInterval > time1 && time1 > time0 - timeInterval) {
+          Event resultingEvent = mergeEvents(e0, e1);
+          spOutputCollector.collect(resultingEvent);
+          this.streamBufferS0.removeOldEvents(time0);
+          this.streamBufferS1.removeOldEvents(time1);
+        }
+      }
+    }
+
+    // Clean up buffer if events do not match to avoid buffer overflow
+    if (this.streamBufferS0.getLength() > 0 && this.streamBufferS1.getLength() > 0) {
+      Event e0 = this.streamBufferS0.get(0);
+      Event e1 = this.streamBufferS1.get(0);
+
+      long time0 = e0.getFieldBySelector(this.timestampFieldStream0).getAsPrimitive().getAsLong();
+      long time1 = e1.getFieldBySelector(this.timestampFieldStream1).getAsPrimitive().getAsLong();
+
+      if (time0 > time1) {
+        this.streamBufferS0.removeOldEvents(time0);
+        this.streamBufferS1.removeOldEvents(time0);
+      } else {
+        this.streamBufferS0.removeOldEvents(time1);
+        this.streamBufferS1.removeOldEvents(time1);
+      }
+    }
+
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+
+
+  private Event mergeEvents(Event e1, Event e2) {
+    return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
+  }
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java
deleted file mode 100644
index 27c3950..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class NumericalTextFilter implements EventProcessor<NumericalTextFilterParameters> {
-
-  private NumericalTextFilterParameters params;
-
-  @Override
-  public void onInvocation(NumericalTextFilterParameters numericalTextFilterParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
-          runtimeContext) {
-    this.params = numericalTextFilterParameters;
-  }
-
-  @Override
-  public void onEvent(Event event, SpOutputCollector out) {
-    boolean satisfiesNumberFilter = false;
-    boolean satisfiesTextFilter = false;
-
-    Double numbervalue = event.getFieldBySelector(params.getNumberProperty())
-            .getAsPrimitive()
-            .getAsDouble();
-
-    String value = event.getFieldBySelector(params.getTextProperty())
-            .getAsPrimitive()
-            .getAsString();
-
-    //Double value = Double.parseDouble(String.valueOf(in.get(params.getFilterProperty())));
-    Double threshold = params.getNumberThreshold();
-
-    if (params.getNumericalOperator() == NumericalOperator.EQ) {
-      satisfiesNumberFilter = (Math.abs(numbervalue - threshold) < 0.000001);
-    } else if (params.getNumericalOperator() == NumericalOperator.GE) {
-      satisfiesNumberFilter = (numbervalue >= threshold);
-    } else if (params.getNumericalOperator() == NumericalOperator.GT) {
-      satisfiesNumberFilter = numbervalue > threshold;
-    } else if (params.getNumericalOperator() == NumericalOperator.LE) {
-      satisfiesNumberFilter = (numbervalue <= threshold);
-    } else if (params.getNumericalOperator() == NumericalOperator.LT) {
-      satisfiesNumberFilter = (numbervalue < threshold);
-    } else if (params.getNumericalOperator() == NumericalOperator.IE) {
-      satisfiesNumberFilter = (Math.abs(numbervalue - threshold) > 0.000001);
-    }
-
-    if (params.getTextOperator() == StringOperator.MATCHES) {
-      satisfiesTextFilter = (value.equals(params.getTextKeyword()));
-    } else if (params.getTextOperator() == StringOperator.CONTAINS) {
-      satisfiesTextFilter = (value.contains(params.getTextKeyword()));
-    }
-
-    if (satisfiesNumberFilter && satisfiesTextFilter) {
-      out.collect(event);
-    }
-  }
-
-  @Override
-  public void onDetach() {
-
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java
deleted file mode 100644
index 32882d5..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class NumericalTextFilterController extends StandaloneEventProcessingDeclarer<NumericalTextFilterParameters> {
-
-  // number
-  private static final String NUMBER_MAPPING = "number-mapping";
-  private static final String NUMBER_OPERATION = "number-operation";
-  private static final String NUMBER_VALUE = "number-value";
-  // text
-  private static final String TEXT_MAPPING = "text-mapping";
-  private static final String TEXT_OPERATION = "text-operation";
-  private static final String TEXT_KEYWORD = "text-keyword";
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.numericaltextfilter")
-            .category(DataProcessorType.FILTER)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-            .withLocales(Locales.EN)
-            .requiredStream(StreamRequirementsBuilder
-                    .create()
-                    .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
-                            Labels.withId(NUMBER_MAPPING),
-                            PropertyScope.MEASUREMENT_PROPERTY)
-                    .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
-                            Labels.withId(TEXT_MAPPING), PropertyScope.NONE)
-                    .build())
-            .requiredSingleValueSelection(Labels.withId(NUMBER_OPERATION), Options.from("<", "<=", ">",
-                    ">=", "==", "!="))
-            .requiredFloatParameter(Labels.withId(NUMBER_VALUE), NUMBER_MAPPING)
-            .requiredSingleValueSelection(Labels.withId(TEXT_OPERATION), Options.from("MATCHES",
-                    "CONTAINS"))
-            .requiredTextParameter(Labels.withId(TEXT_KEYWORD), "text")
-            .outputStrategy(OutputStrategies.keep())
-            .build();
-
-  }
-
-  @Override
-  public ConfiguredEventProcessor<NumericalTextFilterParameters> onInvocation
-          (DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
-    // number
-    String numberProperty = extractor.mappingPropertyValue(NUMBER_MAPPING);
-    Double numberThreshold = extractor.singleValueParameter(NUMBER_VALUE, Double.class);
-    String numberOperation = extractor.selectedSingleValue(NUMBER_OPERATION, String.class);
-
-    // text
-    String textProperty = extractor.mappingPropertyValue(TEXT_MAPPING);
-    String textKeyword = extractor.singleValueParameter(TEXT_KEYWORD, String.class);
-    String textOperation = extractor.selectedSingleValue(TEXT_OPERATION, String.class);
-
-    String numOperation = "GT";
-
-    if (numberOperation.equals("<=")) {
-      numOperation = "LE";
-    } else if (numberOperation.equals("<")) {
-      numOperation = "LT";
-    } else if (numberOperation.equals(">=")) {
-      numOperation = "GE";
-    } else if (numberOperation.equals("==")) {
-      numOperation = "EQ";
-    } else if (numberOperation.equals("!=")) {
-      numOperation = "IE";
-    }
-
-
-    NumericalTextFilterParameters staticParam = new NumericalTextFilterParameters(
-            graph,
-            numberThreshold,
-            NumericalOperator.valueOf(numOperation),
-            numberProperty,
-            textKeyword,
-            StringOperator.valueOf(textOperation),
-            textProperty);
-
-    return new ConfiguredEventProcessor<>(staticParam, NumericalTextFilter::new);
-  }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java
deleted file mode 100644
index c669430..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class NumericalTextFilterParameters extends EventProcessorBindingParams {
-
-  private double numberThreshold;
-  private NumericalOperator numericalOperator;
-  private String numberProperty;
-  private String textKeyword;
-  private StringOperator textOperator;
-  private String textProperty;
-
-  public NumericalTextFilterParameters(DataProcessorInvocation graph, Double numberThreshold, NumericalOperator
-          NumericalOperator, String numberProperty, String textKeyword, StringOperator textOperator, String textProperty) {
-    super(graph);
-    this.numberThreshold = numberThreshold;
-    this.numericalOperator = NumericalOperator;
-    this.numberProperty = numberProperty;
-    this.textKeyword = textKeyword;
-    this.textOperator = textOperator;
-    this.textProperty = textProperty;
-  }
-
-  public double getNumberThreshold() {
-    return numberThreshold;
-  }
-
-  public NumericalOperator getNumericalOperator() {
-    return numericalOperator;
-  }
-
-  public String getNumberProperty() {
-    return numberProperty;
-  }
-
-  public String getTextKeyword() {
-    return textKeyword;
-  }
-
-  public StringOperator getTextOperator() {
-    return textOperator;
-  }
-
-  public String getTextProperty() {
-    return textProperty;
-  }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java
new file mode 100644
index 0000000..3fec57d
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+
+public class NumericalTextFilterProcessor extends StreamPipesDataProcessor {
+
+  // number
+  private static final String NUMBER_MAPPING = "number-mapping";
+  private static final String NUMBER_OPERATION = "number-operation";
+  private static final String NUMBER_VALUE = "number-value";
+  // text
+  private static final String TEXT_MAPPING = "text-mapping";
+  private static final String TEXT_OPERATION = "text-operation";
+  private static final String TEXT_KEYWORD = "text-keyword";
+
+  private double numberThreshold;
+  private NumericalOperator numericalOperator;
+  private String numberProperty;
+  private String textKeyword;
+  private StringOperator textOperator;
+  private String textProperty;
+
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.numericaltextfilter")
+            .category(DataProcessorType.FILTER)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .withLocales(Locales.EN)
+            .requiredStream(StreamRequirementsBuilder
+                    .create()
+                    .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+                            Labels.withId(NUMBER_MAPPING),
+                            PropertyScope.MEASUREMENT_PROPERTY)
+                    .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
+                            Labels.withId(TEXT_MAPPING), PropertyScope.NONE)
+                    .build())
+            .requiredSingleValueSelection(Labels.withId(NUMBER_OPERATION), Options.from("<", "<=", ">",
+                    ">=", "==", "!="))
+            .requiredFloatParameter(Labels.withId(NUMBER_VALUE), NUMBER_MAPPING)
+            .requiredSingleValueSelection(Labels.withId(TEXT_OPERATION), Options.from("MATCHES",
+                    "CONTAINS"))
+            .requiredTextParameter(Labels.withId(TEXT_KEYWORD), "text")
+            .outputStrategy(OutputStrategies.keep())
+            .build();
+
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+
+    // number
+    this.numberProperty = processorParams.extractor().mappingPropertyValue(NUMBER_MAPPING);
+    this.numberThreshold = processorParams.extractor().singleValueParameter(NUMBER_VALUE, Double.class);
+    String numberOperation = processorParams.extractor().selectedSingleValue(NUMBER_OPERATION, String.class);
+
+    // text
+    this.textProperty = processorParams.extractor().mappingPropertyValue(TEXT_MAPPING);
+    this.textKeyword = processorParams.extractor().singleValueParameter(TEXT_KEYWORD, String.class);
+    this.textOperator = StringOperator.valueOf(processorParams.extractor().selectedSingleValue(TEXT_OPERATION, String.class));
+
+    String numOperation = "GT";
+
+    if (numberOperation.equals("<=")) {
+      numOperation = "LE";
+    } else if (numberOperation.equals("<")) {
+      numOperation = "LT";
+    } else if (numberOperation.equals(">=")) {
+      numOperation = "GE";
+    } else if (numberOperation.equals("==")) {
+      numOperation = "EQ";
+    } else if (numberOperation.equals("!=")) {
+      numOperation = "IE";
+    }
+
+    this.numericalOperator = NumericalOperator.valueOf(numOperation);
+
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+    boolean satisfiesNumberFilter = false;
+    boolean satisfiesTextFilter = false;
+
+    Double numbervalue = event.getFieldBySelector(this.numberProperty)
+            .getAsPrimitive()
+            .getAsDouble();
+
+    String value = event.getFieldBySelector(this.textProperty)
+            .getAsPrimitive()
+            .getAsString();
+
+    Double threshold = this.numberThreshold;
+
+    if (this.numericalOperator == NumericalOperator.EQ) {
+      satisfiesNumberFilter = (Math.abs(numbervalue - threshold) < 0.000001);
+    } else if (this.numericalOperator  == NumericalOperator.GE) {
+      satisfiesNumberFilter = (numbervalue >= threshold);
+    } else if (this.numericalOperator  == NumericalOperator.GT) {
+      satisfiesNumberFilter = numbervalue > threshold;
+    } else if (this.numericalOperator  == NumericalOperator.LE) {
+      satisfiesNumberFilter = (numbervalue <= threshold);
+    } else if (this.numericalOperator  == NumericalOperator.LT) {
+      satisfiesNumberFilter = (numbervalue < threshold);
+    } else if (this.numericalOperator  == NumericalOperator.IE) {
+      satisfiesNumberFilter = (Math.abs(numbervalue - threshold) > 0.000001);
+    }
+
+    if (this.textOperator == StringOperator.MATCHES) {
+      satisfiesTextFilter = (value.equals(this.textKeyword));
+    } else if (this.textOperator == StringOperator.CONTAINS) {
+      satisfiesTextFilter = (value.contains(this.textKeyword));
+    }
+
+    if (satisfiesNumberFilter && satisfiesTextFilter) {
+      spOutputCollector.collect(event);
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+}