blob: e0753dfbe4f97ffd696df0ba4cca30ea4766a1b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.azbatch.util.batch;
import com.microsoft.azure.batch.BatchClient;
import com.microsoft.azure.batch.protocol.models.*;
import org.apache.reef.runtime.azbatch.client.AzureBatchJobSubmissionHandler;
import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
import org.apache.reef.runtime.azbatch.util.storage.SharedAccessSignatureCloudBlobClientProvider;
import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A helper class for Azure Batch.
*/
public final class AzureBatchHelper {
private static final Logger LOG = Logger.getLogger(AzureBatchJobSubmissionHandler.class.getName());
/*
* Environment variable that contains the Azure Batch jobId.
*/
private static final String AZ_BATCH_JOB_ID_ENV = "AZ_BATCH_JOB_ID";
private final AzureBatchFileNames azureBatchFileNames;
private final BatchClient client;
private final PoolInformation poolInfo;
@Inject
public AzureBatchHelper(
final AzureBatchFileNames azureBatchFileNames,
final IAzureBatchCredentialProvider credentialProvider,
@Parameter(AzureBatchPoolId.class) final String azureBatchPoolId) {
this.azureBatchFileNames = azureBatchFileNames;
this.client = BatchClient.open(credentialProvider.getCredentials());
this.poolInfo = new PoolInformation().withPoolId(azureBatchPoolId);
}
/**
* Create a job on Azure Batch.
*
* @param applicationId the ID of the application.
* @param storageContainerSAS the publicly accessible uri to the job container.
* @param jobJarUri the publicly accessible uri to the job jar directory.
* @param command the commandline argument to execute the job.
* @throws IOException
*/
public void submitJob(final String applicationId, final String storageContainerSAS, final URI jobJarUri,
final String command) throws IOException {
ResourceFile jarResourceFile = new ResourceFile()
.withBlobSource(jobJarUri.toString())
.withFilePath(AzureBatchFileNames.getTaskJarFileName());
// This setting will signal Batch to generate an access token and pass it to the Job Manager Task (aka the Driver)
// as an environment variable.
// See https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
// for more info.
AuthenticationTokenSettings authenticationTokenSettings = new AuthenticationTokenSettings();
authenticationTokenSettings.withAccess(Collections.singletonList(AccessScope.JOB));
EnvironmentSetting environmentSetting = new EnvironmentSetting()
.withName(SharedAccessSignatureCloudBlobClientProvider.AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV)
.withValue(storageContainerSAS);
JobManagerTask jobManagerTask = new JobManagerTask()
.withRunExclusive(false)
.withId(applicationId)
.withResourceFiles(Collections.singletonList(jarResourceFile))
.withEnvironmentSettings(Collections.singletonList(environmentSetting))
.withAuthenticationTokenSettings(authenticationTokenSettings)
.withCommandLine(command);
LOG.log(Level.INFO, "Job Manager (aka driver) task command: " + command);
JobAddParameter jobAddParameter = new JobAddParameter()
.withId(applicationId)
.withJobManagerTask(jobManagerTask)
.withPoolInfo(poolInfo);
client.jobOperations().createJob(jobAddParameter);
}
/**
* Adds a single task to a job on Azure Batch.
*
* @param jobId the ID of the job.
* @param taskId the ID of the task.
* @param jobJarUri the publicly accessible uri list to the job jar directory.
* @param confUri the publicly accessible uri list to the job configuration directory.
* @param command the commandline argument to execute the job.
* @throws IOException
*/
public void submitTask(final String jobId, final String taskId, final URI jobJarUri,
final URI confUri, final String command)
throws IOException {
final List<ResourceFile> resources = new ArrayList<>();
final ResourceFile jarSourceFile = new ResourceFile()
.withBlobSource(jobJarUri.toString())
.withFilePath(AzureBatchFileNames.getTaskJarFileName());
resources.add(jarSourceFile);
final ResourceFile confSourceFile = new ResourceFile()
.withBlobSource(confUri.toString())
.withFilePath(this.azureBatchFileNames.getEvaluatorShimConfigurationPath());
resources.add(confSourceFile);
LOG.log(Level.INFO, "Evaluator task command: " + command);
final TaskAddParameter taskAddParameter = new TaskAddParameter()
.withId(taskId)
.withResourceFiles(resources)
.withCommandLine(command);
this.client.taskOperations().createTask(jobId, taskAddParameter);
}
/**
* List the tasks of the specified job.
*
* @param jobId the ID of the job.
* @return A list of CloudTask objects.
*/
public List<CloudTask> getTaskStatusForJob(final String jobId) {
List<CloudTask> tasks = null;
try {
tasks = client.taskOperations().listTasks(jobId);
LOG.log(Level.INFO, "Task status for job: {0} returned {1} tasks", new Object[]{jobId, tasks.size()});
} catch (IOException | BatchErrorException ex) {
LOG.log(Level.SEVERE, "Exception when fetching Task status for job: {0}. Exception [{1}]:[2]",
new Object[]{jobId, ex.getMessage(), ex.getStackTrace()});
}
return tasks;
}
/**
* @return the job ID specified in the current system environment.
*/
public String getAzureBatchJobId() {
return System.getenv(AZ_BATCH_JOB_ID_ENV);
}
}