fix(#3325): compact adapters do not add a default property scope (#3328)

* fix(#3325): Add better logging if count of events in measurement fails

* fix(#3325): Add a test to validate file stream adapter via compact API
diff --git a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
index db11222..0a58b8b 100644
--- a/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
+++ b/streampipes-data-explorer-influx/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeMeasurementCounterInflux.java
@@ -22,16 +22,23 @@
 import org.apache.streampipes.model.datalake.AggregationFunction;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class DataLakeMeasurementCounterInflux extends DataLakeMeasurementCounter {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DataLakeMeasurementCounterInflux.class);
+
   private static final String COUNT_FIELD = "count";
 
-  public DataLakeMeasurementCounterInflux(List<DataLakeMeasure> allMeasurements,
-                                          List<String> measurementNames) {
+  public DataLakeMeasurementCounterInflux(
+      List<DataLakeMeasure> allMeasurements,
+      List<String> measurementNames
+  ) {
     super(allMeasurements, measurementNames);
   }
 
@@ -39,15 +46,21 @@
   protected CompletableFuture<Integer> createQueryAsAsyncFuture(DataLakeMeasure measure) {
     return CompletableFuture.supplyAsync(() -> {
       var firstColumn = getFirstMeasurementProperty(measure);
-      var builder = DataLakeInfluxQueryBuilder
-          .create(measure.getMeasureName()).withEndTime(System.currentTimeMillis())
-          .withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
-      var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true);
-      if (queryResult.getTotal() > 0) {
-        return extractResult(queryResult, COUNT_FIELD);
-      } else {
+      if (firstColumn == null) {
+        LOG.error(
+            "Could not count events in measurement: {}, because no measurement property was found in event schema",
+            measure.getMeasureName()
+        );
         return 0;
       }
+
+      var builder = DataLakeInfluxQueryBuilder
+          .create(measure.getMeasureName())
+          .withEndTime(System.currentTimeMillis())
+          .withAggregatedColumn(firstColumn, AggregationFunction.COUNT);
+      var queryResult = new DataExplorerInfluxQueryExecutor().executeQuery(builder.build(), Optional.empty(), true);
+
+      return queryResult.getTotal() > 0 ? extractResult(queryResult, COUNT_FIELD) : 0;
     });
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
index 844ce5b..bc3a551 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataLakeMeasurementCounter.java
@@ -42,8 +42,10 @@
   protected final List<DataLakeMeasure> allMeasurements;
   protected final List<String> measurementNames;
 
-  public DataLakeMeasurementCounter(List<DataLakeMeasure> allMeasurements,
-                                         List<String> measurementNames) {
+  public DataLakeMeasurementCounter(
+      List<DataLakeMeasure> allMeasurements,
+      List<String> measurementNames
+  ) {
     this.allMeasurements = allMeasurements;
     this.measurementNames = measurementNames;
   }
@@ -52,12 +54,14 @@
   public Map<String, Integer> countMeasurementSizes() {
 
     // create async futures so that count queries can be executed parallel
-    Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames.stream()
+    Map<String, CompletableFuture<Integer>> countQueriesFutures = measurementNames
+        .stream()
         .map(this::getMeasure)
         .filter(Objects::nonNull)
         .collect(Collectors.toMap(
-            DataLakeMeasure::getMeasureName,
-            this::createQueryAsAsyncFuture)
+                     DataLakeMeasure::getMeasureName,
+                     this::createQueryAsAsyncFuture
+                 )
         );
 
     return getQueryResults(countQueriesFutures);
@@ -72,7 +76,8 @@
   private DataLakeMeasure getMeasure(String measureName) {
     return allMeasurements
         .stream()
-        .filter(m -> m.getMeasureName().equals(measureName))
+        .filter(m -> m.getMeasureName()
+                      .equals(measureName))
         .findFirst()
         .orElse(null);
   }
@@ -83,7 +88,7 @@
    * @param queryFutures A Map containing the futures of
    *                     asynchronous count queries mapped by their respective keys.
    * @return A Map representing the results of the queries, where each key corresponds to
-   *         a measure name and the value is the count result.
+   * a measure name and the value is the count result.
    */
   private Map<String, Integer> getQueryResults(Map<String, CompletableFuture<Integer>> queryFutures) {
     Map<String, Integer> resultPerMeasure = new HashMap<>();
@@ -106,18 +111,34 @@
    * @return The runtime name of the first measurement property, or null if no such property is found.
    */
   protected String getFirstMeasurementProperty(DataLakeMeasure measure) {
-    return measure.getEventSchema().getEventProperties()
+    var propertyRuntimeName = measure
+        .getEventSchema()
+        .getEventProperties()
         .stream()
         .filter(ep -> ep.getPropertyScope() != null
-            && ep.getPropertyScope().equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
+            && ep.getPropertyScope()
+                 .equals(PropertyScope.MEASUREMENT_PROPERTY.name()))
         .map(EventProperty::getRuntimeName)
         .findFirst()
         .orElse(null);
+
+    if (propertyRuntimeName == null) {
+      LOG.error("No measurement property was found in the event schema found for measure {}", measure.getMeasureName());
+    }
+
+    return propertyRuntimeName;
   }
 
   protected Integer extractResult(SpQueryResult queryResult, String fieldName) {
-    return ((Double) (
-        queryResult.getAllDataSeries().get(0).getRows().get(0).get(queryResult.getHeaders().indexOf(fieldName)))
+    return (
+        (Double) (
+            queryResult.getAllDataSeries()
+                       .get(0)
+                       .getRows()
+                       .get(0)
+                       .get(queryResult.getHeaders()
+                                       .indexOf(fieldName))
+        )
     ).intValue();
   }
 
diff --git a/ui/cypress/fixtures/connect/compact/compactTest.csv b/ui/cypress/fixtures/connect/compact/compactTest.csv
new file mode 100644
index 0000000..6cf117f
--- /dev/null
+++ b/ui/cypress/fixtures/connect/compact/compactTest.csv
@@ -0,0 +1,2 @@
+timestamp;value;temperature
+1000;1.0;0.1
diff --git a/ui/cypress/fixtures/connect/compact/fileReplay.yml b/ui/cypress/fixtures/connect/compact/fileReplay.yml
new file mode 100644
index 0000000..3c57795
--- /dev/null
+++ b/ui/cypress/fixtures/connect/compact/fileReplay.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+id: sp:adapterdescription:IfmzfQ
+name: File Stream Adapter Test
+description: ''
+appId: org.apache.streampipes.connect.iiot.protocol.stream.file
+configuration:
+    - filePath: compactTest.csv
+    - replaceTimestamp:
+          - ''
+    - replayOnce: 'no'
+    - speed: keepOriginalTime
+    - delimiter: ;
+      format: CSV
+      header:
+          - Header
+schema:
+    timestamp:
+        description: ''
+        propertyScope: HEADER_PROPERTY
+        semanticType: http://schema.org/DateTime
+transform:
+    rename: {}
+    measurementUnit: {}
+createOptions:
+    persist: true
+    start: true
diff --git a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
index c19e345..57d2466 100644
--- a/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
+++ b/ui/cypress/tests/connect/compact/addCompactAdapter.spec.ts
@@ -19,6 +19,7 @@
 import { ConnectUtils } from '../../../support/utils/connect/ConnectUtils';
 import { CompactAdapterUtils } from '../../../support/utils/connect/CompactAdapterUtils';
 import { PipelineUtils } from '../../../support/utils/pipeline/PipelineUtils';
+import { FileManagementUtils } from '../../../support/utils/FileManagementUtils';
 
 describe('Add Compact Adapters', () => {
     beforeEach('Setup Test', () => {
@@ -96,4 +97,19 @@
             },
         );
     });
+
+    it('Add file stream adapter via the compact API. Start Adapter with Pipeline', () => {
+        FileManagementUtils.addFile('connect/compact/compactTest.csv');
+
+        cy.readFile('cypress/fixtures/connect/compact/fileReplay.yml').then(
+            ymlDescription => {
+                CompactAdapterUtils.storeCompactYmlAdapter(ymlDescription).then(
+                    () => {
+                        ConnectUtils.validateAdapterIsRunning();
+                        PipelineUtils.checkAmountOfPipelinesPipeline(1);
+                    },
+                );
+            },
+        );
+    });
 });