refactor(#2779): Fix the test for TestStringToStateProcessor (#3031)
* refactor(#2779): Fix the test for TestStringToStateProcessor
* refactor(#2779): Add a utils class to deal with the stream prefix selector
* refactor(#2778): Fix junit test TestStringCounterProcessor (#3036)
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml b/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml
index 26d9b8f..4f05f11 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/pom.xml
@@ -69,6 +69,12 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-test-utils-executors</artifactId>
+ <version>0.97.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
index 50ecbab..08bcff5 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
@@ -38,8 +38,7 @@
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-import com.google.common.collect.Lists;
-
+import java.util.ArrayList;
import java.util.List;
public class StringToStateProcessor extends StreamPipesDataProcessor {
@@ -77,13 +76,13 @@
@Override
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
- List<String> states = Lists.newArrayList();
+ List<String> states = new ArrayList<>();
for (String stateField : stateFields) {
states.add(event.getFieldBySelector(stateField).getAsPrimitive().getAsString());
}
- event.addField(CURRENT_STATE, states.toArray());
+ event.addField(CURRENT_STATE, states);
collector.collect(event);
}
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
index 5887dda..7db3fcb 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
@@ -18,133 +18,61 @@
package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-//@RunWith(Parameterized.class)
+import org.apache.streampipes.test.executors.PrefixStrategy;
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.StreamPrefix;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
public class TestStringCounterProcessor {
-// private static final Logger LOG = LoggerFactory.getLogger(TestStringCounterProcessor.class);
-//
-// @org.junit.runners.Parameterized.Parameters
-// public static Iterable<Object[]> data() {
-// return Arrays.asList(new Object[][] {
-// {"Test", List.of("t1"), new Tuple3<>("", "", 0)},
-// {"Test", Arrays.asList("t1", "t2"), new Tuple3<>("t1", "t2", 1)},
-// {"Test", Arrays.asList("t1", "t2", "t1", "t2"), new Tuple3<>("t1", "t2", 2)},
-// {"Test", Arrays.asList("t1", "t2", "t1", "t3"), new Tuple3<>("t1", "t3", 1)}
-// });
-// }
-//
-// @org.junit.runners.Parameterized.Parameter
-// public String selectedFieldName;
-//
-// @org.junit.runners.Parameterized.Parameter(1)
-// public List<String> eventStrings;
-//
-// @org.junit.runners.Parameterized.Parameter(2)
-// public Tuple3<String, String, Integer> expectedValue;
-//
-// @Test
-// public void testStringCounter() {
-// StringCounterProcessor stringCounter = new StringCounterProcessor();
-// DataProcessorDescription originalGraph = stringCounter.declareModel();
-// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
-//
-// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
-// graph.setInputStreams(Collections
-// .singletonList(EventStreamGenerator
-// .makeStreamWithProperties(Collections.singletonList(selectedFieldName))));
-// graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(
-// Collections.singletonList(selectedFieldName)));
-// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
-// .setActualTopicName("output-topic");
-//
-// MappingPropertyUnary mappingPropertyUnary = graph.getStaticProperties().stream()
-// .filter(p -> p instanceof MappingPropertyUnary)
-// .map((p -> (MappingPropertyUnary) p))
-// .filter(p -> p.getInternalName().equals(StringCounterProcessor.FIELD_ID))
-// .findFirst().orElse(null);
-// assert mappingPropertyUnary != null;
-// mappingPropertyUnary.setSelectedProperty("s0::" + selectedFieldName);
-// ProcessorParams params = new ProcessorParams(graph);
-//
-// SpOutputCollector spOut = new SpOutputCollector() {
-// @Override
-// public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {
-// }
-//
-// @Override
-// public void unregisterConsumer(String routeId) {
-// }
-//
-// @Override
-// public void connect() throws SpRuntimeException {
-// }
-//
-// @Override
-// public void disconnect() throws SpRuntimeException {
-// }
-//
-// @Override
-// public void collect(Event event) {
-// }
-// };
-//
-// stringCounter.onInvocation(params, spOut, null);
-// Tuple3<String, String, Integer> tuple = sendEvents(stringCounter, spOut);
-// LOG.info("Expected match count is {}.", expectedValue.x);
-// LOG.info("Actual match count is {}.", tuple.x);
-// assertEquals(expectedValue.x, tuple.x);
-// LOG.info("Expected change from is {}.", expectedValue.k);
-// LOG.info("Actual change from is {}.", tuple.k);
-// assertEquals(expectedValue.k, tuple.k);
-// LOG.info("Expected change to is {}.", expectedValue.k);
-// LOG.info("Actual change to is {}.", tuple.k);
-// assertEquals(expectedValue.v, tuple.v);
-// }
-//
-// private Tuple3<String, String, Integer> sendEvents(StringCounterProcessor stringCounter, SpOutputCollector spOut) {
-// int counter = 0;
-// String changeFrom = "", changeTo = "";
-// List<Event> events = makeEvents();
-// for (Event event : events) {
-// LOG.info("Sending event with value "
-// + event.getFieldBySelector("s0::" + selectedFieldName).getAsPrimitive().getAsString());
-// stringCounter.onEvent(event, spOut);
-// try {
-// TimeUnit.MILLISECONDS.sleep(100);
-// } catch (InterruptedException e) {
-// throw new RuntimeException(e);
-// }
-// try {
-// counter = event.getFieldBySelector(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME)
-// .getAsPrimitive()
-// .getAsInt();
-// changeFrom = event.getFieldBySelector(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME)
-// .getAsPrimitive()
-// .getAsString();
-// changeTo = event.getFieldBySelector(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME)
-// .getAsPrimitive()
-// .getAsString();
-// LOG.info(changeFrom + " change to " + changeTo + ", value = " + counter);
-// } catch (IllegalArgumentException e) {
-//
-// }
-// }
-// return new Tuple3<>(changeFrom, changeTo, counter);
-// }
-//
-//
-// private List<Event> makeEvents() {
-// List<Event> events = Lists.newArrayList();
-// for (String eventSetting : eventStrings) {
-// events.add(makeEvent(eventSetting));
-// }
-// return events;
-// }
-//
-// private Event makeEvent(String value) {
-// Map<String, Object> map = Maps.newHashMap();
-// map.put(selectedFieldName, value);
-// return EventFactory.fromMap(map,
-// new SourceInfo("test" + "-topic", "s0"),
-// new SchemaInfo(null, Lists.newArrayList()));
-// }
-}
+ private static final String KEY_1 = "key1";
+
+ private StringCounterProcessor processor;
+
+ @BeforeEach
+ public void setup() {
+ processor = new StringCounterProcessor();
+ }
+
+
+ static Stream<Arguments> arguments() {
+ return Stream.of(
+ Arguments.of(
+ List.of(Map.of(KEY_1, "v1"), Map.of(KEY_1, "v2")),
+ List.of(Map.of(
+ KEY_1, "v2",
+ StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME, "v1",
+ StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME, "v2",
+ StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME, 1
+ ))
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("arguments")
+ public void testStringToState(
+ List<Map<String, Object>> intpuEvents,
+ List<Map<String, Object>> outputEvents
+ ) {
+
+ var configuration = TestConfiguration
+ .builder()
+ .config(StringCounterProcessor.FIELD_ID, StreamPrefix.s0(KEY_1))
+ .prefixStrategy(PrefixStrategy.SAME_PREFIX)
+ .build();
+
+ var testExecutor = new ProcessingElementTestExecutor(processor, configuration);
+
+ testExecutor.run(intpuEvents, outputEvents);
+ }
+
+}
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
index 332b07d..5278e44 100644
--- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
+++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
@@ -18,159 +18,90 @@
package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-//@RunWith(Parameterized.class)
+import org.apache.streampipes.test.executors.PrefixStrategy;
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.StreamPrefix;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
public class TestStringToStateProcessor {
-//
-// private static final Logger LOG = LoggerFactory.getLogger(TestStringToStateProcessor.class);
-//
-// @org.junit.runners.Parameterized.Parameters
-// public static Iterable<Object[]> data() {
-// return Arrays.asList(new Object[][] {
-// {
-// List.of(),
-// List.of("c1", "c2", "c3"),
-// List.of(Arrays.asList("t1", "t2", "t3")),
-// List.of()
-// },
-// {
-// List.of("c1"),
-// List.of("c1", "c2", "c3"),
-// List.of(Arrays.asList("t1", "t2", "t3")),
-// List.of("t1")
-// },
-// {
-// List.of("c1", "c2"),
-// List.of("c1", "c2", "c3"),
-// List.of(Arrays.asList("t1", "t2", "t3")),
-// Arrays.asList("t1", "t2")
-// },
-// {
-// List.of("c1", "c2"),
-// List.of("c1", "c2", "c3"),
-// Arrays.asList(
-// Arrays.asList("t1-1", "t2-1", "t3-1"),
-// Arrays.asList("t1-2", "t2-2", "t3-2")
-// ),
-// Arrays.asList("t1-2", "t2-2")
-// },
-// {
-// List.of("c1", "c2", "c3"),
-// List.of("c1", "c2", "c3"),
-// Arrays.asList(
-// Arrays.asList("t1-1", "t2-1", "t3-1"),
-// Arrays.asList("t1-2", "t2-2", "t3-2"),
-// Arrays.asList("t1-3", "t2-3", "t3-3")
-// ),
-// Arrays.asList("t1-3", "t2-3", "t3-3")
-// }
-// });
-// }
-//
-// @org.junit.runners.Parameterized.Parameter
-// public List<String> selectedFieldNames;
-//
-// @org.junit.runners.Parameterized.Parameter(1)
-// public List<String> fieldNames;
-//
-// @org.junit.runners.Parameterized.Parameter(2)
-// public List<List<String>> eventStrings;
-//
-// @org.junit.runners.Parameterized.Parameter(3)
-// public List<String> expectedValue;
-//
-// private static final String DEFAULT_STREAM_NAME = "stream1";
-//
-// @Test
-// public void testStringToState() {
-// StringToStateProcessor stringToStateProcessor = new StringToStateProcessor();
-// DataProcessorDescription originalGraph = stringToStateProcessor.declareModel();
-// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
-//
-// DataProcessorInvocation graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
-// graph.setInputStreams(Collections
-// .singletonList(EventStreamGenerator
-// .makeStreamWithProperties(Collections.singletonList("stream-in"))));
-// graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("stream-out")));
-// graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
-// .setActualTopicName("output-topic");
-//
-// MappingPropertyNary mappingPropertyNary = graph.getStaticProperties().stream()
-// .filter(p -> p instanceof MappingPropertyNary)
-// .map(p -> (MappingPropertyNary) p)
-// .filter(p -> p.getInternalName().equals(StringToStateProcessor.STRING_STATE_FIELD))
-// .findFirst().orElse(null);
-//
-// assert mappingPropertyNary != null;
-// mappingPropertyNary.setSelectedProperties(
-// selectedFieldNames.stream().map(field -> DEFAULT_STREAM_NAME + "::" + field).toList());
-//
-// ProcessorParams params = new ProcessorParams(graph);
-//
-// SpOutputCollector spOutputCollector = new SpOutputCollector() {
-// @Override
-// public void registerConsumer(String routeId, InternalEventProcessor<Map<String, Object>> consumer) {
-// }
-//
-// @Override
-// public void unregisterConsumer(String routeId) {
-// }
-//
-// @Override
-// public void connect() throws SpRuntimeException {
-// }
-//
-// @Override
-// public void disconnect() throws SpRuntimeException {
-// }
-//
-// @Override
-// public void collect(Event event) {
-// }
-// };
-//
-// stringToStateProcessor.onInvocation(params, spOutputCollector, null);
-// Object[] states = sendEvents(stringToStateProcessor, spOutputCollector);
-// LOG.info("Expected states is {}.", expectedValue);
-// LOG.info("Actual states is {}.", Arrays.toString(states));
-// assertArrayEquals(expectedValue.toArray(), states);
-// }
-//
-// private Object[] sendEvents(StringToStateProcessor stateProcessor, SpOutputCollector spOut) {
-// List<Event> events = makeEvents();
-// Object[] states = null;
-// for (Event event : events) {
-// stateProcessor.onEvent(event, spOut);
-// try {
-// TimeUnit.MILLISECONDS.sleep(100);
-// } catch (InterruptedException e) {
-// throw new RuntimeException(e);
-// }
-// try {
-// states = (Object[]) event.getFieldBySelector(StringToStateProcessor.CURRENT_STATE)
-// .getAsPrimitive().getRawValue();
-// LOG.info("Current states: " + Arrays.toString(states));
-// } catch (IllegalArgumentException e) {
-//
-// }
-// }
-// return states;
-// }
-//
-// private List<Event> makeEvents() {
-// List<Event> events = Lists.newArrayList();
-// for (List<String> eventSetting : eventStrings) {
-// events.add(makeEvent(eventSetting));
-// }
-// return events;
-// }
-//
-// private Event makeEvent(List<String> value) {
-// Map<String, Object> map = Maps.newHashMap();
-// for (int i = 0; i < selectedFieldNames.size(); i++) {
-// map.put(selectedFieldNames.get(i), value.get(i));
-// }
-// return EventFactory.fromMap(map,
-// new SourceInfo("test-topic", DEFAULT_STREAM_NAME),
-// new SchemaInfo(null, Lists.newArrayList()));
-// }
+
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+ private static final String VALUE_1 = "value 1";
+ private static final String VALUE_2 = "value 2";
+
+ private static final String PREFIX_KEY_1 = StreamPrefix.s0(KEY_1);
+ private static final String PREFIX_KEY_2 = StreamPrefix.s0(KEY_2);
+
+ private StringToStateProcessor processor;
+
+ @BeforeEach
+ public void setup() {
+ processor = new StringToStateProcessor();
+ }
+
+ static Stream<Arguments> arguments() {
+ return Stream.of(
+ Arguments.of(
+ Collections.emptyList(),
+ List.of(Map.of(KEY_1, VALUE_1)),
+ List.of(Map.of(
+ KEY_1, VALUE_1,
+ StringToStateProcessor.CURRENT_STATE, Collections.emptyList()
+ ))
+ ),
+ Arguments.of(
+ List.of(PREFIX_KEY_1),
+ List.of(Map.of(
+ KEY_1, VALUE_1
+ )),
+ List.of(Map.of(
+ KEY_1, VALUE_1,
+ StringToStateProcessor.CURRENT_STATE, List.of(VALUE_1)
+ ))
+ ),
+ Arguments.of(
+ List.of(PREFIX_KEY_1, PREFIX_KEY_2),
+ List.of(Map.of(
+ KEY_1, VALUE_1,
+ KEY_2, VALUE_2
+ )),
+ List.of(Map.of(
+ KEY_1, VALUE_1,
+ KEY_2, VALUE_2,
+ StringToStateProcessor.CURRENT_STATE, List.of(VALUE_1, VALUE_2)
+ ))
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("arguments")
+ public void testStringToState(
+ List<String> selectedFieldNames,
+ List<Map<String, Object>> intpuEvents,
+ List<Map<String, Object>> outputEvents
+ ) {
+
+ var configuration = TestConfiguration
+ .builder()
+ .config(StringToStateProcessor.STRING_STATE_FIELD, selectedFieldNames)
+ .prefixStrategy(PrefixStrategy.SAME_PREFIX)
+ .build();
+
+ var testExecutor = new ProcessingElementTestExecutor(processor, configuration);
+
+ testExecutor.run(intpuEvents, outputEvents);
+ }
+
}
diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
index f2d8719..b27f0e8 100644
--- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
+++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/ProcessingElementTestExecutor.java
@@ -44,6 +44,9 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class ProcessingElementTestExecutor {
@@ -93,17 +96,17 @@
invocationConfig.accept(dataProcessorInvocation);
}
- var e = getProcessingElementParameterExtractor(dataProcessorInvocation);
- var mockParams = Mockito.mock(IDataProcessorParameters.class);
+ var extractor = getProcessingElementParameterExtractor(dataProcessorInvocation);
+ var mockParams = mock(IDataProcessorParameters.class);
- Mockito.when(mockParams.getModel()).thenReturn(dataProcessorInvocation);
- Mockito.when(mockParams.extractor()).thenReturn(e);
+ when(mockParams.getModel()).thenReturn(dataProcessorInvocation);
+ when(mockParams.extractor()).thenReturn(extractor);
// calls the onPipelineStarted method of the processor to initialize it
processor.onPipelineStarted(mockParams, null, null);
// mock the output collector to capture the output events and validate the results later
- var mockCollector = Mockito.mock(SpOutputCollector.class);
+ var mockCollector = mock(SpOutputCollector.class);
var spOutputCollectorCaptor = ArgumentCaptor.forClass(Event.class);
diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java
new file mode 100644
index 0000000..3e69a0a
--- /dev/null
+++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/StreamPrefix.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.test.executors;
+
+/**
+ * Provides utility methods to append stream prefixes to property key values.
+ * This class is used to be used in unit tests.
+ * Consider integrating prefix configuration into the application's logic, possibly within the TestConfiguration class
+ */
+public class StreamPrefix {
+ public static final String S0 = "s0";
+ public static final String S1 = "s1";
+
+ /**
+ * Appends the S0 prefix to a given property value.
+ *
+ * @param propertyValue The value to which the S0 prefix will be appended.
+ * @return A string with the S0 prefix followed by the property value.
+ */
+ public static String s0(String propertyValue) {
+ return addPrefix(S0, propertyValue);
+ }
+
+ /**
+ * Appends the S1 prefix to a given property value.
+ *
+ * @param propertyValue The value to which the S1 prefix will be appended.
+ * @return A string with the S1 prefix followed by the property value.
+ */
+ public static String s1(String propertyValue) {
+ return addPrefix(S1, propertyValue);
+ }
+
+ private static String addPrefix(String prefix, String propertyValue) {
+ return prefix + "::" + propertyValue;
+ }
+}
diff --git a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java
index e2260c5..ec20e24 100644
--- a/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java
+++ b/streampipes-test-utils-executors/src/main/java/org/apache/streampipes/test/executors/TestConfigurationBuilder.java
@@ -45,8 +45,8 @@
public TestConfigurationBuilder prefixStrategy(PrefixStrategy strategy){
this.eventPrefixes = switch (strategy){
- case SAME_PREFIX -> List.of("s0");
- case ALTERNATE -> List.of("s0", "s1");
+ case SAME_PREFIX -> List.of(StreamPrefix.S0);
+ case ALTERNATE -> List.of(StreamPrefix.S0, StreamPrefix.S1);
};
return this;
}
diff --git a/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java b/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java
new file mode 100644
index 0000000..c89fa55
--- /dev/null
+++ b/streampipes-test-utils-executors/src/test/java/org/apache/streampipes/test/executors/StreamPrefixTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.test.executors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class StreamPrefixTest {
+
+ @Test
+ void s0_AppendsPropertyValueCorrectly() {
+ var result = StreamPrefix.s0("testValue");
+ assertEquals("s0::testValue", result);
+ }
+
+ @Test
+ void s1_AppendsPropertyValueCorrectly() {
+ var result = StreamPrefix.s1("anotherTestValue");
+ assertEquals("s1::anotherTestValue", result);
+ }
+
+ @Test
+ void s0_HandlesEmptyPropertyValue() {
+ var result = StreamPrefix.s0("");
+ assertEquals("s0::", result);
+ }
+
+ @Test
+ void s1_HandlesEmptyPropertyValue() {
+ var result = StreamPrefix.s1("");
+ assertEquals("s1::", result);
+ }
+
+ @Test
+ void s0_HandlesSpecialCharactersInPropertyValue() {
+ var result = StreamPrefix.s0("value$&*");
+ assertEquals("s0::value$&*", result);
+ }
+
+ @Test
+ void s1_HandlesSpecialCharactersInPropertyValue() {
+ var result = StreamPrefix.s1("value@#%");
+ assertEquals("s1::value@#%", result);
+ }
+}
\ No newline at end of file