[WIP] improve pipeline executor
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java
index 7e7274b..54f34e8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/pipeline/PipelineElementStatus.java
@@ -25,6 +25,8 @@
private String optionalMessage;
private String operation;
private String elementNode;
+ //TODO: Assess if runningInstanceId is needed separately or if it can be combined with the elementId
+ private String runningInstanceId;
private boolean success;
@@ -85,4 +87,12 @@
public void setElementNode(String elementNode) {
this.elementNode = elementNode;
}
+
+ public String getRunningInstanceId() {
+ return runningInstanceId;
+ }
+
+ public void setRunningInstanceId(String runningInstanceId) {
+ this.runningInstanceId = runningInstanceId;
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
index a83c125..bbcadfb 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/data/PipelineGraphBuilder.java
@@ -28,9 +28,9 @@
public class PipelineGraphBuilder {
- private Pipeline pipeline;
- private List<NamedStreamPipesEntity> allPipelineElements;
- private List<InvocableStreamPipesEntity> invocableElements;
+ private final Pipeline pipeline;
+ private final List<NamedStreamPipesEntity> allPipelineElements;
+ private final List<InvocableStreamPipesEntity> invocableElements;
public PipelineGraphBuilder(Pipeline pipeline) {
this.pipeline = pipeline;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
index dc0fdd9..82272c8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/HttpRequestBuilder.java
@@ -99,10 +99,12 @@
PipelineElementStatus status = new PipelineElementStatus(endpointUrl, payload.getName(), response.isSuccess(),
response.getOptionalMessage());
if(payload instanceof InvocableStreamPipesEntity){
+ status.setRunningInstanceId(((InvocableStreamPipesEntity) payload).getDeploymentRunningInstanceId());
status.setElementNode(((InvocableStreamPipesEntity)payload).getDeploymentTargetNodeId());
status.setOperation(action);
}
else if(payload instanceof SpDataStreamRelayContainer){
+ status.setRunningInstanceId(((SpDataStreamRelayContainer) payload).getRunningStreamRelayInstanceId() + " relay");
status.setElementNode(((SpDataStreamRelayContainer)payload).getDeploymentTargetNodeId());
status.setOperation(action + " relay");
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
deleted file mode 100644
index 01c4f2c..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/AbstractPipelineExecutor.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.data.PipelineGraph;
-import org.apache.streampipes.manager.data.PipelineGraphHelpers;
-import org.apache.streampipes.manager.execution.http.GraphSubmitter;
-
-import org.apache.streampipes.manager.execution.http.StateSubmitter;
-import org.apache.streampipes.manager.util.TemporaryGraphStorage;
-import org.apache.streampipes.model.SpDataSet;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelay;
-import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.model.grounding.EventGrounding;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineElementStatus;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
-import org.apache.streampipes.storage.api.INodeDataStreamRelay;
-import org.apache.streampipes.storage.api.IPipelineStorage;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-import org.apache.streampipes.user.management.encryption.CredentialsManager;
-
-import java.security.GeneralSecurityException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-public abstract class AbstractPipelineExecutor {
-
- protected Pipeline pipeline;
- protected boolean visualize;
- protected boolean storeStatus;
- protected boolean monitor;
-
- public AbstractPipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor) {
- this.pipeline = pipeline;
- this.visualize = visualize;
- this.storeStatus = storeStatus;
- this.monitor = monitor;
- }
-
- // standard methods
- protected void setPipelineStarted(Pipeline pipeline) {
- pipeline.setRunning(true);
- pipeline.setStartedAt(new Date().getTime());
- getPipelineStorageApi().updatePipeline(pipeline);
- }
-
- protected void setPipelineStopped(Pipeline pipeline) {
- pipeline.setRunning(false);
- getPipelineStorageApi().updatePipeline(pipeline);
- }
-
- protected void deleteVisualization(String pipelineId) {
- StorageDispatcher.INSTANCE
- .getNoSqlStore()
- .getVisualizationStorageApi()
- .deleteVisualization(pipelineId);
- }
-
- protected void storeInvocationGraphs(String pipelineId, List<InvocableStreamPipesEntity> graphs,
- List<SpDataSet> dataSets) {
- TemporaryGraphStorage.graphStorage.put(pipelineId, graphs);
- TemporaryGraphStorage.datasetStorage.put(pipelineId, dataSets);
- }
-
- protected void storeDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
- //relays.forEach(StreamPipesClusterManager::persistDataStreamRelay);
- relays.forEach(relay -> getDataStreamRelayApi().addRelayContainer(relay));
- }
-
- protected void deleteDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
- //relays.forEach(StreamPipesClusterManager::deleteDataStreamRelay);
- relays.forEach(relay -> getDataStreamRelayApi().deleteRelayContainer(relay));
- }
-
- protected void updateDataStreamRelayContainer(List<SpDataStreamRelayContainer> relays) {
- //relays.forEach(StreamPipesClusterManager::updateDataStreamRelay);
- relays.forEach(relay -> getDataStreamRelayApi().updateRelayContainer(relay));
- }
-
-
- protected PipelineOperationStatus startPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
- List<SpDataStreamRelayContainer> relays){
- if (graphs.isEmpty()) {
- return initPipelineOperationStatus();
- }
- return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
- graphs, new ArrayList<>(), relays).invokePipelineElementsAndRelays();
- }
-
- protected PipelineOperationStatus stopPipelineElementsAndRelays(List<InvocableStreamPipesEntity> graphs,
- List<SpDataStreamRelayContainer> relays){
- if (graphs.isEmpty()) {
- return initPipelineOperationStatus();
- }
- return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
- graphs, new ArrayList<>(),relays).detachPipelineElementsAndRelays();
- }
-
- protected PipelineOperationStatus startRelays(List<SpDataStreamRelayContainer> relays){
- return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(), new ArrayList<>(), new ArrayList<>(),
- relays).invokeRelaysOnMigration();
- }
-
- protected PipelineOperationStatus stopRelays(List<SpDataStreamRelayContainer> relays){
- return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),new ArrayList<>(), new ArrayList<>(),
- relays).detachRelaysOnMigration();
- }
-
- protected PipelineElementStatus getState(InvocableStreamPipesEntity graph){
- return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, null).getElementState();
- }
-
- protected PipelineElementStatus setState(InvocableStreamPipesEntity graph, String state){
- return new StateSubmitter(pipeline.getPipelineId(), pipeline.getName(), graph, state).setElementState();
- }
-
- protected List<SpDataStreamRelayContainer> findRelaysWhenStopping(List<NamedStreamPipesEntity> predecessors,
- InvocableStreamPipesEntity target){
-
- List<SpDataStreamRelayContainer> relays = new ArrayList<>();
-
- predecessors.forEach(pred -> {
- List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
- SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
-
- if (pred instanceof DataProcessorInvocation){
- //Data Processor
- DataProcessorInvocation graph = (DataProcessorInvocation) pred;
- if (differentDeploymentTargets(pred, target)) {
-
- // TODO only add if no other processor or sink depends on relay
- String predDOMId = pred.getDOM();
- String targetRunningInstanceId = target.getDeploymentRunningInstanceId();
- Optional<DataProcessorInvocation> foundProcessor = pipeline.getSepas().stream()
- .filter(processor -> processor.getConnectedTo().contains(predDOMId))
- .filter(processor -> !processor.getDeploymentRunningInstanceId().equals(targetRunningInstanceId))
- .findAny();
-
- Optional<DataSinkInvocation> foundSink = pipeline.getActions().stream()
- .filter(action -> action.getConnectedTo().contains(predDOMId))
- .findAny();
-
- boolean foundDependencyOnDifferentTarget = false;
- if (foundProcessor.isPresent()) {
- foundDependencyOnDifferentTarget = differentDeploymentTargets(foundProcessor.get(), target);
- }
-
- if (foundSink.isPresent()) {
- foundDependencyOnDifferentTarget = differentDeploymentTargets(foundSink.get(), target);
- }
-
- if (foundDependencyOnDifferentTarget) {
- dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
-
- relayContainer = new SpDataStreamRelayContainer(graph);
- relayContainer.setOutputStreamRelays(dataStreamRelays);
-
- relays.add(relayContainer);
- }
-
- }
- } else if (pred instanceof SpDataStream){
- //DataStream
- SpDataStream stream = (SpDataStream) pred;
- if (differentDeploymentTargets(stream, target)){
-
- String id = extractUniqueAdpaterId(stream.getElementId());
- //There is a relay that needs to be removed
- dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
- .get(getIndex(pred.getDOM(), target))
- .getEventGrounding())));
- String relayStrategy = pipeline.getEventRelayStrategy();
- relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
- }
- }
- });
- return relays;
- }
-
-
- protected List<SpDataStreamRelayContainer> findRelays(List<NamedStreamPipesEntity> predecessors,
- InvocableStreamPipesEntity target){
-
- List<SpDataStreamRelayContainer> relays = new ArrayList<>();
-
- predecessors.forEach(pred -> {
- List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
- SpDataStreamRelayContainer relayContainer = new SpDataStreamRelayContainer();
-
- if (pred instanceof DataProcessorInvocation){
- //Data Processor
- DataProcessorInvocation graph = (DataProcessorInvocation) pred;
- if (differentDeploymentTargets(pred, target)) {
-
- String runningRelayId = ((DataProcessorInvocation) pred).getDeploymentRunningInstanceId();
- Optional<SpDataStreamRelayContainer> existingRelay = getRelayContainerById(runningRelayId);
-
- // only add relay if not existing - prevent from duplicate relays with same topic to same target
- Collection<? extends SpDataStreamRelay> foundRelays = findRelaysWithMatchingTopic(graph, target);
-
- if (!existingRelay.isPresent() || missingRelayToTarget(existingRelay.get(), foundRelays)) {
- dataStreamRelays.addAll(findRelaysWithMatchingTopic(graph, target));
-
- //dsRelayContainer.setRunningStreamRelayInstanceId(pipeline.getPipelineId());
- relayContainer = new SpDataStreamRelayContainer(graph);
- relayContainer.setOutputStreamRelays(dataStreamRelays);
-
- relays.add(relayContainer);
- }
-
- }
- } else if (pred instanceof SpDataStream){
- //DataStream
- SpDataStream stream = (SpDataStream) pred;
- if (differentDeploymentTargets(stream, target)){
-
- String id = extractUniqueAdpaterId(stream.getElementId());
- Optional<SpDataStreamRelayContainer> existingRelay = getRelayContainerById(id);
-
- // only add relay if not existing - prevent from duplicate relays with same topic
- if(!existingRelay.isPresent()) {
- //There is a relay that needs to be removed
- dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(target.getInputStreams()
- .get(getIndex(pred.getDOM(), target))
- .getEventGrounding())));
- String relayStrategy = pipeline.getEventRelayStrategy();
- relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
- } else {
- // generate relays for adapter streams to remote processors
- List<SpDataStreamRelayContainer> generatedRelays =
- generateDataStreamRelays(Collections.singletonList(target));
-
- relays.addAll(generatedRelays);
- }
- }
- }
- });
- return relays;
- }
-
- private boolean missingRelayToTarget(SpDataStreamRelayContainer existingRelayContainer,
- Collection<? extends SpDataStreamRelay> foundRelays) {
-
- List<TransportProtocol> set = foundRelays.stream()
- .map(SpDataStreamRelay::getEventGrounding)
- .map(EventGrounding::getTransportProtocol)
- .collect(Collectors.toList());
-
- List<TransportProtocol> relay = existingRelayContainer.getOutputStreamRelays().stream()
- .map(SpDataStreamRelay::getEventGrounding)
- .map(EventGrounding::getTransportProtocol)
- .collect(Collectors.toList());
-
- boolean missing = true;
- for (TransportProtocol tp: set) {
- for (TransportProtocol r: relay) {
- String targetTopic = tp.getTopicDefinition().getActualTopicName();
- String rTopic = r.getTopicDefinition().getActualTopicName();
- String targetHost = tp.getBrokerHostname();
- String rHost = r.getBrokerHostname();
-
- if (targetHost.equals(rHost) && targetTopic.equals(rTopic)) {
- missing = false;
- }
- }
- }
-
- return missing;
- }
-
- protected List<NamedStreamPipesEntity> getPredecessors(NamedStreamPipesEntity source,
- InvocableStreamPipesEntity target,
- PipelineGraph pipelineGraph,
- List<NamedStreamPipesEntity> foundPredecessors){
-
- Set<InvocableStreamPipesEntity> targets = getTargetsAsSet(source, pipelineGraph,
- InvocableStreamPipesEntity.class);
-
- //TODO: Check if this works for all graph topologies
- if (targets.contains(target)){
- foundPredecessors.add(source);
- } else {
- List<NamedStreamPipesEntity> successors = getTargetsAsList(source, pipelineGraph,
- NamedStreamPipesEntity.class);
-
- if (successors.isEmpty()) return foundPredecessors;
- successors.forEach(successor -> getPredecessors(successor, target, pipelineGraph, foundPredecessors));
- }
- return foundPredecessors;
- }
-
- protected NamedStreamPipesEntity findMatching(NamedStreamPipesEntity entity, PipelineGraph pipelineGraph){
- AtomicReference<NamedStreamPipesEntity> match = new AtomicReference<>();
- List<SpDataStream> dataStreams = PipelineGraphHelpers.findStreams(pipelineGraph);
-
- for (SpDataStream stream : dataStreams) {
- NamedStreamPipesEntity foundEntity = compareGraphs(stream, entity, pipelineGraph, new ArrayList<>());
- if (foundEntity != null) {
- match.set(foundEntity);
- }
- }
- return match.get();
- }
-
- private NamedStreamPipesEntity compareGraphs(NamedStreamPipesEntity source,
- NamedStreamPipesEntity searchedEntity,
- PipelineGraph pipelineGraph,
- List<NamedStreamPipesEntity> successors){
- if(matchingDOM(source, searchedEntity)) {
- return source;
- } else if (successors.isEmpty()) {
- successors = getTargetsAsList(source, pipelineGraph, NamedStreamPipesEntity.class);
- Optional<NamedStreamPipesEntity> successor = successors.stream().findFirst();
- if (successor.isPresent()) {
- successors.remove(successor.get());
- return compareGraphs(successor.get(), searchedEntity, pipelineGraph, successors);
- }
- }
- return null;
- }
-
- protected List<SpDataStreamRelayContainer> generateRelays(List<InvocableStreamPipesEntity> graphs) {
- return generateDataStreamRelays(graphs).stream()
- .filter(r -> r.getOutputStreamRelays().size() > 0)
- .collect(Collectors.toList());
- }
-
- // TODO: when using kafka as edge protocol it generates duplicate event relays -> check with mqtt as edge
- // protocol and fix
- private List<SpDataStreamRelayContainer> generateDataStreamRelays(List<InvocableStreamPipesEntity> graphs) {
- List<SpDataStreamRelayContainer> relays = new ArrayList<>();
-
- for (InvocableStreamPipesEntity graph : graphs) {
- for (SpDataStream stream: pipeline.getStreams()) {
- if (differentDeploymentTargets(stream, graph) && connected(stream, graph)) {
-
- List<SpDataStreamRelay> dataStreamRelays = new ArrayList<>();
- dataStreamRelays.add(new SpDataStreamRelay(new EventGrounding(graph.getInputStreams()
- .get(getIndex(stream.getDOM(), graph))
- .getEventGrounding())));
-
- String id = extractUniqueAdpaterId(stream.getElementId());
- String relayStrategy = pipeline.getEventRelayStrategy();
-
- if (!relayExists(relays, id)) {
- relays.add(new SpDataStreamRelayContainer(id, relayStrategy, stream, dataStreamRelays));
- }
- }
- }
- for (DataProcessorInvocation processor : pipeline.getSepas()) {
- if (differentDeploymentTargets(processor, graph) && connected(processor, graph)) {
- if (!relayExists(relays, processor.getDeploymentRunningInstanceId())) {
-// String previousId = processor.getDeploymentRunningInstanceId();
-// String modifiedId = previousId + "-" + processor.getDeploymentTargetNodeId();
-// processor.setDeploymentRunningInstanceId(modifiedId);
- SpDataStreamRelayContainer processorRelay = new SpDataStreamRelayContainer(processor);
- relays.add(processorRelay);
- }
- }
- }
- }
- return relays;
- }
-
-
- // Helpers
-
- /**
- *
- * @param id
- * @return
- */
- private Optional<SpDataStreamRelayContainer> getRelayContainerById(String id) {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage().getRelayContainerById(id);
- }
-
- /**
- * Check if relay with deploymentRunningInstanceId of predecessor already exists
- *
- * @param relays List of existing relays
- * @param deploymentRunningInstanceId Id to check
- * @return boolean
- */
- private boolean relayExists(List<SpDataStreamRelayContainer> relays,
- String deploymentRunningInstanceId) {
- return relays.stream().anyMatch(r -> r.getRunningStreamRelayInstanceId().equals(deploymentRunningInstanceId));
- }
-
- /**
- * Updates group.id for data processor/sink. Note: KafkaTransportProtocol only!!
- *
- * @param entity data processor/sink
- */
- protected void updateKafkaGroupIds(InvocableStreamPipesEntity entity) {
- entity.getInputStreams()
- .stream()
- .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
- .map(is -> is.getEventGrounding().getTransportProtocol())
- .map(KafkaTransportProtocol.class::cast)
- .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
- }
-
- /**
- * Decrypt potential secrets contained in static properties, e.g., passwords
- *
- * @param graphs List of graphs
- * @return List of decrypted graphs
- */
- protected List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
- List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
- graphs.stream().map(g -> {
- if (g instanceof DataProcessorInvocation) {
- return new DataProcessorInvocation((DataProcessorInvocation) g);
- } else {
- return new DataSinkInvocation((DataSinkInvocation) g);
- }
- }).forEach(g -> {
- g.getStaticProperties()
- .stream()
- .filter(SecretStaticProperty.class::isInstance)
- .forEach(sp -> {
- try {
- String decrypted = CredentialsManager.decrypt(pipeline.getCreatedByUser(),
- ((SecretStaticProperty) sp).getValue());
- ((SecretStaticProperty) sp).setValue(decrypted);
- ((SecretStaticProperty) sp).setEncrypted(false);
- } catch (GeneralSecurityException e) {
- e.printStackTrace();
- }
- });
- decryptedGraphs.add(g);
- });
- return decryptedGraphs;
- }
-
- /**
- * Get pipeline storage dispatcher API
- *
- * @return IPipelineStorage NoSQL storage interface for pipelines
- */
- private IPipelineStorage getPipelineStorageApi() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI();
- }
-
- /**
- * Get data stream relay storage dispatcher API
- *
- * @return INodeDataStreamRelay NoSQL storage interface for data stream relays
- */
- private INodeDataStreamRelay getDataStreamRelayApi() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getNodeDataStreamRelayStorage();
- }
-
- /**
- * Extract topic name
- *
- * @param entity
- * @return
- */
- private String extractActualTopic(NamedStreamPipesEntity entity) {
- if (entity instanceof SpDataStream) {
- return ((SpDataStream) entity)
- .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
- } else if (entity instanceof SpDataStreamRelay) {
- return ((SpDataStreamRelay) entity)
- .getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
- }
- throw new SpRuntimeException("Could not extract actual topic name from entity");
- }
-
- // Edge / Migration Helpers
-
- /**
- * Compare deployment targets of two pipeline elements, namely data stream/processor (source) and data
- * processor/sink (target)
- *
- * @param e1
- * @param e2
- * @return boolean value that returns true if source and target share the same deployment target, else false
- */
- private boolean differentDeploymentTargets(NamedStreamPipesEntity e1, InvocableStreamPipesEntity e2) {
- if (e1 instanceof SpDataStream) {
- return !((SpDataStream) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
- } else if (e1 instanceof DataProcessorInvocation) {
- return !((DataProcessorInvocation) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
- } else if (e1 instanceof DataSinkInvocation) {
- return !((DataSinkInvocation) e1).getDeploymentTargetNodeId().equals(e2.getDeploymentTargetNodeId());
- }
- throw new SpRuntimeException("Matching deployment targets check failed");
- }
-
- /**
- * Find relays with matching topics
- *
- * @param graph data processor
- * @param target data processor/sink
- * @return collection of data stream relays
- */
- private Collection<? extends SpDataStreamRelay> findRelaysWithMatchingTopic(DataProcessorInvocation graph,
- InvocableStreamPipesEntity target) {
- return graph.getOutputStreamRelays().stream().
- filter(relay ->
- target.getInputStreams().stream()
- .map(this::extractActualTopic)
- .collect(Collectors.toSet())
- .contains(extractActualTopic(relay)))
- .collect(Collectors.toList());
- }
-
-
- private <T> Set<T> getTargetsAsSet(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
- Class<T> clazz){
- return pipelineGraph.outgoingEdgesOf(source)
- .stream()
- .map(pipelineGraph::getEdgeTarget)
- .map(clazz::cast)
- .collect(Collectors.toSet());
- }
-
- private <T> List<T> getTargetsAsList(NamedStreamPipesEntity source, PipelineGraph pipelineGraph,
- Class<T> clazz){
- return new ArrayList<>(getTargetsAsSet(source, pipelineGraph, clazz));
- }
-
- /**
- * Compare connection of two pipeline elements, namely data stream/processor (source) and data processor/sink
- * (target) by DOM identifier.
- *
- * @param source data stream or data processor
- * @param target data processor/sink
- * @return boolean value that returns true if source and target are connected, else false
- */
- private boolean connected(NamedStreamPipesEntity source, InvocableStreamPipesEntity target) {
- int index = getIndex(source.getDOM(), target);
- if (index != -1) {
- return target.getConnectedTo().get(index).equals(source.getDOM());
- }
- return false;
- }
-
- /**
- * Get index of data processor/sink connection based on source DOM identifier
- *
- * @param sourceDomId source DOM identifier
- * @param target data processor/sink
- * @return Integer with index of connection, if invalid returns -1.
- */
- private Integer getIndex(String sourceDomId, InvocableStreamPipesEntity target) {
- return target.getConnectedTo().indexOf(sourceDomId);
- }
-
- /**
- * Checks if DOM are equal
- *
- * @param source pipeline element
- * @param target pipeline element
- * @return true if DOM is the same, else false
- */
- private boolean matchingDOM(NamedStreamPipesEntity source, NamedStreamPipesEntity target) {
- return source.getDOM().equals(target.getDOM());
- }
-
- /**
- * Get List of InvocableStreamPipes entities, i.e., data processors/sinks from list of NameStreamPipesEntity
- *
- * @param graphs List<NamedStreamPipesEntity> graphs
- * @return
- */
- private List<InvocableStreamPipesEntity> getListOfInvocableStreamPipesEntity(List<NamedStreamPipesEntity> graphs) {
- List<InvocableStreamPipesEntity> invocableEntities = new ArrayList<>();
- graphs.stream()
- .filter(i -> i instanceof InvocableStreamPipesEntity)
- .forEach(i -> invocableEntities.add((InvocableStreamPipesEntity) i));
- return invocableEntities;
- }
-
- /**
- * Create pipeline operation status with pipeline id and name and set success to true
- *
- * @return PipelineOperationStatus
- */
- protected PipelineOperationStatus initPipelineOperationStatus() {
- PipelineOperationStatus status = new PipelineOperationStatus();
- status.setPipelineId(pipeline.getPipelineId());
- status.setPipelineName(pipeline.getName());
- status.setSuccess(true);
- return status;
- }
-
- private <T> List<T> filter(List<InvocableStreamPipesEntity> graphs, Class<T> clazz) {
- return graphs
- .stream()
- .filter(clazz::isInstance)
- .map(clazz::cast)
- .collect(Collectors.toList());
- }
-
- private String extractUniqueAdpaterId(String s) {
- return s.substring(s.lastIndexOf("/") + 1);
- }
-
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
deleted file mode 100644
index 2258cb9..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationHelpers.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline;
-
-public class PipelineMigrationHelpers {
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
index 3e50d91..948ae4b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutor.java
@@ -17,10 +17,8 @@
*/
package org.apache.streampipes.manager.execution.pipeline.executor;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.LifecycleEntity;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.PipelineExecutionOperation;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.ReconfigurationOperation;
+import org.apache.streampipes.manager.execution.pipeline.executor.steps.EntitiesLifecycleObject;
+import org.apache.streampipes.manager.execution.pipeline.executor.steps.PipelineExecutionStep;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
@@ -48,16 +46,16 @@
private PipelineElementReconfigurationEntity reconfigurationEntity;
- private final LifecycleEntity<InvocableStreamPipesEntity> graphs;
+ private final EntitiesLifecycleObject<InvocableStreamPipesEntity> graphs;
- private final LifecycleEntity<SpDataStreamRelayContainer> relays;
+ private final EntitiesLifecycleObject<SpDataStreamRelayContainer> relays;
- private final LifecycleEntity<SpDataSet> dataSets;
+ private final EntitiesLifecycleObject<SpDataSet> dataSets;
private PipelineOperationStatus status;
- private final LinkedList<PipelineExecutionOperation> operations = new LinkedList<>();
+ private final LinkedList<PipelineExecutionStep> operations = new LinkedList<>();
public PipelineExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus, boolean monitor){
this.pipeline = pipeline;
@@ -66,19 +64,19 @@
this.monitor = monitor;
this.status = StatusUtils.initPipelineOperationStatus(pipeline);
- this.graphs = new LifecycleEntity<>();
- this.relays = new LifecycleEntity<>();
- this.dataSets = new LifecycleEntity<>();
+ this.graphs = new EntitiesLifecycleObject<>();
+ this.relays = new EntitiesLifecycleObject<>();
+ this.dataSets = new EntitiesLifecycleObject<>();
}
public PipelineOperationStatus execute(){
- for(PipelineExecutionOperation pipelineExecutionOperation: this.operations){
- PipelineOperationStatus operationStatus = pipelineExecutionOperation.executeOperation();
+ for(PipelineExecutionStep pipelineExecutionStep : this.operations){
+ PipelineOperationStatus operationStatus = pipelineExecutionStep.executeOperation();
StatusUtils.checkSuccess(operationStatus);
- pipelineExecutionOperation.setStatus(operationStatus);
+ pipelineExecutionStep.setStatus(operationStatus);
StatusUtils.updateStatus(operationStatus, this.status);
if(!operationStatus.isSuccess()){
- rollback(pipelineExecutionOperation);
+ rollback(pipelineExecutionStep);
break;
}
}
@@ -86,11 +84,11 @@
return this.status;
}
- private void rollback(PipelineExecutionOperation failedOperation){
+ private void rollback(PipelineExecutionStep failedOperation){
PipelineOperationStatus rollbackStatus = StatusUtils.initPipelineOperationStatus(pipeline);
for(int currentOperationIndex = this.operations.indexOf(failedOperation);
- currentOperationIndex<=0; currentOperationIndex--){
- PipelineExecutionOperation currentOperation = this.operations.get(currentOperationIndex);
+ currentOperationIndex>=0; currentOperationIndex--){
+ PipelineExecutionStep currentOperation = this.operations.get(currentOperationIndex);
PipelineOperationStatus rollbackOperationStatus = currentOperation.rollbackOperation();
StatusUtils.checkSuccess(rollbackOperationStatus);
StatusUtils.updateStatus(rollbackOperationStatus, rollbackStatus);
@@ -99,18 +97,10 @@
StatusUtils.updateStatus(rollbackStatus, this.status);
}
- public void addOperation(PipelineExecutionOperation operation){
+ public void addStep(PipelineExecutionStep operation){
this.operations.add(operation);
}
- public boolean containsReconfigurationOperation(){
- return this.operations.stream().anyMatch(operation -> operation instanceof ReconfigurationOperation);
- }
-
- public boolean containsMigrationOperation(){
- return this.operations.stream().anyMatch(operation -> operation instanceof MigrationOperation);
- }
-
//Getter and Setter
@@ -178,15 +168,15 @@
this.reconfigurationEntity = reconfigurationEntity;
}
- public LifecycleEntity<InvocableStreamPipesEntity> getGraphs() {
+ public EntitiesLifecycleObject<InvocableStreamPipesEntity> getGraphs() {
return graphs;
}
- public LifecycleEntity<SpDataStreamRelayContainer> getRelays() {
+ public EntitiesLifecycleObject<SpDataStreamRelayContainer> getRelays() {
return relays;
}
- public LifecycleEntity<SpDataSet> getDataSets() {
+ public EntitiesLifecycleObject<SpDataSet> getDataSets() {
return dataSets;
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
index d423cf1..250cde7 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorBuilder.java
@@ -17,8 +17,7 @@
*/
package org.apache.streampipes.manager.execution.pipeline.executor;
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.*;
+import org.apache.streampipes.manager.execution.pipeline.executor.steps.*;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineElementMigrationEntity;
import org.apache.streampipes.model.pipeline.PipelineElementReconfigurationEntity;
@@ -27,10 +26,6 @@
private PipelineExecutor pipelineExecutor;
- private boolean reconfigurationParametersSet = false;
-
- private boolean migrationParametersSet = false;
-
public static PipelineExecutorBuilder getBuilder(){
return new PipelineExecutorBuilder();
}
@@ -46,7 +41,6 @@
PipelineElementMigrationEntity migrationEntity){
pipelineExecutor.setSecondaryPipeline(pipelineBeforeMigration);
pipelineExecutor.setMigrationEntity(migrationEntity);
- this.migrationParametersSet = true;
return this;
}
@@ -54,76 +48,70 @@
PipelineElementReconfigurationEntity reconfigurationEntity){
pipelineExecutor.setSecondaryPipeline(reconfiguredPipeline);
pipelineExecutor.setReconfigurationEntity(reconfigurationEntity);
- this.reconfigurationParametersSet = true;
return this;
}
- public PipelineExecutorBuilder addPrepareMigrationOperation(){
- pipelineExecutor.addOperation(new PrepareMigrationOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addPrepareMigrationStep(){
+ pipelineExecutor.addStep(new PrepareMigrationStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addGetStateOperation(){
- pipelineExecutor.addOperation(new GetStateOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addGetStateStep(){
+ pipelineExecutor.addStep(new GetStateStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStartRelaysOperation(){
- pipelineExecutor.addOperation(new StartRelaysOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStartRelaysStep(){
+ pipelineExecutor.addStep(new StartRelaysStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStartGraphsAndAssociatedRelaysOperation(){
- pipelineExecutor.addOperation(new StartGraphsAndAssociatedRelaysOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStartGraphsAndAssociatedRelaysStep(){
+ pipelineExecutor.addStep(new StartGraphsAndAssociatedRelaysStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStopGraphsAndAssociatedRelaysOperation(){
- pipelineExecutor.addOperation(new StopGraphsAndAssociatedRelaysOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStopGraphsAndAssociatedRelaysStep(){
+ pipelineExecutor.addStep(new StopGraphsAndAssociatedRelaysStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStopRelaysOperation(){
- pipelineExecutor.addOperation(new StopRelaysOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStopRelaysStep(){
+ pipelineExecutor.addStep(new StopRelaysStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStoreMigratedPipelineOperation(){
- pipelineExecutor.addOperation(new StoreMigratedPipelineOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStoreMigratedPipelineStep(){
+ pipelineExecutor.addStep(new StoreMigratedPipelineStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addReconfigureElementOperation(){
- pipelineExecutor.addOperation(new ReconfigureElementOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addReconfigureElementStep(){
+ pipelineExecutor.addStep(new ReconfigureElementStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addPreparePipelineStartOperation(){
- pipelineExecutor.addOperation(new PrepareStartPipelineOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addPreparePipelineStartStep(){
+ pipelineExecutor.addStep(new PrepareStartPipelineStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStartPipelineOperation(){
- pipelineExecutor.addOperation(new StartPipelineOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStartPipelineStep(){
+ pipelineExecutor.addStep(new StartPipelineStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStorePipelineOperation(){
- pipelineExecutor.addOperation(new StorePipelineOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStorePipelineStep(){
+ pipelineExecutor.addStep(new StorePipelineStep(pipelineExecutor));
return this;
}
- public PipelineExecutorBuilder addStopPipelineOperation(){
- pipelineExecutor.addOperation(new StopPipelineOperation(pipelineExecutor));
+ public PipelineExecutorBuilder addStopPipelineStep(){
+ pipelineExecutor.addStep(new StopPipelineStep(pipelineExecutor));
return this;
}
public PipelineExecutor buildPipelineExecutor(){
- //Is this check needed? Only relevant for core development not for users but gives a little more clarity at the
- //cost of introducing some new boolean flags and marker interfaces
- if((this.pipelineExecutor.containsMigrationOperation() && !this.migrationParametersSet)
- || (this.pipelineExecutor.containsReconfigurationOperation() && !this.reconfigurationParametersSet))
- throw new SpRuntimeException("PipelineExecutor can't be build since the required parameters have not been set");
- return this.pipelineExecutor; }
-
+ return this.pipelineExecutor;
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
index 617ff01..e4b042b 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/PipelineExecutorFactory.java
@@ -29,14 +29,14 @@
PipelineExecutorBuilder builder = PipelineExecutorBuilder.getBuilder()
.initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
.setMigrationParameters(pipelineBeforeMigration, migrationEntity)
- .addPrepareMigrationOperation();
+ .addPrepareMigrationStep();
if(migrationEntity.getTargetElement().isStateful())
- builder.addGetStateOperation();
- builder.addStartGraphsAndAssociatedRelaysOperation()
- .addStopRelaysOperation()
- .addStartRelaysOperation()
- .addStopGraphsAndAssociatedRelaysOperation()
- .addStoreMigratedPipelineOperation();
+ builder.addGetStateStep();
+ builder.addStartGraphsAndAssociatedRelaysStep()
+ .addStopRelaysStep()
+ .addStartRelaysStep()
+ .addStopGraphsAndAssociatedRelaysStep()
+ .addStoreMigratedPipelineStep();
return builder.buildPipelineExecutor();
}
@@ -44,23 +44,23 @@
boolean monitor, Pipeline reconfiguredPipeline,
PipelineElementReconfigurationEntity reconfigurationEntity){
return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
- .setReconfigurationParameters(reconfiguredPipeline, reconfigurationEntity).addReconfigureElementOperation()
+ .setReconfigurationParameters(reconfiguredPipeline, reconfigurationEntity).addReconfigureElementStep()
.buildPipelineExecutor();
}
public static PipelineExecutor createInvocationExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
boolean monitor){
return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
- .addPreparePipelineStartOperation()
- .addStartPipelineOperation()
- .addStorePipelineOperation()
+ .addPreparePipelineStartStep()
+ .addStartPipelineStep()
+ .addStorePipelineStep()
.buildPipelineExecutor();
}
public static PipelineExecutor createDetachExecutor(Pipeline pipeline, boolean visualize, boolean storeStatus,
boolean monitor){
return PipelineExecutorBuilder.getBuilder().initializePipelineExecutor(pipeline, visualize, storeStatus, monitor)
- .addStopPipelineOperation().buildPipelineExecutor();
+ .addStopPipelineStep().buildPipelineExecutor();
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/MigrationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/MigrationOperation.java
deleted file mode 100644
index 22c21d1..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/MigrationOperation.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations.types;
-
-public interface MigrationOperation {
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/ReconfigurationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/ReconfigurationOperation.java
deleted file mode 100644
index 13c67a0..0000000
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/types/ReconfigurationOperation.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.manager.execution.pipeline.executor.operations.types;
-
-public interface ReconfigurationOperation {
-}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/EntitiesLifecycleObject.java
similarity index 94%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/EntitiesLifecycleObject.java
index d2c6d8c..5a2ca91 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/LifecycleEntity.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/EntitiesLifecycleObject.java
@@ -15,12 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import java.util.ArrayList;
import java.util.List;
-public class LifecycleEntity<T> {
+public class EntitiesLifecycleObject<T> {
private final List<T> entitiesToStart;
@@ -30,7 +30,7 @@
private final List<T> entitiesToDelete;
- public LifecycleEntity(){
+ public EntitiesLifecycleObject(){
entitiesToStart = new ArrayList<>();
entitiesToStop = new ArrayList<>();
entitiesToStore = new ArrayList<>();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/GetStateStep.java
similarity index 92%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/GetStateStep.java
index e11609b..1fc19f8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/GetStateOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/GetStateStep.java
@@ -15,10 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -29,9 +28,9 @@
import java.io.IOException;
import java.io.ObjectOutputStream;
-public class GetStateOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class GetStateStep extends PipelineExecutionStep {
- public GetStateOperation(PipelineExecutor pipelineExecutor) {
+ public GetStateStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PipelineExecutionStep.java
similarity index 89%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PipelineExecutionStep.java
index e822ec4..098de93 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PipelineExecutionOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PipelineExecutionStep.java
@@ -15,25 +15,26 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-public abstract class PipelineExecutionOperation {
+public abstract class PipelineExecutionStep {
protected final PipelineExecutor pipelineExecutor;
private PipelineOperationStatus status;
- public PipelineExecutionOperation(PipelineExecutor pipelineExecutor){
+ public PipelineExecutionStep(PipelineExecutor pipelineExecutor){
this.pipelineExecutor = pipelineExecutor;
this.status = StatusUtils.initPipelineOperationStatus(this.pipelineExecutor.getPipeline());
}
public abstract PipelineOperationStatus executeOperation();
+ //TODO: Check if partial and full rollback can be unified to single method
public abstract PipelineOperationStatus rollbackOperationPartially();
public abstract PipelineOperationStatus rollbackOperationFully();
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareMigrationStep.java
similarity index 94%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareMigrationStep.java
index 4c9304d..4c45b15 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareMigrationOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareMigrationStep.java
@@ -15,13 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.data.PipelineGraph;
import org.apache.streampipes.manager.data.PipelineGraphBuilder;
import org.apache.streampipes.manager.data.PipelineGraphHelpers;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
@@ -37,13 +36,13 @@
import java.util.List;
import java.util.stream.Collectors;
-public class PrepareMigrationOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class PrepareMigrationStep extends PipelineExecutionStep {
private List<NamedStreamPipesEntity> predecessorsAfterMigration;
private final List<NamedStreamPipesEntity> predecessorsBeforeMigration = new ArrayList<>();
private Pipeline pipeline;
- public PrepareMigrationOperation(PipelineExecutor pipelineExecutor) {
+ public PrepareMigrationStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareStartPipelineStep.java
similarity index 95%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareStartPipelineStep.java
index 264ac19..ca9dc66 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/PrepareStartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/PrepareStartPipelineStep.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
@@ -35,10 +35,10 @@
import java.util.UUID;
import java.util.stream.Collectors;
-public class PrepareStartPipelineOperation extends PipelineExecutionOperation {
+public class PrepareStartPipelineStep extends PipelineExecutionStep {
- public PrepareStartPipelineOperation(PipelineExecutor pipelineExecutor) {
+ public PrepareStartPipelineStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/ReconfigureElementStep.java
similarity index 92%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/ReconfigureElementStep.java
index 2e66272..2dddf83 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/ReconfigureElementOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/ReconfigureElementStep.java
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.http.ReconfigurationSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.ReconfigurationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.pipeline.Pipeline;
@@ -31,9 +30,9 @@
import java.util.Optional;
import java.util.stream.Collectors;
-public class ReconfigureElementOperation extends PipelineExecutionOperation implements ReconfigurationOperation {
+public class ReconfigureElementStep extends PipelineExecutionStep {
- public ReconfigureElementOperation(PipelineExecutor pipelineExecutor) {
+ public ReconfigureElementStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartGraphsAndAssociatedRelaysStep.java
similarity index 91%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartGraphsAndAssociatedRelaysStep.java
index 72d3802..ae976e3 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartGraphsAndAssociatedRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartGraphsAndAssociatedRelaysStep.java
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
import org.apache.streampipes.manager.execution.pipeline.executor.*;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.*;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.eventrelay.SpDataStreamRelayContainer;
@@ -28,9 +27,9 @@
import java.util.List;
import java.util.Set;
-public class StartGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartGraphsAndAssociatedRelaysStep extends PipelineExecutionStep {
- public StartGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor){
+ public StartGraphsAndAssociatedRelaysStep(PipelineExecutor pipelineExecutor){
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartPipelineStep.java
similarity index 95%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartPipelineStep.java
index 720d8c5..19d465d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartPipelineStep.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.http.GraphSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
@@ -33,9 +33,9 @@
import java.util.Set;
-public class StartPipelineOperation extends PipelineExecutionOperation{
+public class StartPipelineStep extends PipelineExecutionStep {
- public StartPipelineOperation(PipelineExecutor pipelineExecutor) {
+ public StartPipelineStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartRelaysStep.java
similarity index 90%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartRelaysStep.java
index 07c6480..07dc88f 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StartRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StartRelaysStep.java
@@ -15,10 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
@@ -29,10 +28,10 @@
import java.util.List;
import java.util.Set;
-public class StartRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StartRelaysStep extends PipelineExecutionStep {
- public StartRelaysOperation(PipelineExecutor pipelineExecutor) {
+ public StartRelaysStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopGraphsAndAssociatedRelaysStep.java
similarity index 91%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopGraphsAndAssociatedRelaysStep.java
index 2e1a0d0..810f374 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopGraphsAndAssociatedRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopGraphsAndAssociatedRelaysStep.java
@@ -15,10 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineElementUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
@@ -31,9 +30,9 @@
import java.util.List;
import java.util.Set;
-public class StopGraphsAndAssociatedRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopGraphsAndAssociatedRelaysStep extends PipelineExecutionStep {
- public StopGraphsAndAssociatedRelaysOperation(PipelineExecutor pipelineExecutor) {
+ public StopGraphsAndAssociatedRelaysStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopPipelineStep.java
similarity index 84%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopPipelineStep.java
index 1f28247..473b039 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopPipelineStep.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.http.GraphSubmitter;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
@@ -33,9 +33,9 @@
import java.util.List;
import java.util.Set;
-public class StopPipelineOperation extends PipelineExecutionOperation {
+public class StopPipelineStep extends PipelineExecutionStep {
- public StopPipelineOperation(PipelineExecutor pipelineExecutor) {
+ public StopPipelineStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
@@ -70,18 +70,16 @@
Set<String> idsToRollback = StatusUtils.extractUniqueSuccessfulIds(this.getStatus());
+ List<InvocableStreamPipesEntity> graphs = TemporaryGraphStorage.graphStorage.get(pipeline.getPipelineId());
+ List<SpDataSet> dataSets = TemporaryGraphStorage.datasetStorage.get(pipeline.getPipelineId());
+ List<SpDataStreamRelayContainer> relays = PipelineElementUtils.generateRelays(graphs, pipeline);
+
List<InvocableStreamPipesEntity> graphsToRollBack =
- PipelineElementUtils.filterPipelineElementsById(
- pipelineExecutor.getGraphs().getEntitiesToStop(),
- idsToRollback);
+ PipelineElementUtils.filterPipelineElementsById(graphs, idsToRollback);
List<SpDataSet> dataSetsToRollBack =
- DataSetUtils.filterDataSetsById(
- pipelineExecutor.getDataSets().getEntitiesToStart(),
- idsToRollback);
+ DataSetUtils.filterDataSetsById(dataSets, idsToRollback);
List<SpDataStreamRelayContainer> relaysToRollBack =
- RelayUtils.filterRelaysById(
- pipelineExecutor.getRelays().getEntitiesToStart(),
- idsToRollback);
+ RelayUtils.filterRelaysById(relays, idsToRollback);
return new GraphSubmitter(pipeline.getPipelineId(), pipeline.getName(),
graphsToRollBack,
@@ -91,7 +89,6 @@
@Override
public PipelineOperationStatus rollbackOperationFully() {
- //TODO: Implement sth?
- return StatusUtils.initPipelineOperationStatus(pipelineExecutor.getPipeline());
+ return rollbackOperationPartially();
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopRelaysStep.java
similarity index 90%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopRelaysStep.java
index dee1933..cc34c52 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StopRelaysOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StopRelaysStep.java
@@ -15,11 +15,10 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.logging.evaluation.EvaluationLogger;
import org.apache.streampipes.manager.execution.pipeline.executor.*;
-import org.apache.streampipes.manager.execution.pipeline.executor.operations.types.MigrationOperation;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.CommunicationUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.RelayUtils;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -29,9 +28,9 @@
import java.util.List;
import java.util.Set;
-public class StopRelaysOperation extends PipelineExecutionOperation implements MigrationOperation {
+public class StopRelaysStep extends PipelineExecutionStep {
- public StopRelaysOperation(PipelineExecutor pipelineExecutor){
+ public StopRelaysStep(PipelineExecutor pipelineExecutor){
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StoreMigratedPipelineStep.java
similarity index 92%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StoreMigratedPipelineStep.java
index 895e38d..a300683 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StoreMigratedPipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StoreMigratedPipelineStep.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.PipelineUtils;
@@ -26,9 +26,9 @@
import java.util.List;
-public class StoreMigratedPipelineOperation extends PipelineExecutionOperation{
+public class StoreMigratedPipelineStep extends PipelineExecutionStep {
- public StoreMigratedPipelineOperation(PipelineExecutor pipelineExecutor) {
+ public StoreMigratedPipelineStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StorePipelineStep.java
similarity index 94%
rename from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
rename to streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StorePipelineStep.java
index 960aec0..98def31 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/operations/StorePipelineOperation.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/steps/StorePipelineStep.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.manager.execution.pipeline.executor.operations;
+package org.apache.streampipes.manager.execution.pipeline.executor.steps;
import org.apache.streampipes.manager.execution.pipeline.executor.PipelineExecutor;
import org.apache.streampipes.manager.execution.pipeline.executor.utils.StatusUtils;
@@ -26,9 +26,9 @@
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-public class StorePipelineOperation extends PipelineExecutionOperation{
+public class StorePipelineStep extends PipelineExecutionStep {
- public StorePipelineOperation(PipelineExecutor pipelineExecutor) {
+ public StorePipelineStep(PipelineExecutor pipelineExecutor) {
super(pipelineExecutor);
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
index ac0b210..7d8fae8 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/RelayUtils.java
@@ -237,7 +237,7 @@
public static List<SpDataStreamRelayContainer> filterRelaysById(List<SpDataStreamRelayContainer> relays,
Set<String> relayIds) {
return relays.stream().
- filter(relay -> relayIds.contains(relay.getRunningStreamRelayInstanceId()))
+ filter(relay -> relayIds.contains(relay.getRunningStreamRelayInstanceId() + " relay"))
.collect(Collectors.toList());
}
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
index c4d8a4f..381bebf 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/executor/utils/StatusUtils.java
@@ -50,7 +50,7 @@
public static Set<String> extractUniqueSuccessfulIds(PipelineOperationStatus status) {
return status.getElementStatus().stream()
.filter(PipelineElementStatus::isSuccess)
- .map(PipelineElementStatus::getElementId)
+ .map(PipelineElementStatus::getRunningInstanceId)
.collect(Collectors.toSet());
}
diff --git a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
index eb34d39..46af116 100644
--- a/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
+++ b/ui/src/app/editor/dialog/migrate-pipeline-processors/migrate-pipeline-processors.component.ts
@@ -170,26 +170,29 @@
modifyPipelineElementsDeployments(pipelineElements) {
pipelineElements.forEach(p => {
- let selectedTargetNodeId = p.deploymentTargetNodeId
+ const selectedTargetNodeId = p.deploymentTargetNodeId
// Currently relay only for data processors
if (p instanceof DataProcessorInvocation) {
p.eventRelayStrategy = this.selectedRelayStrategyVal;
}
- if(selectedTargetNodeId != "default") {
- let selectedNode = this.edgeNodes
- .filter(node => node.nodeControllerId === selectedTargetNodeId)
+ if (selectedTargetNodeId !== 'default') {
+ const selectedNode = this.edgeNodes
+ .filter(node => node.nodeControllerId === selectedTargetNodeId);
p.deploymentTargetNodeHostname = selectedNode
- .map(node => node.hostname)[0]
+ .map(node => node.hostname)[0];
p.deploymentTargetNodePort = selectedNode
- .map(node => node.port)[0]
- }
- else {
- p.deploymentTargetNodeHostname = null
- p.deploymentTargetNodePort = null
+ .map(node => node.port)[0];
+
+ p.elementEndpointHostname = selectedNode
+ .map(node => node.hostname)[0];
+ } else {
+ p.deploymentTargetNodeHostname = null;
+ p.deploymentTargetNodePort = null;
+ p.elementEndpointHostname = null;
}
})
}