refactor(#2779): Fix the test for TestStringToStateProcessor (#3031)

* refactor(#2779): Fix the test for TestStringToStateProcessor

* refactor(#2779): Add a utils class to deal with the stream prefix selector

* refactor(#2778): Fix junit test TestStringCounterProcessor (#3036)
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml b/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml
index 26d9b8f..4f05f11 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml
@@ -69,6 +69,12 @@
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
         </dependency>
+      <dependency>
+        <groupId>org.apache.streampipes</groupId>
+        <artifactId>streampipes-test-utils-executors</artifactId>
+        <version>0.97.0-SNAPSHOT</version>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
 
     <build>
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
index 50ecbab..08bcff5 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
@@ -38,8 +38,7 @@
 import org.apache.streampipes.wrapper.params.compat.ProcessorParams;

 import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

 

-import com.google.common.collect.Lists;

-

+import java.util.ArrayList;

 import java.util.List;

 

 public class StringToStateProcessor extends StreamPipesDataProcessor {

@@ -77,13 +76,13 @@
 

   @Override

   public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {

-    List<String> states = Lists.newArrayList();

+    List<String> states = new ArrayList<>();

 

     for (String stateField : stateFields) {

       states.add(event.getFieldBySelector(stateField).getAsPrimitive().getAsString());

     }

 

-    event.addField(CURRENT_STATE, states.toArray());

+    event.addField(CURRENT_STATE, states);

     collector.collect(event);

   }

 

diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
index 5887dda..7db3fcb 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
@@ -18,133 +18,61 @@
 
 package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
 
-//@RunWith(Parameterized.class)
+import org.apache.streampipes.test.executors.PrefixStrategy;
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.StreamPrefix;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
 public class TestStringCounterProcessor {
-//  private static final Logger LOG = LoggerFactory.getLogger(TestStringCounterProcessor.class);
-//
-//  @org.junit.runners.Parameterized.Parameters
-//  public static Iterable<Object[]> data() {
-//    return Arrays.asList(new Object[][] {
-//        {"Test", List.of("t1"), new Tuple3<>("", "", 0)},
-//        {"Test", Arrays.asList("t1", "t2"), new Tuple3<>("t1", "t2", 1)},
-//        {"Test", Arrays.asList("t1", "t2", "t1", "t2"), new Tuple3<>("t1", "t2", 2)},
-//        {"Test", Arrays.asList("t1", "t2", "t1", "t3"), new Tuple3<>("t1", "t3", 1)}
-//    });
-//  }
-//
-//  @org.junit.runners.Parameterized.Parameter
-//  public String selectedFieldName;
-//
-//  @org.junit.runners.Parameterized.Parameter(1)
-//  public List<String> eventStrings;
-//
-//  @org.junit.runners.Parameterized.Parameter(2)
-//  public Tuple3<String, String, Integer> expectedValue;
-//
-//  @Test
-//  public void testStringCounter() {
-//    StringCounterProcessor stringCounter = new StringCounterProcessor();
-//    DataProcessorDescription originalGraph = stringCounter.declareModel();
-//    originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
-//
-//    DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
-//    graph.setInputStreams(Collections
-//        .singletonList(EventStreamGenerator
-//            .makeStreamWithProperties(Collections.singletonList(selectedFieldName))));
-//    graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(
-//    Collections.singletonList(selectedFieldName)));
-//    graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
-//        .setActualTopicName("output-topic");
-//
-//    MappingPropertyUnary mappingPropertyUnary = graph.getStaticProperties().stream()
-//        .filter(p -> p instanceof MappingPropertyUnary)
-//        .map((p -> (MappingPropertyUnary) p))
-//        .filter(p -> p.getInternalName().equals(StringCounterProcessor.FIELD_ID))
-//        .findFirst().orElse(null);
-//    assert mappingPropertyUnary != null;
-//    mappingPropertyUnary.setSelectedProperty("s0::" + selectedFieldName);
-//    ProcessorParams params = new ProcessorParams(graph);
-//
-//    SpOutputCollector spOut = new SpOutputCollector() {
-//      @Override
-//      public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {
-//      }
-//
-//      @Override
-//      public void unregisterConsumer(String routeId) {
-//      }
-//
-//      @Override
-//      public void connect() throws SpRuntimeException {
-//      }
-//
-//      @Override
-//      public void disconnect() throws SpRuntimeException {
-//      }
-//
-//      @Override
-//      public void collect(Event event) {
-//      }
-//    };
-//
-//    stringCounter.onInvocation(params, spOut, null);
-//    Tuple3<String, String, Integer> tuple = sendEvents(stringCounter, spOut);
-//    LOG.info("Expected match count is {}.", expectedValue.x);
-//    LOG.info("Actual match count is {}.", tuple.x);
-//    assertEquals(expectedValue.x, tuple.x);
-//    LOG.info("Expected change from is {}.", expectedValue.k);
-//    LOG.info("Actual change from is {}.", tuple.k);
-//    assertEquals(expectedValue.k, tuple.k);
-//    LOG.info("Expected change to is {}.", expectedValue.k);
-//    LOG.info("Actual change to is {}.", tuple.k);
-//    assertEquals(expectedValue.v, tuple.v);
-//  }
-//
-//  private Tuple3<String, String, Integer> sendEvents(StringCounterProcessor stringCounter, SpOutputCollector spOut) {
-//    int counter = 0;
-//    String changeFrom = "", changeTo = "";
-//    List<Event> events = makeEvents();
-//    for (Event event : events) {
-//      LOG.info("Sending event with value "
-//          + event.getFieldBySelector("s0::" + selectedFieldName).getAsPrimitive().getAsString());
-//      stringCounter.onEvent(event, spOut);
-//      try {
-//        TimeUnit.MILLISECONDS.sleep(100);
-//      } catch (InterruptedException e) {
-//        throw new RuntimeException(e);
-//      }
-//      try {
-//        counter = event.getFieldBySelector(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME)
-//            .getAsPrimitive()
-//            .getAsInt();
-//        changeFrom = event.getFieldBySelector(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME)
-//            .getAsPrimitive()
-//            .getAsString();
-//        changeTo = event.getFieldBySelector(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME)
-//            .getAsPrimitive()
-//            .getAsString();
-//        LOG.info(changeFrom + " change to " + changeTo + ", value = " + counter);
-//      } catch (IllegalArgumentException e) {
-//
-//      }
-//    }
-//    return new Tuple3<>(changeFrom, changeTo, counter);
-//  }
-//
-//
-//  private List<Event> makeEvents() {
-//    List<Event> events = Lists.newArrayList();
-//    for (String eventSetting : eventStrings) {
-//      events.add(makeEvent(eventSetting));
-//    }
-//    return events;
-//  }
-//
-//  private Event makeEvent(String value) {
-//    Map<String, Object> map = Maps.newHashMap();
-//    map.put(selectedFieldName, value);
-//    return EventFactory.fromMap(map,
-//        new SourceInfo("test" + "-topic", "s0"),
-//        new SchemaInfo(null, Lists.newArrayList()));
-//  }
-}
+  private static final String KEY_1 = "key1";
+
+  private StringCounterProcessor processor;
+
+  @BeforeEach
+  public void setup() {
+    processor = new StringCounterProcessor();
+  }
+
+
+  static Stream<Arguments> arguments() {
+    return Stream.of(
+        Arguments.of(
+            List.of(Map.of(KEY_1, "v1"), Map.of(KEY_1, "v2")),
+            List.of(Map.of(
+                KEY_1, "v2",
+                StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME, "v1",
+                StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME, "v2",
+                StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME, 1
+            ))
+        )
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("arguments")
+  public void testStringToState(
+      List<Map<String, Object>> intpuEvents,
+      List<Map<String, Object>> outputEvents
+  ) {
+
+    var configuration = TestConfiguration
+        .builder()
+        .config(StringCounterProcessor.FIELD_ID, StreamPrefix.s0(KEY_1))
+        .prefixStrategy(PrefixStrategy.SAME_PREFIX)
+        .build();
+
+    var testExecutor = new ProcessingElementTestExecutor(processor, configuration);
+
+    testExecutor.run(intpuEvents, outputEvents);
+  }
+
+}
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
index 332b07d..5278e44 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
@@ -18,159 +18,90 @@
 

 package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;

 

-//@RunWith(Parameterized.class)

+import org.apache.streampipes.test.executors.PrefixStrategy;

+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;

+import org.apache.streampipes.test.executors.StreamPrefix;

+import org.apache.streampipes.test.executors.TestConfiguration;

+

+import org.junit.jupiter.api.BeforeEach;

+import org.junit.jupiter.params.ParameterizedTest;

+import org.junit.jupiter.params.provider.Arguments;

+import org.junit.jupiter.params.provider.MethodSource;

+

+import java.util.Collections;

+import java.util.List;

+import java.util.Map;

+import java.util.stream.Stream;

+

 public class TestStringToStateProcessor {

-//

-//  private static final Logger LOG = LoggerFactory.getLogger(TestStringToStateProcessor.class);

-//

-//  @org.junit.runners.Parameterized.Parameters

-//  public static Iterable<Object[]> data() {

-//    return Arrays.asList(new Object[][] {

-//        {

-//            List.of(),

-//            List.of("c1", "c2", "c3"),

-//            List.of(Arrays.asList("t1", "t2", "t3")),

-//            List.of()

-//        },

-//        {

-//            List.of("c1"),

-//            List.of("c1", "c2", "c3"),

-//            List.of(Arrays.asList("t1", "t2", "t3")),

-//            List.of("t1")

-//        },

-//        {

-//            List.of("c1", "c2"),

-//            List.of("c1", "c2", "c3"),

-//            List.of(Arrays.asList("t1", "t2", "t3")),

-//            Arrays.asList("t1", "t2")

-//        },

-//        {

-//            List.of("c1", "c2"),

-//            List.of("c1", "c2", "c3"),

-//            Arrays.asList(

-//                Arrays.asList("t1-1", "t2-1", "t3-1"),

-//                Arrays.asList("t1-2", "t2-2", "t3-2")

-//            ),

-//            Arrays.asList("t1-2", "t2-2")

-//        },

-//        {

-//            List.of("c1", "c2", "c3"),

-//            List.of("c1", "c2", "c3"),

-//            Arrays.asList(

-//                Arrays.asList("t1-1", "t2-1", "t3-1"),

-//                Arrays.asList("t1-2", "t2-2", "t3-2"),

-//                Arrays.asList("t1-3", "t2-3", "t3-3")

-//            ),

-//            Arrays.asList("t1-3", "t2-3", "t3-3")

-//        }

-//    });

-//  }

-//

-//  @org.junit.runners.Parameterized.Parameter

-//  public List<String> selectedFieldNames;

-//

-//  @org.junit.runners.Parameterized.Parameter(1)

-//  public List<String> fieldNames;

-//

-//  @org.junit.runners.Parameterized.Parameter(2)

-//  public List<List<String>> eventStrings;

-//

-//  @org.junit.runners.Parameterized.Parameter(3)

-//  public List<String> expectedValue;

-//

-//  private static final String DEFAULT_STREAM_NAME = "stream1";

-//

-//  @Test

-//  public void testStringToState() {

-//    StringToStateProcessor stringToStateProcessor = new StringToStateProcessor();

-//    DataProcessorDescription originalGraph = stringToStateProcessor.declareModel();

-//    originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());

-//

-//    DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);

-//    graph.setInputStreams(Collections

-//        .singletonList(EventStreamGenerator

-//            .makeStreamWithProperties(Collections.singletonList("stream-in"))));

-//    graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("stream-out")));

-//    graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()

-//        .setActualTopicName("output-topic");

-//

-//    MappingPropertyNary mappingPropertyNary = graph.getStaticProperties().stream()

-//        .filter(p -> p instanceof MappingPropertyNary)

-//        .map(p -> (MappingPropertyNary) p)

-//        .filter(p -> p.getInternalName().equals(StringToStateProcessor.STRING_STATE_FIELD))

-//        .findFirst().orElse(null);

-//

-//    assert mappingPropertyNary != null;

-//    mappingPropertyNary.setSelectedProperties(

-//        selectedFieldNames.stream().map(field -> DEFAULT_STREAM_NAME + "::" + field).toList());

-//

-//    ProcessorParams params = new ProcessorParams(graph);

-//

-//    SpOutputCollector spOutputCollector = new SpOutputCollector() {

-//      @Override

-//      public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {

-//      }

-//

-//      @Override

-//      public void unregisterConsumer(String routeId) {

-//      }

-//

-//      @Override

-//      public void connect() throws SpRuntimeException {

-//      }

-//

-//      @Override

-//      public void disconnect() throws SpRuntimeException {

-//      }

-//

-//      @Override

-//      public void collect(Event event) {

-//      }

-//    };

-//

-//    stringToStateProcessor.onInvocation(params, spOutputCollector, null);

-//    Object[] states = sendEvents(stringToStateProcessor, spOutputCollector);

-//    LOG.info("Expected states is {}.", expectedValue);

-//    LOG.info("Actual states is {}.", Arrays.toString(states));

-//    assertArrayEquals(expectedValue.toArray(), states);

-//  }

-//

-//  private Object[] sendEvents(StringToStateProcessor stateProcessor, SpOutputCollector spOut) {

-//    List<Event> events = makeEvents();

-//    Object[] states = null;

-//    for (Event event : events) {

-//      stateProcessor.onEvent(event, spOut);

-//      try {

-//        TimeUnit.MILLISECONDS.sleep(100);

-//      } catch (InterruptedException e) {

-//        throw new RuntimeException(e);

-//      }

-//      try {

-//        states = (Object[]) event.getFieldBySelector(StringToStateProcessor.CURRENT_STATE)

-//            .getAsPrimitive().getRawValue();

-//        LOG.info("Current states: " + Arrays.toString(states));

-//      } catch (IllegalArgumentException e) {

-//

-//      }

-//    }

-//    return states;

-//  }

-//

-//  private List<Event> makeEvents() {

-//    List<Event> events = Lists.newArrayList();

-//    for (List<String> eventSetting : eventStrings) {

-//      events.add(makeEvent(eventSetting));

-//    }

-//    return events;

-//  }

-//

-//  private Event makeEvent(List<String> value) {

-//    Map<String, Object> map = Maps.newHashMap();

-//    for (int i = 0; i < selectedFieldNames.size(); i++) {

-//      map.put(selectedFieldNames.get(i), value.get(i));

-//    }

-//    return EventFactory.fromMap(map,

-//        new SourceInfo("test-topic", DEFAULT_STREAM_NAME),

-//        new SchemaInfo(null, Lists.newArrayList()));

-//  }

+

+  private static final String KEY_1 = "key1";

+  private static final String KEY_2 = "key2";

+  private static final String VALUE_1 = "value 1";

+  private static final String VALUE_2 = "value 2";

+

+  private static final String PREFIX_KEY_1 = StreamPrefix.s0(KEY_1);

+  private static final String PREFIX_KEY_2 = StreamPrefix.s0(KEY_2);

+

+  private StringToStateProcessor processor;

+

+  @BeforeEach

+  public void setup() {

+    processor = new StringToStateProcessor();

+  }

+

+  static Stream<Arguments> arguments() {

+    return Stream.of(

+        Arguments.of(

+            Collections.emptyList(),

+            List.of(Map.of(KEY_1, VALUE_1)),

+            List.of(Map.of(

+                KEY_1, VALUE_1,

+                StringToStateProcessor.CURRENT_STATE, Collections.emptyList()

+            ))

+        ),

+        Arguments.of(

+            List.of(PREFIX_KEY_1),

+            List.of(Map.of(

+                KEY_1, VALUE_1

+            )),

+            List.of(Map.of(

+                KEY_1, VALUE_1,

+                StringToStateProcessor.CURRENT_STATE, List.of(VALUE_1)

+            ))

+        ),

+        Arguments.of(

+            List.of(PREFIX_KEY_1, PREFIX_KEY_2),

+            List.of(Map.of(

+                KEY_1, VALUE_1,

+                KEY_2, VALUE_2

+            )),

+            List.of(Map.of(

+                KEY_1, VALUE_1,

+                KEY_2, VALUE_2,

+                StringToStateProcessor.CURRENT_STATE, List.of(VALUE_1, VALUE_2)

+            ))

+        )

+    );

+  }

+

+  @ParameterizedTest

+  @MethodSource("arguments")

+  public void testStringToState(

+      List<String> selectedFieldNames,

+      List<Map<String, Object>> intpuEvents,

+      List<Map<String, Object>> outputEvents

+  ) {

+

+    var configuration = TestConfiguration

+        .builder()

+        .config(StringToStateProcessor.STRING_STATE_FIELD, selectedFieldNames)

+        .prefixStrategy(PrefixStrategy.SAME_PREFIX)

+        .build();

+

+    var testExecutor = new ProcessingElementTestExecutor(processor, configuration);

+

+    testExecutor.run(intpuEvents, outputEvents);

+  }

+

 }

diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
index f2d8719..b27f0e8 100644
--- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
+++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
@@ -44,6 +44,9 @@
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 
 public class ProcessingElementTestExecutor {
 
@@ -93,17 +96,17 @@
       invocationConfig.accept(dataProcessorInvocation);
     }
 
-    var e = getProcessingElementParameterExtractor(dataProcessorInvocation);
-    var mockParams = Mockito.mock(IDataProcessorParameters.class);
+    var extractor = getProcessingElementParameterExtractor(dataProcessorInvocation);
+    var mockParams = mock(IDataProcessorParameters.class);
 
-    Mockito.when(mockParams.getModel()).thenReturn(dataProcessorInvocation);
-    Mockito.when(mockParams.extractor()).thenReturn(e);
+    when(mockParams.getModel()).thenReturn(dataProcessorInvocation);
+    when(mockParams.extractor()).thenReturn(extractor);
 
     // calls the onPipelineStarted method of the processor to initialize it
     processor.onPipelineStarted(mockParams, null, null);
 
     // mock the output collector to capture the output events and validate the results later
-    var mockCollector = Mockito.mock(SpOutputCollector.class);
+    var mockCollector = mock(SpOutputCollector.class);
     var spOutputCollectorCaptor = ArgumentCaptor.forClass(Event.class);
 
 
diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java
new file mode 100644
index 0000000..3e69a0a
--- /dev/null
+++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.test.executors;
+
+/**
+ * Provides utility methods to append stream prefixes to property key values.
+ * This class is used to be used in unit tests.
+ * Consider integrating prefix configuration into the application's logic, possibly within the TestConfiguration class
+ */
+public class StreamPrefix {
+  public static final String S0 = "s0";
+  public static final String S1 = "s1";
+
+  /**
+   * Appends the S0 prefix to a given property value.
+   *
+   * @param propertyValue The value to which the S0 prefix will be appended.
+   * @return A string with the S0 prefix followed by the property value.
+   */
+  public static String s0(String propertyValue) {
+    return addPrefix(S0, propertyValue);
+  }
+
+  /**
+   * Appends the S1 prefix to a given property value.
+   *
+   * @param propertyValue The value to which the S1 prefix will be appended.
+   * @return A string with the S1 prefix followed by the property value.
+   */
+  public static String s1(String propertyValue) {
+    return addPrefix(S1, propertyValue);
+  }
+
+  private static String addPrefix(String prefix, String propertyValue) {
+    return prefix + "::" + propertyValue;
+  }
+}
diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java
index e2260c5..ec20e24 100644
--- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java
+++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java
@@ -45,8 +45,8 @@
 
   public TestConfigurationBuilder prefixStrategy(PrefixStrategy strategy){
     this.eventPrefixes = switch (strategy){
-      case SAME_PREFIX -> List.of("s0");
-      case ALTERNATE -> List.of("s0", "s1");
+      case SAME_PREFIX -> List.of(StreamPrefix.S0);
+      case ALTERNATE -> List.of(StreamPrefix.S0, StreamPrefix.S1);
     };
     return this;
   }
diff --git a/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java b/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java
new file mode 100644
index 0000000..c89fa55
--- /dev/null
+++ b/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.test.executors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class StreamPrefixTest {
+
+  @Test
+  void s0_AppendsPropertyValueCorrectly() {
+    var result = StreamPrefix.s0("testValue");
+    assertEquals("s0::testValue", result);
+  }
+
+  @Test
+  void s1_AppendsPropertyValueCorrectly() {
+    var result = StreamPrefix.s1("anotherTestValue");
+    assertEquals("s1::anotherTestValue", result);
+  }
+
+  @Test
+  void s0_HandlesEmptyPropertyValue() {
+    var result = StreamPrefix.s0("");
+    assertEquals("s0::", result);
+  }
+
+  @Test
+  void s1_HandlesEmptyPropertyValue() {
+    var result = StreamPrefix.s1("");
+    assertEquals("s1::", result);
+  }
+
+  @Test
+  void s0_HandlesSpecialCharactersInPropertyValue() {
+    var result = StreamPrefix.s0("value$&*");
+    assertEquals("s0::value$&*", result);
+  }
+
+  @Test
+  void s1_HandlesSpecialCharactersInPropertyValue() {
+    var result = StreamPrefix.s1("value@#%");
+    assertEquals("s1::value@#%", result);
+  }
+}
\ No newline at end of file