| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.reef.runtime.azbatch.client; |
| |
| import org.apache.reef.annotations.audience.Private; |
| import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames; |
| import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper; |
| import org.apache.reef.runtime.azbatch.util.storage.AzureStorageClient; |
| import org.apache.reef.runtime.azbatch.util.command.CommandBuilder; |
| import org.apache.reef.runtime.common.client.DriverConfigurationProvider; |
| import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; |
| import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; |
| import org.apache.reef.runtime.common.files.JobJarMaker; |
| import org.apache.reef.tang.Configuration; |
| |
| import javax.inject.Inject; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.*; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| /** |
| * A {@link JobSubmissionHandler} implementation for Azure Batch runtime. |
| */ |
| @Private |
| public final class AzureBatchJobSubmissionHandler implements JobSubmissionHandler { |
| |
| private static final Logger LOG = Logger.getLogger(AzureBatchJobSubmissionHandler.class.getName()); |
| |
| /** |
| * Maximum number of characters allowed in Azure Batch job name. This limit is imposed by Azure Batch. |
| */ |
| private static final int MAX_CHARS_JOB_NAME = 64; |
| |
| private String applicationId; |
| |
| private final AzureStorageClient azureStorageClient; |
| private final DriverConfigurationProvider driverConfigurationProvider; |
| private final JobJarMaker jobJarMaker; |
| private final CommandBuilder launchCommandBuilder; |
| private final AzureBatchFileNames azureBatchFileNames; |
| private final AzureBatchHelper azureBatchHelper; |
| |
| @Inject |
| AzureBatchJobSubmissionHandler( |
| final AzureStorageClient azureStorageClient, |
| final DriverConfigurationProvider driverConfigurationProvider, |
| final JobJarMaker jobJarMaker, |
| final CommandBuilder launchCommandBuilder, |
| final AzureBatchFileNames azureBatchFileNames, |
| final AzureBatchHelper azureBatchHelper) { |
| this.azureStorageClient = azureStorageClient; |
| this.driverConfigurationProvider = driverConfigurationProvider; |
| this.jobJarMaker = jobJarMaker; |
| this.launchCommandBuilder = launchCommandBuilder; |
| this.azureBatchHelper = azureBatchHelper; |
| this.azureBatchFileNames = azureBatchFileNames; |
| } |
| |
| /** |
| * Returns REEF application id (which corresponds to Azure Batch job id) or null if the application hasn't been |
| * submitted yet. |
| * |
| * @return REEF application id. |
| */ |
| @Override |
| public String getApplicationId() { |
| return this.applicationId; |
| } |
| |
| /** |
| * Closes the resources. |
| * |
| * @throws Exception |
| */ |
| @Override |
| public void close() throws Exception { |
| LOG.log(Level.INFO, "Closing {0}", AzureBatchJobSubmissionHandler.class.getName()); |
| } |
| |
| /** |
| * Invoked when JobSubmissionEvent is triggered. |
| * |
| * @param jobSubmissionEvent triggered job submission event. |
| */ |
| @Override |
| public void onNext(final JobSubmissionEvent jobSubmissionEvent) { |
| LOG.log(Level.FINEST, "Submitting job: {0}", jobSubmissionEvent); |
| |
| try { |
| this.applicationId = createApplicationId(jobSubmissionEvent); |
| final String folderName = this.azureBatchFileNames.getStorageJobFolder(this.applicationId); |
| |
| LOG.log(Level.FINE, "Creating a job folder on Azure at: {0}.", folderName); |
| final URI jobFolderURL = this.azureStorageClient.getJobSubmissionFolderUri(folderName); |
| |
| LOG.log(Level.FINE, "Getting a shared access signature for {0}.", folderName); |
| final String storageContainerSAS = this.azureStorageClient.createContainerSharedAccessSignature(); |
| |
| LOG.log(Level.FINE, "Assembling Configuration for the Driver."); |
| final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, this.applicationId, |
| jobFolderURL); |
| |
| LOG.log(Level.FINE, "Making Job JAR."); |
| final File jobSubmissionJarFile = |
| this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); |
| |
| LOG.log(Level.FINE, "Uploading Job JAR to Azure."); |
| final URI jobJarSasUri = this.azureStorageClient.uploadFile(folderName, jobSubmissionJarFile); |
| |
| LOG.log(Level.FINE, "Assembling application submission."); |
| final String command = this.launchCommandBuilder.buildDriverCommand(jobSubmissionEvent); |
| |
| this.azureBatchHelper.submitJob(getApplicationId(), storageContainerSAS, jobJarSasUri, command); |
| |
| } catch (final IOException e) { |
| LOG.log(Level.SEVERE, "Error submitting Azure Batch request: {0}", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private Configuration makeDriverConfiguration( |
| final JobSubmissionEvent jobSubmissionEvent, |
| final String appId, |
| final URI jobFolderURL) { |
| return this.driverConfigurationProvider.getDriverConfiguration( |
| jobFolderURL, jobSubmissionEvent.getRemoteId(), appId, jobSubmissionEvent.getConfiguration()); |
| } |
| |
| private String createApplicationId(final JobSubmissionEvent jobSubmissionEvent) { |
| String uuid = UUID.randomUUID().toString(); |
| String jobIdentifier = jobSubmissionEvent.getIdentifier(); |
| String jobNameShort = jobIdentifier.length() + 1 + uuid.length() < MAX_CHARS_JOB_NAME ? |
| jobIdentifier : jobIdentifier.substring(0, MAX_CHARS_JOB_NAME - uuid.length() - 1); |
| return jobNameShort + "-" + uuid; |
| } |
| } |