blob: c28e979eb2814476c354213fd55315154b9c5033 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.yarn.driver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.io.TempFileCreator;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.JARFileMaker;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Manages the global files on the driver and can produce the needed FileResources for container submissions.
*/
@DriverSide
final class EvaluatorSetupHelper {
private static final Logger LOG = Logger.getLogger(EvaluatorSetupHelper.class.getName());
private final REEFFileNames fileNames;
private final ConfigurationSerializer configurationSerializer;
private final TempFileCreator tempFileCreator;
private final UploaderToJobFolder uploader;
private final GlobalJarUploader globalJarUploader;
private final boolean deleteTempFiles;
@Inject
EvaluatorSetupHelper(
final REEFFileNames fileNames,
final ConfigurationSerializer configurationSerializer,
final TempFileCreator tempFileCreator,
@Parameter(DeleteTempFiles.class) final boolean deleteTempFiles,
final UploaderToJobFolder uploader,
final GlobalJarUploader globalJarUploader) throws IOException {
this.tempFileCreator = tempFileCreator;
this.deleteTempFiles = deleteTempFiles;
this.globalJarUploader = globalJarUploader;
this.fileNames = fileNames;
this.configurationSerializer = configurationSerializer;
this.uploader = uploader;
}
/**
* @return the map to be used in formulating the evaluator launch submission.
*/
Map<String, LocalResource> getGlobalResources() {
try {
return this.globalJarUploader.call();
} catch (final IOException e) {
throw new RuntimeException("Unable to upload the global JAR file to the job folder.", e);
}
}
/**
* Sets up the LocalResources for a new Evaluator.
*
* @param resourceLaunchEvent
* @return
* @throws IOException
*/
Map<String, LocalResource> getResources(
final ResourceLaunchEvent resourceLaunchEvent)
throws IOException {
final Map<String, LocalResource> result = new HashMap<>();
result.putAll(getGlobalResources());
final File localStagingFolder = this.tempFileCreator.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix());
// Write the configuration
final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName());
this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchEvent), configurationFile);
// Copy files to the staging folder
JobJarMaker.copy(resourceLaunchEvent.getFileSet(), localStagingFolder);
// Make a JAR file out of it
final File localFile = tempFileCreator.createTempFile(
this.fileNames.getEvaluatorFolderPrefix(), this.fileNames.getJarFileSuffix());
new JARFileMaker(localFile).addChildren(localStagingFolder).close();
// Upload the JAR to the job folder
final Path pathToEvaluatorJar = this.uploader.uploadToJobFolder(localFile);
result.put(this.fileNames.getLocalFolderPath(), this.uploader.makeLocalResourceForJarFile(pathToEvaluatorJar));
if (this.deleteTempFiles) {
LOG.log(Level.FINE, "Marking [{0}] for deletion at the exit of this JVM and deleting [{1}]",
new Object[]{localFile.getAbsolutePath(), localStagingFolder.getAbsolutePath()});
localFile.deleteOnExit();
if (!localStagingFolder.delete()) {
LOG.log(Level.WARNING, "Failed to delete [{0}]", localStagingFolder.getAbsolutePath());
}
} else {
LOG.log(Level.FINE, "The evaluator staging folder will be kept at [{0}], the JAR at [{1}]",
new Object[]{localFile.getAbsolutePath(), localStagingFolder.getAbsolutePath()});
}
return result;
}
/**
* Assembles the configuration for an Evaluator.
*
* @param resourceLaunchEvent
* @return
* @throws IOException
*/
private Configuration makeEvaluatorConfiguration(final ResourceLaunchEvent resourceLaunchEvent)
throws IOException {
return Tang.Factory.getTang()
.newConfigurationBuilder(resourceLaunchEvent.getEvaluatorConf())
.build();
}
}