feat(#3266): Add API to programmatically create pipelines (#3267)
* feat(#3266): Add API to programmatically create pipelines
* Fix checkstyle
* Fix validation
diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 9aae628..b64b661 100644
--- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -84,7 +84,7 @@
try {
var modificationMessage = new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
- var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
+ var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline().pipeline();
var canAutoMigrate = canAutoMigrate(modificationMessage);
if (!canAutoMigrate) {
modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
new file mode 100644
index 0000000..9289b3d
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.pipeline;
+
+public class ExtendedPipelineElementValidationInfo extends PipelineElementValidationInfo {
+
+ private final String pipelineElementName;
+ private final String pipelineElementId;
+
+ public ExtendedPipelineElementValidationInfo(String pipelineElementName,
+ String pipelineElementId,
+ PipelineElementValidationInfo info) {
+ super(info.getLevel(), info.getMessage());
+ this.pipelineElementName = pipelineElementName;
+ this.pipelineElementId = pipelineElementId;
+ }
+
+ public String getPipelineElementName() {
+ return pipelineElementName;
+ }
+
+ public String getPipelineElementId() {
+ return pipelineElementId;
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
index fc2ce9c..5855988 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
@@ -37,19 +37,8 @@
private List<SpDataStream> inputStreams;
private SpDataStream outputStream;
- public PipelineModification(String domId, String elementId,
- List<StaticProperty> staticProperties) {
- super();
- this.domId = domId;
- this.elementId = elementId;
- this.staticProperties = staticProperties;
- this.inputStreams = new ArrayList<>();
- this.outputStrategies = new ArrayList<>();
- this.validationInfos = new ArrayList<>();
- }
-
public PipelineModification() {
-
+ validationInfos = new ArrayList<>();
}
public String getDomId() {
@@ -92,10 +81,6 @@
this.inputStreams = inputStreams;
}
- public void addInputStream(SpDataStream inputStream) {
- this.inputStreams.add(inputStream);
- }
-
public boolean isPipelineElementValid() {
return pipelineElementValid;
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
similarity index 67%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
index 9285ae1..9e6a1d8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
@@ -16,8 +16,14 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
-import java.util.Map;
+import java.util.List;
-public record CompactConfiguration(Map<String, Object> values) {}
+public record PipelineModificationResult(Pipeline pipeline,
+ List<ExtendedPipelineElementValidationInfo> validationInfos) {
+
+ public boolean allPipelineElementsValid() {
+ return validationInfos.stream().noneMatch(v -> v.getLevel() == ValidationInfoLevel.ERROR);
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
similarity index 70%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
index 9285ae1..7e730b2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
@@ -16,8 +16,12 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
-import java.util.Map;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-public record CompactConfiguration(Map<String, Object> values) {}
+import java.util.List;
+
+public record PipelineVerificationResult(List<ExtendedPipelineElementValidationInfo> validationInfos,
+ List<NamedStreamPipesEntity> modifiedPipelineElements) {
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
similarity index 70%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
index 9285ae1..59948ae 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
@@ -16,8 +16,16 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
-import java.util.Map;
+import org.apache.streampipes.model.connect.adapter.compact.CreateOptions;
-public record CompactConfiguration(Map<String, Object> values) {}
+import java.util.List;
+
+public record CompactPipeline(
+ String id,
+ String name,
+ String description,
+ List<CompactPipelineElement> pipelineElements,
+ CreateOptions createOptions
+) {}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
similarity index 68%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
index 9285ae1..0f976c8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
@@ -16,8 +16,14 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
+import java.util.List;
import java.util.Map;
-public record CompactConfiguration(Map<String, Object> values) {}
+public record CompactPipelineElement(String type,
+ String ref,
+ String id,
+ List<String> connectedTo,
+ List<Map<String, Object>> configuration) {
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
similarity index 83%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
index 9285ae1..bcf4952 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
@@ -16,8 +16,7 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
-import java.util.Map;
-
-public record CompactConfiguration(Map<String, Object> values) {}
+public record CreateOptions(boolean start) {
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
index 5e898c3..7a9400b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
@@ -20,6 +20,16 @@
public abstract class DefaultStaticPropertyVisitor implements StaticPropertyVisitor {
+ protected boolean ignoreValidation;
+
+ public DefaultStaticPropertyVisitor(boolean ignoreValidation) {
+ this.ignoreValidation = ignoreValidation;
+ }
+
+ public DefaultStaticPropertyVisitor() {
+ this(false);
+ }
+
@Override
public void visit(CollectionStaticProperty collectionStaticProperty) {
collectionStaticProperty.getStaticPropertyTemplate().accept(this);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index 362833f..823d7c7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -34,6 +34,7 @@
import org.apache.streampipes.model.message.PipelineModificationMessage;
import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
import org.apache.streampipes.model.pipeline.PipelineModification;
+import org.apache.streampipes.model.pipeline.ValidationInfoLevel;
import java.util.ArrayList;
import java.util.Collections;
@@ -99,7 +100,7 @@
modification.setElementId(t.getElementId());
try {
pipelineValidator.apply(source, t, targets, validationInfos);
- buildModification(modification, t, t.getInputStreams(), true);
+ buildModification(modification, t, t.getInputStreams(), !hasValidationError(modification));
edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDom(), t.getDom()));
} catch (SpValidationException e) {
e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString())));
@@ -114,6 +115,10 @@
});
}
+ private boolean hasValidationError(PipelineModification modification) {
+ return modification.getValidationInfos().stream().anyMatch(v -> v.getLevel() == ValidationInfoLevel.ERROR);
+ }
+
private String makeKey(NamedStreamPipesEntity source,
InvocableStreamPipesEntity t) {
return source.getDom() + "-" + t.getDom();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index a00a5c4..3275f95 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -27,8 +27,11 @@
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.message.PipelineModificationMessage;
+import org.apache.streampipes.model.pipeline.ExtendedPipelineElementValidationInfo;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineModification;
+import org.apache.streampipes.model.pipeline.PipelineModificationResult;
+import org.apache.streampipes.model.pipeline.PipelineVerificationResult;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,11 +52,12 @@
return new PipelineModificationGenerator(graph, steps).buildPipelineModificationMessage();
}
- public Pipeline makeModifiedPipeline() {
- var allElements = verifyAndBuildGraphs(false);
+ public PipelineModificationResult makeModifiedPipeline() {
+ var result = verifyAndBuildGraphs(false);
+ var allElements = result.modifiedPipelineElements();
pipeline.setSepas(filterAndConvert(allElements, DataProcessorInvocation.class));
pipeline.setActions(filterAndConvert(allElements, DataSinkInvocation.class));
- return pipeline;
+ return new PipelineModificationResult(pipeline, result.validationInfos());
}
private <T extends InvocableStreamPipesEntity> List<T> filterAndConvert(List<NamedStreamPipesEntity> elements,
@@ -65,10 +69,11 @@
.toList();
}
- public List<NamedStreamPipesEntity> verifyAndBuildGraphs(boolean ignoreUnconfigured) {
+ public PipelineVerificationResult verifyAndBuildGraphs(boolean ignoreUnconfigured) {
var pipelineModifications = verifyPipeline().getPipelineModifications();
var allElements = new AllElementsProvider(pipeline).getAllElements();
- var result = new ArrayList<NamedStreamPipesEntity>();
+ var validationInfos = new ArrayList<ExtendedPipelineElementValidationInfo>();
+ var modifiedPipelineElements = new ArrayList<NamedStreamPipesEntity>();
allElements.forEach(pipelineElement -> {
var modificationOpt = getModification(pipelineElement.getDom(), pipelineModifications);
if (modificationOpt.isPresent()) {
@@ -79,15 +84,26 @@
applyModificationsForDataProcessor((DataProcessorInvocation) pipelineElement, modification);
}
}
+ validationInfos.addAll(
+ modification.getValidationInfos()
+ .stream()
+ .map(v -> new ExtendedPipelineElementValidationInfo(
+ pipelineElement.getName(),
+ pipelineElement.getDom(),
+ v
+ )
+ )
+ .toList()
+ );
if (!ignoreUnconfigured || modification.isPipelineElementValid()) {
- result.add(pipelineElement);
+ modifiedPipelineElements.add(pipelineElement);
}
} else {
- result.add(pipelineElement);
+ modifiedPipelineElements.add(pipelineElement);
}
});
- return result;
+ return new PipelineVerificationResult(validationInfos, modifiedPipelineElements);
}
private void applyModificationsForDataProcessor(DataProcessorInvocation pipelineElement,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
index 1083396..fadbd19 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
@@ -29,23 +29,33 @@
import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
import org.apache.streampipes.model.staticproperty.MatchingStaticProperty;
import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.model.staticproperty.RuntimeResolvableGroupStaticProperty;
import org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
public class CheckCompletedVisitor extends DefaultStaticPropertyVisitor {
- private List<PipelineElementValidationInfo> validationInfos;
+ private final List<PipelineElementValidationInfo> validationInfos;
public CheckCompletedVisitor() {
this.validationInfos = new ArrayList<>();
}
+ public CheckCompletedVisitor(boolean ignoreValidation) {
+ this();
+ this.ignoreValidation = ignoreValidation;
+ }
+
@Override
public void visit(AnyStaticProperty property) {
@@ -53,21 +63,22 @@
@Override
public void visit(CodeInputStaticProperty codeInputStaticProperty) {
+ validateNull(codeInputStaticProperty, codeInputStaticProperty.getValue());
}
@Override
public void visit(ColorPickerStaticProperty colorPickerStaticProperty) {
-
+ validateNull(colorPickerStaticProperty, colorPickerStaticProperty.getSelectedColor());
}
@Override
public void visit(FileStaticProperty fileStaticProperty) {
-
+ validateNull(fileStaticProperty, fileStaticProperty.getLocationPath());
}
@Override
public void visit(FreeTextStaticProperty freeTextStaticProperty) {
-
+ validateNull(freeTextStaticProperty, freeTextStaticProperty.getValue());
}
@Override
@@ -110,7 +121,7 @@
}
}
} else {
- if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()){
+ if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()) {
String firstSelector = mappingPropertyUnary.getMapsFromOptions().get(0);
mappingPropertyUnary.setSelectedProperty(firstSelector);
}
@@ -124,12 +135,19 @@
@Override
public void visit(OneOfStaticProperty oneOfStaticProperty) {
-
+ if (!ignoreValidation && oneOfStaticProperty.getOptions().stream().noneMatch(Option::isSelected)) {
+ validationInfos.add(PipelineElementValidationInfo.error(
+ String.format(
+ "Configuration \"%s\" must have one selected option, but no option was selected.",
+ oneOfStaticProperty.getInternalName()
+ )
+ ));
+ }
}
@Override
public void visit(SecretStaticProperty secretStaticProperty) {
-
+ validateNull(secretStaticProperty, secretStaticProperty.getValue());
}
@Override
@@ -139,18 +157,43 @@
@Override
public void visit(RuntimeResolvableTreeInputStaticProperty treeInputStaticProperty) {
-
+ if (!ignoreValidation
+ && !treeInputStaticProperty.isOptional()
+ && treeInputStaticProperty.getSelectedNodesInternalNames().isEmpty()) {
+ addMissingConfiguration(treeInputStaticProperty);
+ }
}
@Override
public void visit(RuntimeResolvableGroupStaticProperty groupStaticProperty) {
+ }
+ @Override
+ public void visit(StaticPropertyAlternatives staticPropertyAlternatives) {
+ if (!ignoreValidation && !staticPropertyAlternatives.isOptional()
+ && staticPropertyAlternatives.getAlternatives().stream().noneMatch(StaticPropertyAlternative::getSelected)) {
+ validationInfos.add(PipelineElementValidationInfo.error(
+ String.format(
+ "No alternative of configuration \"%s\" was selected, but at least one alternative must be chosen",
+ staticPropertyAlternatives.getInternalName()
+ )
+ ));
+ }
+ var visitor = new CheckCompletedVisitor(true);
+ staticPropertyAlternatives.getAlternatives().forEach(alternative -> alternative.accept(visitor));
+ validationInfos.addAll(visitor.getValidationInfos());
}
public List<PipelineElementValidationInfo> getValidationInfos() {
return this.validationInfos;
}
+ private void validateNull(StaticProperty sp, Object value) {
+ if (!ignoreValidation && !sp.isOptional() && Objects.isNull(value)) {
+ addMissingConfiguration(sp);
+ }
+ }
+
private boolean existsSelection(MappingPropertyUnary mappingProperty) {
return !(mappingProperty.getSelectedProperty() == null || mappingProperty.getSelectedProperty().isEmpty());
}
@@ -158,4 +201,15 @@
private boolean existsSelection(MappingPropertyNary mappingProperty) {
return !(mappingProperty.getSelectedProperties() == null || mappingProperty.getSelectedProperties().isEmpty());
}
+
+ private void addMissingConfiguration(StaticProperty sp) {
+ validationInfos.add(
+ PipelineElementValidationInfo.error(
+ String.format(
+ "Configuration option \"%s\" as no value although it is marked as required",
+ sp.getInternalName()
+ )
+ )
+ );
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
new file mode 100644
index 0000000..1a0a1cd
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact;
+
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
+import org.apache.streampipes.manager.pipeline.compact.generation.PipelineElementConfigurationStep;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModificationResult;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+
+public class CompactPipelineManagement {
+
+ private final IPipelineElementDescriptionStorage storage;
+
+ public CompactPipelineManagement(IPipelineElementDescriptionStorage storage) {
+ this.storage = storage;
+ }
+
+ public PipelineModificationResult makePipeline(CompactPipeline compactPipeline) throws Exception {
+ var pipeline = new Pipeline();
+ applyPipelineBasics(compactPipeline, pipeline);
+
+ new PipelineElementConfigurationStep(storage).apply(pipeline, compactPipeline);
+
+ return new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
+ }
+
+ private void applyPipelineBasics(CompactPipeline compactPipeline,
+ Pipeline pipeline) {
+ pipeline.setElementId(compactPipeline.id());
+ pipeline.setName(compactPipeline.name());
+ pipeline.setDescription(compactPipeline.description());
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
similarity index 70%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
index 9285ae1..18b8efe 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
@@ -16,8 +16,13 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.manager.pipeline.compact.generation;
-import java.util.Map;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
-public record CompactConfiguration(Map<String, Object> values) {}
+public interface CompactPipelineGenerator {
+
+ void apply(Pipeline pipeline,
+ CompactPipeline compactPipeline) throws Exception;
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
new file mode 100644
index 0000000..c736293
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.manager.template.DataProcessorTemplateHandler;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+
+public class DataProcessorPipelineElementGenerator {
+
+ private final InvocablePipelineElementGenerator<DataProcessorInvocation> basicGenerator;
+
+ public DataProcessorPipelineElementGenerator(
+ InvocablePipelineElementGenerator<DataProcessorInvocation> basicGenerator) {
+ this.basicGenerator = basicGenerator;
+ }
+
+ public DataProcessorInvocation generate(DataProcessorInvocation processor,
+ CompactPipelineElement pipelineElement) {
+ basicGenerator.apply(processor, pipelineElement);
+ var template = basicGenerator.makeTemplate(processor, pipelineElement);
+ return new DataProcessorTemplateHandler(template, processor, false)
+ .applyTemplateOnPipelineElement();
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
new file mode 100644
index 0000000..2c1f940
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.manager.template.DataSinkTemplateHandler;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+
+public class DataSinkPipelineElementGenerator {
+
+ private final InvocablePipelineElementGenerator<DataSinkInvocation> basicGenerator;
+
+ public DataSinkPipelineElementGenerator(InvocablePipelineElementGenerator<DataSinkInvocation> basicGenerator) {
+ this.basicGenerator = basicGenerator;
+ }
+
+ public DataSinkInvocation generate(DataSinkInvocation sink,
+ CompactPipelineElement pipelineElement) {
+ basicGenerator.apply(sink, pipelineElement);
+ var template = basicGenerator.makeTemplate(sink, pipelineElement);
+ return new DataSinkTemplateHandler(template, sink, false)
+ .applyTemplateOnPipelineElement();
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
similarity index 63%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
index 9285ae1..9d46606 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
@@ -16,8 +16,16 @@
*
*/
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.manager.pipeline.compact.generation;
-import java.util.Map;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
-public record CompactConfiguration(Map<String, Object> values) {}
+public class DataStreamPipelineElementGenerator {
+
+ public SpDataStream generate(SpDataStream stream,
+ CompactPipelineElement pipelineElement) {
+ stream.setDom("jsplumb_" + pipelineElement.ref());
+ return stream;
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
new file mode 100644
index 0000000..0f8d135
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+import org.apache.streampipes.model.util.Cloner;
+
+import java.util.ArrayList;
+
+public class InvocablePipelineElementGenerator<T extends InvocableStreamPipesEntity> {
+
+ private static final String ID_PREFIX = "jsplumb_";
+
+ public void apply(T element,
+ CompactPipelineElement compatPipelineElement) {
+ element.setDom(ID_PREFIX + compatPipelineElement.ref());
+ element.setConnectedTo(compatPipelineElement.connectedTo().stream().map(c -> ID_PREFIX + c).toList());
+ element.setStreamRequirements(new Cloner().streams(element.getStreamRequirements()));
+ }
+
+ protected PipelineElementTemplate makeTemplate(T element,
+ CompactPipelineElement compactPipelineElement) {
+ var configs = compactPipelineElement.configuration();
+ if (compactPipelineElement.configuration() == null) {
+ configs = new ArrayList<>();
+ }
+ var template = new PipelineElementTemplate();
+ template.setTemplateConfigs(configs);
+ template.setBasePipelineElementAppId(element.getAppId());
+
+ return template;
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
new file mode 100644
index 0000000..ec55394
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+
+public class PipelineElementConfigurationStep implements CompactPipelineGenerator {
+
+ private static final String StreamType = "stream";
+ private static final String ProcessorType = "processor";
+ private static final String SinkType = "sink";
+
+ private final IPipelineElementDescriptionStorage storage;
+
+ public PipelineElementConfigurationStep(IPipelineElementDescriptionStorage storage) {
+ this.storage = storage;
+ }
+
+ @Override
+ public void apply(Pipeline pipeline,
+ CompactPipeline compactPipeline) throws Exception {
+ compactPipeline.pipelineElements().forEach(pe -> {
+ if (pe.type().equalsIgnoreCase(StreamType)) {
+ pipeline.getStreams().add(makeStream(pe));
+ } else if (pe.type().equalsIgnoreCase(ProcessorType)) {
+ pipeline.getSepas().add(makeProcessor(pe));
+ } else if (pe.type().equalsIgnoreCase(SinkType)) {
+ pipeline.getActions().add(makeSink(pe));
+ }
+ });
+ }
+
+ public SpDataStream makeStream(CompactPipelineElement pipelineElement) {
+ var element = storage.getDataStreamById(pipelineElement.id());
+ return new DataStreamPipelineElementGenerator().generate(element, pipelineElement);
+ }
+
+ public DataProcessorInvocation makeProcessor(CompactPipelineElement pipelineElement) {
+ var element = storage.getDataProcessorByAppId(pipelineElement.id());
+ var invocation = new DataProcessorInvocation(element);
+ return new DataProcessorPipelineElementGenerator(new InvocablePipelineElementGenerator<>())
+ .generate(invocation, pipelineElement);
+ }
+
+ public DataSinkInvocation makeSink(CompactPipelineElement pipelineElement) {
+ var element = storage.getDataSinkByAppId(pipelineElement.id());
+ var invocation = new DataSinkInvocation(element);
+ return new DataSinkPipelineElementGenerator(new InvocablePipelineElementGenerator<>())
+ .generate(invocation, pipelineElement);
+ }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 0e6b22b..3ec7651 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -53,6 +53,7 @@
List<NamedStreamPipesEntity> pipelineElements = new ArrayList<>(
new PipelineVerificationHandlerV2(pipeline)
.verifyAndBuildGraphs(true)
+ .modifiedPipelineElements()
);
rewriteElementIds(pipelineElements, elementIdMappings);
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
new file mode 100644
index 0000000..2147dde
--- /dev/null
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement;
+import org.apache.streampipes.model.message.Notification;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.security.AuthConstants;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api/v2/compact-pipelines")
+public class CompactPipelineResource extends AbstractAuthGuardedRestResource {
+
+ private final CompactPipelineManagement compactPipelineManagement;
+
+ public CompactPipelineResource() {
+ this.compactPipelineManagement = new CompactPipelineManagement(
+ getPipelineElementStorage()
+ );
+ }
+
+ @PostMapping(
+ consumes = {
+ MediaType.APPLICATION_JSON_VALUE,
+ "application/yaml",
+ "application/yml"
+ }
+ )
+ @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
+ public ResponseEntity<?> addPipelineCompact(
+ @RequestBody CompactPipeline compactPipeline
+ ) throws Exception {
+
+ var pipelineGenerationResult = compactPipelineManagement.makePipeline(compactPipeline);
+ if (pipelineGenerationResult.allPipelineElementsValid()) {
+ String pipelineId = PipelineManager.addPipeline(getAuthenticatedUserSid(), pipelineGenerationResult.pipeline());
+ if (compactPipeline.createOptions().start()) {
+ try {
+ PipelineOperationStatus status = PipelineManager.startPipeline(pipelineId);
+ return ok(status);
+ } catch (Exception e) {
+ return statusMessage(Notifications.error(NotificationType.UNKNOWN_ERROR));
+ }
+ }
+ var message = Notifications.success("Pipeline stored");
+ message.addNotification(new Notification("id", pipelineId));
+ return ok(message);
+ } else {
+ return ResponseEntity.status(400).body(pipelineGenerationResult.validationInfos());
+ }
+ }
+
+ @PutMapping(
+ path = "{id}",
+ consumes = {
+ MediaType.APPLICATION_JSON_VALUE,
+ "application/yaml",
+ "application/yml"
+ }
+ )
+ @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
+ public ResponseEntity<?> updatePipelineCompact(
+ @PathVariable("id") String elementId,
+ @RequestBody CompactPipeline compactPipeline
+ ) throws Exception {
+
+ return null;
+ }
+}