Merge pull request #55 from Hrushi20/iss449

[Streampipes-449] Update Processing Element API in module streampipes
diff --git a/pom.xml b/pom.xml
index 4ab8062..5536514 100644
--- a/pom.xml
+++ b/pom.xml
@@ -415,6 +415,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.streampipes</groupId>
+                <artifactId>streampipes-model</artifactId>
+                <version>${streampipes.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.streampipes</groupId>
                 <artifactId>streampipes-sdk</artifactId>
                 <version>${streampipes.version}</version>
             </dependency>
diff --git a/streampipes-connect-adapters-iiot/pom.xml b/streampipes-connect-adapters-iiot/pom.xml
index c728abc..1d11f1f 100644
--- a/streampipes-connect-adapters-iiot/pom.xml
+++ b/streampipes-connect-adapters-iiot/pom.xml
@@ -42,6 +42,11 @@
             <artifactId>streampipes-pipeline-elements-shared</artifactId>
             <version>${streampipes.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-client</artifactId>
+            <version>${streampipes.version}</version>
+        </dependency>
 
         <!-- External dependencies -->
         <dependency>
@@ -249,4 +254,4 @@
         <finalName>streampipes-connect-adapters-iiot</finalName>
     </build>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java
index de6f8fb..d692cd7 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/netio/NetioMQTTAdapter.java
@@ -26,7 +26,7 @@
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
 import org.apache.streampipes.connect.iiot.adapters.netio.model.NetioAllPowerOutputs;
 import org.apache.streampipes.connect.iiot.adapters.netio.model.NetioPowerOutput;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
index 3073a4a..a7788f2 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/set/FileProtocol.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.connect.iiot.protocol.set;
 
 
-import org.apache.http.client.fluent.Request;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
 import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -27,6 +26,7 @@
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
@@ -41,7 +41,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -98,7 +100,7 @@
         }
         SendToPipeline stk = new SendToPipeline(format, adapterPipeline);
         try {
-            InputStream dataInputStream = getFileInputStream();
+            InputStream dataInputStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
             if(dataInputStream != null) {
                 parser.parse(dataInputStream, stk);
             } else {
@@ -120,7 +122,7 @@
     public GuessSchema getGuessSchema() throws ParseException {
 
         try {
-            InputStream targetStream = getFileInputStream();
+            InputStream targetStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
             List<byte[]> dataByte = parser.parseNEvents(targetStream, 20);
 
             EventSchema eventSchema = parser.getEventSchema(dataByte);
@@ -133,45 +135,6 @@
         }
     }
 
-    private InputStream getFileInputStream() throws FileNotFoundException {
-        if (!isFilePresent()) {
-            try {
-                storeFileLocally();
-            } catch (IOException e) {
-                throw new ParseException("Could not receive file");
-            }
-        }
-
-        return new FileInputStream(makeFileLoc(this.selectedFilename));
-    }
-
-    private boolean isFilePresent() {
-        File file = new File(makeFileLoc(selectedFilename));
-        return file.exists();
-    }
-
-    private void storeFileLocally() throws IOException {
-        File storageDir = new File(makeServiceStorageDir());
-        if (!storageDir.exists()) {
-            storageDir.mkdirs();
-        }
-
-        File file = new File(makeFileLoc(selectedFilename));
-
-        Request.Get(fileFetchUrl).execute().saveContent(file);
-    }
-
-    private String makeServiceStorageDir() {
-        return System.getProperty("user.home")
-                + File.separator
-                + ".streampipes"
-                + File.separator
-                + "service";
-    }
-
-    private String makeFileLoc(String filename) {
-        return makeServiceStorageDir() + File.separator + filename;
-    }
 
     @Override
     public List<Map<String, Object>> getNElements(int n) throws ParseException {
@@ -179,7 +142,7 @@
 
         List<byte[]> dataByteArray = new ArrayList<>();
         try {
-            InputStream dataInputStream = getFileInputStream();
+            InputStream dataInputStream = FileProtocolUtils.getFileInputStream(this.selectedFilename);
             dataByteArray = parser.parseNEvents(dataInputStream, n);
         } catch (FileNotFoundException e) {
             e.printStackTrace();
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 6536c3d..4b9368b 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.connect.iiot.protocol.stream;
 
-import org.apache.http.client.fluent.Request;
 import org.apache.streampipes.connect.SendToPipeline;
 import org.apache.streampipes.connect.adapter.guess.SchemaGuesser;
 import org.apache.streampipes.connect.adapter.model.generic.Protocol;
@@ -30,6 +29,7 @@
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
 import org.apache.streampipes.model.connect.guess.GuessSchema;
@@ -55,7 +55,7 @@
   public static final String ID = "org.apache.streampipes.connect.iiot.protocol.stream.file";
 
   //private String filePath;
-  private String fileFetchUrl;
+  private String selectedFileName;
  // private String timestampKey;
   private boolean replaceTimestamp;
   private float speedUp;
@@ -68,10 +68,10 @@
   public FileStreamProtocol() {
   }
 
-  public FileStreamProtocol(IParser parser, IFormat format, String fileFetchUrl,
+  public FileStreamProtocol(IParser parser, IFormat format, String selectedFileName,
                             boolean replaceTimestamp, float speedUp, int timeBetweenReplay) {
     super(parser, format);
-    this.fileFetchUrl = fileFetchUrl;
+    this.selectedFileName = selectedFileName;
     this.replaceTimestamp = replaceTimestamp;
     this.speedUp = speedUp;
     this.timeBetweenReplay = timeBetweenReplay;
@@ -141,10 +141,12 @@
   }
 
   private InputStream getDataFromEndpoint() throws ParseException {
-    try {
-      return Request.Get(fileFetchUrl).execute().returnContent().asStream();
+
+
+      try {
+          return FileProtocolUtils.getFileInputStream(this.selectedFileName);
     } catch (IOException e) {
-      throw new ParseException("Could not find file: " + fileFetchUrl);
+      throw new ParseException("Could not find file: " + selectedFileName);
     }
   }
 
@@ -160,8 +162,8 @@
 
     int timeBetweenReplay = 1;
 
-    String fileFetchUrl = extractor.selectedFileFetchUrl("filePath");
-    return new FileStreamProtocol(parser, format, fileFetchUrl, replaceTimestamp, speedUp, timeBetweenReplay);
+    String fileName = extractor.selectedFilename("filePath");
+    return new FileStreamProtocol(parser, format, fileName, replaceTimestamp, speedUp, timeBetweenReplay);
   }
 
   private String getTimestampKey(List<EventProperty> eventProperties, String prefixKey) {
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
index 480f91b..5c997ce 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/KafkaProtocol.java
@@ -33,7 +33,7 @@
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.pe.shared.config.Kafka.*;
+import org.apache.streampipes.pe.shared.config.kafka.*;
 import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java
index 571d684..1f1cd4a 100644
--- a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/MqttProtocol.java
@@ -24,7 +24,7 @@
 import org.apache.streampipes.connect.api.IFormat;
 import org.apache.streampipes.connect.api.IParser;
 import org.apache.streampipes.connect.api.exception.ParseException;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
diff --git a/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
new file mode 100644
index 0000000..3662335
--- /dev/null
+++ b/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.iiot.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.connect.api.exception.ParseException;
+import org.apache.streampipes.service.extensions.base.client.StreamPipesClientResolver;
+
+import java.io.*;
+
+public class FileProtocolUtils {
+
+    public static InputStream getFileInputStream(String selectedFilename) throws FileNotFoundException {
+        if (!isFilePresent(selectedFilename)) {
+            try {
+                storeFileLocally(selectedFilename);
+            } catch (IOException e) {
+                throw new ParseException("Could not receive file");
+            }
+        }
+
+        return new FileInputStream(makeFileLoc(selectedFilename));
+    }
+
+    private static boolean isFilePresent(String selectedFilename) {
+        File file = new File(makeFileLoc(selectedFilename));
+        return file.exists();
+    }
+
+    private static void storeFileLocally(String selectedFilename) throws IOException {
+        File storageDir = new File(makeServiceStorageDir());
+        if (!storageDir.exists()) {
+            storageDir.mkdirs();
+        }
+
+        StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance();
+
+        byte[] res = client.fileApi().getFileContent(selectedFilename);
+
+        File file = new File(makeFileLoc(selectedFilename));
+        FileUtils.writeByteArrayToFile(file, res);
+    }
+
+    private static  String makeServiceStorageDir() {
+        return System.getProperty("user.home")
+                + File.separator
+                + ".streampipes"
+                + File.separator
+                + "service";
+    }
+
+    private static String makeFileLoc(String filename) {
+        return makeServiceStorageDir() + File.separator + filename;
+    }
+
+}
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
index 82747ff..efca723 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/flic/FlicMQTTAdapter.java
@@ -23,7 +23,7 @@
 import org.apache.streampipes.connect.api.exception.AdapterException;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapter.model.specific.SpecificDataStreamAdapter;
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.model.AdapterType;
 import org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
diff --git a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
index 6282bbc..7d3c20e 100644
--- a/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
+++ b/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/adapters/ti/TISensorTag.java
@@ -18,7 +18,7 @@
 
 package org.apache.streampipes.connect.adapters.ti;
 
-import org.apache.streampipes.pe.shared.config.Mqqt.*;
+import org.apache.streampipes.pe.shared.config.mqtt.*;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.slf4j.Logger;
diff --git a/streampipes-pipeline-elements-shared/pom.xml b/streampipes-pipeline-elements-shared/pom.xml
index 72ffe5f..c4cdb0c 100644
--- a/streampipes-pipeline-elements-shared/pom.xml
+++ b/streampipes-pipeline-elements-shared/pom.xml
@@ -46,5 +46,10 @@
             <groupId>org.fusesource.mqtt-client</groupId>
             <artifactId>mqtt-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-model</artifactId>
+        </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
+
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java
index 2f3491f..15a68b2 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/PlaceholderExtractor.java
@@ -18,59 +18,16 @@
 
 package org.apache.streampipes.pe.shared;
 
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.streampipes.model.runtime.Event;
 
 public class PlaceholderExtractor {
 
-  private static final Pattern pattern = Pattern.compile("#[^#]*#");
+  private static final String HASHTAG = "#";
 
-  public static String replacePlaceholders(String content, String json) {
-    List<String> placeholders = getPlaceholders(content);
-    JsonParser parser = new JsonParser();
-    JsonObject jsonObject = parser.parse(json).getAsJsonObject();
-
-    for(String placeholder : placeholders) {
-      String replacedValue = getPropertyValue(jsonObject, placeholder);
-      content = content.replaceAll(placeholder, replacedValue);
+  public static String replacePlaceholders(Event event, String content) {
+    for(String key: event.getRaw().keySet()) {
+      content = content.replaceAll(HASHTAG + key + HASHTAG, event.getRaw().get(key).toString());
     }
-
     return content;
   }
-
-  public static String replacePlaceholders(String content, Map<String, Object> event) {
-    List<String> placeholders = getPlaceholders(content);
-
-    for(String placeholder : placeholders) {
-      String replacedValue = getPropertyValue(event, placeholder);
-      content = content.replaceAll(placeholder, replacedValue);
-    }
-
-    return content;
-  }
-
-  private static String getPropertyValue(Map<String, Object> event, String placeholder) {
-    String key = placeholder.replaceAll("#", "");
-    return String.valueOf(event.get(key));
-  }
-
-  private static String getPropertyValue(JsonObject jsonObject, String placeholder) {
-    String jsonKey = placeholder.replaceAll("#", "");
-    return String.valueOf(jsonObject.get(jsonKey).getAsString());
-  }
-
-  private static List<String> getPlaceholders(String content) {
-    List<String> results = new ArrayList<>();
-    Matcher matcher = pattern.matcher(content);
-    while (matcher.find()) {
-      results.add(matcher.group());
-    }
-    return results;
-  }
 }
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConfig.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConfig.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
index 8365950..e0a9a27 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConfig.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConfig.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.Kafka;
+package org.apache.streampipes.pe.shared.config.kafka;
 
 public class KafkaConfig {
 
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConnectUtils.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConnectUtils.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
index e14a630..3c9dde1 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Kafka/KafkaConnectUtils.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/kafka/KafkaConnectUtils.java
@@ -16,9 +16,8 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.Kafka;
+package org.apache.streampipes.pe.shared.config.kafka;
 
-import org.apache.streampipes.pe.shared.config.Kafka.KafkaConfig;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConfig.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConfig.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConfig.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConfig.java
index 21c8de3..71c049e 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConfig.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.pe.shared.config.Mqqt;
+package org.apache.streampipes.pe.shared.config.mqtt;
 
 public class MqttConfig {
 
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConnectUtils.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
similarity index 96%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConnectUtils.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
index 599c725..61313a7 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConnectUtils.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConnectUtils.java
@@ -16,9 +16,8 @@
  *
  */
 
-package org.apache.streampipes.pe.shared.config.Mqqt;
+package org.apache.streampipes.pe.shared.config.mqtt;
 
-import org.apache.streampipes.pe.shared.config.Mqqt.MqttConfig;
 import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
 import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
diff --git a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConsumer.java b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConsumer.java
similarity index 97%
rename from streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConsumer.java
rename to streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConsumer.java
index 7bea5d6..ee265d4 100644
--- a/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/Mqqt/MqttConsumer.java
+++ b/streampipes-pipeline-elements-shared/src/main/java/org/apache/streampipes/pe/shared/config/mqtt/MqttConsumer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.pe.shared.config.Mqqt;
+package org.apache.streampipes.pe.shared.config.mqtt;
 
 import org.fusesource.mqtt.client.*;
 import org.apache.streampipes.messaging.InternalEventProcessor;
diff --git a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
index 3dec7cb..5f7b90c 100644
--- a/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
+++ b/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rabbitmq/RabbitMqConsumer.java
@@ -18,16 +18,14 @@
 
 package org.apache.streampipes.sinks.brokers.jvm.rabbitmq;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.pe.shared.PlaceholderExtractor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
-
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RabbitMqConsumer implements EventSink<RabbitMqParameters> {
 
@@ -55,9 +53,8 @@
   @Override
   public void onEvent(Event inputEvent) {
     try {
-      Map<String, Object> event = inputEvent.getRaw();
-      publisher.fire(dataFormatDefinition.fromMap(event),
-              PlaceholderExtractor.replacePlaceholders(topic, event));
+      publisher.fire(dataFormatDefinition.fromMap(inputEvent.getRaw()),
+              PlaceholderExtractor.replacePlaceholders(inputEvent, topic));
     } catch (SpRuntimeException e) {
       LOG.error("Could not serialiaze event");
     }
diff --git a/streampipes-sinks-internal-jvm/pom.xml b/streampipes-sinks-internal-jvm/pom.xml
index 9bcc944..4294d85 100644
--- a/streampipes-sinks-internal-jvm/pom.xml
+++ b/streampipes-sinks-internal-jvm/pom.xml
@@ -33,6 +33,11 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-pipeline-elements-shared</artifactId>
+            <version>0.69.0-SNAPSHOT</version>
+        </dependency>
         <!-- StreamPipes dependencies -->
         <dependency>
             <groupId>org.apache.streampipes</groupId>
@@ -133,4 +138,4 @@
         </plugins>
         <finalName>streampipes-sinks-internal-jvm</finalName>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 287e84f..61123b2 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -28,6 +28,7 @@
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
 import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
 import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
 import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
@@ -57,6 +58,17 @@
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
+            .addConfig(ConfigKeys.JMS_HOST, "activemq", "")
+            .addConfig(ConfigKeys.JMS_PORT, 61616, "")
+            .addConfig(ConfigKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database")
+            .addConfig(ConfigKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database")
+            .addConfig(ConfigKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database")
+            .addConfig(ConfigKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database")
+            .addConfig(ConfigKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database")
+            .addConfig(ConfigKeys.DATA_LAKE_DATABASE_NAME, "sp", "Database name for the StreamPipes data lake database")
+            .addConfig(ConfigKeys.IMAGE_STORAGE_LOCATION, "/spImages/", "Storage location of the data lake images")
             .build();
+
+
   }
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
index fba1b64..fd8bd65 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
@@ -19,23 +19,13 @@
 package org.apache.streampipes.sinks.internal.jvm.config;
 
 public class ConfigKeys {
-    final static String HOST = "SP_HOST";
-    final static String PORT = "SP_PORT";
-    final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
-    final static String COUCHDB_PORT = "SP_COCHDB_PORT";
-    final static String JMS_HOST = "SP_JMS_HOST";
-    final static String JMS_PORT = "SP_JMS_PORT";
-    final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-    final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-    final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-    final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-    final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-    final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-    final static String BACKEND_HOST = "SP_BACKEND_HOST";
-    final static String BACKEND_PORT = "SP_BACKEND_PORT";
-    final static String BACKEND_PROTOCOL = "SP_BACKEND_PROTOCOL";
-    final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
-
-
-    final static String SERVICE_NAME = "SP_SERVICE_NAME";
+    public final static String JMS_HOST = "SP_JMS_HOST";
+    public final static String JMS_PORT = "SP_JMS_PORT";
+    public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
+    public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
+    public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
+    public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
+    public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
+    public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
+    public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/SinksInternalJvmConfig.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/SinksInternalJvmConfig.java
deleted file mode 100644
index faf1b1d..0000000
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/SinksInternalJvmConfig.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.config;
-
-
-import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.container.model.PeConfig;
-
-public enum SinksInternalJvmConfig implements PeConfig {
-	INSTANCE;
-
-	private SpConfig config;
-
-	public final static String serverUrl;
-
-	private final static String service_id = "pe/org.apache.streampipes.sinks.internal.jvm";
-	private final static String service_name = "Sinks Internal JVM";
-	private final static String service_container_name = "sinks-internal-jvm";
-
-
-	SinksInternalJvmConfig() {
-		config = SpServiceDiscovery.getSpConfig(service_id);
-		config.register(ConfigKeys.HOST, service_container_name, "Hostname for the pe esper");
-		config.register(ConfigKeys.PORT, 8090, "Port for the pe esper");
-
-		config.register(ConfigKeys.COUCHDB_HOST, "couchdb", "Host for couchdb of the pe sinks project");
-		config.register(ConfigKeys.COUCHDB_PORT, 5984, "Port for couchdb of the pe sinks project");
-		config.register(ConfigKeys.JMS_HOST, "activemq", "Hostname for pe actions service for active mq");
-		config.register(ConfigKeys.JMS_PORT, 61616, "Port for pe actions service for active mq");
-		config.register(ConfigKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database");
-		config.register(ConfigKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database");
-		config.register(ConfigKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database");
-		config.register(ConfigKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database");
-		config.registerPassword(ConfigKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database");
-		config.register(ConfigKeys.DATA_LAKE_DATABASE_NAME, "sp", "Database name for the StreamPipes data lake database");
-		config.register(ConfigKeys.BACKEND_HOST, "backend", "Hostname for the StreamPipes-Backend");
-		config.register(ConfigKeys.BACKEND_PORT, 8030, "Port for the StreamPipes-Backend");
-		config.register(ConfigKeys.BACKEND_PROTOCOL, "http", "Protocol for the StreamPipes-Backend");
-		config.register(ConfigKeys.IMAGE_STORAGE_LOCATION, "/spImages/", "Storage location of the data lake images");
-
-		config.register(ConfigKeys.SERVICE_NAME, service_name, "The name of the service");
-
-	}
-	
-	static {
-		serverUrl = SinksInternalJvmConfig.INSTANCE.getHost() + ":" + SinksInternalJvmConfig.INSTANCE.getPort();
-	}
-
-	@Override
-	public String getHost() {
-		return config.getString(ConfigKeys.HOST);
-	}
-
-	@Override
-	public int getPort() {
-		return config.getInteger(ConfigKeys.PORT);
-	}
-
-	public String getCouchDbHost() {
-		return config.getString(ConfigKeys.COUCHDB_HOST);
-	}
-
-	public int getCouchDbPort() {
-		return config.getInteger(ConfigKeys.COUCHDB_PORT);
-	}
-
-	public String getJmsHost() {
-		return config.getString(ConfigKeys.JMS_HOST);
-	}
-
-	public int getJmsPort() {
-		return config.getInteger(ConfigKeys.JMS_PORT);
-	}
-
-	public String getDataLakeHost() {
-		return config.getString(ConfigKeys.DATA_LAKE_HOST);
-	}
-
-	public String getDataLakeProtocol() {
-		return config.getString(ConfigKeys.DATA_LAKE_PROTOCOL);
-	}
-
-	public Integer getDataLakePort() {
-		return config.getInteger(ConfigKeys.DATA_LAKE_PORT);
-	}
-
-	public String getDataLakeUsername() {
-		return config.getString(ConfigKeys.DATA_LAKE_USERNAME);
-	}
-
-
-	public String getDataLakePassword() {
-		return config.getString(ConfigKeys.DATA_LAKE_PASSWORD);
-	}
-
-
-	public String getDataLakeDatabaseName() {
-		return config.getString(ConfigKeys.DATA_LAKE_DATABASE_NAME);
-	}
-
-
-	@Override
-	public String getId() {
-		return service_id;
-	}
-
-	@Override
-	public String getName() {
-		return config.getString(ConfigKeys.SERVICE_NAME);
-	}
-
-
-	public String getStreamPipesBackendHost() {
-		return config.getString(ConfigKeys.BACKEND_HOST);
-	}
-
-	public String getStreamPipesBackendProtocol() {
-		return config.getString(ConfigKeys.BACKEND_PROTOCOL);
-	}
-
-
-	public String getImageStorageLocation() {
-		return config.getString(ConfigKeys.IMAGE_STORAGE_LOCATION);
-	}
-
-	public Integer getStreamPipesBackendPort() {
-		return config.getInteger(ConfigKeys.BACKEND_PORT);
-	}
-
-	public String getStreamPipesBackendUrl() { return getStreamPipesBackendProtocol() + "://"
-			+ getStreamPipesBackendHost() + ":" + getStreamPipesBackendPort(); }
-
-
-
-}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index 91564c7..e826b98 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -23,7 +23,7 @@
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
@@ -38,10 +38,10 @@
 
     @Override
     public void onInvocation(DashboardParameters parameters,
-                             EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+                             EventSinkRuntimeContext context) throws SpRuntimeException {
         this.publisher = new ActiveMQPublisher(
-                SinksInternalJvmConfig.INSTANCE.getJmsHost(),
-                SinksInternalJvmConfig.INSTANCE.getJmsPort(),
+                context.getConfigStore().getConfig().getString(ConfigKeys.JMS_HOST),
+                context.getConfigStore().getConfig().getInteger(ConfigKeys.JMS_PORT),
                 makeTopic(parameters.getGraph().getInputStreams().get(0), parameters.getVisualizationName()));
     }
 
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index d7134f1..07c146f 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -19,17 +19,14 @@
 package org.apache.streampipes.sinks.internal.jvm.datalake;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.vocabulary.SPSensor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
@@ -62,13 +59,21 @@
 
     this.timestampField = parameters.getTimestampField();
 
+    SpConfig configStore = runtimeContext.getConfigStore().getConfig();
+
+    String influxHost = configStore.getString(ConfigKeys.DATA_LAKE_PROTOCOL) + "://" + configStore.getString(ConfigKeys.DATA_LAKE_HOST);
+    Integer influxPort = configStore.getInteger(ConfigKeys.DATA_LAKE_PORT);
+    String databaseName = configStore.getString(ConfigKeys.DATA_LAKE_DATABASE_NAME);
+    String user = configStore.getString(ConfigKeys.DATA_LAKE_USERNAME);
+    String password = configStore.getString(ConfigKeys.DATA_LAKE_PASSWORD);
+
     this.influxDbClient = new DataLakeInfluxDbClient(
-            parameters.getInfluxDbHost(),
-            parameters.getInfluxDbPort(),
-            parameters.getDatabaseName(),
+            influxHost,
+            influxPort,
+            databaseName,
             parameters.getMeasurementName(),
-            parameters.getUsername(),
-            parameters.getPassword(),
+            user,
+            password,
             parameters.getTimestampField(),
             parameters.getBatchSize(),
             parameters.getFlushDuration(),
@@ -91,7 +96,7 @@
     schema.getEventProperties().stream().forEach(eventProperty -> {
       eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
     });
-    registerAtDataLake(parameters.getMeasurementName(), schema);
+    registerAtDataLake(parameters.getMeasurementName(), schema, runtimeContext.getStreamPipesClient());
 
     imageProperties = schema.getEventProperties().stream()
             .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
@@ -99,7 +104,7 @@
                     eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
             .collect(Collectors.toList());
 
-    imageDirectory = SinksInternalJvmConfig.INSTANCE.getImageStorageLocation() + parameters.getMeasurementName() + "/";
+    imageDirectory = configStore.getString(ConfigKeys.IMAGE_STORAGE_LOCATION) + parameters.getMeasurementName() + "/";
 
   }
 
@@ -150,24 +155,12 @@
    * @param eventSchema
    * @throws SpRuntimeException
    */
-  private void registerAtDataLake(String measure, EventSchema eventSchema) throws SpRuntimeException {
-    String url = SinksInternalJvmConfig.INSTANCE.getStreamPipesBackendUrl();
-
-    try {
-      String json = JacksonSerializer.getObjectMapper().writeValueAsString(eventSchema);
-      StringEntity stringEntity = new StringEntity(json, ContentType.APPLICATION_JSON);
-      HttpResponse response = Request.Post(url + "/streampipes-backend/api/v3/noauth/datalake/" + measure)
-              .addHeader("Content-type", "application/json")
-              .body(stringEntity)
-              .execute()
-              .returnResponse();
-      if (response.getStatusLine().getStatusCode() == 409) {
-        throw new SpRuntimeException("The measurement '" + measure +"' is already registered as Data lake with different Event schema");
-      }
-    } catch (IOException e) {
-      LOG.error(e.toString());
-    }
-
+  private void registerAtDataLake(String measure,
+                                  EventSchema eventSchema,
+                                  StreamPipesClient client) throws SpRuntimeException {
+      client
+        .customRequest()
+        .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
   }
 
   public static String prepareString(String s) {
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index 814c437..ffb2670 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -29,12 +29,9 @@
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
 
-import java.util.List;
-
 public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakeParameters> {
 
   private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
@@ -64,21 +61,16 @@
     measureName = DataLake.prepareString(measureName);
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
 
-    String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
-    Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
-    String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
-    String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
-    String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
+//    String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
+//    Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
+//    String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
+//    String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
+//    String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
     Integer batch_size = 2000;
     Integer flush_duration = 500;
 
     DataLakeParameters params = new DataLakeParameters(graph,
-            hostname,
-            port,
-            dbName,
             measureName,
-            user,
-            password,
             timestampField,
             batch_size,
             flush_duration);
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
index 1dcc0fa..61f8f3a 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
@@ -26,63 +26,27 @@
  */
 public class DataLakeParameters extends EventSinkBindingParams {
 
-  private String influxDbHost;
-  private Integer influxDbPort;
-  private String databaseName;
   private String measureName;
-  private String user;
-  private String password;
   private String timestampField;
   private Integer batchSize;
   private Integer flushDuration;
 
   public DataLakeParameters(DataSinkInvocation graph,
-                            String influxDbHost,
-                            Integer influxDbPort,
-                            String databaseName,
                             String measureName,
-                            String user,
-                            String password,
                             String timestampField,
                             Integer batchSize,
                             Integer flushDuration) {
     super(graph);
-
-    this.influxDbHost = influxDbHost;
-    this.influxDbPort = influxDbPort;
-    this.databaseName = databaseName;
     this.measureName = measureName;
-    this.user = user;
-    this.password = password;
     this.timestampField = timestampField;
     this.batchSize = batchSize;
     this.flushDuration = flushDuration;
   }
 
-  public String getInfluxDbHost() {
-    return influxDbHost;
-  }
-
-  public Integer getInfluxDbPort() {
-    return influxDbPort;
-  }
-
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
   public String getMeasurementName() {
     return measureName;
   }
 
-  public String getUsername() {
-    return user;
-  }
-
-  public String getPassword() {
-    return password;
-  }
-
   public String getTimestampField() {
     return timestampField;
   }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
index cdcf04b..a18692c 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
@@ -24,7 +24,9 @@
 import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.Notification;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
+import org.apache.streampipes.pe.shared.PlaceholderExtractor;
+import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
@@ -45,16 +47,17 @@
 
 
   @Override
-  public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext runtimeContext) throws
+  public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext context) throws
           SpRuntimeException {
     this.gson = new Gson();
     this.title = parameters.getTitle();
     this.content = parameters.getContent();
     this.correspondingPipelineId = parameters.getGraph().getCorrespondingPipeline();
     this.correspondingUser = parameters.getGraph().getCorrespondingUser();
+    SpConfig configStore = context.getConfigStore().getConfig();
     this.publisher = new ActiveMQPublisher(
-            SinksInternalJvmConfig.INSTANCE.getJmsHost(),
-            SinksInternalJvmConfig.INSTANCE.getJmsPort(),
+            configStore.getString(ConfigKeys.JMS_HOST),
+            configStore.getInteger(ConfigKeys.JMS_PORT),
             "org.apache.streampipes.notifications." + this.correspondingUser);
   }
 
@@ -65,7 +68,7 @@
     notification.setId(UUID.randomUUID().toString());
     notification.setRead(false);
     notification.setTitle(title);
-    notification.setMessage(replacePlaceholders(inputEvent, content));
+    notification.setMessage(PlaceholderExtractor.replacePlaceholders(inputEvent, content));
     notification.setCreatedAt(currentDate);
     notification.setCreatedAtTimestamp(currentDate.getTime());
     notification.setCorrespondingPipelineId(correspondingPipelineId);
@@ -81,10 +84,4 @@
     this.publisher.disconnect();
   }
 
-  private String replacePlaceholders(Event event, String content) {
-    for(String key: event.getRaw().keySet()) {
-      content = content.replaceAll(HASHTAG + key + HASHTAG, event.getRaw().get(key).toString());
-    }
-    return content;
-  }
 }
diff --git a/streampipes-sinks-notifications-jvm/pom.xml b/streampipes-sinks-notifications-jvm/pom.xml
index 22d81c0..be38650 100644
--- a/streampipes-sinks-notifications-jvm/pom.xml
+++ b/streampipes-sinks-notifications-jvm/pom.xml
@@ -28,6 +28,11 @@
     <artifactId>streampipes-sinks-notifications-jvm</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.streampipes</groupId>
+            <artifactId>streampipes-pipeline-elements-shared</artifactId>
+            <version>0.69.0-SNAPSHOT</version>
+        </dependency>
         <!-- StreamPipes dependencies -->
         <dependency>
             <groupId>org.apache.streampipes</groupId>
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
index 20c94ac..019cb4b 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/SinksNotificationsJvmInit.java
@@ -59,13 +59,6 @@
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
             .addConfig(ConfigKeys.WEBSOCKET_PROTOCOL, "ws", "")
-            .addConfig(ConfigKeys.EMAIL_FROM, "", "The sender address of the email")
-            .addConfig(ConfigKeys.EMAIL_USERNAME, "", "The username of the email account")
-            .addConfig(ConfigKeys.EMAIL_PASSWORD, "", "The password of the email account")
-            .addConfig(ConfigKeys.EMAIL_SMTP_HOST, "", "The SMTP Host")
-            .addConfig(ConfigKeys.EMAIL_SMTP_PORT, "", "The SMTP Port")
-            .addConfig(ConfigKeys.EMAIL_STARTTLS, false, "Use startls?")
-            .addConfig(ConfigKeys.EMAIL_SLL, false, "Use SLL?")
             .build();
   }
 }
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java
index f69d0a7..5b3a478 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/config/ConfigKeys.java
@@ -19,13 +19,5 @@
 package org.apache.streampipes.sinks.notifications.jvm.config;
 
 public class ConfigKeys {
-    public final static String EMAIL_FROM = "SP_EMAIL_FROM";
-    public final static String EMAIL_USERNAME = "SP_EMAIL_USERNAME";
-    public final static String EMAIL_PASSWORD = "SP_EMAIL_PASSWORD";
-    public final static String EMAIL_SMTP_HOST = "SP_EMAIL_SMTP_HOST";
-    public final static String EMAIL_SMTP_PORT = "SP_EMAIL_SMTP_PORT";
-    public final static String EMAIL_STARTTLS = "SP_EMAIL_STARTTLS";
-    public final static String EMAIL_SLL = "SP_EMAIL_SLL";
     public final static String WEBSOCKET_PROTOCOL = "SP_WEBSOCKET_PROTOCOL";
-
 }
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java
index 1f5483e..ab48dcd 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailController.java
@@ -36,6 +36,7 @@
   private static final String TO_EMAIL_ADRESS = "to_email";
   private static final String EMAIL_SUBJECT = "email_subject";
   private static final String EMAIL_CONTENT = "email_content";
+  private static final String SILENT_PERIOD = "silent-period";
 
 
   @Override
@@ -51,6 +52,7 @@
                     .requiredProperty(EpRequirements.anyProperty())
                     .build())
             .requiredHtmlInputParameter(Labels.withId(EMAIL_CONTENT))
+            .requiredIntegerParameter(Labels.withId(SILENT_PERIOD))
             .build();
   }
 
@@ -60,8 +62,9 @@
     String toEmail = extractor.singleValueParameter(TO_EMAIL_ADRESS, String.class);
     String subject = extractor.singleValueParameter(EMAIL_SUBJECT, String.class);
     String content = extractor.singleValueParameter(EMAIL_CONTENT, String.class);
+    Integer silentPeriod = extractor.singleValueParameter(SILENT_PERIOD, Integer.class);
 
-    EmailParameters params = new EmailParameters(graph, toEmail, subject, content);
+    EmailParameters params = new EmailParameters(graph, toEmail, subject, content, silentPeriod);
 
     return new ConfiguredEventSink<>(params, EmailPublisher::new);
   }
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java
index e63ca31..82fbad2 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailParameters.java
@@ -26,12 +26,18 @@
     private String toEmailAddress;
     private String subject;
     private String content;
+    private Integer silentPeriod;
 
-    public EmailParameters(DataSinkInvocation graph, String toEmailAddress, String subject, String content) {
+    public EmailParameters(DataSinkInvocation graph,
+                           String toEmailAddress,
+                           String subject,
+                           String content,
+                           Integer silentPeriod) {
         super(graph);
         this.toEmailAddress = toEmailAddress;
         this.subject = subject;
         this.content = content;
+        this.silentPeriod = silentPeriod;
     }
 
     public String getToEmailAddress() {
@@ -45,4 +51,8 @@
     public String getContent() {
         return content;
     }
+
+    public Integer getSilentPeriod() {
+        return silentPeriod;
+    }
 }
diff --git a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java
index 8ee229e..63ab537 100644
--- a/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java
+++ b/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/email/EmailPublisher.java
@@ -18,87 +18,53 @@
 
 package org.apache.streampipes.sinks.notifications.jvm.email;
 
+import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.mail.SpEmail;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.EventConverter;
-import org.apache.streampipes.sinks.notifications.jvm.config.ConfigKeys;
+import org.apache.streampipes.pe.shared.PlaceholderExtractor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
-import javax.mail.*;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-import java.util.Map;
-import java.util.Properties;
+import java.time.Instant;
+import java.util.Collections;
 
 public class EmailPublisher implements EventSink<EmailParameters> {
 
-    private static Logger LOG;
+    private SpEmail preparedEmail;
+    private long silentPeriodInSeconds;
+    private long lastMailEpochSecond = -1;
 
-    private MimeMessage message;
-    private String content;
+    private String originalContent;
+
+    private StreamPipesClient client;
 
     @Override
     public void onInvocation(EmailParameters parameters, EventSinkRuntimeContext runtimeContext) {
-        LOG = parameters.getGraph().getLogger(EmailPublisher.class);
-        SpConfig config = runtimeContext.getConfigStore().getConfig();
-        String from = config.getString(ConfigKeys.EMAIL_FROM);
-        String to = parameters.getToEmailAddress();
-        String subject = parameters.getSubject();
-        this.content = parameters.getContent();
-        String username = config.getString(ConfigKeys.EMAIL_USERNAME);
-        String password = config.getString(ConfigKeys.EMAIL_PASSWORD);
-        String host = config.getString(ConfigKeys.EMAIL_SMTP_HOST);
-        int port = config.getInteger(ConfigKeys.EMAIL_SMTP_PORT);
-        boolean starttls = config.getBoolean(ConfigKeys.EMAIL_STARTTLS);
-        boolean ssl = config.getBoolean(ConfigKeys.EMAIL_SLL);
+        this.preparedEmail = new SpEmail();
+        this.preparedEmail.setRecipients(Collections.singletonList(parameters.getToEmailAddress()));
+        this.preparedEmail.setSubject(parameters.getSubject());
 
-        Properties properties = new Properties();
-        properties.setProperty("mail.smtp.host", host);
-        properties.setProperty("mail.smtp.port", String.valueOf(port));
-
-        if (starttls) {
-            properties.put("mail.smtp.starttls.enable", "true");
-        }
-        if (ssl) {
-            properties.put("mail.smtp.ssl.enable", "true");
-        }
-        properties.put("mail.smtp.auth", "true");
-
-        Session session = Session.getDefaultInstance(properties, new Authenticator() {
-
-            @Override
-            protected PasswordAuthentication getPasswordAuthentication() {
-                return new PasswordAuthentication(username, password);
-            }
-        });
-
-        try {
-            this.message = new MimeMessage(session);
-            this.message.setFrom(new InternetAddress(from));
-            this.message.addRecipient(Message.RecipientType.TO, new InternetAddress(to));
-            this.message.setSubject(subject);
-        } catch (MessagingException e) {
-           LOG.error(e.toString());
-        }
+        this.silentPeriodInSeconds = parameters.getSilentPeriod() * 60;
+        this.client = runtimeContext.getStreamPipesClient();
+        this.originalContent = parameters.getContent();
     }
 
     @Override
     public void onEvent(Event inputEvent) {
-        String contentWithValues = this.content;
-        Map<String, Object> inputMap = new EventConverter(inputEvent).toMap();
-        try {
-            for (Map.Entry entry: inputMap.entrySet()) {
-                contentWithValues = contentWithValues.replaceAll("#" + entry.getKey() + "#",
-                        entry.getValue().toString());
-            }
-            this.message.setContent(contentWithValues, "text/html; charset=utf-8");
-            Transport.send(message);
-            LOG.info("Sent notifaction email");
-        } catch (MessagingException e) {
-            LOG.error(e.toString());
+        if (shouldSendMail()) {
+            String message = PlaceholderExtractor.replacePlaceholders(inputEvent, this.originalContent);
+            this.preparedEmail.setMessage(message);
+            this.client.deliverEmail(this.preparedEmail);
+            this.lastMailEpochSecond = Instant.now().getEpochSecond();
+        }
+    }
+
+    private boolean shouldSendMail() {
+        if (this.lastMailEpochSecond == -1) {
+            return true;
+        } else {
+            return Instant.now().getEpochSecond() >= (this.lastMailEpochSecond + this.silentPeriodInSeconds);
         }
     }
 
diff --git a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en
index bca41f7..ab58dd2 100644
--- a/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en
+++ b/streampipes-sinks-notifications-jvm/src/main/resources/org.apache.streampipes.sinks.notifications.jvm.email/strings.en
@@ -8,4 +8,7 @@
 email_subject.description=The subject of the email
 
 email_content.title=Content
-email_content.description=Enter the email text. You can use place holders like #fieldName# to add the value of a field.
\ No newline at end of file
+email_content.description=Enter the email text. You can use place holders like #fieldName# to add the value of a field.
+
+silent-period.title=Silent Period [min]
+silent-period.description=The minimum number of minutes between two consecutive mails that are sent.