Merge branch 'apache:dev' into iss449
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index 24c545a..b9ad393 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -34,7 +34,7 @@
import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeController;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterController;
+import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.projection.ProjectionProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.schema.MergeBySchemaProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.textfilter.TextFilterProcessor;
@@ -62,7 +62,7 @@
new MergeByTimeController(),
new MergeBySchemaProcessor(),
new ComposeController(),
- new NumericalTextFilterController(),
+ new NumericalTextFilterProcessor(),
new RateLimitController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java
deleted file mode 100644
index 27c3950..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class NumericalTextFilter implements EventProcessor<NumericalTextFilterParameters> {
-
- private NumericalTextFilterParameters params;
-
- @Override
- public void onInvocation(NumericalTextFilterParameters numericalTextFilterParameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext
- runtimeContext) {
- this.params = numericalTextFilterParameters;
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector out) {
- boolean satisfiesNumberFilter = false;
- boolean satisfiesTextFilter = false;
-
- Double numbervalue = event.getFieldBySelector(params.getNumberProperty())
- .getAsPrimitive()
- .getAsDouble();
-
- String value = event.getFieldBySelector(params.getTextProperty())
- .getAsPrimitive()
- .getAsString();
-
- //Double value = Double.parseDouble(String.valueOf(in.get(params.getFilterProperty())));
- Double threshold = params.getNumberThreshold();
-
- if (params.getNumericalOperator() == NumericalOperator.EQ) {
- satisfiesNumberFilter = (Math.abs(numbervalue - threshold) < 0.000001);
- } else if (params.getNumericalOperator() == NumericalOperator.GE) {
- satisfiesNumberFilter = (numbervalue >= threshold);
- } else if (params.getNumericalOperator() == NumericalOperator.GT) {
- satisfiesNumberFilter = numbervalue > threshold;
- } else if (params.getNumericalOperator() == NumericalOperator.LE) {
- satisfiesNumberFilter = (numbervalue <= threshold);
- } else if (params.getNumericalOperator() == NumericalOperator.LT) {
- satisfiesNumberFilter = (numbervalue < threshold);
- } else if (params.getNumericalOperator() == NumericalOperator.IE) {
- satisfiesNumberFilter = (Math.abs(numbervalue - threshold) > 0.000001);
- }
-
- if (params.getTextOperator() == StringOperator.MATCHES) {
- satisfiesTextFilter = (value.equals(params.getTextKeyword()));
- } else if (params.getTextOperator() == StringOperator.CONTAINS) {
- satisfiesTextFilter = (value.contains(params.getTextKeyword()));
- }
-
- if (satisfiesNumberFilter && satisfiesTextFilter) {
- out.collect(event);
- }
- }
-
- @Override
- public void onDetach() {
-
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java
deleted file mode 100644
index 32882d5..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterController.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class NumericalTextFilterController extends StandaloneEventProcessingDeclarer<NumericalTextFilterParameters> {
-
- // number
- private static final String NUMBER_MAPPING = "number-mapping";
- private static final String NUMBER_OPERATION = "number-operation";
- private static final String NUMBER_VALUE = "number-value";
- // text
- private static final String TEXT_MAPPING = "text-mapping";
- private static final String TEXT_OPERATION = "text-operation";
- private static final String TEXT_KEYWORD = "text-keyword";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.numericaltextfilter")
- .category(DataProcessorType.FILTER)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.withId(NUMBER_MAPPING),
- PropertyScope.MEASUREMENT_PROPERTY)
- .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
- Labels.withId(TEXT_MAPPING), PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(NUMBER_OPERATION), Options.from("<", "<=", ">",
- ">=", "==", "!="))
- .requiredFloatParameter(Labels.withId(NUMBER_VALUE), NUMBER_MAPPING)
- .requiredSingleValueSelection(Labels.withId(TEXT_OPERATION), Options.from("MATCHES",
- "CONTAINS"))
- .requiredTextParameter(Labels.withId(TEXT_KEYWORD), "text")
- .outputStrategy(OutputStrategies.keep())
- .build();
-
- }
-
- @Override
- public ConfiguredEventProcessor<NumericalTextFilterParameters> onInvocation
- (DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- // number
- String numberProperty = extractor.mappingPropertyValue(NUMBER_MAPPING);
- Double numberThreshold = extractor.singleValueParameter(NUMBER_VALUE, Double.class);
- String numberOperation = extractor.selectedSingleValue(NUMBER_OPERATION, String.class);
-
- // text
- String textProperty = extractor.mappingPropertyValue(TEXT_MAPPING);
- String textKeyword = extractor.singleValueParameter(TEXT_KEYWORD, String.class);
- String textOperation = extractor.selectedSingleValue(TEXT_OPERATION, String.class);
-
- String numOperation = "GT";
-
- if (numberOperation.equals("<=")) {
- numOperation = "LE";
- } else if (numberOperation.equals("<")) {
- numOperation = "LT";
- } else if (numberOperation.equals(">=")) {
- numOperation = "GE";
- } else if (numberOperation.equals("==")) {
- numOperation = "EQ";
- } else if (numberOperation.equals("!=")) {
- numOperation = "IE";
- }
-
-
- NumericalTextFilterParameters staticParam = new NumericalTextFilterParameters(
- graph,
- numberThreshold,
- NumericalOperator.valueOf(numOperation),
- numberProperty,
- textKeyword,
- StringOperator.valueOf(textOperation),
- textProperty);
-
- return new ConfiguredEventProcessor<>(staticParam, NumericalTextFilter::new);
- }
-
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java
deleted file mode 100644
index c669430..0000000
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterParameters.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class NumericalTextFilterParameters extends EventProcessorBindingParams {
-
- private double numberThreshold;
- private NumericalOperator numericalOperator;
- private String numberProperty;
- private String textKeyword;
- private StringOperator textOperator;
- private String textProperty;
-
- public NumericalTextFilterParameters(DataProcessorInvocation graph, Double numberThreshold, NumericalOperator
- NumericalOperator, String numberProperty, String textKeyword, StringOperator textOperator, String textProperty) {
- super(graph);
- this.numberThreshold = numberThreshold;
- this.numericalOperator = NumericalOperator;
- this.numberProperty = numberProperty;
- this.textKeyword = textKeyword;
- this.textOperator = textOperator;
- this.textProperty = textProperty;
- }
-
- public double getNumberThreshold() {
- return numberThreshold;
- }
-
- public NumericalOperator getNumericalOperator() {
- return numericalOperator;
- }
-
- public String getNumberProperty() {
- return numberProperty;
- }
-
- public String getTextKeyword() {
- return textKeyword;
- }
-
- public StringOperator getTextOperator() {
- return textOperator;
- }
-
- public String getTextProperty() {
- return textProperty;
- }
-}
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java
new file mode 100644
index 0000000..3fec57d
--- /dev/null
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericaltextfilter/NumericalTextFilterProcessor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+
+public class NumericalTextFilterProcessor extends StreamPipesDataProcessor {
+
+ // number
+ private static final String NUMBER_MAPPING = "number-mapping";
+ private static final String NUMBER_OPERATION = "number-operation";
+ private static final String NUMBER_VALUE = "number-value";
+ // text
+ private static final String TEXT_MAPPING = "text-mapping";
+ private static final String TEXT_OPERATION = "text-operation";
+ private static final String TEXT_KEYWORD = "text-keyword";
+
+ private double numberThreshold;
+ private NumericalOperator numericalOperator;
+ private String numberProperty;
+ private String textKeyword;
+ private StringOperator textOperator;
+ private String textProperty;
+
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.numericaltextfilter")
+ .category(DataProcessorType.FILTER)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.withId(NUMBER_MAPPING),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(),
+ Labels.withId(TEXT_MAPPING), PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(NUMBER_OPERATION), Options.from("<", "<=", ">",
+ ">=", "==", "!="))
+ .requiredFloatParameter(Labels.withId(NUMBER_VALUE), NUMBER_MAPPING)
+ .requiredSingleValueSelection(Labels.withId(TEXT_OPERATION), Options.from("MATCHES",
+ "CONTAINS"))
+ .requiredTextParameter(Labels.withId(TEXT_KEYWORD), "text")
+ .outputStrategy(OutputStrategies.keep())
+ .build();
+
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+
+ // number
+ this.numberProperty = processorParams.extractor().mappingPropertyValue(NUMBER_MAPPING);
+ this.numberThreshold = processorParams.extractor().singleValueParameter(NUMBER_VALUE, Double.class);
+ String numberOperation = processorParams.extractor().selectedSingleValue(NUMBER_OPERATION, String.class);
+
+ // text
+ this.textProperty = processorParams.extractor().mappingPropertyValue(TEXT_MAPPING);
+ this.textKeyword = processorParams.extractor().singleValueParameter(TEXT_KEYWORD, String.class);
+ this.textOperator = StringOperator.valueOf(processorParams.extractor().selectedSingleValue(TEXT_OPERATION, String.class));
+
+ String numOperation = "GT";
+
+ if (numberOperation.equals("<=")) {
+ numOperation = "LE";
+ } else if (numberOperation.equals("<")) {
+ numOperation = "LT";
+ } else if (numberOperation.equals(">=")) {
+ numOperation = "GE";
+ } else if (numberOperation.equals("==")) {
+ numOperation = "EQ";
+ } else if (numberOperation.equals("!=")) {
+ numOperation = "IE";
+ }
+
+ this.numericalOperator = NumericalOperator.valueOf(numOperation);
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+ boolean satisfiesNumberFilter = false;
+ boolean satisfiesTextFilter = false;
+
+ Double numbervalue = event.getFieldBySelector(this.numberProperty)
+ .getAsPrimitive()
+ .getAsDouble();
+
+ String value = event.getFieldBySelector(this.textProperty)
+ .getAsPrimitive()
+ .getAsString();
+
+ Double threshold = this.numberThreshold;
+
+ if (this.numericalOperator == NumericalOperator.EQ) {
+ satisfiesNumberFilter = (Math.abs(numbervalue - threshold) < 0.000001);
+ } else if (this.numericalOperator == NumericalOperator.GE) {
+ satisfiesNumberFilter = (numbervalue >= threshold);
+ } else if (this.numericalOperator == NumericalOperator.GT) {
+ satisfiesNumberFilter = numbervalue > threshold;
+ } else if (this.numericalOperator == NumericalOperator.LE) {
+ satisfiesNumberFilter = (numbervalue <= threshold);
+ } else if (this.numericalOperator == NumericalOperator.LT) {
+ satisfiesNumberFilter = (numbervalue < threshold);
+ } else if (this.numericalOperator == NumericalOperator.IE) {
+ satisfiesNumberFilter = (Math.abs(numbervalue - threshold) > 0.000001);
+ }
+
+ if (this.textOperator == StringOperator.MATCHES) {
+ satisfiesTextFilter = (value.equals(this.textKeyword));
+ } else if (this.textOperator == StringOperator.CONTAINS) {
+ satisfiesTextFilter = (value.contains(this.textKeyword));
+ }
+
+ if (satisfiesNumberFilter && satisfiesTextFilter) {
+ spOutputCollector.collect(event);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}