fix(#3325): compact adapters do not add a default property scope (#3328)
* fix(#3325): Add better logging if count of events in measurement fails
* fix(#3325): Add a test to validate file stream adapter via compact API
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
index db11222..0a58b8b 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
+++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
@@ -22,16 +22,23 @@
import org.apache.streampipes.model.datalake.AggregationFunction;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class DataLakeMeasurementCounterInflux extends DataLakeMeasurementCounter {
+ private static final Logger LOG = LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class);
+
private static final String COUNT_FIELD = "count";
- public DataLakeMeasurementCounterInflux(List<DataLakeMeasure> allMeasurements,
- List<String> measurementNames) {
+ public DataLakeMeasurementCounterInflux(
+ List<DataLakeMeasure> allMeasurements,
+ List<String> measurementNames
+ ) {
super(allMeasurements, measurementNames);
}
@@ -39,15 +46,21 @@
protected CompletableFuture<Integer> createQueryAsAsyncFuture(DataLakeMeasure measure) {
return CompletableFuture.supplyAsync(() -> {
var firstColumn = getFirstMeasurementProperty(measure);
- var builder = DataLakeInfluxQueryBuilder
- .create(measure.getMeasureName()).withEndTime(System.currentTimeMillis())
- .withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
- var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true);
- if (queryResult.getTotal() > 0) {
- return extractResult(queryResult, COUNT_FIELD);
- } else {
+ if (firstColumn == null) {
+ LOG.error(
+ "Could not count events in measurement: {}, because no measurement property was found in event schema",
+ measure.getMeasureName()
+ );
return 0;
}
+
+ var builder = DataLakeInfluxQueryBuilder
+ .create(measure.getMeasureName())
+ .withEndTime(System.currentTimeMillis())
+ .withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
+ var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true);
+
+ return queryResult.getTotal() > 0 ? extractResult(queryResult, COUNT_FIELD) : 0;
});
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
index 844ce5b..bc3a551 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
@@ -42,8 +42,10 @@
protected final List<DataLakeMeasure> allMeasurements;
protected final List<String> measurementNames;
- public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
- List<String> measurementNames) {
+ public DataLakeMeasurementCounter(
+ List<DataLakeMeasure> allMeasurements,
+ List<String> measurementNames
+ ) {
this.allMeasurements = allMeasurements;
this.measurementNames = measurementNames;
}
@@ -52,12 +54,14 @@
public Map<String, Integer> countMeasurementSizes() {
// create async futures so that count queries can be executed parallel
- Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames.stream()
+ Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames
+ .stream()
.map(this::getMeasure)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
- DataLakeMeasure::getMeasureName,
- this::createQueryAsAsyncFuture)
+ DataLakeMeasure::getMeasureName,
+ this::createQueryAsAsyncFuture
+ )
);
return getQueryResults(countQueriesFutures);
@@ -72,7 +76,8 @@
private DataLakeMeasure getMeasure(String measureName) {
return allMeasurements
.stream()
- .filter(m -> m.getMeasureName().equals(measureName))
+ .filter(m -> m.getMeasureName()
+ .equals(measureName))
.findFirst()
.orElse(null);
}
@@ -83,7 +88,7 @@
* @param queryFutures A Map containing the futures of
* asynchronous count queries mapped by their respective keys.
* @return A Map representing the results of the queries, where each key corresponds to
- * a measure name and the value is the count result.
+ * a measure name and the value is the count result.
*/
private Map<String, Integer> getQueryResults(Map<String, CompletableFuture<Integer>> queryFutures) {
Map<String, Integer> resultPerMeasure = new HashMap<>();
@@ -106,18 +111,34 @@
* @return The runtime name of the first measurement property, or null if no such property is found.
*/
protected String getFirstMeasurementProperty(DataLakeMeasure measure) {
- return measure.getEventSchema().getEventProperties()
+ var propertyRuntimeName = measure
+ .getEventSchema()
+ .getEventProperties()
.stream()
.filter(ep -> ep.getPropertyScope() != null
- && ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
+ && ep.getPropertyScope()
+ .equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
.map(EventProperty::getRuntimeName)
.findFirst()
.orElse(null);
+
+ if (propertyRuntimeName == null) {
+ LOG.error("No measurement property was found in the event schema found for measure {}", measure.getMeasureName());
+ }
+
+ return propertyRuntimeName;
}
protected Integer extractResult(SpQueryResult queryResult, String fieldName) {
- return ((Double) (
- queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName)))
+ return (
+ (Double) (
+ queryResult.getAllDataSeries()
+ .get(0)
+ .getRows()
+ .get(0)
+ .get(queryResult.getHeaders()
+ .indexOf(fieldName))
+ )
).intValue();
}
diff --git a/ui/cypress/fixtures/connect/compact/compactTest.csv b/ui/cypress/fixtures/connect/compact/compactTest.csv
new file mode 100644
index 0000000..6cf117f
--- /dev/null
+++ b/ui/cypress/fixtures/connect/compact/compactTest.csv
@@ -0,0 +1,2 @@
+timestamp;value;temperature
+1000;1.0;0.1
diff --git a/ui/cypress/fixtures/connect/compact/fileReplay.yml b/ui/cypress/fixtures/connect/compact/fileReplay.yml
new file mode 100644
index 0000000..3c57795
--- /dev/null
+++ b/ui/cypress/fixtures/connect/compact/fileReplay.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+id: sp:adapterdescription:IfmzfQ
+name: File Stream Adapter Test
+description: ''
+appId: org.apache.streampipes.connect.iiot.protocol.stream.file
+configuration:
+ - filePath: compactTest.csv
+ - replaceTimestamp:
+ - ''
+ - replayOnce: 'no'
+ - speed: keepOriginalTime
+ - delimiter: ;
+ format: CSV
+ header:
+ - Header
+schema:
+ timestamp:
+ description: ''
+ propertyScope: HEADER_PROPERTY
+ semanticType: http://schema.org/DateTime
+transform:
+ rename: {}
+ measurementUnit: {}
+createOptions:
+ persist: true
+ start: true
diff --git a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
index c19e345..57d2466 100644
--- a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
+++ b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
@@ -19,6 +19,7 @@
import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils';
import { CompactAdapterUtils } from '../../../support/utils/connect/CompactAdapterUtils';
import { PipelineUtils } from '../../../support/utils/pipeline/PipelineUtils';
+import { FileManagementUtils } from '../../../support/utils/FileManagementUtils';
describe('Add Compact Adapters', () => {
beforeEach('Setup Test', () => {
@@ -96,4 +97,19 @@
},
);
});
+
+ it('Add file stream adapter via the compact API. Start Adapter with Pipeline', () => {
+ FileManagementUtils.addFile('connect/compact/compactTest.csv');
+
+ cy.readFile('cypress/fixtures/connect/compact/fileReplay.yml').then(
+ ymlDescription => {
+ CompactAdapterUtils.storeCompactYmlAdapter(ymlDescription).then(
+ () => {
+ ConnectUtils.validateAdapterIsRunning();
+ PipelineUtils.checkAmountOfPipelinesPipeline(1);
+ },
+ );
+ },
+ );
+ });
});