Merge pull request #55 from Hrushi20/iss449
[Streampipes-449] Update Processing Element API in module streampipes
diff --git a/pom.xml b/pom.xml
index 4ab8062..5536514 100644
--- a/pom.xml
+++ b/pom.xml
@@ -415,6 +415,11 @@
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-model</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-sdk</artifactId>
<version>${streampipes.version}</version>
</dependency>
diff --git a/streampipes-connect-adapters-iiot/pom.xml b/streampipes-connect-adapters-iiot/pom.xml
index c728abc..1d11f1f 100644
--- a/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-connect-adapters-iiot/pom.xml
@@ -42,6 +42,11 @@
<artifactId>streampipes-pipeline-elements-shared</artifactId>
<version>${streampipes.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-client</artifactId>
+ <version>${streampipes.version}</version>
+ </dependency>
<!-- External dependencies -->
<dependency>
@@ -249,4 +254,4 @@
<finalName>streampipes-connect-adapters-iiot</finalName>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java
index de6f8fb..d692cd7 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java
@@ -26,7 +26,7 @@
import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
import org.apache.streampipes.connect.iiot.adapters.netio.model.NetioAllPowerOutputs;
import org.apache.streampipes.connect.iiot.adapters.netio.model.NetioPowerOutput;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
index 3073a4a..a7788f2 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.connect.iiot.protocol.set;
-import org.apache.http.client.fluent.Request;
import org.apache.streampipes.connect.SendToPipeline;
import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -27,6 +26,7 @@
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IParser;
import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
@@ -41,7 +41,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -98,7 +100,7 @@
}
SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
try {
- InputStream dataInputStream = getFileInputStream();
+ InputStream dataInputStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
if(dataInputStream != null) {
parser.parse(dataInputStream, stk);
} else {
@@ -120,7 +122,7 @@
public GuessSchema getGuessSchema() throws ParseException {
try {
- InputStream targetStream = getFileInputStream();
+ InputStream targetStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
List<byte[]> dataByte = parser.parseNEvents(targetStream, 20);
EventSchema eventSchema = parser.getEventSchema(dataByte);
@@ -133,45 +135,6 @@
}
}
- private InputStream getFileInputStream() throws FileNotFoundException {
- if (!isFilePresent()) {
- try {
- storeFileLocally();
- } catch (IOException e) {
- throw new ParseException("Could not receive file");
- }
- }
-
- return new FileInputStream(makeFileLoc(this.selectedFilename));
- }
-
- private boolean isFilePresent() {
- File file = new File(makeFileLoc(selectedFilename));
- return file.exists();
- }
-
- private void storeFileLocally() throws IOException {
- File storageDir = new File(makeServiceStorageDir());
- if (!storageDir.exists()) {
- storageDir.mkdirs();
- }
-
- File file = new File(makeFileLoc(selectedFilename));
-
- Request.Get(fileFetchUrl).execute().saveContent(file);
- }
-
- private String makeServiceStorageDir() {
- return System.getProperty("user.home")
- + File.separator
- + ".streampipes"
- + File.separator
- + "service";
- }
-
- private String makeFileLoc(String filename) {
- return makeServiceStorageDir() + File.separator + filename;
- }
@Override
public List<Map<String, Object>> getNElements(int n) throws ParseException {
@@ -179,7 +142,7 @@
List<byte[]> dataByteArray = new ArrayList<>();
try {
- InputStream dataInputStream = getFileInputStream();
+ InputStream dataInputStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
dataByteArray = parser.parseNEvents(dataInputStream, n);
} catch (FileNotFoundException e) {
e.printStackTrace();
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 6536c3d..4b9368b 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.connect.iiot.protocol.stream;
-import org.apache.http.client.fluent.Request;
import org.apache.streampipes.connect.SendToPipeline;
import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -30,6 +29,7 @@
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IParser;
import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
@@ -55,7 +55,7 @@
public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file";
//private String filePath;
- private String fileFetchUrl;
+ private String selectedFileName;
// private String timestampKey;
private boolean replaceTimestamp;
private float speedUp;
@@ -68,10 +68,10 @@
public FileStreamProtocol() {
}
- public FileStreamProtocol(IParser parser, IFormat format, String fileFetchUrl,
+ public FileStreamProtocol(IParser parser, IFormat format, String selectedFileName,
boolean replaceTimestamp, float speedUp, int timeBetweenReplay) {
super(parser, format);
- this.fileFetchUrl = fileFetchUrl;
+ this.selectedFileName = selectedFileName;
this.replaceTimestamp = replaceTimestamp;
this.speedUp = speedUp;
this.timeBetweenReplay = timeBetweenReplay;
@@ -141,10 +141,12 @@
}
private InputStream getDataFromEndpoint() throws ParseException {
- try {
- return Request.Get(fileFetchUrl).execute().returnContent().asStream();
+
+
+ try {
+ return FileProtocolUtils.getFileInputStream(this.selectedFileName);
} catch (IOException e) {
- throw new ParseException("Could not find file: " + fileFetchUrl);
+ throw new ParseException("Could not find file: " + selectedFileName);
}
}
@@ -160,8 +162,8 @@
int timeBetweenReplay = 1;
- String fileFetchUrl = extractor.selectedFileFetchUrl("filePath");
- return new FileStreamProtocol(parser, format, fileFetchUrl, replaceTimestamp, speedUp, timeBetweenReplay);
+ String fileName = extractor.selectedFilename("filePath");
+ return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay);
}
private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) {
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index 480f91b..5c997ce 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -33,7 +33,7 @@
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IParser;
import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.pe.shared.config.Kafka.*;
+import org.apache.streampipes.pe.shared.config.kafka.*;
import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java
index 571d684..1f1cd4a 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java
@@ -24,7 +24,7 @@
import org.apache.streampipes.connect.api.IFormat;
import org.apache.streampipes.connect.api.IParser;
import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
new file mode 100644
index 0000000..3662335
--- /dev/null
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.service.extensions.base.client.StreamPipesClientResolver;
+
+import java.io.*;
+
+public class FileProtocolUtils {
+
+ public static InputStream getFileInputStream(String selectedFilename) throws FileNotFoundException {
+ if (!isFilePresent(selectedFilename)) {
+ try {
+ storeFileLocally(selectedFilename);
+ } catch (IOException e) {
+ throw new ParseException("Could not receive file");
+ }
+ }
+
+ return new FileInputStream(makeFileLoc(selectedFilename));
+ }
+
+ private static boolean isFilePresent(String selectedFilename) {
+ File file = new File(makeFileLoc(selectedFilename));
+ return file.exists();
+ }
+
+ private static void storeFileLocally(String selectedFilename) throws IOException {
+ File storageDir = new File(makeServiceStorageDir());
+ if (!storageDir.exists()) {
+ storageDir.mkdirs();
+ }
+
+ StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance();
+
+ byte[] res = client.fileApi().getFileContent(selectedFilename);
+
+ File file = new File(makeFileLoc(selectedFilename));
+ FileUtils.writeByteArrayToFile(file, res);
+ }
+
+ private static String makeServiceStorageDir() {
+ return System.getProperty("user.home")
+ + File.separator
+ + ".streampipes"
+ + File.separator
+ + "service";
+ }
+
+ private static String makeFileLoc(String filename) {
+ return makeServiceStorageDir() + File.separator + filename;
+ }
+
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
index 82747ff..efca723 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
@@ -23,7 +23,7 @@
import org.apache.streampipes.connect.api.exception.AdapterException;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
index 6282bbc..7d3c20e 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
@@ -18,7 +18,7 @@
package org.apache.streampipes.connect.adapters.ti;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
import org.slf4j.Logger;
diff --git a/streampipes-pipeline-elements-shared/pom.xml b/streampipes-pipeline-elements-shared/pom.xml
index 72ffe5f..c4cdb0c 100644
--- a/streampipes-pipeline-elements-shared/pom.xml
+++ b/streampipes-pipeline-elements-shared/pom.xml
@@ -46,5 +46,10 @@
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-model</artifactId>
+ </dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
+
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java
index 2f3491f..15a68b2 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java
@@ -18,59 +18,16 @@
package org.apache.streampipes.pe.shared;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.streampipes.model.runtime.Event;
public class PlaceholderExtractor {
- private static final Pattern pattern = Pattern.compile("#[^#]*#");
+ private static final String HASHTAG = "#";
- public static String replacePlaceholders(String content, String json) {
- List<String> placeholders = getPlaceholders(content);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject = parser.parse(json).getAsJsonObject();
-
- for(String placeholder : placeholders) {
- String replacedValue = getPropertyValue(jsonObject, placeholder);
- content = content.replaceAll(placeholder, replacedValue);
+ public static String replacePlaceholders(Event event, String content) {
+ for(String key: event.getRaw().keySet()) {
+ content = content.replaceAll(HASHTAG + key + HASHTAG, event.getRaw().get(key).toString());
}
-
return content;
}
-
- public static String replacePlaceholders(String content, Map<String, Object> event) {
- List<String> placeholders = getPlaceholders(content);
-
- for(String placeholder : placeholders) {
- String replacedValue = getPropertyValue(event, placeholder);
- content = content.replaceAll(placeholder, replacedValue);
- }
-
- return content;
- }
-
- private static String getPropertyValue(Map<String, Object> event, String placeholder) {
- String key = placeholder.replaceAll("#", "");
- return String.valueOf(event.get(key));
- }
-
- private static String getPropertyValue(JsonObject jsonObject, String placeholder) {
- String jsonKey = placeholder.replaceAll("#", "");
- return String.valueOf(jsonObject.get(jsonKey).getAsString());
- }
-
- private static List<String> getPlaceholders(String content) {
- List<String> results = new ArrayList<>();
- Matcher matcher = pattern.matcher(content);
- while (matcher.find()) {
- results.add(matcher.group());
- }
- return results;
- }
}
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConfig.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConfig.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
index 8365950..e0a9a27 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConfig.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.pe.shared.config.Kafka;
+package org.apache.streampipes.pe.shared.config.kafka;
public class KafkaConfig {
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConnectUtils.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConnectUtils.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
index e14a630..3c9dde1 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConnectUtils.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
@@ -16,9 +16,8 @@
*
*/
-package org.apache.streampipes.pe.shared.config.Kafka;
+package org.apache.streampipes.pe.shared.config.kafka;
-import org.apache.streampipes.pe.shared.config.Kafka.KafkaConfig;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConfig.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConfig.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConfig.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConfig.java
index 21c8de3..71c049e 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConfig.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.pe.shared.config.Mqqt;
+package org.apache.streampipes.pe.shared.config.mqtt;
public class MqttConfig {
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConnectUtils.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConnectUtils.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
index 599c725..61313a7 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConnectUtils.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
@@ -16,9 +16,8 @@
*
*/
-package org.apache.streampipes.pe.shared.config.Mqqt;
+package org.apache.streampipes.pe.shared.config.mqtt;
-import org.apache.streampipes.pe.shared.config.Mqqt.MqttConfig;
import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConsumer.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConsumer.java
similarity index 97%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConsumer.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConsumer.java
index 7bea5d6..ee265d4 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConsumer.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConsumer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.pe.shared.config.Mqqt;
+package org.apache.streampipes.pe.shared.config.mqtt;
import org.fusesource.mqtt.client.*;
import org.apache.streampipes.messaging.InternalEventProcessor;
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
index 3dec7cb..5f7b90c 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
@@ -18,16 +18,14 @@
package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.pe.shared.PlaceholderExtractor;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RabbitMqConsumer implements EventSink<RabbitMqParameters> {
@@ -55,9 +53,8 @@
@Override
public void onEvent(Event inputEvent) {
try {
- Map<String, Object> event = inputEvent.getRaw();
- publisher.fire(dataFormatDefinition.fromMap(event),
- PlaceholderExtractor.replacePlaceholders(topic, event));
+ publisher.fire(dataFormatDefinition.fromMap(inputEvent.getRaw()),
+ PlaceholderExtractor.replacePlaceholders(inputEvent, topic));
} catch (SpRuntimeException e) {
LOG.error("Could not serialiaze event");
}
diff --git a/streampipes-sinks-internal-jvm/pom.xml b/streampipes-sinks-internal-jvm/pom.xml
index 9bcc944..4294d85 100644
--- a/streampipes-sinks-internal-jvm/pom.xml
+++ b/streampipes-sinks-internal-jvm/pom.xml
@@ -33,6 +33,11 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-pipeline-elements-shared</artifactId>
+ <version>0.69.0-SNAPSHOT</version>
+ </dependency>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
@@ -133,4 +138,4 @@
</plugins>
<finalName>streampipes-sinks-internal-jvm</finalName>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 287e84f..61123b2 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -28,6 +28,7 @@
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
@@ -57,6 +58,17 @@
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
+ .addConfig(ConfigKeys.JMS_HOST, "activemq", "")
+ .addConfig(ConfigKeys.JMS_PORT, 61616, "")
+ .addConfig(ConfigKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database")
+ .addConfig(ConfigKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database")
+ .addConfig(ConfigKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database")
+ .addConfig(ConfigKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database")
+ .addConfig(ConfigKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database")
+ .addConfig(ConfigKeys.DATA_LAKE_DATABASE_NAME, "sp", "Database name for the StreamPipes data lake database")
+ .addConfig(ConfigKeys.IMAGE_STORAGE_LOCATION, "/spImages/", "Storage location of the data lake images")
.build();
+
+
}
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
index fba1b64..fd8bd65 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
@@ -19,23 +19,13 @@
package org.apache.streampipes.sinks.internal.jvm.config;
public class ConfigKeys {
- final static String HOST = "SP_HOST";
- final static String PORT = "SP_PORT";
- final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
- final static String COUCHDB_PORT = "SP_COCHDB_PORT";
- final static String JMS_HOST = "SP_JMS_HOST";
- final static String JMS_PORT = "SP_JMS_PORT";
- final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
- final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
- final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
- final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
- final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
- final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
- final static String BACKEND_HOST = "SP_BACKEND_HOST";
- final static String BACKEND_PORT = "SP_BACKEND_PORT";
- final static String BACKEND_PROTOCOL = "SP_BACKEND_PROTOCOL";
- final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
-
-
- final static String SERVICE_NAME = "SP_SERVICE_NAME";
+ public final static String JMS_HOST = "SP_JMS_HOST";
+ public final static String JMS_PORT = "SP_JMS_PORT";
+ public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
+ public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
+ public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
+ public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
+ public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
+ public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
+ public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/SinksInternalJvmConfig.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/SinksInternalJvmConfig.java
deleted file mode 100644
index faf1b1d..0000000
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/SinksInternalJvmConfig.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.config;
-
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum SinksInternalJvmConfig implements PeConfig {
- INSTANCE;
-
- private SpConfig config;
-
- public final static String serverUrl;
-
- private final static String service_id = "pe/org.apache.streampipes.sinks.internal.jvm";
- private final static String service_name = "Sinks Internal JVM";
- private final static String service_container_name = "sinks-internal-jvm";
-
-
- SinksInternalJvmConfig() {
- config = SpServiceDiscovery.getSpConfig(service_id);
- config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe esper");
- config.register(ConfigKeys.PORT, 8090, "Port for the pe esper");
-
- config.register(ConfigKeys.COUCHDB_HOST, "couchdb", "Host for couchdb of the pe sinks project");
- config.register(ConfigKeys.COUCHDB_PORT, 5984, "Port for couchdb of the pe sinks project");
- config.register(ConfigKeys.JMS_HOST, "activemq", "Hostname for pe actions service for active mq");
- config.register(ConfigKeys.JMS_PORT, 61616, "Port for pe actions service for active mq");
- config.register(ConfigKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database");
- config.register(ConfigKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database");
- config.register(ConfigKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database");
- config.register(ConfigKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database");
- config.registerPassword(ConfigKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database");
- config.register(ConfigKeys.DATA_LAKE_DATABASE_NAME, "sp", "Database name for the StreamPipes data lake database");
- config.register(ConfigKeys.BACKEND_HOST, "backend", "Hostname for the StreamPipes-Backend");
- config.register(ConfigKeys.BACKEND_PORT, 8030, "Port for the StreamPipes-Backend");
- config.register(ConfigKeys.BACKEND_PROTOCOL, "http", "Protocol for the StreamPipes-Backend");
- config.register(ConfigKeys.IMAGE_STORAGE_LOCATION, "/spImages/", "Storage location of the data lake images");
-
- config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
- }
-
- static {
- serverUrl = SinksInternalJvmConfig.INSTANCE.getHost() + ":" + SinksInternalJvmConfig.INSTANCE.getPort();
- }
-
- @Override
- public String getHost() {
- return config.getString(ConfigKeys.HOST);
- }
-
- @Override
- public int getPort() {
- return config.getInteger(ConfigKeys.PORT);
- }
-
- public String getCouchDbHost() {
- return config.getString(ConfigKeys.COUCHDB_HOST);
- }
-
- public int getCouchDbPort() {
- return config.getInteger(ConfigKeys.COUCHDB_PORT);
- }
-
- public String getJmsHost() {
- return config.getString(ConfigKeys.JMS_HOST);
- }
-
- public int getJmsPort() {
- return config.getInteger(ConfigKeys.JMS_PORT);
- }
-
- public String getDataLakeHost() {
- return config.getString(ConfigKeys.DATA_LAKE_HOST);
- }
-
- public String getDataLakeProtocol() {
- return config.getString(ConfigKeys.DATA_LAKE_PROTOCOL);
- }
-
- public Integer getDataLakePort() {
- return config.getInteger(ConfigKeys.DATA_LAKE_PORT);
- }
-
- public String getDataLakeUsername() {
- return config.getString(ConfigKeys.DATA_LAKE_USERNAME);
- }
-
-
- public String getDataLakePassword() {
- return config.getString(ConfigKeys.DATA_LAKE_PASSWORD);
- }
-
-
- public String getDataLakeDatabaseName() {
- return config.getString(ConfigKeys.DATA_LAKE_DATABASE_NAME);
- }
-
-
- @Override
- public String getId() {
- return service_id;
- }
-
- @Override
- public String getName() {
- return config.getString(ConfigKeys.SERVICE_NAME);
- }
-
-
- public String getStreamPipesBackendHost() {
- return config.getString(ConfigKeys.BACKEND_HOST);
- }
-
- public String getStreamPipesBackendProtocol() {
- return config.getString(ConfigKeys.BACKEND_PROTOCOL);
- }
-
-
- public String getImageStorageLocation() {
- return config.getString(ConfigKeys.IMAGE_STORAGE_LOCATION);
- }
-
- public Integer getStreamPipesBackendPort() {
- return config.getInteger(ConfigKeys.BACKEND_PORT);
- }
-
- public String getStreamPipesBackendUrl() { return getStreamPipesBackendProtocol() + "://"
- + getStreamPipesBackendHost() + ":" + getStreamPipesBackendPort(); }
-
-
-
-}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index 91564c7..e826b98 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -23,7 +23,7 @@
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -38,10 +38,10 @@
@Override
public void onInvocation(DashboardParameters parameters,
- EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+ EventSinkRuntimeContext context) throws SpRuntimeException {
this.publisher = new ActiveMQPublisher(
- SinksInternalJvmConfig.INSTANCE.getJmsHost(),
- SinksInternalJvmConfig.INSTANCE.getJmsPort(),
+ context.getConfigStore().getConfig().getString(ConfigKeys.JMS_HOST),
+ context.getConfigStore().getConfig().getInteger(ConfigKeys.JMS_PORT),
makeTopic(parameters.getGraph().getInputStreams().get(0), parameters.getVisualizationName()));
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index d7134f1..07c146f 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -19,17 +19,14 @@
package org.apache.streampipes.sinks.internal.jvm.datalake;
import org.apache.commons.codec.binary.Base64;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.vocabulary.SPSensor;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -62,13 +59,21 @@
this.timestampField = parameters.getTimestampField();
+ SpConfig configStore = runtimeContext.getConfigStore().getConfig();
+
+ String influxHost = configStore.getString(ConfigKeys.DATA_LAKE_PROTOCOL) + "://" + configStore.getString(ConfigKeys.DATA_LAKE_HOST);
+ Integer influxPort = configStore.getInteger(ConfigKeys.DATA_LAKE_PORT);
+ String databaseName = configStore.getString(ConfigKeys.DATA_LAKE_DATABASE_NAME);
+ String user = configStore.getString(ConfigKeys.DATA_LAKE_USERNAME);
+ String password = configStore.getString(ConfigKeys.DATA_LAKE_PASSWORD);
+
this.influxDbClient = new DataLakeInfluxDbClient(
- parameters.getInfluxDbHost(),
- parameters.getInfluxDbPort(),
- parameters.getDatabaseName(),
+ influxHost,
+ influxPort,
+ databaseName,
parameters.getMeasurementName(),
- parameters.getUsername(),
- parameters.getPassword(),
+ user,
+ password,
parameters.getTimestampField(),
parameters.getBatchSize(),
parameters.getFlushDuration(),
@@ -91,7 +96,7 @@
schema.getEventProperties().stream().forEach(eventProperty -> {
eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
});
- registerAtDataLake(parameters.getMeasurementName(), schema);
+ registerAtDataLake(parameters.getMeasurementName(), schema, runtimeContext.getStreamPipesClient());
imageProperties = schema.getEventProperties().stream()
.filter(eventProperty -> eventProperty.getDomainProperties() != null &&
@@ -99,7 +104,7 @@
eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
.collect(Collectors.toList());
- imageDirectory = SinksInternalJvmConfig.INSTANCE.getImageStorageLocation() + parameters.getMeasurementName() + "/";
+ imageDirectory = configStore.getString(ConfigKeys.IMAGE_STORAGE_LOCATION) + parameters.getMeasurementName() + "/";
}
@@ -150,24 +155,12 @@
* @param eventSchema
* @throws SpRuntimeException
*/
- private void registerAtDataLake(String measure, EventSchema eventSchema) throws SpRuntimeException {
- String url = SinksInternalJvmConfig.INSTANCE.getStreamPipesBackendUrl();
-
- try {
- String json = JacksonSerializer.getObjectMapper().writeValueAsString(eventSchema);
- StringEntity stringEntity = new StringEntity(json, ContentType.APPLICATION_JSON);
- HttpResponse response = Request.Post(url + "/streampipes-backend/api/v3/noauth/datalake/" + measure)
- .addHeader("Content-type", "application/json")
- .body(stringEntity)
- .execute()
- .returnResponse();
- if (response.getStatusLine().getStatusCode() == 409) {
- throw new SpRuntimeException("The measurement '" + measure +"' is already registered as Data lake with different Event schema");
- }
- } catch (IOException e) {
- LOG.error(e.toString());
- }
-
+ private void registerAtDataLake(String measure,
+ EventSchema eventSchema,
+ StreamPipesClient client) throws SpRuntimeException {
+ client
+ .customRequest()
+ .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
}
public static String prepareString(String s) {
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index 814c437..ffb2670 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -29,12 +29,9 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-import java.util.List;
-
public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakeParameters> {
private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
@@ -64,21 +61,16 @@
measureName = DataLake.prepareString(measureName);
String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
- String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
- Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
- String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
- String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
- String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
+// String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
+// Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
+// String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
+// String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
+// String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
Integer batch_size = 2000;
Integer flush_duration = 500;
DataLakeParameters params = new DataLakeParameters(graph,
- hostname,
- port,
- dbName,
measureName,
- user,
- password,
timestampField,
batch_size,
flush_duration);
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
index 1dcc0fa..61f8f3a 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
@@ -26,63 +26,27 @@
*/
public class DataLakeParameters extends EventSinkBindingParams {
- private String influxDbHost;
- private Integer influxDbPort;
- private String databaseName;
private String measureName;
- private String user;
- private String password;
private String timestampField;
private Integer batchSize;
private Integer flushDuration;
public DataLakeParameters(DataSinkInvocation graph,
- String influxDbHost,
- Integer influxDbPort,
- String databaseName,
String measureName,
- String user,
- String password,
String timestampField,
Integer batchSize,
Integer flushDuration) {
super(graph);
-
- this.influxDbHost = influxDbHost;
- this.influxDbPort = influxDbPort;
- this.databaseName = databaseName;
this.measureName = measureName;
- this.user = user;
- this.password = password;
this.timestampField = timestampField;
this.batchSize = batchSize;
this.flushDuration = flushDuration;
}
- public String getInfluxDbHost() {
- return influxDbHost;
- }
-
- public Integer getInfluxDbPort() {
- return influxDbPort;
- }
-
- public String getDatabaseName() {
- return databaseName;
- }
-
public String getMeasurementName() {
return measureName;
}
- public String getUsername() {
- return user;
- }
-
- public String getPassword() {
- return password;
- }
-
public String getTimestampField() {
return timestampField;
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
index cdcf04b..a18692c 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
@@ -24,7 +24,9 @@
import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
import org.apache.streampipes.model.Notification;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
+import org.apache.streampipes.pe.shared.PlaceholderExtractor;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -45,16 +47,17 @@
@Override
- public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext runtimeContext) throws
+ public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext context) throws
SpRuntimeException {
this.gson = new Gson();
this.title = parameters.getTitle();
this.content = parameters.getContent();
this.correspondingPipelineId = parameters.getGraph().getCorrespondingPipeline();
this.correspondingUser = parameters.getGraph().getCorrespondingUser();
+ SpConfig configStore = context.getConfigStore().getConfig();
this.publisher = new ActiveMQPublisher(
- SinksInternalJvmConfig.INSTANCE.getJmsHost(),
- SinksInternalJvmConfig.INSTANCE.getJmsPort(),
+ configStore.getString(ConfigKeys.JMS_HOST),
+ configStore.getInteger(ConfigKeys.JMS_PORT),
"org.apache.streampipes.notifications." + this.correspondingUser);
}
@@ -65,7 +68,7 @@
notification.setId(UUID.randomUUID().toString());
notification.setRead(false);
notification.setTitle(title);
- notification.setMessage(replacePlaceholders(inputEvent, content));
+ notification.setMessage(PlaceholderExtractor.replacePlaceholders(inputEvent, content));
notification.setCreatedAt(currentDate);
notification.setCreatedAtTimestamp(currentDate.getTime());
notification.setCorrespondingPipelineId(correspondingPipelineId);
@@ -81,10 +84,4 @@
this.publisher.disconnect();
}
- private String replacePlaceholders(Event event, String content) {
- for(String key: event.getRaw().keySet()) {
- content = content.replaceAll(HASHTAG + key + HASHTAG, event.getRaw().get(key).toString());
- }
- return content;
- }
}
diff --git a/streampipes-sinks-notifications-jvm/pom.xml b/streampipes-sinks-notifications-jvm/pom.xml
index 22d81c0..be38650 100644
--- a/streampipes-sinks-notifications-jvm/pom.xml
+++ b/streampipes-sinks-notifications-jvm/pom.xml
@@ -28,6 +28,11 @@
<artifactId>streampipes-sinks-notifications-jvm</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-pipeline-elements-shared</artifactId>
+ <version>0.69.0-SNAPSHOT</version>
+ </dependency>
<!-- StreamPipes dependencies -->
<dependency>
<groupId>org.apache.streampipes</groupId>
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
index 20c94ac..019cb4b 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
@@ -59,13 +59,6 @@
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
.addConfig(ConfigKeys.WEBSOCKET_PROTOCOL, "ws", "")
- .addConfig(ConfigKeys.EMAIL_FROM, "", "The sender address of the email")
- .addConfig(ConfigKeys.EMAIL_USERNAME, "", "The username of the email account")
- .addConfig(ConfigKeys.EMAIL_PASSWORD, "", "The password of the email account")
- .addConfig(ConfigKeys.EMAIL_SMTP_HOST, "", "The SMTP Host")
- .addConfig(ConfigKeys.EMAIL_SMTP_PORT, "", "The SMTP Port")
- .addConfig(ConfigKeys.EMAIL_STARTTLS, false, "Use startls?")
- .addConfig(ConfigKeys.EMAIL_SLL, false, "Use SLL?")
.build();
}
}
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java
index f69d0a7..5b3a478 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java
@@ -19,13 +19,5 @@
package org.apache.streampipes.sinks.notifications.jvm.config;
public class ConfigKeys {
- public final static String EMAIL_FROM = "SP_EMAIL_FROM";
- public final static String EMAIL_USERNAME = "SP_EMAIL_USERNAME";
- public final static String EMAIL_PASSWORD = "SP_EMAIL_PASSWORD";
- public final static String EMAIL_SMTP_HOST = "SP_EMAIL_SMTP_HOST";
- public final static String EMAIL_SMTP_PORT = "SP_EMAIL_SMTP_PORT";
- public final static String EMAIL_STARTTLS = "SP_EMAIL_STARTTLS";
- public final static String EMAIL_SLL = "SP_EMAIL_SLL";
public final static String WEBSOCKET_PROTOCOL = "SP_WEBSOCKET_PROTOCOL";
-
}
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java
index 1f5483e..ab48dcd 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java
@@ -36,6 +36,7 @@
private static final String TO_EMAIL_ADRESS = "to_email";
private static final String EMAIL_SUBJECT = "email_subject";
private static final String EMAIL_CONTENT = "email_content";
+ private static final String SILENT_PERIOD = "silent-period";
@Override
@@ -51,6 +52,7 @@
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredHtmlInputParameter(Labels.withId(EMAIL_CONTENT))
+ .requiredIntegerParameter(Labels.withId(SILENT_PERIOD))
.build();
}
@@ -60,8 +62,9 @@
String toEmail = extractor.singleValueParameter(TO_EMAIL_ADRESS, String.class);
String subject = extractor.singleValueParameter(EMAIL_SUBJECT, String.class);
String content = extractor.singleValueParameter(EMAIL_CONTENT, String.class);
+ Integer silentPeriod = extractor.singleValueParameter(SILENT_PERIOD, Integer.class);
- EmailParameters params = new EmailParameters(graph, toEmail, subject, content);
+ EmailParameters params = new EmailParameters(graph, toEmail, subject, content, silentPeriod);
return new ConfiguredEventSink<>(params, EmailPublisher::new);
}
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java
index e63ca31..82fbad2 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java
@@ -26,12 +26,18 @@
private String toEmailAddress;
private String subject;
private String content;
+ private Integer silentPeriod;
- public EmailParameters(DataSinkInvocation graph, String toEmailAddress, String subject, String content) {
+ public EmailParameters(DataSinkInvocation graph,
+ String toEmailAddress,
+ String subject,
+ String content,
+ Integer silentPeriod) {
super(graph);
this.toEmailAddress = toEmailAddress;
this.subject = subject;
this.content = content;
+ this.silentPeriod = silentPeriod;
}
public String getToEmailAddress() {
@@ -45,4 +51,8 @@
public String getContent() {
return content;
}
+
+ public Integer getSilentPeriod() {
+ return silentPeriod;
+ }
}
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java
index 8ee229e..63ab537 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java
@@ -18,87 +18,53 @@
package org.apache.streampipes.sinks.notifications.jvm.email;
+import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.mail.SpEmail;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventConverter;
-import org.apache.streampipes.sinks.notifications.jvm.config.ConfigKeys;
+import org.apache.streampipes.pe.shared.PlaceholderExtractor;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventSink;
-import javax.mail.*;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-import java.util.Map;
-import java.util.Properties;
+import java.time.Instant;
+import java.util.Collections;
public class EmailPublisher implements EventSink<EmailParameters> {
- private static Logger LOG;
+ private SpEmail preparedEmail;
+ private long silentPeriodInSeconds;
+ private long lastMailEpochSecond = -1;
- private MimeMessage message;
- private String content;
+ private String originalContent;
+
+ private StreamPipesClient client;
@Override
public void onInvocation(EmailParameters parameters, EventSinkRuntimeContext runtimeContext) {
- LOG = parameters.getGraph().getLogger(EmailPublisher.class);
- SpConfig config = runtimeContext.getConfigStore().getConfig();
- String from = config.getString(ConfigKeys.EMAIL_FROM);
- String to = parameters.getToEmailAddress();
- String subject = parameters.getSubject();
- this.content = parameters.getContent();
- String username = config.getString(ConfigKeys.EMAIL_USERNAME);
- String password = config.getString(ConfigKeys.EMAIL_PASSWORD);
- String host = config.getString(ConfigKeys.EMAIL_SMTP_HOST);
- int port = config.getInteger(ConfigKeys.EMAIL_SMTP_PORT);
- boolean starttls = config.getBoolean(ConfigKeys.EMAIL_STARTTLS);
- boolean ssl = config.getBoolean(ConfigKeys.EMAIL_SLL);
+ this.preparedEmail = new SpEmail();
+ this.preparedEmail.setRecipients(Collections.singletonList(parameters.getToEmailAddress()));
+ this.preparedEmail.setSubject(parameters.getSubject());
- Properties properties = new Properties();
- properties.setProperty("mail.smtp.host", host);
- properties.setProperty("mail.smtp.port", String.valueOf(port));
-
- if (starttls) {
- properties.put("mail.smtp.starttls.enable", "true");
- }
- if (ssl) {
- properties.put("mail.smtp.ssl.enable", "true");
- }
- properties.put("mail.smtp.auth", "true");
-
- Session session = Session.getDefaultInstance(properties, new Authenticator() {
-
- @Override
- protected PasswordAuthentication getPasswordAuthentication() {
- return new PasswordAuthentication(username, password);
- }
- });
-
- try {
- this.message = new MimeMessage(session);
- this.message.setFrom(new InternetAddress(from));
- this.message.addRecipient(Message.RecipientType.TO, new InternetAddress(to));
- this.message.setSubject(subject);
- } catch (MessagingException e) {
- LOG.error(e.toString());
- }
+ this.silentPeriodInSeconds = parameters.getSilentPeriod() * 60;
+ this.client = runtimeContext.getStreamPipesClient();
+ this.originalContent = parameters.getContent();
}
@Override
public void onEvent(Event inputEvent) {
- String contentWithValues = this.content;
- Map<String, Object> inputMap = new EventConverter(inputEvent).toMap();
- try {
- for (Map.Entry entry: inputMap.entrySet()) {
- contentWithValues = contentWithValues.replaceAll("#" + entry.getKey() + "#",
- entry.getValue().toString());
- }
- this.message.setContent(contentWithValues, "text/html; charset=utf-8");
- Transport.send(message);
- LOG.info("Sent notifaction email");
- } catch (MessagingException e) {
- LOG.error(e.toString());
+ if (shouldSendMail()) {
+ String message = PlaceholderExtractor.replacePlaceholders(inputEvent, this.originalContent);
+ this.preparedEmail.setMessage(message);
+ this.client.deliverEmail(this.preparedEmail);
+ this.lastMailEpochSecond = Instant.now().getEpochSecond();
+ }
+ }
+
+ private boolean shouldSendMail() {
+ if (this.lastMailEpochSecond == -1) {
+ return true;
+ } else {
+ return Instant.now().getEpochSecond() >= (this.lastMailEpochSecond + this.silentPeriodInSeconds);
}
}
diff --git a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en
index bca41f7..ab58dd2 100644
--- a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en
+++ b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en
@@ -8,4 +8,7 @@
email_subject.description=The subject of the email
email_content.title=Content
-email_content.description=Enter the email text. You can use place holders like #fieldName# to add the value of a field.
\ No newline at end of file
+email_content.description=Enter the email text. You can use place holders like #fieldName# to add the value of a field.
+
+silent-period.title=Silent Period [min]
+silent-period.description=The minimum number of minutes between two consecutive mails that are sent.