blob: 789c0da749f9ae809b13dd4c59e19069857c682e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.evaluator.EvaluatorProcess;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.FileResource;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
import org.apache.reef.util.logging.LoggingScope;
import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
/**
* A resource manager that uses threads to execute containers.
*/
@Private
@DriverSide
public final class ResourceManager {
private static final Logger LOG = Logger.getLogger(ResourceManager.class.getName());
private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
private final EventHandler<ResourceAllocationEvent> allocationHandler;
private final ContainerManager theContainers;
private final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler;
private final ConfigurationSerializer configurationSerializer;
private final RemoteManager remoteManager;
private final REEFFileNames fileNames;
private final double jvmHeapFactor;
private final LoggingScopeFactory loggingScopeFactory;
@Inject
ResourceManager(
final ContainerManager containerManager,
@Parameter(RuntimeParameters.ResourceAllocationHandler.class)
final EventHandler<ResourceAllocationEvent> allocationHandler,
@Parameter(RuntimeParameters.RuntimeStatusHandler.class)
final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler,
@Parameter(JVMHeapSlack.class) final double jvmHeapSlack,
final ConfigurationSerializer configurationSerializer,
final RemoteManager remoteManager,
final REEFFileNames fileNames,
final LoggingScopeFactory loggingScopeFactory) {
this.theContainers = containerManager;
this.allocationHandler = allocationHandler;
this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler;
this.configurationSerializer = configurationSerializer;
this.remoteManager = remoteManager;
this.fileNames = fileNames;
this.jvmHeapFactor = 1.0 - jvmHeapSlack;
this.loggingScopeFactory = loggingScopeFactory;
LOG.log(Level.FINE, "Instantiated 'ResourceManager'");
}
/**
* Extracts the files out of the launchRequest.
*
* @param launchRequest the ResourceLaunchProto to parse
* @return a list of files set in the given ResourceLaunchProto
*/
private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) {
final List<File> files = new ArrayList<>(); // Libraries local to this evaluator
for (final FileResource frp : launchRequest.getFileSet()) {
files.add(new File(frp.getPath()).getAbsoluteFile());
}
return files;
}
/**
* Receives a resource request.
* <p>
* If the request can be met, it will also be satisfied immediately.
*
* @param resourceRequest the resource request to be handled.
*/
void onResourceRequest(final ResourceRequestEvent resourceRequest) {
synchronized (this.theContainers) {
this.requestQueue.add(new ResourceRequest(resourceRequest));
this.checkRequestQueue();
}
}
/**
* Receives and processes a resource release request.
*
* @param releaseRequest the release request to be processed
*/
void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) {
synchronized (this.theContainers) {
LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
this.theContainers.release(releaseRequest.getIdentifier());
this.checkRequestQueue();
}
}
/**
* Called when the ReefRunnableProcessObserver detects that the Evaluator process has exited.
*
* @param evaluatorId the ID of the Evaluator that exited.
*/
public void onEvaluatorExit(final String evaluatorId) {
synchronized (this.theContainers) {
this.theContainers.release(evaluatorId);
this.checkRequestQueue();
}
}
/**
* Processes a resource launch request.
*
* @param launchRequest the launch request to be processed.
*/
void onResourceLaunchRequest(
final ResourceLaunchEvent launchRequest) {
synchronized (this.theContainers) {
final Container c = this.theContainers.get(launchRequest.getIdentifier());
try (LoggingScope lb = this.loggingScopeFactory
.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile")) {
// Add the global files and libraries.
c.addGlobalFiles(this.fileNames.getGlobalFolder());
c.addLocalFiles(getLocalFiles(launchRequest));
// Make the configuration file of the evaluator.
final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
try {
this.configurationSerializer.toFile(launchRequest.getEvaluatorConf(), evaluatorConfigurationFile);
} catch (final IOException | BindException e) {
throw new RuntimeException("Unable to write configuration.", e);
}
}
try (LoggingScope lc = this.loggingScopeFactory
.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) {
final List<String> command = getLaunchCommand(launchRequest, c.getMemory());
LOG.log(Level.FINEST, "Launching container: {0}", c);
c.run(command);
}
}
}
private List<String> getLaunchCommand(final ResourceLaunchEvent launchRequest,
final int containerMemory) {
final EvaluatorProcess process = launchRequest.getProcess()
.setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath());
if (process.isOptionSet()) {
return process.getCommandLine();
} else {
return process
.setMemory((int) (this.jvmHeapFactor * containerMemory))
.getCommandLine();
}
}
/**
/**
* Checks the allocation queue for new allocations and if there are any
* satisfies them.
*/
private void checkRequestQueue() {
if (requestQueue.hasOutStandingRequests()) {
final ResourceRequest resourceRequest = requestQueue.head();
final ResourceRequestEvent requestEvent = resourceRequest.getRequestProto();
final Optional<Container> cont = theContainers.allocateContainer(requestEvent);
if (cont.isPresent()) {
// Container has been allocated
requestQueue.satisfyOne();
final Container container = cont.get();
// Tell the receivers about it
final ResourceAllocationEvent alloc = ResourceEventImpl.newAllocationBuilder()
.setIdentifier(container.getContainerID()).setNodeId(container.getNodeID())
.setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores())
.setRackName(container.getRackName()).setRuntimeName(RuntimeIdentifier.RUNTIME_NAME).build();
LOG.log(Level.FINEST, "Allocating container: {0}", container);
this.allocationHandler.onNext(alloc);
// update REEF
this.sendRuntimeStatus();
// Check whether we can satisfy another one.
this.checkRequestQueue();
} else {
// could not allocate, update REEF
this.sendRuntimeStatus();
}
} else {
// done
this.sendRuntimeStatus();
}
}
private void sendRuntimeStatus() {
final RuntimeStatusEventImpl.Builder builder =
RuntimeStatusEventImpl.newBuilder()
.setName("LOCAL")
.setState(State.RUNNING)
.setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests());
for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) {
builder.addContainerAllocation(containerAllocation);
}
final RuntimeStatusEvent msg = builder.build();
LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
new Object[]{msg.getContainerAllocationList().size(), msg.getOutstandingContainerRequests()});
this.runtimeStatusHandlerEventHandler.onNext(msg);
}
}