Add example for runtime-resolvable options and compact processor definition
diff --git a/README.md b/README.md
index 1ee028e..3afdc96 100644
--- a/README.md
+++ b/README.md
@@ -18,10 +18,10 @@
# StreamPipes
-Apache StreamPipes (incubating) A self-service (Industrial) IoT toolbox to enable non-technical users to connect
+Apache StreamPipes (incubating) A self-service (Industrial) IoT toolbox to enable non-technical userAccounts to connect
, analyze and explore IoT data streams.
-It leverages non-technical users to quickly define and execute processing pipelines based on an easily extensible
+It leverages non-technical userAccounts to quickly define and execute processing pipelines based on an easily extensible
toolbox of data sources, data processors and data sinks.
Learn more about StreamPipes at [https://streampipes.apache.org/](https://streampipes.apache.org/)
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java
index dccd087..46e1bcc 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/Example.java
@@ -17,35 +17,36 @@
*/
package org.apache.streampipes.client.example;
-import org.apache.commons.collections.MapUtils;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.StreamPipesCredentials;
-import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.client.credentials.CredentialsProvider;
import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import java.util.List;
public class Example {
public static void main(String[] args) {
- StreamPipesCredentials credentials = StreamPipesCredentials
- .from(System.getenv("user"), System.getenv("apiKey"));
+// CredentialsProvider credentials = StreamPipesCredentials
+// .withApiKey(System.getenv("user"), System.getenv("apiKey"));
+
+ CredentialsProvider credentials = StreamPipesCredentials.withServiceToken("sp-service-client", "my-apache-streampipes-secret-key-change-me");
// Create an instance of the StreamPipes client
StreamPipesClient client = StreamPipesClient
- .create("localhost", 80, credentials, true);
+ .create("localhost", 8082, credentials, true);
// Get all pipelines
List<Pipeline> pipelines = client.pipelines().all();
+ System.out.println(pipelines.size());
- // Start a pipeline
- PipelineOperationStatus message = client.pipelines().start(pipelines.get(0));
-
- // Get all data streams
- List<SpDataStream> dataStreams = client.streams().all();
-
- // Subscribe to a data stream
- client.streams().subscribe(dataStreams.get(0), event -> MapUtils.debugPrint(System.out, "event", event.getRaw()));
+// // Start a pipeline
+// PipelineOperationStatus message = client.pipelines().start(pipelines.get(0));
+//
+// // Get all data streams
+// List<SpDataStream> dataStreams = client.streams().all();
+//
+// // Subscribe to a data stream
+// client.streams().subscribe(dataStreams.get(0), event -> MapUtils.debugPrint(System.out, "event", event.getRaw()));
}
}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java
index 2e4d9a1..6fad4d8 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/client/example/StreamPipesClientExample.java
@@ -20,6 +20,7 @@
import org.apache.commons.collections.MapUtils;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.client.credentials.CredentialsProvider;
import org.apache.streampipes.client.live.KafkaConfig;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.graph.DataSinkInvocation;
@@ -37,8 +38,8 @@
// First, go to the StreamPipes UI and create an API key (user -> profile in the upper right corner)
// Create credentials by providing a user (the email) and the API key
- StreamPipesCredentials credentials = StreamPipesCredentials
- .from(System.getenv("user"), System.getenv("apiKey"));
+ CredentialsProvider credentials = StreamPipesCredentials
+ .withApiKey(System.getenv("user"), System.getenv("apiKey"));
// Create an instance of the StreamPipes client
StreamPipesClient client = StreamPipesClient
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java
index 634de79..f3d8d75 100644
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/ExamplesInit.java
@@ -59,6 +59,7 @@
.registerPipelineElement(new CollectionMappingExample())
.registerPipelineElement(new NestedListRequirementsController())
.registerPipelineElement(new TwoStreamsMappingExample())
+ .registerPipelineElement(new CompactRuntimeResolvableSingleValueProcessor())
.registerPipelineElement(new AppendOutputController())
.registerPipelineElement(new CustomOutputController())
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java
new file mode 100644
index 0000000..783a5d6
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/staticproperty/CompactRuntimeResolvableSingleValueProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.examples.jvm.staticproperty;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CompactRuntimeResolvableSingleValueProcessor extends StreamPipesDataProcessor implements ResolvesContainerProvidedOptions {
+
+ private static final String KafkaHost = "kafka-host";
+ private static final String KafkaPort = "kafka-port";
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactRuntimeResolvableSingleValueProcessor.class);
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.examples.staticproperty" +
+ ".compactruntimeresolvable", "Compact Runtime-resolvable single value example", "")
+ .requiredStream(StreamRequirementsBuilder.
+ create()
+ .requiredProperty(EpRequirements.anyProperty())
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .supportedProtocols(SupportedProtocols.kafka())
+ .supportedFormats(SupportedFormats.jsonFormat())
+ .requiredTextParameter(Labels.from(KafkaHost, "Kafka Host", ""))
+ .requiredIntegerParameter(Labels.from(KafkaPort, "Kafka Port", ""))
+
+ // create a single value selection parameter that is resolved at runtime
+ .requiredSingleValueSelectionFromContainer(Labels.from("id", "Example Name", "Example " +
+ "Description"), Arrays.asList(KafkaHost, KafkaPort))
+
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ String selectedSingleValue = parameters.extractor().selectedSingleValue("id", String.class);
+ LOG.info(selectedSingleValue);
+ }
+
+ @Override
+ public void onEvent(Event event,
+ SpOutputCollector collector) throws SpRuntimeException {
+ MapUtils.debugPrint(System.out, "event", event.getRaw());
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+
+ @Override
+ public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+ return Options.from("A", "B");
+ }
+}