Merge pull request #55 from Hrushi20/iss449
[Streampipes-449] Update Processing Element API in module streampipes
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index 24c545a..df52d22 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -29,12 +29,12 @@
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.filters.jvm.processor.booleanfilter.BooleanFilterProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeController;
-import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichController;
+import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeProcessor;
+import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
-import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeController;
+import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterController;
+import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
@@ -58,11 +58,11 @@
new NumericalFilterProcessor(),
new ThresholdDetectionProcessor(),
new ProjectionProcessor(),
- new MergeByEnrichController(),
- new MergeByTimeController(),
+ new MergeByEnrichProcessor(),
+ new MergeByTimeProcessor(),
new MergeBySchemaProcessor(),
- new ComposeController(),
- new NumericalTextFilterController(),
+ new ComposeProcessor(),
+ new NumericalTextFilterProcessor(),
new RateLimitController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/Compose.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/Compose.java
deleted file mode 100644
index 5cb9620..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/Compose.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.compose;
-
-import org.apache.streampipes.model.constants.PropertySelectorConstants;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class Compose implements EventProcessor<ComposeParameters> {
-
- private Map<String, Event> lastEvents;
- private EventSchema outputSchema;
- private List<String> outputKeySelectors;
-
-
- @Override
- public void onInvocation(ComposeParameters composeParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
- this.outputSchema = composeParameters.getGraph().getOutputStream().getEventSchema();
- this.outputKeySelectors = composeParameters.getOutputKeySelectors();
- this.lastEvents = new HashMap<>();
- }
-
- @Override
- public void onDetach() {
- this.lastEvents.clear();
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector spOutputCollector) {
- this.lastEvents.put(event.getSourceInfo().getSelectorPrefix(), event);
- if (lastEvents.size() == 2) {
- spOutputCollector.collect(buildOutEvent(event.getSourceInfo().getSelectorPrefix()));
- }
- }
-
- private Event buildOutEvent(String currentSelectorPrefix) {
- return EventFactory.fromEvents(lastEvents.get(currentSelectorPrefix), lastEvents.get
- (getOtherSelectorPrefix(currentSelectorPrefix)), outputSchema).getSubset(outputKeySelectors);
- }
-
- private String getOtherSelectorPrefix(String currentSelectorPrefix) {
- return currentSelectorPrefix.equals(PropertySelectorConstants.FIRST_STREAM_ID_PREFIX) ?
- PropertySelectorConstants.SECOND_STREAM_ID_PREFIX : PropertySelectorConstants
- .FIRST_STREAM_ID_PREFIX;
- }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeController.java
deleted file mode 100644
index aa22d9f..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeController.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.compose;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class ComposeController extends StandaloneEventProcessingDeclarer<ComposeParameters> {
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.compose")
- .category(DataProcessorType.TRANSFORM)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .outputStrategy(OutputStrategies.custom(true))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<ComposeParameters> onInvocation(DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor) {
-
- List<String> outputKeySelectors = extractor.outputKeySelectors();
-
- ComposeParameters staticParam = new ComposeParameters(
- graph, outputKeySelectors);
-
- return new ConfiguredEventProcessor<>(staticParam, Compose::new);
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeParameters.java
deleted file mode 100644
index 2c0b429..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeParameters.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.compose;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class ComposeParameters extends EventProcessorBindingParams {
-
- private List<String> outputKeySelectors;
-
- public ComposeParameters(DataProcessorInvocation graph, List<String> outputKeySelectors) {
- super(graph);
- this.outputKeySelectors = outputKeySelectors;
- }
-
- public List<String> getOutputKeySelectors() {
- return outputKeySelectors;
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeProcessor.java
new file mode 100644
index 0000000..341f26c
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/compose/ComposeProcessor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.filters.jvm.processor.compose;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.constants.PropertySelectorConstants;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ComposeProcessor extends StreamPipesDataProcessor {
+
+ private List<String> outputKeySelectors;
+ private Map<String, Event> lastEvents;
+ private EventSchema outputSchema;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.compose")
+ .category(DataProcessorType.TRANSFORM)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .outputStrategy(OutputStrategies.custom(true))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+ this.outputKeySelectors = processorParams.extractor().outputKeySelectors();
+ this.outputSchema = processorParams.getGraph().getOutputStream().getEventSchema();
+ this.lastEvents = new HashMap<>();
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+ this.lastEvents.put(event.getSourceInfo().getSelectorPrefix(), event);
+ if (lastEvents.size() == 2) {
+ spOutputCollector.collect(buildOutEvent(event.getSourceInfo().getSelectorPrefix()));
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ this.lastEvents.clear();
+ }
+
+
+ private Event buildOutEvent(String currentSelectorPrefix) {
+ return EventFactory.fromEvents(lastEvents.get(currentSelectorPrefix), lastEvents.get
+ (getOtherSelectorPrefix(currentSelectorPrefix)), outputSchema).getSubset(outputKeySelectors);
+ }
+
+ private String getOtherSelectorPrefix(String currentSelectorPrefix) {
+ return currentSelectorPrefix.equals(PropertySelectorConstants.FIRST_STREAM_ID_PREFIX) ?
+ PropertySelectorConstants.SECOND_STREAM_ID_PREFIX : PropertySelectorConstants
+ .FIRST_STREAM_ID_PREFIX;
+ }
+
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrich.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrich.java
deleted file mode 100644
index 5f24c3e..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrich.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.enrich;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-
-public class MergeByEnrich implements EventProcessor<MergeByEnrichParameters> {
-
- private EventSchema outputSchema;
- private List<String> outputKeySelectors;
- private String selectedStream;
-
- private Event eventBuffer;
-
- @Override
- public void onInvocation(MergeByEnrichParameters composeParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
- this.outputSchema = composeParameters.getGraph().getOutputStream().getEventSchema();
- this.outputKeySelectors = composeParameters.getOutputKeySelectors();
-
- if (composeParameters.getSelectedStream().equals("Stream 1")) {
- this.selectedStream = "s0";
- } else {
- this.selectedStream = "s1";
- }
-
- this.eventBuffer = null;
- }
-
-
- @Override
- public void onEvent(Event event, SpOutputCollector spOutputCollector) {
- String streamId = event.getSourceInfo().getSelectorPrefix();
-
- // Enrich the selected stream and store last event of other stream
- if (this.selectedStream.equals(streamId)) {
- if (this.eventBuffer != null) {
- Event result = mergeEvents(event, this.eventBuffer);
- spOutputCollector.collect(result);
- }
- } else {
- this.eventBuffer = event;
- }
-
- }
-
- @Override
- public void onDetach() {
- this.eventBuffer = null;
- }
-
- private Event mergeEvents(Event e1, Event e2) {
- return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
- }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
deleted file mode 100644
index 62dee80..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichController.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.enrich;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class MergeByEnrichController extends StandaloneEventProcessingDeclarer<MergeByEnrichParameters> {
-
- private static final String SELECT_STREAM = "select-stream";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.enrich")
- .category(DataProcessorType.TRANSFORM)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .requiredSingleValueSelection(Labels.withId(SELECT_STREAM),
- Options.from("Stream 1", "Stream 2"))
- .outputStrategy(OutputStrategies.custom(true))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<MergeByEnrichParameters>
- onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- List<String> outputKeySelectors = extractor.outputKeySelectors();
-
- String selectedStream = extractor.selectedSingleValue(SELECT_STREAM, String.class);
-
- MergeByEnrichParameters staticParam = new MergeByEnrichParameters(
- graph, outputKeySelectors, selectedStream);
-
- return new ConfiguredEventProcessor<>(staticParam, MergeByEnrich::new);
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichParameters.java
deleted file mode 100644
index 2a5881f..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichParameters.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.enrich;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class MergeByEnrichParameters extends EventProcessorBindingParams {
-
- private List<String> outputKeySelectors;
- private String selectedStream;
-
- public MergeByEnrichParameters(DataProcessorInvocation graph, List<String> outputKeySelectors, String selectedStream) {
- super(graph);
- this.outputKeySelectors = outputKeySelectors;
- this.selectedStream = selectedStream;
- }
-
- public List<String> getOutputKeySelectors() {
- return outputKeySelectors;
- }
-
- public String getSelectedStream() {
- return selectedStream;
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessor.java
new file mode 100644
index 0000000..d7a4e78
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/enrich/MergeByEnrichProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.filters.jvm.processor.enrich;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.List;
+
+public class MergeByEnrichProcessor extends StreamPipesDataProcessor {
+
+ private static final String SELECT_STREAM = "select-stream";
+
+ private List<String> outputKeySelectors;
+ private String selectedStream;
+ private EventSchema outputSchema;
+ private Event eventBuffer;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.enrich")
+ .category(DataProcessorType.TRANSFORM)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .requiredSingleValueSelection(Labels.withId(SELECT_STREAM),
+ Options.from("Stream 1", "Stream 2"))
+ .outputStrategy(OutputStrategies.custom(true))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+ this.outputKeySelectors = processorParams.extractor().outputKeySelectors();
+
+ this.selectedStream = processorParams.extractor().selectedSingleValue(SELECT_STREAM, String.class);
+
+ this.outputSchema = processorParams.getGraph().getOutputStream().getEventSchema();
+
+ if (this.selectedStream.equals("Stream 1")) {
+ this.selectedStream = "s0";
+ } else {
+ this.selectedStream = "s1";
+ }
+
+ this.eventBuffer = null;
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+ String streamId = event.getSourceInfo().getSelectorPrefix();
+
+ // Enrich the selected stream and store last event of other stream
+ if (this.selectedStream.equals(streamId)) {
+ if (this.eventBuffer != null) {
+ Event result = mergeEvents(event, this.eventBuffer);
+ spOutputCollector.collect(result);
+ }
+ } else {
+ this.eventBuffer = event;
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+
+ private Event mergeEvents(Event e1, Event e2) {
+ return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
+ }
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTime.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTime.java
deleted file mode 100644
index c53cb1c..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTime.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.merge;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventFactory;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-
-public class MergeByTime implements EventProcessor<MergeByTimeParameters> {
-
- private EventSchema outputSchema;
- private List<String> outputKeySelectors;
-
- private String timestampFieldStream0;
- private String timestampFieldStream1;
- private Integer timeInterval;
-
- private StreamBuffer streamBufferS0;
- private StreamBuffer streamBufferS1;
-
- @Override
- public void onInvocation(MergeByTimeParameters composeParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
- this.outputSchema = composeParameters.getGraph().getOutputStream().getEventSchema();
- this.outputKeySelectors = composeParameters.getOutputKeySelectors();
- this.timestampFieldStream0 = composeParameters.getTimestampFieldStream1();
- this.timestampFieldStream1 = composeParameters.getTimestampFieldStream2();
- this.timeInterval = composeParameters.getTimeInterval();
-
- this.streamBufferS0 = new StreamBuffer(this.timestampFieldStream0);
- this.streamBufferS1 = new StreamBuffer(this.timestampFieldStream1);
- }
-
-
- @Override
- public void onEvent(Event event, SpOutputCollector spOutputCollector) {
- String streamId = event.getSourceInfo().getSelectorPrefix();
-
- // Decide to which buffer the event should be added
- if ("s0".equals(streamId)) {
- this.streamBufferS0.add(event);
- } else {
- this.streamBufferS1.add(event);
- }
-
- // Calculate matching events between data streams
- for (Event e0 : this.streamBufferS0.getList()) {
- long time0 = e0.getFieldBySelector(timestampFieldStream0).getAsPrimitive().getAsLong();
- for (Event e1 : this.streamBufferS1.getList()) {
- long time1 = e1.getFieldBySelector(timestampFieldStream1).getAsPrimitive().getAsLong();
-
- if (time0 + timeInterval > time1 && time1 > time0 - timeInterval) {
- Event resultingEvent = mergeEvents(e0, e1);
- spOutputCollector.collect(resultingEvent);
- this.streamBufferS0.removeOldEvents(time0);
- this.streamBufferS1.removeOldEvents(time1);
- }
- }
- }
-
- // Clean up buffer if events do not match to avoid buffer overflow
- if (this.streamBufferS0.getLength() > 0 && this.streamBufferS1.getLength() > 0) {
- Event e0 = this.streamBufferS0.get(0);
- Event e1 = this.streamBufferS1.get(0);
-
- long time0 = e0.getFieldBySelector(this.timestampFieldStream0).getAsPrimitive().getAsLong();
- long time1 = e1.getFieldBySelector(this.timestampFieldStream1).getAsPrimitive().getAsLong();
-
- if (time0 > time1) {
- this.streamBufferS0.removeOldEvents(time0);
- this.streamBufferS1.removeOldEvents(time0);
- } else {
- this.streamBufferS0.removeOldEvents(time1);
- this.streamBufferS1.removeOldEvents(time1);
- }
- }
-
- }
-
- @Override
- public void onDetach() {
- this.streamBufferS0.reset();
- this.streamBufferS1.reset();
- }
-
- private Event mergeEvents(Event e1, Event e2) {
- return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
- }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeController.java
deleted file mode 100644
index e6446f6..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeController.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.merge;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.processors.filters.jvm.config.FiltersJvmConfig;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class MergeByTimeController extends StandaloneEventProcessingDeclarer<MergeByTimeParameters> {
-
- private static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1";
- private static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2";
- private static final String NUMBER_MAPPING = "number_mapping";
- private static final String TIME_INTERVAL = "time-interval";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.merge")
- .category(DataProcessorType.TRANSFORM)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON, "merge_description.png")
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_STREAM_1_KEY),
- PropertyScope.NONE).build())
- .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_STREAM_2_KEY),
- PropertyScope.NONE).build())
- .requiredIntegerParameter(Labels.withId(TIME_INTERVAL), NUMBER_MAPPING)
- .outputStrategy(OutputStrategies.custom(true))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<MergeByTimeParameters>
- onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- List<String> outputKeySelectors = extractor.outputKeySelectors();
-
- String timestampFieldStream1 = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_1_KEY);
- String timestampFieldStream2 = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_2_KEY);
- Integer timeInterval = extractor.singleValueParameter(TIME_INTERVAL, Integer.class);
-
- MergeByTimeParameters staticParam = new MergeByTimeParameters(
- graph, outputKeySelectors, timestampFieldStream1, timestampFieldStream2, timeInterval);
-
- return new ConfiguredEventProcessor<>(staticParam, MergeByTime::new);
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeParameters.java
deleted file mode 100644
index f39405c..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeParameters.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.merge;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class MergeByTimeParameters extends EventProcessorBindingParams {
-
- private List<String> outputKeySelectors;
- private String timestampFieldStream1;
- private String timestampFieldStream2;
- private Integer timeInterval;
-
- public MergeByTimeParameters(DataProcessorInvocation graph, List<String> outputKeySelectors,
- String timestampFieldStream1, String timestampFieldStream2, Integer timeInterval) {
- super(graph);
- this.outputKeySelectors = outputKeySelectors;
- this.timestampFieldStream1 = timestampFieldStream1;
- this.timestampFieldStream2 = timestampFieldStream2;
- this.timeInterval = timeInterval;
- }
-
- public List<String> getOutputKeySelectors() {
- return outputKeySelectors;
- }
-
- public String getTimestampFieldStream1() {
- return timestampFieldStream1;
- }
-
- public String getTimestampFieldStream2() {
- return timestampFieldStream2;
- }
-
- public Integer getTimeInterval() {
- return timeInterval;
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java
new file mode 100644
index 0000000..8d41aa7
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/merge/MergeByTimeProcessor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.filters.jvm.processor.merge;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.List;
+
+public class MergeByTimeProcessor extends StreamPipesDataProcessor {
+
+ private static final String TIMESTAMP_MAPPING_STREAM_1_KEY = "timestamp_mapping_stream_1";
+ private static final String TIMESTAMP_MAPPING_STREAM_2_KEY = "timestamp_mapping_stream_2";
+ private static final String NUMBER_MAPPING = "number_mapping";
+ private static final String TIME_INTERVAL = "time-interval";
+
+ private List<String> outputKeySelectors;
+ private String timestampFieldStream0;
+ private String timestampFieldStream1;
+ private Integer timeInterval;
+ private EventSchema outputSchema;
+
+ private StreamBuffer streamBufferS0;
+ private StreamBuffer streamBufferS1;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.merge")
+ .category(DataProcessorType.TRANSFORM)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON, "merge_description.png")
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_STREAM_1_KEY),
+ PropertyScope.NONE).build())
+ .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_STREAM_2_KEY),
+ PropertyScope.NONE).build())
+ .requiredIntegerParameter(Labels.withId(TIME_INTERVAL), NUMBER_MAPPING)
+ .outputStrategy(OutputStrategies.custom(true))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+ this.outputSchema = processorParams.getGraph().getOutputStream().getEventSchema();
+ this.outputKeySelectors = processorParams.extractor().outputKeySelectors();
+ this.timestampFieldStream0 = processorParams.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_1_KEY);
+ this.timestampFieldStream1 = processorParams.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_STREAM_2_KEY);
+
+ this.timeInterval = processorParams.extractor().singleValueParameter(TIME_INTERVAL,Integer.class);
+
+ this.streamBufferS0 = new StreamBuffer(this.timestampFieldStream0);
+ this.streamBufferS1 = new StreamBuffer(this.timestampFieldStream1);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+ String streamId = event.getSourceInfo().getSelectorPrefix();
+
+ // Decide to which buffer the event should be added
+ if ("s0".equals(streamId)) {
+ this.streamBufferS0.add(event);
+ } else {
+ this.streamBufferS1.add(event);
+ }
+
+ // Calculate matching events between data streams
+ for (Event e0 : this.streamBufferS0.getList()) {
+ long time0 = e0.getFieldBySelector(timestampFieldStream0).getAsPrimitive().getAsLong();
+ for (Event e1 : this.streamBufferS1.getList()) {
+ long time1 = e1.getFieldBySelector(timestampFieldStream1).getAsPrimitive().getAsLong();
+
+ if (time0 + timeInterval > time1 && time1 > time0 - timeInterval) {
+ Event resultingEvent = mergeEvents(e0, e1);
+ spOutputCollector.collect(resultingEvent);
+ this.streamBufferS0.removeOldEvents(time0);
+ this.streamBufferS1.removeOldEvents(time1);
+ }
+ }
+ }
+
+ // Clean up buffer if events do not match to avoid buffer overflow
+ if (this.streamBufferS0.getLength() > 0 && this.streamBufferS1.getLength() > 0) {
+ Event e0 = this.streamBufferS0.get(0);
+ Event e1 = this.streamBufferS1.get(0);
+
+ long time0 = e0.getFieldBySelector(this.timestampFieldStream0).getAsPrimitive().getAsLong();
+ long time1 = e1.getFieldBySelector(this.timestampFieldStream1).getAsPrimitive().getAsLong();
+
+ if (time0 > time1) {
+ this.streamBufferS0.removeOldEvents(time0);
+ this.streamBufferS1.removeOldEvents(time0);
+ } else {
+ this.streamBufferS0.removeOldEvents(time1);
+ this.streamBufferS1.removeOldEvents(time1);
+ }
+ }
+
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+
+
+ private Event mergeEvents(Event e1, Event e2) {
+ return EventFactory.fromEvents(e1, e2, outputSchema).getSubset(outputKeySelectors);
+ }
+}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java
deleted file mode 100644
index 27c3950..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class NumericalTextFilter implements EventProcessor<NumericalTextFilterParameters> {
-
- private NumericalTextFilterParameters params;
-
- @Override
- public void onInvocation(NumericalTextFilterParameters numericalTextFilterParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
- runtimeContext) {
- this.params = numericalTextFilterParameters;
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector out) {
- boolean satisfiesNumberFilter = false;
- boolean satisfiesTextFilter = false;
-
- Double numbervalue = event.getFieldBySelector(params.getNumberProperty())
- .getAsPrimitive()
- .getAsDouble();
-
- String value = event.getFieldBySelector(params.getTextProperty())
- .getAsPrimitive()
- .getAsString();
-
- //Double value = Double.parseDouble(String.valueOf(in.get(params.getFilterProperty())));
- Double threshold = params.getNumberThreshold();
-
- if (params.getNumericalOperator() == NumericalOperator.EQ) {
- satisfiesNumberFilter = (Math.abs(numbervalue - threshold) < 0.000001);
- } else if (params.getNumericalOperator() == NumericalOperator.GE) {
- satisfiesNumberFilter = (numbervalue >= threshold);
- } else if (params.getNumericalOperator() == NumericalOperator.GT) {
- satisfiesNumberFilter = numbervalue > threshold;
- } else if (params.getNumericalOperator() == NumericalOperator.LE) {
- satisfiesNumberFilter = (numbervalue <= threshold);
- } else if (params.getNumericalOperator() == NumericalOperator.LT) {
- satisfiesNumberFilter = (numbervalue < threshold);
- } else if (params.getNumericalOperator() == NumericalOperator.IE) {
- satisfiesNumberFilter = (Math.abs(numbervalue - threshold) > 0.000001);
- }
-
- if (params.getTextOperator() == StringOperator.MATCHES) {
- satisfiesTextFilter = (value.equals(params.getTextKeyword()));
- } else if (params.getTextOperator() == StringOperator.CONTAINS) {
- satisfiesTextFilter = (value.contains(params.getTextKeyword()));
- }
-
- if (satisfiesNumberFilter && satisfiesTextFilter) {
- out.collect(event);
- }
- }
-
- @Override
- public void onDetach() {
-
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java
deleted file mode 100644
index 32882d5..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class NumericalTextFilterController extends StandaloneEventProcessingDeclarer<NumericalTextFilterParameters> {
-
- // number
- private static final String NUMBER_MAPPING = "number-mapping";
- private static final String NUMBER_OPERATION = "number-operation";
- private static final String NUMBER_VALUE = "number-value";
- // text
- private static final String TEXT_MAPPING = "text-mapping";
- private static final String TEXT_OPERATION = "text-operation";
- private static final String TEXT_KEYWORD = "text-keyword";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.numericaltextfilter")
- .category(DataProcessorType.FILTER)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.withId(NUMBER_MAPPING),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
- Labels.withId(TEXT_MAPPING), PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(NUMBER_OPERATION), Options.from("<", "<=", ">",
- ">=", "==", "!="))
- .requiredFloatParameter(Labels.withId(NUMBER_VALUE), NUMBER_MAPPING)
- .requiredSingleValueSelection(Labels.withId(TEXT_OPERATION), Options.from("MATCHES",
- "CONTAINS"))
- .requiredTextParameter(Labels.withId(TEXT_KEYWORD), "text")
- .outputStrategy(OutputStrategies.keep())
- .build();
-
- }
-
- @Override
- public ConfiguredEventProcessor<NumericalTextFilterParameters> onInvocation
- (DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- // number
- String numberProperty = extractor.mappingPropertyValue(NUMBER_MAPPING);
- Double numberThreshold = extractor.singleValueParameter(NUMBER_VALUE, Double.class);
- String numberOperation = extractor.selectedSingleValue(NUMBER_OPERATION, String.class);
-
- // text
- String textProperty = extractor.mappingPropertyValue(TEXT_MAPPING);
- String textKeyword = extractor.singleValueParameter(TEXT_KEYWORD, String.class);
- String textOperation = extractor.selectedSingleValue(TEXT_OPERATION, String.class);
-
- String numOperation = "GT";
-
- if (numberOperation.equals("<=")) {
- numOperation = "LE";
- } else if (numberOperation.equals("<")) {
- numOperation = "LT";
- } else if (numberOperation.equals(">=")) {
- numOperation = "GE";
- } else if (numberOperation.equals("==")) {
- numOperation = "EQ";
- } else if (numberOperation.equals("!=")) {
- numOperation = "IE";
- }
-
-
- NumericalTextFilterParameters staticParam = new NumericalTextFilterParameters(
- graph,
- numberThreshold,
- NumericalOperator.valueOf(numOperation),
- numberProperty,
- textKeyword,
- StringOperator.valueOf(textOperation),
- textProperty);
-
- return new ConfiguredEventProcessor<>(staticParam, NumericalTextFilter::new);
- }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java
deleted file mode 100644
index c669430..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class NumericalTextFilterParameters extends EventProcessorBindingParams {
-
- private double numberThreshold;
- private NumericalOperator numericalOperator;
- private String numberProperty;
- private String textKeyword;
- private StringOperator textOperator;
- private String textProperty;
-
- public NumericalTextFilterParameters(DataProcessorInvocation graph, Double numberThreshold, NumericalOperator
- NumericalOperator, String numberProperty, String textKeyword, StringOperator textOperator, String textProperty) {
- super(graph);
- this.numberThreshold = numberThreshold;
- this.numericalOperator = NumericalOperator;
- this.numberProperty = numberProperty;
- this.textKeyword = textKeyword;
- this.textOperator = textOperator;
- this.textProperty = textProperty;
- }
-
- public double getNumberThreshold() {
- return numberThreshold;
- }
-
- public NumericalOperator getNumericalOperator() {
- return numericalOperator;
- }
-
- public String getNumberProperty() {
- return numberProperty;
- }
-
- public String getTextKeyword() {
- return textKeyword;
- }
-
- public StringOperator getTextOperator() {
- return textOperator;
- }
-
- public String getTextProperty() {
- return textProperty;
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java
new file mode 100644
index 0000000..3fec57d
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+
+public class NumericalTextFilterProcessor extends StreamPipesDataProcessor {
+
+ // number
+ private static final String NUMBER_MAPPING = "number-mapping";
+ private static final String NUMBER_OPERATION = "number-operation";
+ private static final String NUMBER_VALUE = "number-value";
+ // text
+ private static final String TEXT_MAPPING = "text-mapping";
+ private static final String TEXT_OPERATION = "text-operation";
+ private static final String TEXT_KEYWORD = "text-keyword";
+
+ private double numberThreshold;
+ private NumericalOperator numericalOperator;
+ private String numberProperty;
+ private String textKeyword;
+ private StringOperator textOperator;
+ private String textProperty;
+
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.numericaltextfilter")
+ .category(DataProcessorType.FILTER)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.withId(NUMBER_MAPPING),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
+ Labels.withId(TEXT_MAPPING), PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(NUMBER_OPERATION), Options.from("<", "<=", ">",
+ ">=", "==", "!="))
+ .requiredFloatParameter(Labels.withId(NUMBER_VALUE), NUMBER_MAPPING)
+ .requiredSingleValueSelection(Labels.withId(TEXT_OPERATION), Options.from("MATCHES",
+ "CONTAINS"))
+ .requiredTextParameter(Labels.withId(TEXT_KEYWORD), "text")
+ .outputStrategy(OutputStrategies.keep())
+ .build();
+
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+
+ // number
+ this.numberProperty = processorParams.extractor().mappingPropertyValue(NUMBER_MAPPING);
+ this.numberThreshold = processorParams.extractor().singleValueParameter(NUMBER_VALUE, Double.class);
+ String numberOperation = processorParams.extractor().selectedSingleValue(NUMBER_OPERATION, String.class);
+
+ // text
+ this.textProperty = processorParams.extractor().mappingPropertyValue(TEXT_MAPPING);
+ this.textKeyword = processorParams.extractor().singleValueParameter(TEXT_KEYWORD, String.class);
+ this.textOperator = StringOperator.valueOf(processorParams.extractor().selectedSingleValue(TEXT_OPERATION, String.class));
+
+ String numOperation = "GT";
+
+ if (numberOperation.equals("<=")) {
+ numOperation = "LE";
+ } else if (numberOperation.equals("<")) {
+ numOperation = "LT";
+ } else if (numberOperation.equals(">=")) {
+ numOperation = "GE";
+ } else if (numberOperation.equals("==")) {
+ numOperation = "EQ";
+ } else if (numberOperation.equals("!=")) {
+ numOperation = "IE";
+ }
+
+ this.numericalOperator = NumericalOperator.valueOf(numOperation);
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+ boolean satisfiesNumberFilter = false;
+ boolean satisfiesTextFilter = false;
+
+ Double numbervalue = event.getFieldBySelector(this.numberProperty)
+ .getAsPrimitive()
+ .getAsDouble();
+
+ String value = event.getFieldBySelector(this.textProperty)
+ .getAsPrimitive()
+ .getAsString();
+
+ Double threshold = this.numberThreshold;
+
+ if (this.numericalOperator == NumericalOperator.EQ) {
+ satisfiesNumberFilter = (Math.abs(numbervalue - threshold) < 0.000001);
+ } else if (this.numericalOperator == NumericalOperator.GE) {
+ satisfiesNumberFilter = (numbervalue >= threshold);
+ } else if (this.numericalOperator == NumericalOperator.GT) {
+ satisfiesNumberFilter = numbervalue > threshold;
+ } else if (this.numericalOperator == NumericalOperator.LE) {
+ satisfiesNumberFilter = (numbervalue <= threshold);
+ } else if (this.numericalOperator == NumericalOperator.LT) {
+ satisfiesNumberFilter = (numbervalue < threshold);
+ } else if (this.numericalOperator == NumericalOperator.IE) {
+ satisfiesNumberFilter = (Math.abs(numbervalue - threshold) > 0.000001);
+ }
+
+ if (this.textOperator == StringOperator.MATCHES) {
+ satisfiesTextFilter = (value.equals(this.textKeyword));
+ } else if (this.textOperator == StringOperator.CONTAINS) {
+ satisfiesTextFilter = (value.contains(this.textKeyword));
+ }
+
+ if (satisfiesNumberFilter && satisfiesTextFilter) {
+ spOutputCollector.collect(event);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}