Merge branch 'dev' into STREAMPIPES-527
diff --git a/.idea/runConfigurations/backend.xml b/.idea/runConfigurations/backend.xml
index 4e1f928..344ff6e 100644
--- a/.idea/runConfigurations/backend.xml
+++ b/.idea/runConfigurations/backend.xml
@@ -10,7 +10,7 @@
       <env name="SP_JMS_HOST" value="localhost" />
       <env name="SP_KAFKA_HOST" value="localhost" />
       <env name="SP_KAFKA_PORT" value="9094" />
-      <env name="SP_PRIORITIZED_PROTOCOL" value="kafka" />
+      <env name="SP_PRIORITIZED_PROTOCOL" value="mqtt" />
     </envs>
     <module name="streampipes-backend" />
     <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.backend.StreamPipesBackendApplication" />
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
index 7871118..c67c3a7 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
@@ -18,3 +18,5 @@
   backend:
     ports:
       - "8030:8030"
+    environment:
+      - SP_HOST=host.docker.internal
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.yml b/installer/cli/deploy/standalone/backend/docker-compose.yml
index 4d014eb..c67ba45 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.yml
@@ -24,7 +24,7 @@
       - backend:/root/.streampipes
       - files:/spImages
     environment:
-      - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-kafka}
+      - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-mqtt}
       - SP_MQTT_HOST=${SP_MQTT_HOST:-activemq}
     logging:
       driver: "json-file"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
index ae347c1..711570b 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
@@ -17,4 +17,4 @@
 services:
   mosquitto:
     ports:
-      - "1884:1883"
+      - "1883:1883"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
index 5e5b07a..c936edf 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
@@ -16,7 +16,8 @@
 version: "3.4"
 services:
   mosquitto:
-    image: eclipse-mosquitto:1.5.4
+    image: eclipse-mosquitto:2.0.14
+    command: mosquitto -c /mosquitto-no-auth.conf
     logging:
       driver: "json-file"
       options:
diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml
index 92a4ca5..4c5a079 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -113,33 +113,9 @@
     networks:
       spnet:
 
-  kafka:
-    image: fogsyio/kafka:2.2.0
-    hostname: kafka
-    depends_on:
-      - zookeeper
-    environment:
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
-      KAFKA_LISTENERS: PLAINTEXT://:9092
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
-    volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
-  zookeeper:
-    image: fogsyio/zookeeper:3.4.13
-    volumes:
-      - zookeeper:/opt/zookeeper-3.4.13
+  mosquitto:
+    image: eclipse-mosquitto:2.0.14
+    command: mosquitto -c /mosquitto-no-auth.conf
     logging: *default-logging
     restart: unless-stopped
     networks:
@@ -232,8 +208,6 @@
   consul:
   connect:
   couchdb:
-  kafka:
-  zookeeper:
   influxdb:
   files:
 
diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml
index 358493c..c1b04c0 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -85,33 +85,9 @@
     networks:
       spnet:
 
-  kafka:
-    image: fogsyio/kafka:2.2.0
-    hostname: kafka
-    depends_on:
-      - zookeeper
-    environment:
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
-      KAFKA_LISTENERS: PLAINTEXT://:9092
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_ADVERTISED_HOST_NAME: kafka
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
-      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
-    volumes:
-      - kafka:/kafka
-      - /var/run/docker.sock:/var/run/docker.sock
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
-  zookeeper:
-    image: fogsyio/zookeeper:3.4.13
-    volumes:
-      - zookeeper:/opt/zookeeper-3.4.13
+  mosquitto:
+    image: eclipse-mosquitto:2.0.14
+    command: mosquitto -c /mosquitto-no-auth.conf
     logging: *default-logging
     restart: unless-stopped
     networks:
@@ -158,8 +134,6 @@
   consul:
   connect:
   couchdb:
-  kafka:
-  zookeeper:
   influxdb:
   files:
 
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
index 514c2e5..e336b1d 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
@@ -32,6 +32,6 @@
   }
 
   public static Boolean getEnvAsBoolean(String envVariable) {
-    return Boolean.parseBoolean(getEnv(envVariable));
+    return exists(envVariable) && Boolean.parseBoolean(getEnv(envVariable));
   }
 }
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 403ac4d..eb713c8 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -47,7 +47,7 @@
 
     config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
     config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
-    config.register(BackendConfigKeys.MQTT_HOST, "activemq", "Hostname of mqtt service");
+    config.register(BackendConfigKeys.MQTT_HOST, "mosquitto", "Hostname of mqtt service");
     config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
     config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
     config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index 6b49a0e..2c37e6f 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -34,9 +34,6 @@
     List<SpProtocol> protocolList;
     if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
       switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
-        case "mqtt":
-          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
-          break;
         case "kafka":
           protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
           break;
@@ -44,10 +41,10 @@
           protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
           break;
         default:
