blob: 72f655e369ea6000ba587fdac28550a03da2b423 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.hdinsight.client;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import org.apache.reef.runtime.hdinsight.client.yarnrest.*;
import org.apache.reef.tang.Configuration;
import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.net.URI;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Handles job submission to a HDInsight instance.
*/
@ClientSide
@Private
public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler {
private static final Logger LOG = Logger.getLogger(HDInsightJobSubmissionHandler.class.getName());
private final AzureUploader uploader;
private final JobJarMaker jobJarMaker;
private final HDInsightInstance hdInsightInstance;
private final REEFFileNames filenames;
private final ClasspathProvider classpath;
private final DriverConfigurationProvider driverConfigurationProvider;
private String applicationId;
@Inject
HDInsightJobSubmissionHandler(final AzureUploader uploader,
final JobJarMaker jobJarMaker,
final HDInsightInstance hdInsightInstance,
final REEFFileNames filenames,
final ClasspathProvider classpath,
final DriverConfigurationProvider driverConfigurationProvider) {
this.uploader = uploader;
this.jobJarMaker = jobJarMaker;
this.hdInsightInstance = hdInsightInstance;
this.filenames = filenames;
this.classpath = classpath;
this.driverConfigurationProvider = driverConfigurationProvider;
}
@Override
public void close() {
LOG.log(Level.WARNING, ".close() is inconsequential with the HDInsight runtime");
}
@Override
public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
try {
LOG.log(Level.FINE, "Requesting Application ID from HDInsight.");
final String appId = this.hdInsightInstance.getApplicationID().getApplicationId();
LOG.log(Level.INFO, "Submitting application {0} to YARN.", appId);
LOG.log(Level.FINE, "Creating a job folder on Azure.");
final URI jobFolderURL = this.uploader.createJobFolder(appId);
LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, appId, jobFolderURL);
LOG.log(Level.FINE, "Making Job JAR.");
final File jobSubmissionJarFile =
this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration);
LOG.log(Level.FINE, "Uploading Job JAR to Azure.");
final LocalResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile);
LOG.log(Level.FINE, "Assembling application submission.");
final String command = getCommandString(jobSubmissionEvent);
final ApplicationSubmission applicationSubmission = new ApplicationSubmission()
.setApplicationId(appId)
.setApplicationName(jobSubmissionEvent.getIdentifier())
.setResource(getResource(jobSubmissionEvent))
.setAmContainerSpec(new AmContainerSpec()
.addLocalResource(this.filenames.getREEFFolderName(), uploadedFile)
.setCommand(command));
this.hdInsightInstance.submitApplication(applicationSubmission);
this.applicationId = appId;
LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", appId);
} catch (final IOException ex) {
LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);
throw new RuntimeException(ex);
}
}
/**
* Get the RM application ID.
* Return null if the application has not been submitted yet, or was submitted unsuccessfully.
* @return string application ID or null if no app has been submitted yet.
*/
@Override
public String getApplicationId() {
return this.applicationId;
}
/**
* Extracts the resource demands from the jobSubmissionEvent.
*/
private Resource getResource(
final JobSubmissionEvent jobSubmissionEvent) {
return new Resource()
.setMemory(jobSubmissionEvent.getDriverMemory().get())
.setvCores(jobSubmissionEvent.getDriverCPUCores().get());
}
/**
* Assembles the command to execute the Driver.
*/
private String getCommandString(
final JobSubmissionEvent jobSubmissionEvent) {
return StringUtils.join(getCommandList(jobSubmissionEvent), ' ');
}
/**
* Assembles the command to execute the Driver in list form.
*/
private List<String> getCommandList(
final JobSubmissionEvent jobSubmissionEvent) {
return new JavaLaunchCommandBuilder()
.setJavaPath("%JAVA_HOME%/bin/java")
.setConfigurationFilePaths(Collections.singletonList(this.filenames.getDriverConfigurationPath()))
.setClassPath(this.classpath.getDriverClasspath())
.setMemory(jobSubmissionEvent.getDriverMemory().get())
.setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
.setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
.build();
}
private Configuration makeDriverConfiguration(
final JobSubmissionEvent jobSubmissionEvent,
final String appId,
final URI jobFolderURL) throws IOException {
return this.driverConfigurationProvider.getDriverConfiguration(
jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, jobSubmissionEvent.getConfiguration());
}
}