Merge branch 'dev' of https://github.com/apache/incubator-streampipes-examples into dev
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
deleted file mode 100644
index 5c29939..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GraphParameterExtractor.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public abstract class GraphParameterExtractor {
-
- private static String getInputTopic(EventProcessorBindingParams parameters) {
- return parameters
- .getGraph()
- .getInputStreams()
- .get(0)
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private static String getOutputTopic(EventProcessorBindingParams parameters) {
- return parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol()
- .getTopicDefinition()
- .getActualTopicName();
- }
-
- private static String getKafkaUrl(EventProcessorBindingParams parameters) {
- String brokerHostname = parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocols()
- .get(0)
- .getBrokerHostname();
-
- Integer kafkaPort = ((KafkaTransportProtocol) parameters
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocols()
- .get(0))
- .getKafkaPort();
-
- return brokerHostname + ":" + kafkaPort.toString();
- }
-
- public static JsonObject toJson(String processorID, String invocationID, EventProcessorBindingParams parameters) {
- JsonObject json = new JsonObject();
- json.addProperty("invocation_id", invocationID);
- json.addProperty("processor_id", processorID);
- json.addProperty("input_topics", GraphParameterExtractor.getInputTopic(parameters));
- json.addProperty("output_topics", GraphParameterExtractor.getOutputTopic(parameters));
- json.addProperty("bootstrap_servers", GraphParameterExtractor.getKafkaUrl(parameters));
-
- return json;
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
deleted file mode 100644
index ab1dfac..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterParameters.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class GreeterParameters extends EventProcessorBindingParams {
- private final String greeting;
-
- public GreeterParameters(DataProcessorInvocation graph, String greeting) {
- super(graph);
- this.greeting = greeting;
- }
-
- public String getGreeting() {
- return greeting;
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
deleted file mode 100644
index 53adfa5..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPython.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import com.google.gson.JsonObject;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.ExternalEventProcessor;
-
-import java.util.UUID;
-
-import static org.apache.streampipes.pe.examples.jvm.python.Route.post;
-
-public class GreeterPython implements ExternalEventProcessor<GreeterParameters> {
- private String invocationId;
-
- @Override
- public void onInvocation(GreeterParameters parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-
- // use invocationId to keep track of started instances of
- this.invocationId = UUID.randomUUID().toString();
-
- // construct JSON request from Java -> Python
- JsonObject json = GraphParameterExtractor.toJson(
- GreeterPythonController.PROCESSOR_ID,
- this.invocationId,
- parameters);
-
- JsonObject staticProperties = new JsonObject();
- staticProperties.addProperty("greeting", parameters.getGreeting());
-
- json.add("static_properties", staticProperties);
-
- // send invocation request to python
- post(Route.INVOCATION, json.toString());
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- JsonObject json = new JsonObject();
- json.addProperty("invocation_id", this.invocationId);
-
- // send detach request to python to stop processor with invocationId
- post(Route.DETACH, this.invocationId);
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
deleted file mode 100644
index b6138f7..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/GreeterPythonController.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
-import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredExternalEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneExternalEventProcessingDeclarer;
-
-public class GreeterPythonController extends StandaloneExternalEventProcessingDeclarer<GreeterParameters> {
-
- private static final String GREETER_KEY = "greeter-key";
- public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
- .requiredStream(StreamRequirementsBuilder.
- create()
- .requiredProperty(EpRequirements.anyProperty())
- .build())
-
- // create a simple text parameter
- .requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
-
- // Append greeting to event stream
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(Labels.empty(),"greeting", SO.Text)))
-
- // NOTE: currently one Kafka transport protocol is supported
- .supportedProtocols(SupportedProtocols.kafka())
- .supportedFormats(SupportedFormats.jsonFormat())
- .build();
- }
-
- @Override
- public ConfiguredExternalEventProcessor<GreeterParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
-
- // Extract the greeting value
- String greeting = extractor.singleValueParameter(GREETER_KEY, String.class);
-
- // now the text parameter would be added to a parameter class (omitted for this example)
- return new ConfiguredExternalEventProcessor<>(new GreeterParameters(graph, greeting), GreeterPython::new);
- }
-}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
new file mode 100644
index 0000000..23c3be5
--- /dev/null
+++ b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/PythonGreeterProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.pe.examples.jvm.python;
+
+import com.google.gson.JsonObject;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesExternalDataProcessor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PythonGreeterProcessor extends StreamPipesExternalDataProcessor {
+
+ private static final String GREETER_KEY = "greeter-key";
+ public static final String PROCESSOR_ID = "org.apache.streampipes.examples.python.processor.greeter";
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create(PROCESSOR_ID, "Python Greeter", "")
+ .requiredStream(StreamRequirementsBuilder.any())
+ // create a simple text parameter
+ .requiredTextParameter(Labels.withId(GREETER_KEY), "greeting")
+ // append greeting to event stream
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(Labels.empty(),"greeting", SO.Text)))
+ // NOTE: currently one Kafka transport protocol is supported
+ .supportedProtocols(SupportedProtocols.kafka())
+ .supportedFormats(SupportedFormats.jsonFormat())
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+ // extract static properties and add to map to build minimal invocation graph
+ Map<String, String> staticPropertyMap = new HashMap<>();
+ staticPropertyMap.put("greeting", parameters.extractor().singleValueParameter(GREETER_KEY, String.class));
+
+ JsonObject minimalInvocationGraph = createMinimalInvocationGraph(staticPropertyMap);
+
+ // send invocation request to python
+ invoke(minimalInvocationGraph);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ // send detach request to python to stop processor with invocationId
+ detach();
+ }
+}
diff --git a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java b/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
deleted file mode 100644
index 3613c62..0000000
--- a/streampipes-pipeline-elements-examples-processors-jvm/src/main/java/org/apache/streampipes/pe/examples/jvm/python/Route.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.streampipes.pe.examples.jvm.python;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import java.io.IOException;
-
-public abstract class Route {
-
- public static final String INVOCATION = "invoke";
- public static final String DETACH = "detach";
-
- // endpoint of Python processor runs here
- public static final String PYTHON_ENDPOINT = "localhost:5000";
-
- public static String post(String endpoint, String payload) {
- String responseString = null;
-
- try {
- responseString = Request.Post(PYTHON_ENDPOINT + "/" + endpoint)
- .bodyString(payload, ContentType.APPLICATION_JSON)
- .connectTimeout(1000)
- .socketTimeout(100000)
- .execute().returnContent().asString();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return responseString;
- }
-}