feat(#3266): Add API to programmatically create pipelines (#3267)

* feat(#3266): Add API to programmatically create pipelines

* Fix checkstyle

* Fix validation
diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index 9aae628..b64b661 100644
--- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -84,7 +84,7 @@
       try {
         var modificationMessage = new PipelineVerificationHandlerV2(pipeline).verifyPipeline();
         var updateInfo = makeUpdateInfo(modificationMessage, pipeline);
-        var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
+        var modifiedPipeline = new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline().pipeline();
         var canAutoMigrate = canAutoMigrate(modificationMessage);
         if (!canAutoMigrate) {
           modifiedPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
new file mode 100644
index 0000000..9289b3d
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/ExtendedPipelineElementValidationInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.pipeline;
+
+public class ExtendedPipelineElementValidationInfo extends PipelineElementValidationInfo {
+
+  private final String pipelineElementName;
+  private final String pipelineElementId;
+
+  public ExtendedPipelineElementValidationInfo(String pipelineElementName,
+                                               String pipelineElementId,
+                                               PipelineElementValidationInfo info) {
+    super(info.getLevel(), info.getMessage());
+    this.pipelineElementName = pipelineElementName;
+    this.pipelineElementId = pipelineElementId;
+  }
+
+  public String getPipelineElementName() {
+    return pipelineElementName;
+  }
+
+  public String getPipelineElementId() {
+    return pipelineElementId;
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
index fc2ce9c..5855988 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModification.java
@@ -37,19 +37,8 @@
   private List<SpDataStream> inputStreams;
   private SpDataStream outputStream;
 
-  public PipelineModification(String domId, String elementId,
-                              List<StaticProperty> staticProperties) {
-    super();
-    this.domId = domId;
-    this.elementId = elementId;
-    this.staticProperties = staticProperties;
-    this.inputStreams = new ArrayList<>();
-    this.outputStrategies = new ArrayList<>();
-    this.validationInfos = new ArrayList<>();
-  }
-
   public PipelineModification() {
-
+    validationInfos = new ArrayList<>();
   }
 
   public String getDomId() {
@@ -92,10 +81,6 @@
     this.inputStreams = inputStreams;
   }
 
-  public void addInputStream(SpDataStream inputStream) {
-    this.inputStreams.add(inputStream);
-  }
-
   public boolean isPipelineElementValid() {
     return pipelineElementValid;
   }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
similarity index 67%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
index 9285ae1..9e6a1d8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineModificationResult.java
@@ -16,8 +16,14 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
 
-import java.util.Map;
+import java.util.List;
 
-public record CompactConfiguration(Map<String, Object> values) {}
+public record PipelineModificationResult(Pipeline pipeline,
+                                         List<ExtendedPipelineElementValidationInfo> validationInfos) {
+
+  public boolean allPipelineElementsValid() {
+    return validationInfos.stream().noneMatch(v -> v.getLevel() == ValidationInfoLevel.ERROR);
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
similarity index 70%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
index 9285ae1..7e730b2 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineVerificationResult.java
@@ -16,8 +16,12 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline;
 
-import java.util.Map;
+import org.apache.streampipes.model.base.NamedStreamPipesEntity;
 
-public record CompactConfiguration(Map<String, Object> values) {}
+import java.util.List;
+
+public record PipelineVerificationResult(List<ExtendedPipelineElementValidationInfo> validationInfos,
+                                         List<NamedStreamPipesEntity> modifiedPipelineElements) {
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
similarity index 70%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
index 9285ae1..59948ae 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipeline.java
@@ -16,8 +16,16 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
 
-import java.util.Map;
+import org.apache.streampipes.model.connect.adapter.compact.CreateOptions;
 
-public record CompactConfiguration(Map<String, Object> values) {}
+import java.util.List;
+
+public record CompactPipeline(
+    String id,
+    String name,
+    String description,
+    List<CompactPipelineElement> pipelineElements,
+    CreateOptions createOptions
+) {}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
similarity index 68%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
index 9285ae1..0f976c8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CompactPipelineElement.java
@@ -16,8 +16,14 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
 
+import java.util.List;
 import java.util.Map;
 
-public record CompactConfiguration(Map<String, Object> values) {}
+public record CompactPipelineElement(String type,
+                                     String ref,
+                                     String id,
+                                     List<String> connectedTo,
+                                     List<Map<String, Object>> configuration) {
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
similarity index 83%
rename from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
rename to streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
index 9285ae1..bcf4952 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/compact/CreateOptions.java
@@ -16,8 +16,7 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.model.pipeline.compact;
 
-import java.util.Map;
-
-public record CompactConfiguration(Map<String, Object> values) {}
+public record CreateOptions(boolean start) {
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
index 5e898c3..7a9400b 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/staticproperty/DefaultStaticPropertyVisitor.java
@@ -20,6 +20,16 @@
 
 public abstract class DefaultStaticPropertyVisitor implements StaticPropertyVisitor {
 
+  protected boolean ignoreValidation;
+
+  public DefaultStaticPropertyVisitor(boolean ignoreValidation) {
+    this.ignoreValidation = ignoreValidation;
+  }
+
+  public DefaultStaticPropertyVisitor() {
+    this(false);
+  }
+
   @Override
   public void visit(CollectionStaticProperty collectionStaticProperty) {
     collectionStaticProperty.getStaticPropertyTemplate().accept(this);
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
index 362833f..823d7c7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineModificationGenerator.java
@@ -34,6 +34,7 @@
 import org.apache.streampipes.model.message.PipelineModificationMessage;
 import org.apache.streampipes.model.pipeline.PipelineElementValidationInfo;
 import org.apache.streampipes.model.pipeline.PipelineModification;
+import org.apache.streampipes.model.pipeline.ValidationInfoLevel;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -99,7 +100,7 @@
       modification.setElementId(t.getElementId());
       try {
         pipelineValidator.apply(source, t, targets, validationInfos);
-        buildModification(modification, t, t.getInputStreams(), true);
+        buildModification(modification, t, t.getInputStreams(), !hasValidationError(modification));
         edgeValidations.put(makeKey(source, t), PipelineEdgeValidation.complete(source.getDom(), t.getDom()));
       } catch (SpValidationException e) {
         e.getErrorLog().forEach(log -> validationInfos.add(PipelineElementValidationInfo.error(log.toString())));
@@ -114,6 +115,10 @@
     });
   }
 
+  private boolean hasValidationError(PipelineModification modification) {
+    return modification.getValidationInfos().stream().anyMatch(v -> v.getLevel() == ValidationInfoLevel.ERROR);
+  }
+
   private String makeKey(NamedStreamPipesEntity source,
                          InvocableStreamPipesEntity t) {
     return source.getDom() + "-" + t.getDom();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
index a00a5c4..3275f95 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/PipelineVerificationHandlerV2.java
@@ -27,8 +27,11 @@
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.grounding.EventGrounding;
 import org.apache.streampipes.model.message.PipelineModificationMessage;
+import org.apache.streampipes.model.pipeline.ExtendedPipelineElementValidationInfo;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineModification;
+import org.apache.streampipes.model.pipeline.PipelineModificationResult;
+import org.apache.streampipes.model.pipeline.PipelineVerificationResult;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,11 +52,12 @@
     return new PipelineModificationGenerator(graph, steps).buildPipelineModificationMessage();
   }
 
-  public Pipeline makeModifiedPipeline() {
-    var allElements = verifyAndBuildGraphs(false);
+  public PipelineModificationResult makeModifiedPipeline() {
+    var result = verifyAndBuildGraphs(false);
+    var allElements = result.modifiedPipelineElements();
     pipeline.setSepas(filterAndConvert(allElements, DataProcessorInvocation.class));
     pipeline.setActions(filterAndConvert(allElements, DataSinkInvocation.class));
-    return pipeline;
+    return new PipelineModificationResult(pipeline, result.validationInfos());
   }
 
   private <T extends InvocableStreamPipesEntity> List<T> filterAndConvert(List<NamedStreamPipesEntity> elements,
@@ -65,10 +69,11 @@
         .toList();
   }
 
-  public List<NamedStreamPipesEntity> verifyAndBuildGraphs(boolean ignoreUnconfigured) {
+  public PipelineVerificationResult verifyAndBuildGraphs(boolean ignoreUnconfigured) {
     var pipelineModifications = verifyPipeline().getPipelineModifications();
     var allElements = new AllElementsProvider(pipeline).getAllElements();
-    var result = new ArrayList<NamedStreamPipesEntity>();
+    var validationInfos = new ArrayList<ExtendedPipelineElementValidationInfo>();
+    var modifiedPipelineElements = new ArrayList<NamedStreamPipesEntity>();
     allElements.forEach(pipelineElement -> {
       var modificationOpt = getModification(pipelineElement.getDom(), pipelineModifications);
       if (modificationOpt.isPresent()) {
@@ -79,15 +84,26 @@
             applyModificationsForDataProcessor((DataProcessorInvocation) pipelineElement, modification);
           }
         }
+        validationInfos.addAll(
+            modification.getValidationInfos()
+                .stream()
+                .map(v -> new ExtendedPipelineElementValidationInfo(
+                        pipelineElement.getName(),
+                        pipelineElement.getDom(),
+                        v
+                    )
+                )
+                .toList()
+        );
         if (!ignoreUnconfigured || modification.isPipelineElementValid()) {
-          result.add(pipelineElement);
+          modifiedPipelineElements.add(pipelineElement);
         }
       } else {
-        result.add(pipelineElement);
+        modifiedPipelineElements.add(pipelineElement);
       }
     });
 
-    return result;
+    return new PipelineVerificationResult(validationInfos, modifiedPipelineElements);
   }
 
   private void applyModificationsForDataProcessor(DataProcessorInvocation pipelineElement,
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
index 1083396..fadbd19 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/matching/v2/pipeline/CheckCompletedVisitor.java
@@ -29,23 +29,33 @@
 import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
 import org.apache.streampipes.model.staticproperty.MatchingStaticProperty;
 import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.model.staticproperty.RuntimeResolvableGroupStaticProperty;
 import org.apache.streampipes.model.staticproperty.RuntimeResolvableTreeInputStaticProperty;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.model.staticproperty.SlideToggleStaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public class CheckCompletedVisitor extends DefaultStaticPropertyVisitor {
 
-  private List<PipelineElementValidationInfo> validationInfos;
+  private final List<PipelineElementValidationInfo> validationInfos;
 
   public CheckCompletedVisitor() {
     this.validationInfos = new ArrayList<>();
   }
 
+  public CheckCompletedVisitor(boolean ignoreValidation) {
+    this();
+    this.ignoreValidation = ignoreValidation;
+  }
+
   @Override
   public void visit(AnyStaticProperty property) {
 
@@ -53,21 +63,22 @@
 
   @Override
   public void visit(CodeInputStaticProperty codeInputStaticProperty) {
+    validateNull(codeInputStaticProperty, codeInputStaticProperty.getValue());
   }
 
   @Override
   public void visit(ColorPickerStaticProperty colorPickerStaticProperty) {
-
+    validateNull(colorPickerStaticProperty, colorPickerStaticProperty.getSelectedColor());
   }
 
   @Override
   public void visit(FileStaticProperty fileStaticProperty) {
-
+    validateNull(fileStaticProperty, fileStaticProperty.getLocationPath());
   }
 
   @Override
   public void visit(FreeTextStaticProperty freeTextStaticProperty) {
-
+    validateNull(freeTextStaticProperty, freeTextStaticProperty.getValue());
   }
 
   @Override
@@ -110,7 +121,7 @@
         }
       }
     } else {
-      if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()){
+      if (!mappingPropertyUnary.getMapsFromOptions().isEmpty()) {
         String firstSelector = mappingPropertyUnary.getMapsFromOptions().get(0);
         mappingPropertyUnary.setSelectedProperty(firstSelector);
       }
@@ -124,12 +135,19 @@
 
   @Override
   public void visit(OneOfStaticProperty oneOfStaticProperty) {
-
+    if (!ignoreValidation && oneOfStaticProperty.getOptions().stream().noneMatch(Option::isSelected)) {
+      validationInfos.add(PipelineElementValidationInfo.error(
+          String.format(
+              "Configuration \"%s\" must have one selected option, but no option was selected.",
+              oneOfStaticProperty.getInternalName()
+          )
+      ));
+    }
   }
 
   @Override
   public void visit(SecretStaticProperty secretStaticProperty) {
-
+    validateNull(secretStaticProperty, secretStaticProperty.getValue());
   }
 
   @Override
@@ -139,18 +157,43 @@
 
   @Override
   public void visit(RuntimeResolvableTreeInputStaticProperty treeInputStaticProperty) {
-
+    if (!ignoreValidation
+        && !treeInputStaticProperty.isOptional()
+        && treeInputStaticProperty.getSelectedNodesInternalNames().isEmpty()) {
+      addMissingConfiguration(treeInputStaticProperty);
+    }
   }
 
   @Override
   public void visit(RuntimeResolvableGroupStaticProperty groupStaticProperty) {
+  }
 
+  @Override
+  public void visit(StaticPropertyAlternatives staticPropertyAlternatives) {
+    if (!ignoreValidation && !staticPropertyAlternatives.isOptional()
+        && staticPropertyAlternatives.getAlternatives().stream().noneMatch(StaticPropertyAlternative::getSelected)) {
+      validationInfos.add(PipelineElementValidationInfo.error(
+          String.format(
+              "No alternative of configuration \"%s\" was selected, but at least one alternative must be chosen",
+              staticPropertyAlternatives.getInternalName()
+          )
+      ));
+    }
+    var visitor = new CheckCompletedVisitor(true);
+    staticPropertyAlternatives.getAlternatives().forEach(alternative -> alternative.accept(visitor));
+    validationInfos.addAll(visitor.getValidationInfos());
   }
 
   public List<PipelineElementValidationInfo> getValidationInfos() {
     return this.validationInfos;
   }
 
+  private void validateNull(StaticProperty sp, Object value) {
+    if (!ignoreValidation && !sp.isOptional() && Objects.isNull(value)) {
+      addMissingConfiguration(sp);
+    }
+  }
+
   private boolean existsSelection(MappingPropertyUnary mappingProperty) {
     return !(mappingProperty.getSelectedProperty() == null || mappingProperty.getSelectedProperty().isEmpty());
   }
@@ -158,4 +201,15 @@
   private boolean existsSelection(MappingPropertyNary mappingProperty) {
     return !(mappingProperty.getSelectedProperties() == null || mappingProperty.getSelectedProperties().isEmpty());
   }
+
+  private void addMissingConfiguration(StaticProperty sp) {
+    validationInfos.add(
+        PipelineElementValidationInfo.error(
+            String.format(
+                "Configuration option \"%s\" as no value although it is marked as required",
+                sp.getInternalName()
+            )
+        )
+    );
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
new file mode 100644
index 0000000..1a0a1cd
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/CompactPipelineManagement.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact;
+
+import org.apache.streampipes.manager.matching.PipelineVerificationHandlerV2;
+import org.apache.streampipes.manager.pipeline.compact.generation.PipelineElementConfigurationStep;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineModificationResult;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+
+public class CompactPipelineManagement {
+
+  private final IPipelineElementDescriptionStorage storage;
+
+  public CompactPipelineManagement(IPipelineElementDescriptionStorage storage) {
+    this.storage = storage;
+  }
+
+  public PipelineModificationResult makePipeline(CompactPipeline compactPipeline) throws Exception {
+    var pipeline = new Pipeline();
+    applyPipelineBasics(compactPipeline, pipeline);
+
+    new PipelineElementConfigurationStep(storage).apply(pipeline, compactPipeline);
+
+    return new PipelineVerificationHandlerV2(pipeline).makeModifiedPipeline();
+  }
+
+  private void applyPipelineBasics(CompactPipeline compactPipeline,
+                                   Pipeline pipeline) {
+    pipeline.setElementId(compactPipeline.id());
+    pipeline.setName(compactPipeline.name());
+    pipeline.setDescription(compactPipeline.description());
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
similarity index 70%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
index 9285ae1..18b8efe 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/CompactPipelineGenerator.java
@@ -16,8 +16,13 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.manager.pipeline.compact.generation;
 
-import java.util.Map;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
 
-public record CompactConfiguration(Map<String, Object> values) {}
+public interface CompactPipelineGenerator {
+
+  void apply(Pipeline pipeline,
+             CompactPipeline compactPipeline) throws Exception;
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
new file mode 100644
index 0000000..c736293
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataProcessorPipelineElementGenerator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.manager.template.DataProcessorTemplateHandler;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+
+public class DataProcessorPipelineElementGenerator {
+
+  private final InvocablePipelineElementGenerator<DataProcessorInvocation> basicGenerator;
+
+  public DataProcessorPipelineElementGenerator(
+      InvocablePipelineElementGenerator<DataProcessorInvocation> basicGenerator) {
+    this.basicGenerator = basicGenerator;
+  }
+
+  public DataProcessorInvocation generate(DataProcessorInvocation processor,
+                                          CompactPipelineElement pipelineElement) {
+    basicGenerator.apply(processor, pipelineElement);
+    var template = basicGenerator.makeTemplate(processor, pipelineElement);
+    return new DataProcessorTemplateHandler(template, processor, false)
+        .applyTemplateOnPipelineElement();
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
new file mode 100644
index 0000000..2c1f940
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataSinkPipelineElementGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.manager.template.DataSinkTemplateHandler;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+
+public class DataSinkPipelineElementGenerator {
+
+  private final InvocablePipelineElementGenerator<DataSinkInvocation> basicGenerator;
+
+  public DataSinkPipelineElementGenerator(InvocablePipelineElementGenerator<DataSinkInvocation> basicGenerator) {
+    this.basicGenerator = basicGenerator;
+  }
+
+  public DataSinkInvocation generate(DataSinkInvocation sink,
+                                          CompactPipelineElement pipelineElement) {
+    basicGenerator.apply(sink, pipelineElement);
+    var template = basicGenerator.makeTemplate(sink, pipelineElement);
+    return new DataSinkTemplateHandler(template, sink, false)
+        .applyTemplateOnPipelineElement();
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
similarity index 63%
copy from streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
copy to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
index 9285ae1..9d46606 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/compact/CompactConfiguration.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/DataStreamPipelineElementGenerator.java
@@ -16,8 +16,16 @@
  *
  */
 
-package org.apache.streampipes.model.connect.adapter.compact;
+package org.apache.streampipes.manager.pipeline.compact.generation;
 
-import java.util.Map;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
 
-public record CompactConfiguration(Map<String, Object> values) {}
+public class DataStreamPipelineElementGenerator {
+
+  public SpDataStream generate(SpDataStream stream,
+                               CompactPipelineElement pipelineElement) {
+    stream.setDom("jsplumb_" + pipelineElement.ref());
+    return stream;
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
new file mode 100644
index 0000000..0f8d135
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/InvocablePipelineElementGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.model.template.PipelineElementTemplate;
+import org.apache.streampipes.model.util.Cloner;
+
+import java.util.ArrayList;
+
+public class InvocablePipelineElementGenerator<T extends InvocableStreamPipesEntity> {
+
+  private static final String ID_PREFIX = "jsplumb_";
+
+  public void apply(T element,
+                    CompactPipelineElement compatPipelineElement) {
+    element.setDom(ID_PREFIX + compatPipelineElement.ref());
+    element.setConnectedTo(compatPipelineElement.connectedTo().stream().map(c -> ID_PREFIX + c).toList());
+    element.setStreamRequirements(new Cloner().streams(element.getStreamRequirements()));
+  }
+
+  protected PipelineElementTemplate makeTemplate(T element,
+                                                 CompactPipelineElement compactPipelineElement) {
+    var configs = compactPipelineElement.configuration();
+    if (compactPipelineElement.configuration() == null) {
+      configs = new ArrayList<>();
+    }
+    var template = new PipelineElementTemplate();
+    template.setTemplateConfigs(configs);
+    template.setBasePipelineElementAppId(element.getAppId());
+
+    return template;
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
new file mode 100644
index 0000000..ec55394
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/pipeline/compact/generation/PipelineElementConfigurationStep.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.pipeline.compact.generation;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.model.pipeline.compact.CompactPipelineElement;
+import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
+
+public class PipelineElementConfigurationStep implements CompactPipelineGenerator {
+
+  private static final String StreamType = "stream";
+  private static final String ProcessorType = "processor";
+  private static final String SinkType = "sink";
+
+  private final IPipelineElementDescriptionStorage storage;
+
+  public PipelineElementConfigurationStep(IPipelineElementDescriptionStorage storage) {
+    this.storage = storage;
+  }
+
+  @Override
+  public void apply(Pipeline pipeline,
+                    CompactPipeline compactPipeline) throws Exception {
+    compactPipeline.pipelineElements().forEach(pe -> {
+      if (pe.type().equalsIgnoreCase(StreamType)) {
+        pipeline.getStreams().add(makeStream(pe));
+      } else if (pe.type().equalsIgnoreCase(ProcessorType)) {
+        pipeline.getSepas().add(makeProcessor(pe));
+      } else if (pe.type().equalsIgnoreCase(SinkType)) {
+        pipeline.getActions().add(makeSink(pe));
+      }
+    });
+  }
+
+  public SpDataStream makeStream(CompactPipelineElement pipelineElement) {
+    var element = storage.getDataStreamById(pipelineElement.id());
+    return new DataStreamPipelineElementGenerator().generate(element, pipelineElement);
+  }
+
+  public DataProcessorInvocation makeProcessor(CompactPipelineElement pipelineElement) {
+    var element = storage.getDataProcessorByAppId(pipelineElement.id());
+    var invocation = new DataProcessorInvocation(element);
+    return new DataProcessorPipelineElementGenerator(new InvocablePipelineElementGenerator<>())
+        .generate(invocation, pipelineElement);
+  }
+
+  public DataSinkInvocation makeSink(CompactPipelineElement pipelineElement) {
+    var element = storage.getDataSinkByAppId(pipelineElement.id());
+    var invocation = new DataSinkInvocation(element);
+    return new DataSinkPipelineElementGenerator(new InvocablePipelineElementGenerator<>())
+        .generate(invocation, pipelineElement);
+  }
+
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
index 0e6b22b..3ec7651 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java
@@ -53,6 +53,7 @@
     List<NamedStreamPipesEntity> pipelineElements = new ArrayList<>(
         new PipelineVerificationHandlerV2(pipeline)
             .verifyAndBuildGraphs(true)
+            .modifiedPipelineElements()
     );
 
     rewriteElementIds(pipelineElements, elementIdMappings);
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
new file mode 100644
index 0000000..2147dde
--- /dev/null
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/CompactPipelineResource.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.impl;
+
+import org.apache.streampipes.manager.pipeline.PipelineManager;
+import org.apache.streampipes.manager.pipeline.compact.CompactPipelineManagement;
+import org.apache.streampipes.model.message.Notification;
+import org.apache.streampipes.model.message.NotificationType;
+import org.apache.streampipes.model.message.Notifications;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+import org.apache.streampipes.model.pipeline.compact.CompactPipeline;
+import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.security.AuthConstants;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/api/v2/compact-pipelines")
+public class CompactPipelineResource extends AbstractAuthGuardedRestResource {
+
+  private final CompactPipelineManagement compactPipelineManagement;
+
+  public CompactPipelineResource() {
+    this.compactPipelineManagement = new CompactPipelineManagement(
+        getPipelineElementStorage()
+    );
+  }
+
+  @PostMapping(
+      consumes = {
+          MediaType.APPLICATION_JSON_VALUE,
+          "application/yaml",
+          "application/yml"
+      }
+  )
+  @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
+  public ResponseEntity<?> addPipelineCompact(
+      @RequestBody CompactPipeline compactPipeline
+  ) throws Exception {
+
+    var pipelineGenerationResult = compactPipelineManagement.makePipeline(compactPipeline);
+    if (pipelineGenerationResult.allPipelineElementsValid()) {
+      String pipelineId = PipelineManager.addPipeline(getAuthenticatedUserSid(), pipelineGenerationResult.pipeline());
+      if (compactPipeline.createOptions().start()) {
+        try {
+          PipelineOperationStatus status = PipelineManager.startPipeline(pipelineId);
+          return ok(status);
+        } catch (Exception e) {
+          return statusMessage(Notifications.error(NotificationType.UNKNOWN_ERROR));
+        }
+      }
+      var message = Notifications.success("Pipeline stored");
+      message.addNotification(new Notification("id", pipelineId));
+      return ok(message);
+    } else {
+      return ResponseEntity.status(400).body(pipelineGenerationResult.validationInfos());
+    }
+  }
+
+  @PutMapping(
+      path = "{id}",
+      consumes = {
+          MediaType.APPLICATION_JSON_VALUE,
+          "application/yaml",
+          "application/yml"
+      }
+  )
+  @PreAuthorize(AuthConstants.HAS_WRITE_PIPELINE_PRIVILEGE)
+  public ResponseEntity<?> updatePipelineCompact(
+      @PathVariable("id") String elementId,
+      @RequestBody CompactPipeline compactPipeline
+  ) throws Exception {
+
+    return null;
+  }
+}