blob: a25ff97470839685ab598f746386b7409b743897 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.tensorflow.core.longrunning;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterGroupEmptyException;
import org.apache.ignite.tensorflow.core.ProcessManager;
import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessClearTask;
import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessPingTask;
import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStartTask;
import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessStopTask;
import org.apache.ignite.tensorflow.core.longrunning.task.LongRunningProcessTask;
import org.apache.ignite.tensorflow.core.longrunning.task.util.LongRunningProcessStatus;
/**
* Long running process manager that allows to start, stop and make other actions with long running processes.
*/
public class LongRunningProcessManager implements ProcessManager<LongRunningProcess> {
/** Ignite instance. */
private final Ignite ignite;
/**
* Constructs a new instance of long running process manager.
*
* @param ignite Ignite instance.
*/
public LongRunningProcessManager(Ignite ignite) {
assert ignite != null : "Ignite instance should not be null";
this.ignite = ignite;
}
/** {@inheritDoc} */
@Override public Map<UUID, List<UUID>> start(List<LongRunningProcess> specifications) {
return call(groupByNodeId(specifications), LongRunningProcessStartTask::new, this::rollbackStartTask, false);
}
/** {@inheritDoc} */
@Override public Map<UUID, List<LongRunningProcessStatus>> ping(Map<UUID, List<UUID>> procIds) {
return call(procIds, LongRunningProcessPingTask::new, this::rollbackNothing, false);
}
/** {@inheritDoc} */
@Override public Map<UUID, List<LongRunningProcessStatus>> stop(Map<UUID, List<UUID>> procIds, boolean clear) {
return call(procIds, params -> new LongRunningProcessStopTask(params, clear), this::rollbackNothing, true);
}
/** {@inheritDoc} */
@Override public Map<UUID, List<LongRunningProcessStatus>> clear(Map<UUID, List<UUID>> procIds) {
return call(procIds, LongRunningProcessClearTask::new, this::rollbackNothing, true);
}
/**
* Sends the specified tasks to the cluster to be executed.
*
* @param params Parameters needed to create tasks.
* @param taskSupplier Supplier that defines how to create tasks.
* @param rollback Rollback procedure.
* @param onlyIfNodeExists If node doesn't exist the correspondent task will be ignored.
* @param <T> Type of params needed to create tasks.
* @param <E> Type of returned by task value.
* @return Map of node identifier as a key and list task results as a value.
*/
private <T, E> Map<UUID, List<E>> call(Map<UUID, List<T>> params,
Function<List<T>, LongRunningProcessTask<List<E>>> taskSupplier, Consumer<Map<UUID, List<E>>> rollback,
boolean onlyIfNodeExists) {
Map<UUID, List<E>> res = new HashMap<>();
try {
for (UUID nodeId : params.keySet()) {
List<T> nodeProcesses = params.get(nodeId);
LongRunningProcessTask<List<E>> task = taskSupplier.apply(nodeProcesses);
ClusterGroup clusterGrp = ignite.cluster().forNodeId(nodeId);
try {
List<E> nodeRes = ignite.compute(clusterGrp).call(task);
res.put(nodeId, nodeRes);
}
catch (ClusterGroupEmptyException e) {
if (!onlyIfNodeExists)
throw e;
}
}
}
catch (Exception e) {
// All-or-nothing strategy. In case of exception already processed tasks should be rolled back.
rollback.accept(res);
throw e;
}
return res;
}
/**
* Groups the given process specifications by node identifier.
*
* @param specifications Process specifications.
* @return Map of node identifier as a key and list of process specifications as a value.
*/
private Map<UUID, List<LongRunningProcess>> groupByNodeId(List<LongRunningProcess> specifications) {
Map<UUID, List<LongRunningProcess>> res = new HashMap<>();
for (LongRunningProcess spec : specifications) {
UUID nodeId = spec.getNodeId();
List<LongRunningProcess> nodeSpecs = res.get(nodeId);
if (nodeSpecs == null) {
nodeSpecs = new ArrayList<>();
nodeSpecs.add(spec);
res.put(nodeId, nodeSpecs);
}
else
nodeSpecs.add(spec);
}
return res;
}
/**
* Rolls back start task successfully applied earlier.
*
* @param procIds Process identifiers.
*/
private void rollbackStartTask(Map<UUID, List<UUID>> procIds) {
stop(procIds, true);
}
/**
* Rolls back nothing. Ping, stop and clear tasks cannot be rolled back, so it's the only one available strategy
* for these tasks.
*/
private void rollbackNothing(Map<UUID, List<LongRunningProcessStatus>> processes) {
// Do nothing.
}
}