Merge branch 'dev' of https://github.com/apache/incubator-streampipes-examples into dev
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
deleted file mode 100644
index 5c29939..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public abstract class GraphParameterExtractor {
-
-    private static String getInputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getInputStreams()
-                .get(0)
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private static String getOutputTopic(EventProcessorBindingParams parameters) {
-        return parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol()
-                .getTopicDefinition()
-                .getActualTopicName();
-    }
-
-    private static String getKafkaUrl(EventProcessorBindingParams parameters) {
-        String brokerHostname = parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0)
-                .getBrokerHostname();
-
-        Integer kafkaPort = ((KafkaTransportProtocol) parameters
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocols()
-                .get(0))
-                .getKafkaPort();
-
-        return brokerHostname + ":" + kafkaPort.toString();
-    }
-
-    public static JsonObject toJson(String processorID, String invocationID, EventProcessorBindingParams parameters) {
-        JsonObject json = new JsonObject();
-        json.addProperty("invocation_id", invocationID);
-        json.addProperty("processor_id", processorID);
-        json.addProperty("input_topics", GraphParameterExtractor.getInputTopic(parameters));
-        json.addProperty("output_topics", GraphParameterExtractor.getOutputTopic(parameters));
-        json.addProperty("bootstrap_servers", GraphParameterExtractor.getKafkaUrl(parameters));
-
-        return json;
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
deleted file mode 100644
index ab1dfac..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class GreeterParameters extends EventProcessorBindingParams {
-    private final String greeting;
-
-    public GreeterParameters(DataProcessorInvocation graph, String greeting) {
-        super(graph);
-        this.greeting = greeting;
-    }
-
-    public String getGreeting() {
-        return greeting;
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
deleted file mode 100644
index 53adfa5..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
-
-import java.util.UUID;
-
-import static org.apache.streampipes.pe.examples.jvm.python.Route.post;
-
-public class GreeterPython implements ExternalEventProcessor<GreeterParameters> {
-    private String invocationId;
-
-    @Override
-    public void onInvocation(GreeterParameters parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
-        // use invocationId to keep track of started instances of
-        this.invocationId = UUID.randomUUID().toString();
-
-        // construct JSON request from Java -> Python
-        JsonObject json = GraphParameterExtractor.toJson(
-                GreeterPythonController.PROCESSOR_ID,
-                this.invocationId,
-                parameters);
-
-        JsonObject staticProperties = new JsonObject();
-        staticProperties.addProperty("greeting", parameters.getGreeting());
-
-        json.add("static_properties", staticProperties);
-
-        // send invocation request to python
-        post(Route.INVOCATION, json.toString());
-    }
-
-    @Override
-    public void onDetach() throws SpRuntimeException {
-        JsonObject json = new JsonObject();
-        json.addProperty("invocation_id", this.invocationId);
-
-        // send detach request to python to stop processor with invocationId
-        post(Route.DETACH, this.invocationId);
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
deleted file mode 100644
index b6138f7..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
-
-public class GreeterPythonController extends StandaloneExternalEventProcessingDeclarer<GreeterParameters> {
-
-    private static final String GREETER_KEY = "greeter-key";
-    public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
-
-    @Override
-    public DataProcessorDescription declareModel() {
-        return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
-                .requiredStream(StreamRequirementsBuilder.
-                        create()
-                        .requiredProperty(EpRequirements.anyProperty())
-                        .build())
-
-                // create a simple text parameter
-                .requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
-
-                // Append greeting to event stream
-                .outputStrategy(OutputStrategies.append(
-                        EpProperties.stringEp(Labels.empty(),"greeting", SO.Text)))
-
-                // NOTE: currently one Kafka transport protocol is supported
-                .supportedProtocols(SupportedProtocols.kafka())
-                .supportedFormats(SupportedFormats.jsonFormat())
-                .build();
-    }
-
-    @Override
-    public ConfiguredExternalEventProcessor<GreeterParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
-        // Extract the greeting value
-        String greeting = extractor.singleValueParameter(GREETER_KEY, String.class);
-
-        // now the text parameter would be added to a parameter class (omitted for this example)
-        return new ConfiguredExternalEventProcessor<>(new GreeterParameters(graph, greeting), GreeterPython::new);
-    }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
new file mode 100644
index 0000000..23c3be5
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.examples.jvm.python;
+
+import com.google.gson.JsonObject;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesExternalDataProcessor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PythonGreeterProcessor extends StreamPipesExternalDataProcessor {
+
+    private static final String GREETER_KEY = "greeter-key";
+    public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
+                .requiredStream(StreamRequirementsBuilder.any())
+                // create a simple text parameter
+                .requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
+                // append greeting to event stream
+                .outputStrategy(OutputStrategies.append(
+                        EpProperties.stringEp(Labels.empty(),"greeting", SO.Text)))
+                // NOTE: currently one Kafka transport protocol is supported
+                .supportedProtocols(SupportedProtocols.kafka())
+                .supportedFormats(SupportedFormats.jsonFormat())
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+        // extract static properties and add to map to build minimal invocation graph
+        Map<String, String> staticPropertyMap = new HashMap<>();
+        staticPropertyMap.put("greeting", parameters.extractor().singleValueParameter(GREETER_KEY, String.class));
+
+        JsonObject minimalInvocationGraph = createMinimalInvocationGraph(staticPropertyMap);
+
+        // send invocation request to python
+        invoke(minimalInvocationGraph);
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+        // send detach request to python to stop processor with invocationId
+        detach();
+    }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
deleted file mode 100644
index 3613c62..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import java.io.IOException;
-
-public abstract class Route {
-
-    public static final String INVOCATION = "invoke";
-    public static final String DETACH = "detach";
-
-    // endpoint of Python processor runs here
-    public static final String PYTHON_ENDPOINT = "localhost:5000";
-
-    public static String post(String endpoint, String payload) {
-        String responseString = null;
-
-        try {
-            responseString = Request.Post(PYTHON_ENDPOINT + "/" + endpoint)
-                    .bodyString(payload, ContentType.APPLICATION_JSON)
-                    .connectTimeout(1000)
-                    .socketTimeout(100000)
-                    .execute().returnContent().asString();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return responseString;
-    }
-}