Merge branch 'dev' into STREAMPIPES-527
diff --git a/.idea/runConfigurations/backend.xml b/.idea/runConfigurations/backend.xml
index 4e1f928..344ff6e 100644
--- a/.idea/runConfigurations/backend.xml
+++ b/.idea/runConfigurations/backend.xml
@@ -10,7 +10,7 @@
<env name="SP_JMS_HOST" value="localhost" />
<env name="SP_KAFKA_HOST" value="localhost" />
<env name="SP_KAFKA_PORT" value="9094" />
- <env name="SP_PRIORITIZED_PROTOCOL" value="kafka" />
+ <env name="SP_PRIORITIZED_PROTOCOL" value="mqtt" />
</envs>
<module name="streampipes-backend" />
<option name="SPRING_BOOT_MAIN_CLASS" value="org.apache.streampipes.backend.StreamPipesBackendApplication" />
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
index 7871118..c67c3a7 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.dev.yml
@@ -18,3 +18,5 @@
backend:
ports:
- "8030:8030"
+ environment:
+ - SP_HOST=host.docker.internal
diff --git a/installer/cli/deploy/standalone/backend/docker-compose.yml b/installer/cli/deploy/standalone/backend/docker-compose.yml
index 4d014eb..c67ba45 100644
--- a/installer/cli/deploy/standalone/backend/docker-compose.yml
+++ b/installer/cli/deploy/standalone/backend/docker-compose.yml
@@ -24,7 +24,7 @@
- backend:/root/.streampipes
- files:/spImages
environment:
- - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-kafka}
+ - SP_PRIORITIZED_PROTOCOL=${SP_PRIORITIZED_PROTOCOL:-mqtt}
- SP_MQTT_HOST=${SP_MQTT_HOST:-activemq}
logging:
driver: "json-file"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
index ae347c1..711570b 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.dev.yml
@@ -17,4 +17,4 @@
services:
mosquitto:
ports:
- - "1884:1883"
+ - "1883:1883"
diff --git a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
index 5e5b07a..c936edf 100644
--- a/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
+++ b/installer/cli/deploy/standalone/mosquitto/docker-compose.yml
@@ -16,7 +16,8 @@
version: "3.4"
services:
mosquitto:
- image: eclipse-mosquitto:1.5.4
+ image: eclipse-mosquitto:2.0.14
+ command: mosquitto -c /mosquitto-no-auth.conf
logging:
driver: "json-file"
options:
diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml
index 92a4ca5..4c5a079 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -113,33 +113,9 @@
networks:
spnet:
- kafka:
- image: fogsyio/kafka:2.2.0
- hostname: kafka
- depends_on:
- - zookeeper
- environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
- volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
- volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ mosquitto:
+ image: eclipse-mosquitto:2.0.14
+ command: mosquitto -c /mosquitto-no-auth.conf
logging: *default-logging
restart: unless-stopped
networks:
@@ -232,8 +208,6 @@
consul:
connect:
couchdb:
- kafka:
- zookeeper:
influxdb:
files:
diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml
index 358493c..c1b04c0 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -85,33 +85,9 @@
networks:
spnet:
- kafka:
- image: fogsyio/kafka:2.2.0
- hostname: kafka
- depends_on:
- - zookeeper
- environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://:9092
- KAFKA_LISTENERS: PLAINTEXT://:9092
- KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
- KAFKA_ADVERTISED_HOST_NAME: kafka
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_MESSAGE_MAX_BYTES: 5000012
- KAFKA_FETCH_MESSAGE_MAX_BYTES: 5000012
- KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
- volumes:
- - kafka:/kafka
- - /var/run/docker.sock:/var/run/docker.sock
- logging: *default-logging
- restart: unless-stopped
- networks:
- spnet:
-
- zookeeper:
- image: fogsyio/zookeeper:3.4.13
- volumes:
- - zookeeper:/opt/zookeeper-3.4.13
+ mosquitto:
+ image: eclipse-mosquitto:2.0.14
+ command: mosquitto -c /mosquitto-no-auth.conf
logging: *default-logging
restart: unless-stopped
networks:
@@ -158,8 +134,6 @@
consul:
connect:
couchdb:
- kafka:
- zookeeper:
influxdb:
files:
diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
index 514c2e5..e336b1d 100644
--- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
+++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java
@@ -32,6 +32,6 @@
}
public static Boolean getEnvAsBoolean(String envVariable) {
- return Boolean.parseBoolean(getEnv(envVariable));
+ return exists(envVariable) && Boolean.parseBoolean(getEnv(envVariable));
}
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index 403ac4d..eb713c8 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -47,7 +47,7 @@
config.register(BackendConfigKeys.JMS_HOST, "activemq", "Hostname for backend service for active mq");
config.register(BackendConfigKeys.JMS_PORT, 61616, "Port for backend service for active mq");
- config.register(BackendConfigKeys.MQTT_HOST, "activemq", "Hostname of mqtt service");
+ config.register(BackendConfigKeys.MQTT_HOST, "mosquitto", "Hostname of mqtt service");
config.register(BackendConfigKeys.MQTT_PORT, 1883, "Port of mqtt service");
config.register(BackendConfigKeys.KAFKA_HOST, "kafka", "Hostname for backend service for kafka");
config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka");
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
index 6b49a0e..2c37e6f 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/MessagingSettings.java
@@ -34,9 +34,6 @@
List<SpProtocol> protocolList;
if (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL) != null) {
switch (System.getenv(BackendConfigKeys.PRIORITIZED_PROTOCOL).toLowerCase()) {
- case "mqtt":
- protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
- break;
case "kafka":
protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
break;
@@ -44,10 +41,10 @@
protocolList = Arrays.asList(SpProtocol.JMS, SpProtocol.KAFKA, SpProtocol.MQTT);
break;
default:
- protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
}
} else {
- protocolList = Arrays.asList(SpProtocol.KAFKA, SpProtocol.MQTT, SpProtocol.JMS);
+ protocolList = Arrays.asList(SpProtocol.MQTT, SpProtocol.KAFKA, SpProtocol.JMS);
}
return new MessagingSettings(
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index 86fcf9b..d5d495a 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,6 +18,9 @@
package org.apache.streampipes.connect.adapter;
+import org.apache.streampipes.commons.constants.Envs;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToJmsAdapterSink;
import org.apache.streampipes.connect.adapter.preprocessing.elements.SendToKafkaAdapterSink;
@@ -60,7 +63,7 @@
if (transportProtocol instanceof JmsTransportProtocol) {
SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
transportProtocol.setBrokerHostname("localhost");
//((JmsTransportProtocol) transportProtocol).setPort(61616);
}
@@ -68,7 +71,7 @@
}
else if (transportProtocol instanceof KafkaTransportProtocol) {
SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
transportProtocol.setBrokerHostname("localhost");
((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
}
@@ -76,7 +79,7 @@
}
else if (transportProtocol instanceof MqttTransportProtocol) {
SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink();
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
transportProtocol.setBrokerHostname("localhost");
//((MqttTransportProtocol) transportProtocol).setPort(1883);
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
index 98613c8..b63f108 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -42,12 +42,6 @@
@Override
public void process(Map<String, Object> event) {
- // TODO remove, just for performance tests
- if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
- event.put("internal_t1", System.currentTimeMillis());
- }
-
-
for (IAdapterPipelineElement pipelineElement : pipelineElements) {
event = pipelineElement.process(event);
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
index 9c3b41c..5e4cb89 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.connect.adapter.preprocessing.elements;
+import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.connect.api.IAdapterPipelineElement;
import org.apache.streampipes.connect.adapter.util.TransportFormatSelector;
@@ -46,7 +47,7 @@
.getEventGrounding()
.getTransportProtocol());
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
modifyProtocolForDebugging();
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 9067b45..f9918fc 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -332,15 +332,17 @@
String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId + "\" WITH KEY = \"" + f + "\"";
Query query = new Query(q);
QueryResult queryResult = influxDB.query(query);
- queryResult.getResults().forEach(res -> {
- res.getSeries().forEach(series -> {
- if (series.getValues().size() > 0) {
- String field = series.getValues().get(0).get(0).toString();
- List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
- tags.put(field, values);
- }
+ if (queryResult.getResults() != null) {
+ queryResult.getResults().forEach(res -> {
+ res.getSeries().forEach(series -> {
+ if (series.getValues().size() > 0) {
+ String field = series.getValues().get(0).get(0).toString();
+ List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+ tags.put(field, values);
+ }
+ });
});
- });
+ }
});
}
diff --git a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
index 6ba637a..dcdfbe7 100644
--- a/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
+++ b/streampipes-messaging-mqtt/src/main/java/org/apache/streampipes/messaging/mqtt/AbstractMqttConnector.java
@@ -29,6 +29,8 @@
protected void createBrokerConnection(MqttTransportProtocol protocolSettings) throws Exception {
this.mqtt = new MQTT();
this.mqtt.setHost(makeBrokerUrl(protocolSettings));
+ this.mqtt.setConnectAttemptsMax(3);
+ this.mqtt.setReconnectDelay(1000);
this.connection = mqtt.blockingConnection();
this.connection.connect();
this.connected = true;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
index e7fac0b..d1247a1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java
@@ -17,6 +17,7 @@
*/
package org.apache.streampipes.manager.runtime;
+import org.apache.streampipes.commons.constants.Envs;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.jms.ActiveMQConsumer;
@@ -27,8 +28,6 @@
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.TransportFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -36,8 +35,6 @@
public enum PipelineElementRuntimeInfoFetcher {
INSTANCE;
- Logger logger = LoggerFactory.getLogger(PipelineElementRuntimeInfoFetcher.class);
-
private final int FETCH_INTERVAL_MS = 300;
private final Map<String, SpDataFormatConverter> converterMap;
@@ -49,8 +46,7 @@
if (spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol) {
return getLatestEventFromKafka(spDataStream);
- }
- else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol){
+ } else if (spDataStream.getEventGrounding().getTransportProtocol() instanceof JmsTransportProtocol) {
return getLatestEventFromJms(spDataStream);
} else {
return getLatestEventFromMqtt(spDataStream);
@@ -75,7 +71,7 @@
JmsTransportProtocol protocol = (JmsTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
// Change jms config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
protocol.setBrokerHostname("localhost");
}
if (!converterMap.containsKey(jmsTopic)) {
@@ -113,7 +109,7 @@
MqttTransportProtocol protocol = (MqttTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
// Change mqtt config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
protocol.setBrokerHostname("localhost");
}
@@ -122,15 +118,12 @@
new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
}
MqttConsumer mqttConsumer = new MqttConsumer();
- mqttConsumer.connect(protocol, new InternalEventProcessor<byte[]>() {
- @Override
- public void onEvent(byte[] event) {
- try {
- result[0] = converterMap.get(mqttTopic).convert(event);
- mqttConsumer.disconnect();
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
+ mqttConsumer.connect(protocol, event -> {
+ try {
+ result[0] = converterMap.get(mqttTopic).convert(event);
+ mqttConsumer.disconnect();
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
}
});
@@ -151,7 +144,7 @@
KafkaTransportProtocol protocol = (KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol();
// Change kafka config when running in development mode
- if ("true".equals(System.getenv("SP_DEBUG"))) {
+ if (Envs.SP_DEBUG.getValueAsBoolean()) {
protocol.setBrokerHostname("localhost");
protocol.setKafkaPort(9094);
}
@@ -161,14 +154,11 @@
new SpDataFormatConverterGenerator(getTransportFormat(spDataStream)).makeConverter());
}
- SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, new InternalEventProcessor<byte[]>() {
- @Override
- public void onEvent(byte[] event) {
- try {
- result[0] = converterMap.get(kafkaTopic).convert(event);
- } catch (SpRuntimeException e) {
- e.printStackTrace();
- }
+ SpKafkaConsumer kafkaConsumer = new SpKafkaConsumer(protocol, kafkaTopic, event -> {
+ try {
+ result[0] = converterMap.get(kafkaTopic).convert(event);
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
}
});