blob: 9eeef0970ec62d5b9cde3bb193cf5b185c29ae8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.common.client;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.FailedRuntime;
import org.apache.reef.client.parameters.ResourceManagerErrorHandler;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.RemoteMessage;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@ClientSide
@Private
final class RunningJobsImpl implements RunningJobs {
private static final Logger LOG = Logger.getLogger(RunningJobsImpl.class.getName());
private final Map<String, RunningJobImpl> jobs = new HashMap<>();
private final Injector injector;
private final InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler;
@Inject
RunningJobsImpl(final Injector injector,
@Parameter(ResourceManagerErrorHandler.class)
final InjectionFuture<EventHandler<FailedRuntime>> failedRuntimeEventHandler) {
this.injector = injector;
this.failedRuntimeEventHandler = failedRuntimeEventHandler;
LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
}
@Override
public synchronized void closeAllJobs() {
for (final RunningJobImpl runningJob : this.jobs.values()) {
LOG.log(Level.WARNING, "Force close job {0}", runningJob.getId());
runningJob.close();
}
}
@Override
public synchronized void onJobStatusMessage(final RemoteMessage<ReefServiceProtos.JobStatusProto> message) {
final ReefServiceProtos.JobStatusProto status = message.getMessage();
final String jobIdentifier = status.getIdentifier();
LOG.log(Level.FINE, "Processing message from Job: " + jobIdentifier);
if (status.getState() == ReefServiceProtos.State.INIT) {
try {
final RunningJobImpl runningJob =
this.newRunningJob(status.getIdentifier(), message.getIdentifier().toString());
this.put(runningJob);
} catch (final BindException | InjectionException configError) {
throw new RuntimeException("Configuration error for: " + status, configError);
}
}
this.get(jobIdentifier).onNext(status);
if (status.getState() != ReefServiceProtos.State.RUNNING &&
status.getState() != ReefServiceProtos.State.INIT) {
this.remove(status.getIdentifier());
}
LOG.log(Level.FINE, "Done processing message from Job " + jobIdentifier);
}
@Override
public synchronized void onRuntimeErrorMessage(
final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeFailure) {
try {
this.remove(runtimeFailure.getMessage().getIdentifier());
} finally {
this.failedRuntimeEventHandler.get().onNext(new FailedRuntime(runtimeFailure.getMessage()));
}
}
/**
* A guarded get() that throws an exception if the RunningJob isn't known.
*
* @param jobIdentifier
* @return
*/
private synchronized RunningJobImpl get(final String jobIdentifier) {
final RunningJobImpl result = this.jobs.get(jobIdentifier);
if (null == result) {
throw new RuntimeException("Trying to get a RunningJob that is unknown: " + jobIdentifier);
}
return result;
}
/**
* A guarded remove() that throws an exception if no RunningJob is known for this id.
*
* @param jobIdentifier
*/
private synchronized void remove(final String jobIdentifier) {
final RunningJobImpl result = this.jobs.remove(jobIdentifier);
if (null == result) {
throw new RuntimeException("Trying to remove a RunningJob that is unknown: " + jobIdentifier);
}
}
private synchronized void put(final RunningJobImpl runningJob) {
final String jobIdentifier = runningJob.getId();
if (this.jobs.containsKey(jobIdentifier)) {
throw new IllegalStateException("Trying to re-add a job that is already known: " + jobIdentifier);
}
LOG.log(Level.FINE, "Adding Job with ID: " + jobIdentifier);
this.jobs.put(jobIdentifier, runningJob);
}
/**
* @param jobIdentifier
* @param remoteIdentifier
* @return
* @throws BindException
* @throws InjectionException
*/
private synchronized RunningJobImpl newRunningJob(final String jobIdentifier, final String remoteIdentifier)
throws BindException, InjectionException {
final Injector child = this.injector.forkInjector();
child.bindVolatileParameter(REEFImplementation.DriverRemoteIdentifier.class, remoteIdentifier);
child.bindVolatileParameter(DriverIdentifier.class, jobIdentifier);
return child.getInstance(RunningJobImpl.class);
}
}