| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.reef.runtime.common.driver.evaluator; |
| |
| import org.apache.reef.annotations.audience.DriverSide; |
| import org.apache.reef.annotations.audience.Private; |
| import org.apache.reef.driver.parameters.DriverIdentifier; |
| import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.apache.reef.util.Optional; |
| import org.apache.reef.tang.util.MonotonicSet; |
| import org.apache.reef.util.SingletonAsserter; |
| |
| import javax.inject.Inject; |
| import java.util.*; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * Manages all Evaluators. |
| * See EvaluatorManager for the Driver side representation of a single Evaluator. |
| */ |
| @DriverSide |
| @Private |
| public final class Evaluators implements AutoCloseable { |
| |
| private static final Logger LOG = Logger.getLogger(Evaluators.class.getName()); |
| |
| /** |
| * A map between evaluatorId and the EvaluatorManager that handles this evaluator. |
| */ |
| private final Map<String, EvaluatorManager> evaluators = new HashMap<>(); |
| |
| /** |
| * A set of evaluatorIds for "closed" (failed and returned) evaluators. |
| */ |
| private final MonotonicSet<String> closedEvaluatorIds = new MonotonicSet<>(); |
| |
| @Inject |
| private Evaluators(@Parameter(DriverIdentifier.class) final String driverId) { |
| LOG.log(Level.FINE, "Instantiated 'Evaluators' for driver {0}", driverId); |
| // There can be several instances of the class for multiple REEFEnvironments. |
| // It is still a singleton when REEF Driver owns the entire JVM. |
| assert SingletonAsserter.assertSingleton(driverId, Evaluators.class); |
| } |
| |
| /** |
| * Closes all EvaluatorManager instances managed. |
| */ |
| @Override |
| public void close() { |
| |
| LOG.log(Level.FINER, "Closing the evaluators - begin"); |
| |
| final List<EvaluatorManager> evaluatorsCopy; |
| synchronized (this) { |
| evaluatorsCopy = new ArrayList<>(this.evaluators.values()); |
| } |
| |
| for (final EvaluatorManager evaluatorManager : evaluatorsCopy) { |
| if (!evaluatorManager.isClosedOrClosing()) { |
| LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId()); |
| evaluatorManager.close(); |
| } |
| } |
| |
| LOG.log(Level.FINER, "Closing the evaluators - end"); |
| } |
| |
| /** |
| * Return true if <em>all</em> evaluators are in closed state |
| * (and their processing queues are empty). |
| */ |
| public synchronized boolean allEvaluatorsAreClosed() { |
| synchronized (this.evaluators) { |
| for (final EvaluatorManager eval : this.evaluators.values()) { |
| if (!eval.isClosed()) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * @param evaluatorId |
| * @return the EvaluatorManager for the given id, if one exists. |
| */ |
| public synchronized Optional<EvaluatorManager> get(final String evaluatorId) { |
| return Optional.ofNullable(this.evaluators.get(evaluatorId)); |
| } |
| |
| /** |
| * @param evaluatorId |
| * @return true if evaluator with this id has already been closed. |
| */ |
| public synchronized boolean wasClosed(final String evaluatorId) { |
| return this.closedEvaluatorIds.contains(evaluatorId); |
| } |
| |
| /** |
| * Create new EvaluatorManager and add it to the collection. |
| * <p> |
| * FIXME: This method is a temporary fix for the race condition |
| * described in issues #828 and #839. |
| * |
| * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects. |
| * @param evaluatorMsg Resource allocation message that contains data on the new evaluator. |
| * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. |
| */ |
| public synchronized void put( |
| final EvaluatorManagerFactory evaluatorManagerFactory, |
| final ResourceAllocationEvent evaluatorMsg) { |
| this.put(evaluatorManagerFactory.getNewEvaluatorManagerForNewEvaluator(evaluatorMsg)); |
| } |
| |
| /** |
| * Adds an EvaluatorManager. |
| * |
| * @param evaluatorManager |
| * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known. |
| */ |
| public synchronized void put(final EvaluatorManager evaluatorManager) { |
| final String evaluatorId = evaluatorManager.getId(); |
| if (this.wasClosed(evaluatorId)) { |
| throw new IllegalArgumentException( |
| "Trying to re-add an Evaluator that has already been closed: " + evaluatorId); |
| } |
| final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager); |
| LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev}); |
| if (prev != null) { |
| throw new IllegalArgumentException( |
| "Trying to re-add an Evaluator that is already known: " + evaluatorId); |
| } |
| } |
| |
| /** |
| * Moves evaluator from map of active evaluators to set of closed evaluators. |
| */ |
| public synchronized void removeClosedEvaluator(final EvaluatorManager evaluatorManager) { |
| |
| final String evaluatorId = evaluatorManager.getId(); |
| LOG.log(Level.FINE, "Removing closed evaluator: {0}", evaluatorId); |
| |
| if (!evaluatorManager.isClosed()) { |
| throw new IllegalArgumentException("Removing evaluator that has not been closed yet: " + evaluatorId); |
| } |
| |
| if (!this.evaluators.containsKey(evaluatorId) && !this.closedEvaluatorIds.contains(evaluatorId)) { |
| throw new IllegalArgumentException("Removing unknown evaluator: " + evaluatorId); |
| } |
| |
| if (!this.evaluators.containsKey(evaluatorId) && this.closedEvaluatorIds.contains(evaluatorId)) { |
| LOG.log(Level.FINE, "Removing closed evaluator which has already been removed: {0}", evaluatorId); |
| return; |
| } |
| |
| evaluatorManager.shutdown(); |
| this.evaluators.remove(evaluatorId); |
| this.closedEvaluatorIds.add(evaluatorId); |
| |
| LOG.log(Level.FINEST, "Closed evaluator removed: {0}", evaluatorId); |
| } |
| } |