-          protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+          protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
       }
     } else {
-      protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+      protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
     }
 
     return new MessagingSettings(
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index 86fcf9b..d5d495a 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,6 +18,9 @@
 
 package org.apache.streampipes.connect.adapter;
 
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToJmsAdapterSink;
 import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
@@ -60,7 +63,7 @@
 
         if (transportProtocol instanceof JmsTransportProtocol) {
             SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink();
-            if ("true".equals(System.getenv("SP_DEBUG"))) {
+            if (Envs.SP_DEBUG.getValueAsBoolean()) {
                 transportProtocol.setBrokerHostname("localhost");
                 //((JmsTransportProtocol) transportProtocol).setPort(61616);
             }
@@ -68,7 +71,7 @@
         }
         else if (transportProtocol instanceof KafkaTransportProtocol) {
             SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
-            if ("true".equals(System.getenv("SP_DEBUG"))) {
+            if (Envs.SP_DEBUG.getValueAsBoolean()) {
                 transportProtocol.setBrokerHostname("localhost");
                 ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
             }
@@ -76,7 +79,7 @@
         }
         else if (transportProtocol instanceof MqttTransportProtocol) {
             SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink();
-            if ("true".equals(System.getenv("SP_DEBUG"))) {
+            if (Envs.SP_DEBUG.getValueAsBoolean()) {
                 transportProtocol.setBrokerHostname("localhost");
                 //((MqttTransportProtocol) transportProtocol).setPort(1883);
             }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
index 98613c8..b63f108 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -42,12 +42,6 @@
     @Override
     public void process(Map<String, Object> event) {
 
-        // TODO remove, just for performance tests
-        if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
-            event.put("internal_t1", System.currentTimeMillis());
-        }
-
-
         for (IAdapterPipelineElement pipelineElement : pipelineElements) {
             event = pipelineElement.process(event);
         }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
index 9c3b41c..5e4cb89 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.connect.adapter.preprocessing.elements;
 
+import org.apache.streampipes.commons.constants.Envs;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
@@ -46,7 +47,7 @@
             .getEventGrounding()
             .getTransportProtocol());
 
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       modifyProtocolForDebugging();
     }
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 9067b45..f9918fc 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -332,15 +332,17 @@
                 String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId + "\" WITH KEY = \"" + f + "\"";
                 Query query = new Query(q);
                 QueryResult queryResult = influxDB.query(query);
-                queryResult.getResults().forEach(res -> {
-                    res.getSeries().forEach(series -> {
-                        if (series.getValues().size() > 0) {
-                            String field = series.getValues().get(0).get(0).toString();
-                            List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
-                            tags.put(field, values);
-                        }
+                if (queryResult.getResults() != null) {
+                    queryResult.getResults().forEach(res -> {
+                        res.getSeries().forEach(series -> {
+                            if (series.getValues().size() > 0) {
+                                String field = series.getValues().get(0).get(0).toString();
+                                List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+                                tags.put(field, values);
+                            }
+                        });
                     });
-                });
+                }
             });
         }
 
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index 6ba637a..dcdfbe7 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -29,6 +29,8 @@
   protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
     this.mqtt = new MQTT();
     this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+    this.mqtt.setConnectAttemptsMax(3);
+    this.mqtt.setReconnectDelay(1000);
     this.connection = mqtt.blockingConnection();
     this.connection.connect();
     this.connected = true;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index e7fac0b..d1247a1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,6 +17,7 @@
  */
 package org.apache.streampipes.manager.runtime;
 
+import org.apache.streampipes.commons.constants.Envs;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.messaging.InternalEventProcessor;
 import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
@@ -27,8 +28,6 @@
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.apache.streampipes.model.grounding.MqttTransportProtocol;
 import org.apache.streampipes.model.grounding.TransportFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,8 +35,6 @@
 public enum PipelineElementRuntimeInfoFetcher {
   INSTANCE;
 
-  Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
   private final int FETCH_INTERVAL_MS = 300;
   private final Map<String, SpDataFormatConverter> converterMap;
 
@@ -49,8 +46,7 @@
 
     if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
       return getLatestEventFromKafka(spDataStream);
-    }
-    else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
+    } else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol) {
       return getLatestEventFromJms(spDataStream);
     } else {
       return getLatestEventFromMqtt(spDataStream);
@@ -75,7 +71,7 @@
     JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change jms config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
     }
     if (!converterMap.containsKey(jmsTopic)) {
@@ -113,7 +109,7 @@
     MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change mqtt config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
     }
 
@@ -122,15 +118,12 @@
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
     MqttConsumer mqttConsumer = new MqttConsumer();
-    mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
-      @Override
-      public void onEvent(byte[] event) {
-        try {
-          result[0] = converterMap.get(mqttTopic).convert(event);
-          mqttConsumer.disconnect();
-        } catch (SpRuntimeException e) {
-          e.printStackTrace();
-        }
+    mqttConsumer.connect(protocol, event -> {
+      try {
+        result[0] = converterMap.get(mqttTopic).convert(event);
+        mqttConsumer.disconnect();
+      } catch (SpRuntimeException e) {
+        e.printStackTrace();
       }
     });
 
@@ -151,7 +144,7 @@
     KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
 
     // Change kafka config when running in development mode
-    if ("true".equals(System.getenv("SP_DEBUG"))) {
+    if (Envs.SP_DEBUG.getValueAsBoolean()) {
       protocol.setBrokerHostname("localhost");
       protocol.setKafkaPort(9094);
     }
@@ -161,14 +154,11 @@
               new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
     }
 
-    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
-      @Override
-      public void onEvent(byte[] event) {
-        try {
-          result[0] = converterMap.get(kafkaTopic).convert(event);
-        } catch (SpRuntimeException e) {
-          e.printStackTrace();
-        }
+    SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, event -> {
+      try {
+        result[0] = converterMap.get(kafkaTopic).convert(event);
+      } catch (SpRuntimeException e) {
+        e.printStackTrace();
       }
     });