Update examples
diff --git a/pom.xml b/pom.xml
index bef7842..78a9dc1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,18 +25,18 @@
<parent>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-parent</artifactId>
- <version>0.91.0-SNAPSHOT</version>
+ <version>0.93.0-SNAPSHOT</version>
</parent>
<artifactId>streampipes-pipeline-elements-examples</artifactId>
<packaging>pom</packaging>
- <version>0.71.0-SNAPSHOT</version>
+ <version>0.93.0-SNAPSHOT</version>
<modules>
<module>streampipes-pipeline-elements-examples-processors-jvm</module>
</modules>
<properties>
- <streampipes.version>0.91.0-SNAPSHOT</streampipes.version>
+ <streampipes.version>0.93.0-SNAPSHOT</streampipes.version>
<lightcouch.version>0.1.8</lightcouch.version>
<maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>
@@ -61,11 +61,6 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-container-standalone</artifactId>
- <version>${streampipes.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-wrapper-standalone</artifactId>
<version>${streampipes.version}</version>
</dependency>
@@ -81,22 +76,7 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-config</artifactId>
- <version>${streampipes.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-dataformat-json</artifactId>
- <version>${streampipes.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-kafka</artifactId>
- <version>${streampipes.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-messaging-jms</artifactId>
+ <artifactId>streampipes-sdk-bundle</artifactId>
<version>${streampipes.version}</version>
</dependency>
<dependency>
@@ -106,11 +86,6 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-measurement-units</artifactId>
- <version>${streampipes.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-pipeline-elements-shared</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/pom.xml b/streampipes-pipeline-elements-examples-processors-jvm/pom.xml
index 0041fd6..b2a69f0 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/pom.xml
+++ b/streampipes-pipeline-elements-examples-processors-jvm/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>streampipes-pipeline-elements-examples</artifactId>
<groupId>org.apache.streampipes</groupId>
- <version>0.71.0-SNAPSHOT</version>
+ <version>0.93.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -33,7 +33,6 @@
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk-bundle</artifactId>
- <version>0.91.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java
index 46e1bcc..acd1a22 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java
@@ -19,7 +19,7 @@
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.StreamPipesCredentials;
-import org.apache.streampipes.client.credentials.CredentialsProvider;
+import org.apache.streampipes.client.api.credentials.CredentialsProvider;
import org.apache.streampipes.model.pipeline.Pipeline;
import java.util.List;
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java
index 6fad4d8..fd0c7c5 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java
@@ -17,16 +17,10 @@
*/
package org.apache.streampipes.client.example;
-import org.apache.commons.collections.MapUtils;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.StreamPipesCredentials;
-import org.apache.streampipes.client.credentials.CredentialsProvider;
-import org.apache.streampipes.client.live.KafkaConfig;
+import org.apache.streampipes.client.api.credentials.CredentialsProvider;
import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.template.PipelineElementTemplate;
import java.util.List;
@@ -39,33 +33,34 @@
// Create credentials by providing a user (the email) and the API key
CredentialsProvider credentials = StreamPipesCredentials
- .withApiKey(System.getenv("user"), System.getenv("apiKey"));
+ .withApiKey("admin@streampipes.apache.org", "oKv1uyNjAgXfuPcMBX5Sityy");
// Create an instance of the StreamPipes client
StreamPipesClient client = StreamPipesClient
- .create("localhost", 80, credentials, true);
+ .create("localhost", 8082, credentials, true);
// Get all pipelines
- List<Pipeline> pipelines = client.pipelines().all();
-
- // Start a pipeline
- PipelineOperationStatus message = client.pipelines().start(pipelines.get(0));
-
- // Get all pipeline element templates
- List<PipelineElementTemplate> templates = client.pipelineElementTemplates().all();
-
- // Get all data sinks
- List<DataSinkInvocation> dataSinks = client.sinks().all();
+// List<Pipeline> pipelines = client.pipelines().all();
+//
+// // Start a pipeline
+// PipelineOperationStatus message = client.pipelines().start(pipelines.get(0));
+//
+// // Get all pipeline element templates
+// List<PipelineElementTemplate> templates = client.pipelineElementTemplates().all();
+//
+// // Get all data sinks
+// List<DataSinkInvocation> dataSinks = client.sinks().all();
// Get all data streams
List<SpDataStream> dataStreams = client.streams().all();
+ System.out.println(dataStreams.size());
// Subscribe to a data stream
- client.streams().subscribe(dataStreams.get(0), event -> MapUtils.debugPrint(System.out, "event", event.getRaw()));
-
- // Subscribe to a data stream and provide an additional Kafka config (e.g., for access from outside the StreamPipes network)
- client.streams().subscribe(dataStreams.get(0), KafkaConfig.create("localhost", 9094), event -> {
- MapUtils.debugPrint(System.out, "event", event.getRaw());
- });
+// client.streams().subscribe(dataStreams.get(0), event -> MapUtils.debugPrint(System.out, "event", event.getRaw()));
+//
+// // Subscribe to a data stream and provide an additional Kafka config (e.g., for access from outside the StreamPipes network)
+// client.streams().subscribe(dataStreams.get(0), KafkaConfig.create("localhost", 9094), event -> {
+// MapUtils.debugPrint(System.out, "event", event.getRaw());
+// });
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/function/example/FunctionPublishExample.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/function/example/FunctionPublishExample.java
index 1407154..bffbad2 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/function/example/FunctionPublishExample.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/function/example/FunctionPublishExample.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.function.example;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.function.FunctionId;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
@@ -27,7 +28,6 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Protocols;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.function.FunctionConfig;
import org.apache.streampipes.wrapper.standalone.function.FunctionConfigBuilder;
import org.apache.streampipes.wrapper.standalone.function.FunctionContext;
@@ -49,7 +49,7 @@
@Override
public List<String> requiredStreamIds() {
- return List.of("urn:streampipes.apache.org:eventstream:EtMUkN");
+ return List.of("urn:streampipes.apache.org:eventstream:plSEjN");
}
@Override
@@ -85,7 +85,7 @@
SO.TEXT,
PropertyScope.MEASUREMENT_PROPERTY))
.format(Formats.jsonFormat())
- .protocol(Protocols.kafka("localhost", 9094, STREAM_APP_ID))
+ .protocol(Protocols.kafka("nats", 9094, STREAM_APP_ID))
.build())
.build();
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java
index 4561773..071e96a 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java
@@ -23,7 +23,6 @@
import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
-import org.apache.streampipes.pe.examples.jvm.engine.ExampleExternalEngineController;
import org.apache.streampipes.pe.examples.jvm.outputstrategy.AppendOutputController;
import org.apache.streampipes.pe.examples.jvm.outputstrategy.CustomOutputController;
import org.apache.streampipes.pe.examples.jvm.outputstrategy.CustomTransformOutputController;
@@ -96,8 +95,6 @@
.registerPipelineElement(new KeepOutputController())
.registerPipelineElement(new CollectionMappingGroupExample())
- .registerPipelineElement(new ExampleExternalEngineController())
-
.registerPipelineElement(new VehicleStream())
.build();
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyController.java
deleted file mode 100644
index 9dda049..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyController.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.base;
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class DummyController<B extends EventProcessorBindingParams> extends
- StandaloneEventProcessingDeclarer<B> {
-
- @Override
- public DataProcessorDescription declareModel() {
- return null;
- }
-
- @Override
- public ConfiguredEventProcessor<B> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- return null;
- }
-
-
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyEngine.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyEngine.java
deleted file mode 100644
index cfe579f..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyEngine.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.base;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class DummyEngine implements EventProcessor<DummyParameters> {
-
- @Override
- public void onInvocation(DummyParameters parameters, SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
-
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
-
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyParameters.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyParameters.java
deleted file mode 100644
index cbba25e..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/base/DummyParameters.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.base;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class DummyParameters extends EventProcessorBindingParams {
-
- public DummyParameters(DataProcessorInvocation graph) {
- super(graph);
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngine.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngine.java
deleted file mode 100644
index 09fc94f..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngine.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.engine;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
-
-public class ExampleExternalEngine
- implements ExternalEventProcessor<ExampleExternalEngineParameters> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExampleExternalEngine.class);
-
- @Override
- public void onInvocation(ExampleExternalEngineParameters parameters,
- EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
- LOG.info("I'm invoked!");
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- LOG.info("I'm detached!");
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngineController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngineController.java
deleted file mode 100644
index 8acbb8c..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngineController.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.engine;
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
-
-public class ExampleExternalEngineController
- extends StandaloneExternalEventProcessingDeclarer<ExampleExternalEngineParameters> {
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.engine.external", "Example " +
- "External Engine", "")
- .requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
- .outputStrategy(OutputStrategies.keep())
- .build();
- }
-
- @Override
- public ConfiguredExternalEventProcessor<ExampleExternalEngineParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- return new ConfiguredExternalEventProcessor<>(new ExampleExternalEngineParameters(graph),
- ExampleExternalEngine::new);
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngineParameters.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngineParameters.java
deleted file mode 100644
index 6f15a65..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/engine/ExampleExternalEngineParameters.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.engine;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class ExampleExternalEngineParameters extends EventProcessorBindingParams {
-
- public ExampleExternalEngineParameters(DataProcessorInvocation graph) {
- super(graph);
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/eventmodel/EventModelExamples.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/eventmodel/EventModelExamples.java
index 6e50c02..0b46d40 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/eventmodel/EventModelExamples.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/eventmodel/EventModelExamples.java
@@ -18,15 +18,16 @@
package org.apache.streampipes.pe.examples.jvm.eventmodel;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
import java.util.List;
-public class EventModelExamples implements EventProcessor<DummyParameters> {
+public class EventModelExamples implements IStreamPipesDataProcessor {
/**
* Example event:
@@ -41,9 +42,13 @@
private String lastValueSelector;
@Override
- public void onInvocation(DummyParameters parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ public IDataProcessorConfiguration declareConfig() {
+ return null;
+ }
- // usually, the fields such as temperatureSelector would be retrieved from the parameter class
+ @Override
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+
}
@Override
@@ -66,7 +71,9 @@
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java
index 8b5f5bb..9209baf 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/AppendOutputController.java
@@ -17,13 +17,15 @@
*/
package org.apache.streampipes.pe.examples.jvm.outputstrategy;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -31,32 +33,42 @@
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class AppendOutputController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class AppendOutputController implements IStreamPipesDataProcessor {
+
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
- ".append", "Append output example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ AppendOutputController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
+ ".append", "Append output example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.append(EpProperties.integerEp(Labels.from("avg",
- "The average value", ""), "avg", SO.NUMBER)))
-
- .build();
+ "The average value", ""), "avg", SO.NUMBER)))
+ .build());
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java
index fb0eff9..3b4f061 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomOutputController.java
@@ -17,45 +17,53 @@
*/
package org.apache.streampipes.pe.examples.jvm.outputstrategy;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
-public class CustomOutputController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class CustomOutputController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
- ".custom", "Custom output example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CustomOutputController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
+ ".custom", "Custom output example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.custom())
-
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ List<String> outputSelectors = params.extractor().outputKeySelectors();
+ }
- List<String> outputSelectors = extractor.outputKeySelectors();
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java
index c1fe753..7909d94 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/CustomTransformOutputController.java
@@ -18,15 +18,19 @@
package org.apache.streampipes.pe.examples.jvm.outputstrategy;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
@@ -35,42 +39,58 @@
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-import java.util.Arrays;
+import java.util.List;
-public class CustomTransformOutputController extends
- StandaloneEventProcessingDeclarer<DummyParameters> implements
- ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
+public class CustomTransformOutputController implements
+ IStreamPipesDataProcessor, ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
+
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
- ".customtransform", "Custom transform output example example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CustomTransformOutputController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
+ ".customtransform", "Custom transform output example example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.from
- ("str", "The date property as a string", ""), PropertyScope.NONE)
- .build())
+ create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.from
+ ("str", "The date property as a string", ""), PropertyScope.NONE)
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.customTransformation())
-
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement,
+ ProcessingElementParameterExtractor parameterExtractor) throws SpRuntimeException {
+ return new EventSchema(List.of(
+ EpProperties
+ .stringEp(Labels.from(
+ "runtime",
+ "I was added at runtime",
+ ""), "runtime", SO.TEXT))
+ );
}
@Override
- public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, ProcessingElementParameterExtractor parameterExtractor) throws SpRuntimeException {
- return new EventSchema(Arrays
- .asList(EpProperties
- .stringEp(Labels.from("runtime", "I was added at runtime", ""), "runtime", SO.TEXT)));
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java
index 8b3aa4b..df2ef26 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/FixedOutputController.java
@@ -17,13 +17,15 @@
*/
package org.apache.streampipes.pe.examples.jvm.outputstrategy;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -31,31 +33,44 @@
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class FixedOutputController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class FixedOutputController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
- ".fixed", "Fixed output example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ FixedOutputController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
+ ".fixed", "Fixed output example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.fixed(EpProperties.timestampProperty("timestamp"),
- EpProperties.doubleEp(Labels.from("avg", "Average value", ""), "avg", SO.NUMBER)))
-
- .build();
+ EpProperties.doubleEp(Labels.from("avg", "Average value", ""), "avg", SO.NUMBER)))
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onPipelineStarted(IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext) {
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java
index 9506ffc..4064a27 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/KeepOutputController.java
@@ -17,42 +17,57 @@
*/
package org.apache.streampipes.pe.examples.jvm.outputstrategy;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class KeepOutputController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class KeepOutputController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
- ".keep", "Keep output example example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ KeepOutputController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
+ ".keep", "Keep output example example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onPipelineStarted(IDataProcessorParameters params,
+ SpOutputCollector collector,
+ EventProcessorRuntimeContext runtimeContext) {
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java
index 7aff086..75bac00 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/outputstrategy/TransformOutputController.java
@@ -17,14 +17,16 @@
*/
package org.apache.streampipes.pe.examples.jvm.outputstrategy;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
@@ -32,36 +34,46 @@
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.helpers.TransformOperations;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
-public class TransformOutputController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class TransformOutputController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
- ".transform", "Transform output example example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TransformOutputController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.outputstrategy" +
+ ".transform", "Transform output example example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.from
- ("str", "The date property as a string", ""), PropertyScope.NONE)
- .build())
+ create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.from
+ ("str", "The date property as a string", ""), PropertyScope.NONE)
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.transform(TransformOperations
- .staticDatatypeTransformation("str", Datatypes.Long)))
+ .staticDatatypeTransformation("str", Datatypes.Long)))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- List<String> outputSelectors = extractor.outputKeySelectors();
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ List<String> outputSelectors = params.extractor().outputKeySelectors();
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
deleted file mode 100644
index 8708066..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.python;
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.standalone.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesExternalDataProcessor;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class PythonGreeterProcessor extends StreamPipesExternalDataProcessor {
-
- private static final String GREETER_KEY = "greeter-key";
- public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
- .requiredStream(StreamRequirementsBuilder.any())
- // create a simple text parameter
- .requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
- // append greeting to event stream
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(Labels.empty(),"greeting", SO.TEXT)))
- // NOTE: currently one Kafka transport protocol is supported
- .supportedProtocols(SupportedProtocols.kafka())
- .supportedFormats(SupportedFormats.jsonFormat())
- .build();
- }
-
- @Override
- public void onInvocation(ProcessorParams parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
- // extract static properties and add to map to build minimal invocation graph
- Map<String, String> staticPropertyMap = new HashMap<>();
- staticPropertyMap.put("greeting", parameters.extractor().singleValueParameter(GREETER_KEY, String.class));
-
- JsonObject minimalInvocationGraph = createMinimalInvocationGraph(staticPropertyMap);
-
- // send invocation request to python
- invoke(minimalInvocationGraph);
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- // send detach request to python to stop processor with invocationId
- detach();
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/greeter.py b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/greeter.py
deleted file mode 100644
index b76e0eb..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/greeter.py
+++ /dev/null
@@ -1,47 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from streampipes.core import StandaloneSubmitter, EventProcessor
-from streampipes.manager import Declarer
-
-
-class Greeter(EventProcessor):
- greeting = None
-
- def on_invocation(self):
- # extract greeting text from static property
- self.greeting = self.static_properties.get('greeting')
-
- def on_event(self, event):
- event['greeting'] = self.greeting
- return event
-
- def on_detach(self):
- pass
-
-
-def main():
- # dict with processor id and processor class
- processors = {
- 'org.apache.streampipes.examples.python.processor.greeter': Greeter,
- }
-
- Declarer.add(processors=processors)
- StandaloneSubmitter.init()
-
-
-if __name__ == '__main__':
- main()
\ No newline at end of file
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/ListRequirementsController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/ListRequirementsController.java
index 604ac82..9e60c2f 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/ListRequirementsController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/ListRequirementsController.java
@@ -17,44 +17,61 @@
*/
package org.apache.streampipes.pe.examples.jvm.requirements;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.utils.Datatypes;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class ListRequirementsController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class ListRequirementsController implements IStreamPipesDataProcessor {
+
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.requirements" +
- ".list", "List requirements specification examples", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ListRequirementsController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.requirements" +
+ ".list", "List requirements specification examples", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.listRequirement(Datatypes.Integer))
- .requiredProperty(EpRequirements.listRequirement(Datatypes.Double))
- .requiredProperty(EpRequirements.listRequirement(Datatypes.Boolean))
- .requiredProperty(EpRequirements.listRequirement(Datatypes.String))
- .build())
+ create()
+ .requiredProperty(EpRequirements.listRequirement(Datatypes.Integer))
+ .requiredProperty(EpRequirements.listRequirement(Datatypes.Double))
+ .requiredProperty(EpRequirements.listRequirement(Datatypes.Boolean))
+ .requiredProperty(EpRequirements.listRequirement(Datatypes.String))
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- return null;
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedListRequirementsController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedListRequirementsController.java
index 6a32659..5895d05 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedListRequirementsController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedListRequirementsController.java
@@ -17,40 +17,54 @@
*/
package org.apache.streampipes.pe.examples.jvm.requirements;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class NestedListRequirementsController extends StandaloneEventProcessingDeclarer<DummyParameters> {
-
+public class NestedListRequirementsController implements IStreamPipesDataProcessor {
+
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.requirements.nestedlist",
- "Nested list mapping example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ NestedListRequirementsController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.requirements.nestedlist",
+ "Nested list mapping example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredPropertyWithUnaryMapping(EpRequirements.nestedListRequirement(EpRequirements.stringReq()),
- Labels.from("key-value", "Key", ""),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(EpRequirements.nestedListRequirement(EpRequirements.integerReq()),
- Labels.from("count-value", "Count", ""),
- PropertyScope.NONE)
- .build())
+ create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.nestedListRequirement(EpRequirements.stringReq()),
+ Labels.from("key-value", "Key", ""),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(EpRequirements.nestedListRequirement(EpRequirements.integerReq()),
+ Labels.from("count-value", "Count", ""),
+ PropertyScope.NONE)
+ .build())
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- return null;
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedRequirementsController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedRequirementsController.java
deleted file mode 100644
index 16f9a53..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/NestedRequirementsController.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.pe.examples.jvm.requirements;
-
-public class NestedRequirementsController {
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/SimpleStreamRequirementsController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/SimpleStreamRequirementsController.java
index ee62965..13fb0d9 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/SimpleStreamRequirementsController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/requirements/SimpleStreamRequirementsController.java
@@ -17,50 +17,64 @@
*/
package org.apache.streampipes.pe.examples.jvm.requirements;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class SimpleStreamRequirementsController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class SimpleStreamRequirementsController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.requirements" +
- ".simple", "Simple requirements specification examples", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SimpleStreamRequirementsController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.requirements" +
+ ".simple", "Simple requirements specification examples", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.numberReq()) // any number
- .requiredProperty(EpRequirements.doubleReq()) // any field of type double
- .requiredProperty(EpRequirements.booleanReq()) // any field of type boolean
- .requiredProperty(EpRequirements.integerReq()) // any field of type integer
- .requiredProperty(EpRequirements.stringReq()) // any field of type string
- .requiredProperty(EpRequirements.anyProperty()) // any field allowed (no restriction)
- .requiredProperty(EpRequirements.timestampReq()) // any timestamp field
+ create()
+ .requiredProperty(EpRequirements.numberReq()) // any number
+ .requiredProperty(EpRequirements.doubleReq()) // any field of type double
+ .requiredProperty(EpRequirements.booleanReq()) // any field of type boolean
+ .requiredProperty(EpRequirements.integerReq()) // any field of type integer
+ .requiredProperty(EpRequirements.stringReq()) // any field of type string
+ .requiredProperty(EpRequirements.anyProperty()) // any field allowed (no restriction)
+ .requiredProperty(EpRequirements.timestampReq()) // any timestamp field
- .requiredProperty(EpRequirements.domainPropertyReq(SO.LATITUDE))
- .requiredProperty(EpRequirements.domainPropertyReq(SO.LONGITUDE))
- .build())
+ .requiredProperty(EpRequirements.domainPropertyReq(SO.LATITUDE))
+ .requiredProperty(EpRequirements.domainPropertyReq(SO.LONGITUDE))
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
- return null;
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CodeInputExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CodeInputExampleController.java
index a1285ec..2155c11 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CodeInputExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CodeInputExampleController.java
@@ -1,30 +1,37 @@
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
+import org.apache.streampipes.sdk.helpers.CodeLanguage;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.SupportedFormats;
+import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-public class CodeInputExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class CodeInputExampleController implements IStreamPipesDataProcessor {
private static final String CODE_KEY = "code-key";
private static final String CODE_PYTHON_KEY = "code-python-key";
private static final String CODE_EMPTY_KEY = "code-empty-key";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".codeinput", "Code Input Example", "")
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CodeInputExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".codeinput", "Code Input Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.userDefined())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -38,18 +45,26 @@
// no specific language
.requiredCodeblock(Labels.from(CODE_EMPTY_KEY, "Any Code", ""), CodeLanguage.None)
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the code parameter value
- String code = extractor.codeblockValue(CODE_KEY);
-
- // now the text parameter would be added to a parameter class (omitted for this example)
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ String code = params.extractor().codeblockValue(CODE_KEY);
}
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionExampleController.java
index eacd294..a071c08 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionExampleController.java
@@ -17,53 +17,62 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
-public class CollectionExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class CollectionExampleController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".collection", "Collection Example", "")
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ // Extract the text parameter value
+ List<String> textParameters = params.extractor().singleValueParameterFromCollection("collection", String.class);
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+ @Override
+ public IDataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CollectionExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".collection", "Collection Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
// create a collection parameter
.requiredParameterAsCollection(Labels.from("collection", "Example Name", "Example " +
- "Description"), StaticProperties.stringFreeTextProperty(Labels
- .from("text-property","Text","")))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- // Extract the text parameter value
- List<String> textParameters = extractor.singleValueParameterFromCollection("collection", String.class);
-
- // now the text parameter would be added to a parameter class (omitted for this example)
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ "Description"), StaticProperties.stringFreeTextProperty(Labels
+ .from("text-property", "Text", "")))
+ .build()
+ );
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingExample.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingExample.java
index 8efbd6c..41bd4b7 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingExample.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingExample.java
@@ -17,49 +17,61 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.RequirementsSelector;
import java.util.List;
-public class CollectionMappingExample extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class CollectionMappingExample implements IStreamPipesDataProcessor {
private static final String MAPPING_PROPERTY_ID = "mapping-property";
private static final String FIELDS_KEY = "fields";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.collection.mapping",
- "Collection with mapping properties", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CollectionMappingExample::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.collection.mapping",
+ "Collection with mapping properties", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.withMappingPropertyId(MAPPING_PROPERTY_ID, EpRequirements.numberReq()))
- .build())
+ create()
+ .requiredProperty(EpRequirements.withMappingPropertyId(MAPPING_PROPERTY_ID, EpRequirements.numberReq()))
+ .build())
.requiredCollection(Labels.from(FIELDS_KEY, "Field Mappings", ""),
- StaticProperties.mappingPropertyUnary(Labels.from(MAPPING_PROPERTY_ID, "Field", ""),
- RequirementsSelector.FIRST_INPUT_STREAM,
- PropertyScope.NONE))
+ StaticProperties.mappingPropertyUnary(Labels.from(MAPPING_PROPERTY_ID, "Field", ""),
+ RequirementsSelector.FIRST_INPUT_STREAM,
+ PropertyScope.NONE))
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the mapping property value
- List<String> selectedMappings = extractor.getUnaryMappingsFromCollection(FIELDS_KEY);
+ List<String> selectedMappings = params.extractor().getUnaryMappingsFromCollection(FIELDS_KEY);
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingGroupExample.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingGroupExample.java
index 670b035..1492c15 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingGroupExample.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CollectionMappingGroupExample.java
@@ -17,82 +17,100 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.model.staticproperty.*;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.RequirementsSelector;
import java.util.List;
import java.util.stream.Collectors;
-public class CollectionMappingGroupExample extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class CollectionMappingGroupExample implements IStreamPipesDataProcessor {
private static final String MAPPING_PROPERTY_ID = "mapping-property";
private static final String FIELDS_KEY = "fields";
private static final String COMPARATOR_ID = "comparator";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.collection.mapping.group",
- "Collection with mapping properties and additional properties", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CollectionMappingGroupExample::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.collection.mapping.group",
+ "Collection with mapping properties and additional properties", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.withMappingPropertyId(MAPPING_PROPERTY_ID, EpRequirements.numberReq()))
- .build())
+ create()
+ .requiredProperty(EpRequirements.withMappingPropertyId(MAPPING_PROPERTY_ID, EpRequirements.numberReq()))
+ .build())
.requiredCollection(Labels.from(FIELDS_KEY, "Field Mappings", ""),
- StaticProperties.group(Labels.from("group", "Group", ""), false,
- StaticProperties.singleValueSelection(Labels.from(COMPARATOR_ID, "Comparator", ""),
- Options.from("<", "<=", ">", ">=", "==", "*")),
- StaticProperties.mappingPropertyUnary(Labels.from(MAPPING_PROPERTY_ID, "Field", ""),
- RequirementsSelector.FIRST_INPUT_STREAM,
- PropertyScope.NONE),
- StaticProperties.doubleFreeTextProperty(Labels.from("weight", "Weight", ""))))
+ StaticProperties.group(Labels.from("group", "Group", ""), false,
+ StaticProperties.singleValueSelection(Labels.from(COMPARATOR_ID, "Comparator", ""),
+ Options.from("<", "<=", ">", ">=", "==", "*")),
+ StaticProperties.mappingPropertyUnary(Labels.from(MAPPING_PROPERTY_ID, "Field", ""),
+ RequirementsSelector.FIRST_INPUT_STREAM,
+ PropertyScope.NONE),
+ StaticProperties.doubleFreeTextProperty(Labels.from("weight", "Weight", ""))))
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ var extractor = params.extractor();
List<StaticPropertyGroup> groupItems = extractor.collectionMembersAsGroup(FIELDS_KEY);
List<String> fields = groupItems
- .stream()
- .map(group -> (extractor
- .extractGroupMember(MAPPING_PROPERTY_ID, group)
- .as(MappingPropertyUnary.class))
- .getSelectedProperty())
- .collect(Collectors.toList());
+ .stream()
+ .map(group -> (extractor
+ .extractGroupMember(MAPPING_PROPERTY_ID, group)
+ .as(MappingPropertyUnary.class))
+ .getSelectedProperty())
+ .collect(Collectors.toList());
List<Double> weights = groupItems
- .stream()
- .map(group -> (extractor
- .extractGroupMember("weight", group)
- .as(FreeTextStaticProperty.class))
- .getValue())
- .map(Double::parseDouble)
- .collect(Collectors.toList());
+ .stream()
+ .map(group -> (extractor
+ .extractGroupMember("weight", group)
+ .as(FreeTextStaticProperty.class))
+ .getValue())
+ .map(Double::parseDouble)
+ .collect(Collectors.toList());
List<String> comparators = groupItems
+ .stream()
+ .map(group -> (extractor
+ .extractGroupMember(COMPARATOR_ID, group)
+ .as(OneOfStaticProperty.class))
+ .getOptions()
.stream()
- .map(group -> (extractor
- .extractGroupMember(COMPARATOR_ID, group)
- .as(OneOfStaticProperty.class))
- .getOptions()
- .stream()
- .filter(Option::isSelected).findFirst().get().getName())
- .collect(Collectors.toList());
+ .filter(Option::isSelected).findFirst().get().getName())
+ .collect(Collectors.toList());
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ColorPickerExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ColorPickerExampleController.java
index 68c150a..dbf1384 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ColorPickerExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ColorPickerExampleController.java
@@ -17,29 +17,34 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.SupportedFormats;
+import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-public class ColorPickerExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class ColorPickerExampleController implements IStreamPipesDataProcessor {
private static final String COLOR_PICKER_KEY = "color-key";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".colorpicker", "Color Picker Example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ColorPickerExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".colorpicker", "Color Picker Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.userDefined())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -47,17 +52,24 @@
// create a required code block
.requiredColorParameter(Labels.from(COLOR_PICKER_KEY, "Color", "Select color"))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the code parameter value
- String color = extractor.selectedColor(COLOR_PICKER_KEY);
+ String color = params.extractor().selectedColor(COLOR_PICKER_KEY);
- // now the text parameter would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java
index 49a0db0..dd43cf9 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java
@@ -17,24 +17,24 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.commons.exceptions.SpConfigurationException;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.standalone.ProcessorParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
@@ -43,7 +43,7 @@
import java.util.Arrays;
import java.util.List;
-public class CompactRuntimeResolvableSingleValueProcessor extends StreamPipesDataProcessor implements ResolvesContainerProvidedOptions {
+public class CompactRuntimeResolvableSingleValueProcessor implements IStreamPipesDataProcessor, ResolvesContainerProvidedOptions {
private static final String KafkaHost = "kafka-host";
private static final String KafkaPort = "kafka-port";
@@ -51,13 +51,15 @@
private static final Logger LOG = LoggerFactory.getLogger(CompactRuntimeResolvableSingleValueProcessor.class);
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".compactruntimeresolvable", "Compact Runtime-resolvable single value example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ CompactRuntimeResolvableSingleValueProcessor::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".compactruntimeresolvable", "Compact Runtime-resolvable single value example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -66,31 +68,33 @@
// create a single value selection parameter that is resolved at runtime
.requiredSingleValueSelectionFromContainer(Labels.from("id", "Example Name", "Example " +
- "Description"), Arrays.asList(KafkaHost, KafkaPort))
+ "Description"), Arrays.asList(KafkaHost, KafkaPort))
- .build();
+ .build()
+ );
}
@Override
- public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
- String selectedSingleValue = parameters.extractor().selectedSingleValue("id", String.class);
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ String selectedSingleValue = params.extractor().selectedSingleValue("id", String.class);
LOG.info(selectedSingleValue);
+
}
@Override
- public void onEvent(Event event,
- SpOutputCollector collector) throws SpRuntimeException {
+ public void onEvent(Event event, SpOutputCollector collector) {
MapUtils.debugPrint(System.out, "event", event.getRaw());
collector.collect(event);
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
@Override
- public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+ public List<Option> resolveOptions(String staticPropertyInternalName,
+ IStaticPropertyExtractor parameterExtractor) throws SpConfigurationException {
return Options.from("A", "B");
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ExampleDataProcessor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ExampleDataProcessor.java
new file mode 100644
index 0000000..73e4b5a
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/ExampleDataProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.pe.examples.jvm.staticproperty;
+
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+
+
+public class ExampleDataProcessor implements IStreamPipesDataProcessor {
+ @Override
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ ExampleDataProcessor::new,
+ ProcessingElementBuilder.create("my-example-processor", "Number Parameter With Range", "")
+ .requiredStream(StreamRequirementsBuilder.
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .requiredIntegerParameter(Labels.from("key", "Integer Parameter", ""), 0, 100, 1)
+ .build()
+ );
+ }
+
+ @Override
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ // called when pipeline is started
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/MultiValueSelectionExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/MultiValueSelectionExampleController.java
index dfbb362..46752fa 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/MultiValueSelectionExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/MultiValueSelectionExampleController.java
@@ -17,53 +17,61 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
-public class MultiValueSelectionExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class MultiValueSelectionExampleController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".multivalue", "Multi value selection example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ MultiValueSelectionExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".multivalue", "Multi value selection example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
// create a simple text parameter
.requiredMultiValueSelection(Labels.from("id", "Example Name", "Example " +
- "Description"), Options.from("Value A", "Value B", "Value C"))
+ "Description"), Options.from("Value A", "Value B", "Value C"))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the text parameter value
- List<String> selectedSingleValue = extractor.selectedMultiValues("id", String.class);
+ List<String> selectedSingleValue = params.extractor().selectedMultiValues("id", String.class);
- // now the text parameter would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NaryMappingPropertyExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NaryMappingPropertyExampleController.java
index 14354ee..0e768d4 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NaryMappingPropertyExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NaryMappingPropertyExampleController.java
@@ -17,50 +17,59 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
-public class NaryMappingPropertyExampleController extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class NaryMappingPropertyExampleController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".mappingnary", "Nary Mapping Property Example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ NaryMappingPropertyExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".mappingnary", "Nary Mapping Property Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredPropertyWithNaryMapping(EpRequirements.numberReq(),
- Labels.from("mp-key", "My Mapping", ""),
- PropertyScope.NONE)
- .build())
+ create()
+ .requiredPropertyWithNaryMapping(EpRequirements.numberReq(),
+ Labels.from("mp-key", "My Mapping", ""),
+ PropertyScope.NONE)
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the mapping property value
- List<String> mappingPropertySelectors = extractor.mappingPropertyValues("mp-key");
+ List<String> mappingPropertySelectors = params.extractor().mappingPropertyValues("mp-key");
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterExampleController.java
index 595fdae..84db1e6 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterExampleController.java
@@ -17,33 +17,34 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class NumberParameterExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class NumberParameterExampleController implements IStreamPipesDataProcessor {
private static final String SP_KEY = "my-example-key";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".numberparameter", "Number Parameter Example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ NumberParameterExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".numberparameter", "Number Parameter Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -55,20 +56,27 @@
.requiredFloatParameter(Labels.from("float-key", "Float Parameter", "Example Description"))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the integer parameter value
- Integer integerParameter = extractor.singleValueParameter(SP_KEY, Integer.class);
+ Integer integerParameter = params.extractor().singleValueParameter(SP_KEY, Integer.class);
// Extract the float parameter value
- Float floatParameter = extractor.singleValueParameter("float-key", Float.class);
+ Float floatParameter = params.extractor().singleValueParameter("float-key", Float.class);
- // now the parameters would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterWithRangeExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterWithRangeExampleController.java
index d1b57e2..7a2519d 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterWithRangeExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/NumberParameterWithRangeExampleController.java
@@ -17,53 +17,60 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class NumberParameterWithRangeExampleController extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class NumberParameterWithRangeExampleController implements IStreamPipesDataProcessor {
private static final String SP_KEY = "my-example-key";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".numberparameterrange", "Number Parameter With Range", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ NumberParameterWithRangeExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".numberparameterrange", "Number Parameter With Range", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
// create an integer parameter
.requiredIntegerParameter(Labels.from(SP_KEY, "Integer Parameter", "Example " +
- "Description"), 0, 100, 1)
+ "Description"), 0, 100, 1)
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the integer parameter value
- Integer integerParameter = extractor.singleValueParameter(SP_KEY, Integer.class);
+ Integer integerParameter = params.extractor().singleValueParameter(SP_KEY, Integer.class);
- // now the parameter would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableAnyStaticPropertyController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableAnyStaticPropertyController.java
index caadbfb..9bb5bdd 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableAnyStaticPropertyController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableAnyStaticPropertyController.java
@@ -17,23 +17,22 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -43,20 +42,21 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class RuntimeResolvableAnyStaticPropertyController extends
- StandaloneEventProcessingDeclarer<DummyParameters> implements ResolvesContainerProvidedOptions {
+public class RuntimeResolvableAnyStaticPropertyController implements IStreamPipesDataProcessor, ResolvesContainerProvidedOptions {
private static final String KafkaHost = "kafka-host";
private static final String KafkaPort = "kafka-port";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".runtimeresolvablemulti", "Runtime-resolvable multi value example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ RuntimeResolvableAnyStaticPropertyController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".runtimeresolvablemulti", "Runtime-resolvable multi value example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -65,24 +65,33 @@
// create a single value selection parameter that is resolved at runtime
.requiredMultiValueSelectionFromContainer(Labels.from("id", "Example Name", "Example " +
- "Description"), Arrays.asList(KafkaHost, KafkaPort))
+ "Description"), Arrays.asList(KafkaHost, KafkaPort))
- .build();
+ .build()
+
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the text parameter value
- List<String> selectedSingleValue = extractor.selectedMultiValues("id", String.class);
+ List<String> selectedSingleValue = params.extractor().selectedMultiValues("id", String.class);
- // now the text parameter would be added to a parameter class (omitted for this example)
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
}
@Override
- public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor) {
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+
+ @Override
+ public List<Option> resolveOptions(String requestId, IStaticPropertyExtractor extractor) {
String host = extractor.singleValueParameter(KafkaHost, String.class);
Integer port = extractor.singleValueParameter(KafkaPort, Integer.class);
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
index 5d57c73..a1e4158 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/RuntimeResolvableSingleValue.java
@@ -17,23 +17,22 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOptions;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.Option;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -43,20 +42,21 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class RuntimeResolvableSingleValue extends
- StandaloneEventProcessingDeclarer<DummyParameters> implements ResolvesContainerProvidedOptions {
+public class RuntimeResolvableSingleValue implements IStreamPipesDataProcessor, ResolvesContainerProvidedOptions {
private static final String KafkaHost = "kafka-host";
private static final String KafkaPort = "kafka-port";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".runtimeresolvablesingle", "Runtime-resolvable single value example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ RuntimeResolvableSingleValue::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".runtimeresolvablesingle", "Runtime-resolvable single value example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -65,24 +65,32 @@
// create a single value selection parameter that is resolved at runtime
.requiredSingleValueSelectionFromContainer(Labels.from("id", "Example Name", "Example " +
- "Description"), Arrays.asList(KafkaHost, KafkaPort))
+ "Description"), Arrays.asList(KafkaHost, KafkaPort))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the text parameter value
- String selectedSingleValue = extractor.selectedSingleValue("id", String.class);
+ String selectedSingleValue = params.extractor().selectedSingleValue("id", String.class);
- // now the text parameter would be added to a parameter class (omitted for this example)
-
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
}
@Override
- public List<Option> resolveOptions(String requestId, StaticPropertyExtractor extractor) {
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
+ }
+
+ @Override
+ public List<Option> resolveOptions(String requestId, IStaticPropertyExtractor extractor) {
String host = extractor.singleValueParameter(KafkaHost, String.class);
Integer port = extractor.singleValueParameter(KafkaPort, Integer.class);
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SecretStaticPropertyExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SecretStaticPropertyExampleController.java
index 3435c6e..a66a553 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SecretStaticPropertyExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SecretStaticPropertyExampleController.java
@@ -17,33 +17,34 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class SecretStaticPropertyExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class SecretStaticPropertyExampleController implements IStreamPipesDataProcessor {
private static final String SP_KEY = "my-secret-key";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".secret", "Secret Parameter Example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SecretStaticPropertyExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".secret", "Secret Parameter Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -51,18 +52,25 @@
// create a simple text parameter
.requiredSecret(Labels.from(SP_KEY, "Secret Password", "Secret Password Example"))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the text parameter value
- String textParameter = extractor.secretValue(SP_KEY);
+ String textParameter = params.extractor().secretValue(SP_KEY);
- // now the text parameter would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SingleValueSelectionExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SingleValueSelectionExampleController.java
index e461a79..beaae0f 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SingleValueSelectionExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/SingleValueSelectionExampleController.java
@@ -17,52 +17,59 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class SingleValueSelectionExampleController extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class SingleValueSelectionExampleController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".singlevalue", "Single value selection example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ SingleValueSelectionExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".singlevalue", "Single value selection example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
// create a single value selection parameter
.requiredSingleValueSelection(Labels.from("id", "Example Name", "Example " +
- "Description"), Options.from("Option A", "Option B", "Option C"))
+ "Description"), Options.from("Option A", "Option B", "Option C"))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the text parameter value
- String selectedSingleValue = extractor.selectedSingleValue("id", String.class);
+ String selectedSingleValue = params.extractor().selectedSingleValue("id", String.class);
- // now the text parameter would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java
index 9a51b6e..4cfdc47 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/StaticPropertyAlternativesController.java
@@ -17,14 +17,15 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -32,51 +33,60 @@
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class StaticPropertyAlternativesController extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class StaticPropertyAlternativesController implements IStreamPipesDataProcessor {
private static final String KafkaHost = "kafka-host";
private static final String KafkaPort = "kafka-port";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".alternatives", "Static property alternatives example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ StaticPropertyAlternativesController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".alternatives", "Static property alternatives example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
.requiredTextParameter(Labels.from(KafkaHost, "Kafka Host", ""))
.requiredAlternatives(Labels.from("window", "Window", ""),
- Alternatives.from(Labels.from("count", "Count Window", ""),
- StaticProperties.integerFreeTextProperty(Labels.from("count-window-size",
- "Count Window Size", ""))),
- Alternatives.from(Labels.from("time", "Time Window", ""),
- StaticProperties.group(Labels.from("group", "", ""),
- StaticProperties.integerFreeTextProperty(Labels.from("time" +
- "-window-size", "Time Window Size", "")),
- StaticProperties.singleValueSelection(Labels.from("time" +
- "-window-unit", "Time Unit", ""),
- Options.from("Seconds", "Minutes", "Hours")))))
- .build();
+ Alternatives.from(Labels.from("count", "Count Window", ""),
+ StaticProperties.integerFreeTextProperty(Labels.from("count-window-size",
+ "Count Window Size", ""))),
+ Alternatives.from(Labels.from("time", "Time Window", ""),
+ StaticProperties.group(Labels.from("group", "", ""),
+ StaticProperties.integerFreeTextProperty(Labels.from("time" +
+ "-window-size", "Time Window Size", "")),
+ StaticProperties.singleValueSelection(Labels.from("time" +
+ "-window-unit", "Time Unit", ""),
+ Options.from("Seconds", "Minutes", "Hours")))))
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ var extractor = params.extractor();
String selectedAlternative = extractor.selectedAlternativeInternalId("window");
if (selectedAlternative.equals("time")) {
Integer timeWindowSize = extractor.singleValueParameter("time-window-size", Integer.class);
}
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TextParameterExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TextParameterExampleController.java
index d610806..f8789ab 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TextParameterExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TextParameterExampleController.java
@@ -17,33 +17,34 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class TextParameterExampleController extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class TextParameterExampleController implements IStreamPipesDataProcessor {
private static final String SP_KEY = "my-example-key";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".textparameter", "Text Parameter Example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TextParameterExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".textparameter", "Text Parameter Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
@@ -51,17 +52,24 @@
// create a simple text parameter
.requiredTextParameter(Labels.from(SP_KEY, "Example Name", "Example Description"))
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the text parameter value
- String textParameter = extractor.singleValueParameter(SP_KEY, String.class);
+ String textParameter = params.extractor().singleValueParameter(SP_KEY, String.class);
- // now the text parameter would be added to a parameter class (omitted for this example)
+ }
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TreeInputSink.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TreeInputSink.java
index 409c5ff..300454c 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TreeInputSink.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TreeInputSink.java
@@ -1,73 +1,77 @@
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
+import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.extensions.api.runtime.SupportsRuntimeConfig;
-import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.TreeInputNode;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.standalone.SinkParams;
-import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-public class TreeInputSink extends StreamPipesDataSink implements SupportsRuntimeConfig {
+public class TreeInputSink implements IStreamPipesDataSink, SupportsRuntimeConfig {
private static final String SP_KEY = "example-key";
private static final String TREE_KEY = "tree-key";
@Override
- public DataSinkDescription declareModel() {
- return DataSinkBuilder.create("org.apache.streampipes.examples.treeinput", "Tree Input Example", "")
+ public DataSinkConfiguration declareConfig() {
+ return DataSinkConfiguration.create(
+ TreeInputSink::new,
+ DataSinkBuilder.create("org.apache.streampipes.examples.treeinput", "Tree Input Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
// create a simple text parameter
.requiredTextParameter(Labels.from(SP_KEY, "Example Key", "required by tree input"))
.requiredRuntimeResolvableTreeInput(
- Labels.from(TREE_KEY, "Tree", "The tree input"),
- Collections.singletonList(SP_KEY))
+ Labels.from(TREE_KEY, "Tree", "The tree input"),
+ Collections.singletonList(SP_KEY),
+ true)
- .build();
+ .build()
+ );
}
+
@Override
- public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ public void onPipelineStarted(IDataSinkParameters params, EventSinkRuntimeContext runtimeContext) {
}
@Override
- public void onEvent(Event event) throws SpRuntimeException {
+ public void onEvent(Event event) {
}
@Override
- public void onDetach() throws SpRuntimeException {
+ public void onPipelineStopped() {
}
@Override
public StaticProperty resolveConfiguration(String staticPropertyInternalName,
- StaticPropertyExtractor extractor) {
+ IStaticPropertyExtractor extractor) {
RuntimeResolvableTreeInputStaticProperty treeInput = extractor
- .getStaticPropertyByName(
- staticPropertyInternalName,
- RuntimeResolvableTreeInputStaticProperty.class);
+ .getStaticPropertyByName(
+ staticPropertyInternalName,
+ RuntimeResolvableTreeInputStaticProperty.class);
List<TreeInputNode> nodes = buildSampleNodes();
treeInput.setNodes(nodes);
@@ -89,18 +93,18 @@
private List<TreeInputNode> buildFruitNodes() {
return Arrays.asList(
- buildNode("Apple"),
- buildNode("Banana"),
- buildNode("Orange"),
- buildExpandableNode("Sour", Arrays.asList(buildNode("Lemon"), buildNode("Guava"), buildNode("Cranberry")))
+ buildNode("Apple"),
+ buildNode("Banana"),
+ buildNode("Orange"),
+ buildExpandableNode("Sour", Arrays.asList(buildNode("Lemon"), buildNode("Guava"), buildNode("Cranberry")))
);
}
private List<TreeInputNode> buildVegetableNodes() {
return Arrays.asList(
- buildNode("Tomato"),
- buildNode("Pepper"),
- buildNode("Carrot")
+ buildNode("Tomato"),
+ buildNode("Pepper"),
+ buildNode("Carrot")
);
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TwoStreamsMappingExample.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TwoStreamsMappingExample.java
index ec56c57..d1f924d 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TwoStreamsMappingExample.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/TwoStreamsMappingExample.java
@@ -17,25 +17,24 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
-public class TwoStreamsMappingExample extends StandaloneEventProcessingDeclarer<DummyParameters> {
+public class TwoStreamsMappingExample implements IStreamPipesDataProcessor {
private static final String KEY_STREAM_1 = "stream-1-key";
private static final String PROPERTIES_STREAM_1 = "stream-1-properties";
@@ -44,43 +43,54 @@
private static final String ID = "org.apache.streampipes.examples.staticproperty.twostreamsmapping";
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(ID, "Two Streams", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ TwoStreamsMappingExample::new,
+ ProcessingElementBuilder.create(ID, "Two Streams", "")
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.category(DataProcessorType.ALGORITHM)
.requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithNaryMapping(
- EpRequirements.numberReq(),
- Labels.from(PROPERTIES_STREAM_1, "S1 Properties", ""),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.from(KEY_STREAM_1, "S1 Key", ""), PropertyScope.NONE)
- .build())
+ .create()
+ .requiredPropertyWithNaryMapping(
+ EpRequirements.numberReq(),
+ Labels.from(PROPERTIES_STREAM_1, "S1 Properties", ""),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.from(KEY_STREAM_1, "S1 Key", ""), PropertyScope.NONE)
+ .build())
.requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithNaryMapping(EpRequirements.numberReq(),
- Labels.from(PROPERTIES_STREAM_2, "S2 Properties", ""),
- PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.from(KEY_STREAM_2, "S2 Key", ""),
- PropertyScope.NONE)
- .build())
+ .create()
+ .requiredPropertyWithNaryMapping(EpRequirements.numberReq(),
+ Labels.from(PROPERTIES_STREAM_2, "S2 Properties", ""),
+ PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.from(KEY_STREAM_2, "S2 Key", ""),
+ PropertyScope.NONE)
+ .build())
.outputStrategy(OutputStrategies.keep())
- .build();
+ .build()
+ );
+ }
+
+ @Override
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
+ var extractor = params.extractor();
+ List<String> selProps1 = extractor.mappingPropertyValues(PROPERTIES_STREAM_1);
+ List<String> selProps = extractor.mappingPropertyValues(PROPERTIES_STREAM_2);
+
+ String selKey1 = extractor.mappingPropertyValue(KEY_STREAM_1);
+ String selKey2 = extractor.mappingPropertyValue(KEY_STREAM_2);
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public void onEvent(Event event, SpOutputCollector collector) {
- List<String> selProps1 = extractor.mappingPropertyValues(PROPERTIES_STREAM_1);
- List<String> selProps = extractor.mappingPropertyValues(PROPERTIES_STREAM_2);
+ }
- String selKey1 = extractor.mappingPropertyValue(KEY_STREAM_1);
- String selKey2 = extractor.mappingPropertyValue(KEY_STREAM_2);
+ @Override
+ public void onPipelineStopped() {
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/UnaryMappingPropertyExampleController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/UnaryMappingPropertyExampleController.java
index 0ddeec6..b5149c7 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/UnaryMappingPropertyExampleController.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/UnaryMappingPropertyExampleController.java
@@ -17,48 +17,57 @@
*/
package org.apache.streampipes.pe.examples.jvm.staticproperty;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
+import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.pe.examples.jvm.base.DummyEngine;
-import org.apache.streampipes.pe.examples.jvm.base.DummyParameters;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-public class UnaryMappingPropertyExampleController extends
- StandaloneEventProcessingDeclarer<DummyParameters> {
+public class UnaryMappingPropertyExampleController implements IStreamPipesDataProcessor {
@Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
- ".mappingunary", "Unary Mapping Property Example", "")
+ public DataProcessorConfiguration declareConfig() {
+ return DataProcessorConfiguration.create(
+ UnaryMappingPropertyExampleController::new,
+ ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".mappingunary", "Unary Mapping Property Example", "")
.requiredStream(StreamRequirementsBuilder.
- create()
- .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.from("mp-key", "My Mapping", ""),
- PropertyScope.NONE)
- .build())
+ create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
+ Labels.from("mp-key", "My Mapping", ""),
+ PropertyScope.NONE)
+ .build())
.outputStrategy(OutputStrategies.keep())
.supportedProtocols(SupportedProtocols.kafka())
.supportedFormats(SupportedFormats.jsonFormat())
- .build();
+ .build()
+ );
}
@Override
- public ConfiguredEventProcessor<DummyParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
+ public void onPipelineStarted(IDataProcessorParameters params, SpOutputCollector collector, EventProcessorRuntimeContext runtimeContext) {
// Extract the mapping property value
- String mappingPropertySelector = extractor.mappingPropertyValue("mp-key");
+ String mappingPropertySelector = params.extractor().mappingPropertyValue("mp-key");
- return new ConfiguredEventProcessor<>(new DummyParameters(graph), DummyEngine::new);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+
+ }
+
+ @Override
+ public void onPipelineStopped() {
+
}
}