Merge branch 'dev' into STREAMPIPES-527
diff --git a/.idea/runConfigurations/backend.xml b/.idea/runConfigurations/backend.xml index 4e1f928..344ff6e 100644 --- a/.idea/runConfigurations/backend.xml +++ b/.idea/runConfigurations/backend.xml
@@ -10,7 +10,7 @@ <env name="SP_JMS_HOST" value="localhost" /> <env name="SP_KAFKA_HOST" value="localhost" /> <env name="SP_KAFKA_PORT" value="9094" /> - <env name="SP_PRIORITIZED_PROTOCOL" value="kafka" /> + <env name="SP_PRIORITIZED_PROTOCOL" value="mqtt" /> </envs> <module name="streampipes-backend" /> <option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.backend.StreamPipesBackendApplication" />
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml index 7871118..c67c3a7 100644 --- a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml +++ b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
@@ -18,3 +18,5 @@ backend: ports: - "8030:8030" + environment: + - SP_HOST=host.docker.internal
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.yml b/installer/cli/deploy/standalone/backend/docker-compose.yml index 4d014eb..c67ba45 100644 --- a/installer/cli/deploy/standalone/backend/docker-compose.yml +++ b/installer/cli/deploy/standalone/backend/docker-compose.yml
@@ -24,7 +24,7 @@ - backend:/root/.streampipes - files:/spImages environment: - - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-kafka} + - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-mqtt} - SP_MQTT_HOST=${SP_MQTT_HOST:-activemq} logging: driver: "json-file"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml index ae347c1..711570b 100644 --- a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml +++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
@@ -17,4 +17,4 @@ services: mosquitto: ports: - - "1884:1883" + - "1883:1883"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml index 5e5b07a..c936edf 100644 --- a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml +++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
@@ -16,7 +16,8 @@ version: "3.4" services: mosquitto: - image: eclipse-mosquitto:1.5.4 + image: eclipse-mosquitto:2.0.14 + command: mosquitto -c /mosquitto-no-auth.conf logging: driver: "json-file" options:
diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml index 92a4ca5..4c5a079 100644 --- a/installer/compose/docker-compose.full.yml +++ b/installer/compose/docker-compose.full.yml
@@ -113,33 +113,9 @@ networks: spnet: - kafka: - image: fogsyio/kafka:2.2.0 - hostname: kafka - depends_on: - - zookeeper - environment: - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092 - KAFKA_LISTENERS: PLAINTEXT://:9092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 - volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock - logging: *default-logging - restart: unless-stopped - networks: - spnet: - - zookeeper: - image: fogsyio/zookeeper:3.4.13 - volumes: - - zookeeper:/opt/zookeeper-3.4.13 + mosquitto: + image: eclipse-mosquitto:2.0.14 + command: mosquitto -c /mosquitto-no-auth.conf logging: *default-logging restart: unless-stopped networks: @@ -232,8 +208,6 @@ consul: connect: couchdb: - kafka: - zookeeper: influxdb: files:
diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml index 358493c..c1b04c0 100644 --- a/installer/compose/docker-compose.yml +++ b/installer/compose/docker-compose.yml
@@ -85,33 +85,9 @@ networks: spnet: - kafka: - image: fogsyio/kafka:2.2.0 - hostname: kafka - depends_on: - - zookeeper - environment: - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092 - KAFKA_LISTENERS: PLAINTEXT://:9092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_MESSAGE_MAX_BYTES: 5000012 - KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012 - KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 - volumes: - - kafka:/kafka - - /var/run/docker.sock:/var/run/docker.sock - logging: *default-logging - restart: unless-stopped - networks: - spnet: - - zookeeper: - image: fogsyio/zookeeper:3.4.13 - volumes: - - zookeeper:/opt/zookeeper-3.4.13 + mosquitto: + image: eclipse-mosquitto:2.0.14 + command: mosquitto -c /mosquitto-no-auth.conf logging: *default-logging restart: unless-stopped networks: @@ -158,8 +134,6 @@ consul: connect: couchdb: - kafka: - zookeeper: influxdb: files:
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java index 514c2e5..e336b1d 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
@@ -32,6 +32,6 @@ } public static Boolean getEnvAsBoolean(String envVariable) { - return Boolean.parseBoolean(getEnv(envVariable)); + return exists(envVariable) && Boolean.parseBoolean(getEnv(envVariable)); } }
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java index 403ac4d..eb713c8 100644 --- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java +++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -47,7 +47,7 @@ config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq"); config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq"); - config.register(BackendConfigKeys.MQTT_HOST, "activemq", "Hostname of mqtt service"); + config.register(BackendConfigKeys.MQTT_HOST, "mosquitto", "Hostname of mqtt service"); config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service"); config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka"); config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java index 6b49a0e..2c37e6f 100644 --- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java +++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -34,9 +34,6 @@ List<SpProtocol> protocolList; if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) { switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) { - case "mqtt": - protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS); - break; case "kafka": protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS); break; @@ -44,10 +41,10 @@ protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT); break; default: - protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS); + protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS); } } else { - protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS); + protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS); } return new MessagingSettings(
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java index 86fcf9b..d5d495a 100644 --- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java +++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,6 +18,9 @@ package org.apache.streampipes.connect.adapter; +import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.config.backend.SpProtocol; import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline; import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToJmsAdapterSink; import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink; @@ -60,7 +63,7 @@ if (transportProtocol instanceof JmsTransportProtocol) { SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { transportProtocol.setBrokerHostname("localhost"); //((JmsTransportProtocol) transportProtocol).setPort(61616); } @@ -68,7 +71,7 @@ } else if (transportProtocol instanceof KafkaTransportProtocol) { SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { transportProtocol.setBrokerHostname("localhost"); ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094); } @@ -76,7 +79,7 @@ } else if (transportProtocol instanceof MqttTransportProtocol) { SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { transportProtocol.setBrokerHostname("localhost"); //((MqttTransportProtocol) transportProtocol).setPort(1883); }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java index 98613c8..b63f108 100644 --- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java +++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -42,12 +42,6 @@ @Override public void process(Map<String, Object> event) { - // TODO remove, just for performance tests - if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) { - event.put("internal_t1", System.currentTimeMillis()); - } - - for (IAdapterPipelineElement pipelineElement : pipelineElements) { event = pipelineElement.process(event); }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java index 9c3b41c..5e4cb89 100644 --- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java +++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -17,6 +17,7 @@ */ package org.apache.streampipes.connect.adapter.preprocessing.elements; +import org.apache.streampipes.commons.constants.Envs; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.connect.api.IAdapterPipelineElement; import org.apache.streampipes.connect.adapter.util.TransportFormatSelector; @@ -46,7 +47,7 @@ .getEventGrounding() .getTransportProtocol()); - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { modifyProtocolForDebugging(); }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java index 9067b45..f9918fc 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -332,15 +332,17 @@ String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId + "\" WITH KEY = \"" + f + "\""; Query query = new Query(q); QueryResult queryResult = influxDB.query(query); - queryResult.getResults().forEach(res -> { - res.getSeries().forEach(series -> { - if (series.getValues().size() > 0) { - String field = series.getValues().get(0).get(0).toString(); - List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList()); - tags.put(field, values); - } + if (queryResult.getResults() != null) { + queryResult.getResults().forEach(res -> { + res.getSeries().forEach(series -> { + if (series.getValues().size() > 0) { + String field = series.getValues().get(0).get(0).toString(); + List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList()); + tags.put(field, values); + } + }); }); - }); + } }); }
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java index 6ba637a..dcdfbe7 100644 --- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java +++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -29,6 +29,8 @@ protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception { this.mqtt = new MQTT(); this.mqtt.setHost(makeBrokerUrl(protocolSettings)); + this.mqtt.setConnectAttemptsMax(3); + this.mqtt.setReconnectDelay(1000); this.connection = mqtt.blockingConnection(); this.connection.connect(); this.connected = true;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java index e7fac0b..d1247a1 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,6 +17,7 @@ */ package org.apache.streampipes.manager.runtime; +import org.apache.streampipes.commons.constants.Envs; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.messaging.InternalEventProcessor; import org.apache.streampipes.messaging.jms.ActiveMQConsumer; @@ -27,8 +28,6 @@ import org.apache.streampipes.model.grounding.KafkaTransportProtocol; import org.apache.streampipes.model.grounding.MqttTransportProtocol; import org.apache.streampipes.model.grounding.TransportFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -36,8 +35,6 @@ public enum PipelineElementRuntimeInfoFetcher { INSTANCE; - Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class); - private final int FETCH_INTERVAL_MS = 300; private final Map<String, SpDataFormatConverter> converterMap; @@ -49,8 +46,7 @@ if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) { return getLatestEventFromKafka(spDataStream); - } - else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){ + } else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol) { return getLatestEventFromJms(spDataStream); } else { return getLatestEventFromMqtt(spDataStream); @@ -75,7 +71,7 @@ JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol(); // Change jms config when running in development mode - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { protocol.setBrokerHostname("localhost"); } if (!converterMap.containsKey(jmsTopic)) { @@ -113,7 +109,7 @@ MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol(); // Change mqtt config when running in development mode - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { protocol.setBrokerHostname("localhost"); } @@ -122,15 +118,12 @@ new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter()); } MqttConsumer mqttConsumer = new MqttConsumer(); - mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() { - @Override - public void onEvent(byte[] event) { - try { - result[0] = converterMap.get(mqttTopic).convert(event); - mqttConsumer.disconnect(); - } catch (SpRuntimeException e) { - e.printStackTrace(); - } + mqttConsumer.connect(protocol, event -> { + try { + result[0] = converterMap.get(mqttTopic).convert(event); + mqttConsumer.disconnect(); + } catch (SpRuntimeException e) { + e.printStackTrace(); } }); @@ -151,7 +144,7 @@ KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol(); // Change kafka config when running in development mode - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (Envs.SP_DEBUG.getValueAsBoolean()) { protocol.setBrokerHostname("localhost"); protocol.setKafkaPort(9094); } @@ -161,14 +154,11 @@ new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter()); } - SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() { - @Override - public void onEvent(byte[] event) { - try { - result[0] = converterMap.get(kafkaTopic).convert(event); - } catch (SpRuntimeException e) { - e.printStackTrace(); - } + SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, event -> { + try { + result[0] = converterMap.get(kafkaTopic).convert(event); + } catch (SpRuntimeException e) { + e.printStackTrace(); } });