blob: 141bb2ac40a4541d9171dec0c842f19efc4517dd [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.impl.task.staging;
import org.apache.airavata.agents.api.*;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
@TaskDef(name = "Archival Task")
public class ArchiveTask extends DataStagingTask {
private final static Logger logger = LoggerFactory.getLogger(ArchiveTask.class);
private final static long MAX_ARCHIVE_SIZE = 1024L * 1024L * 1024L * 20L; // 20GB
private final static CountMonitor archiveTaskCounter = new CountMonitor("archive_task_counter");
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
logger.info("Starting archival task " + getTaskId() + " in experiment " + getExperimentId());
archiveTaskCounter.inc();
saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
try {
// Get and validate data staging task model
DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
// Fetch and validate source and destination URLS
URI sourceURI;
String tarDirPath;
String tarCreationAbsPath;
final String archiveFileName = "archive.tar";
String destFilePath;
try {
sourceURI = new URI(dataStagingTaskModel.getSource());
if (sourceURI.getPath().endsWith("/")) {
tarDirPath = sourceURI.getPath().substring(0, sourceURI.getPath().length() - 1);
} else {
tarDirPath = sourceURI.getPath();
}
String inputPath = getTaskContext().getStorageFileSystemRootLocation();
destFilePath = buildDestinationFilePath(inputPath, archiveFileName);
tarCreationAbsPath = tarDirPath + File.separator + archiveFileName;
} catch (URISyntaxException e) {
throw new TaskOnFailException("Failed to obtain source URI for archival staging task " + getTaskId(), true, e);
}
// Fetch and validate storage adaptor
StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
// Fetch and validate compute resource adaptor
AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
// Creating the tar file in the output path of the compute resource
String tarringCommand = "cd " + tarDirPath + " && tar -cvf " + tarCreationAbsPath + " ./* ";
logger.info("Running tar creation command " + tarringCommand);
try {
CommandOutput tarCommandOutput = adaptor.executeCommand(tarringCommand, null);
if (tarCommandOutput.getExitCode() != 0) {
throw new TaskOnFailException("Failed while running the tar command " + tarringCommand + ". Sout : " +
tarCommandOutput.getStdOut() + ". Serr " + tarCommandOutput.getStdError(), false, null);
}
} catch (AgentException e) {
throw new TaskOnFailException("Failed while running the tar command " + tarringCommand, true, null);
}
try {
FileMetadata fileMetadata = adaptor.getFileMetadata(tarCreationAbsPath);
long maxArchiveSize = Long.parseLong(ServerSettings.getSetting("max.archive.size", MAX_ARCHIVE_SIZE + ""));
if (fileMetadata.getSize() < maxArchiveSize) {
boolean fileTransferred = transferFileToStorage(tarCreationAbsPath, destFilePath, archiveFileName, adaptor, storageResourceAdaptor);
if (!fileTransferred) {
logger.error("Failed to transfer created archive file " + tarCreationAbsPath);
throw new TaskOnFailException("Failed to transfer created archive file " + tarCreationAbsPath, false, null);
}
String destParent = destFilePath.substring(0, destFilePath.lastIndexOf("/"));
final String storageArchiveDir = "ARCHIVE";
String unArchiveTarCommand = "mkdir -p " + storageArchiveDir + " && tar -xvf " + archiveFileName + " -C "
+ storageArchiveDir + " && rm " + archiveFileName + " && chmod 755 -f -R " + storageArchiveDir + "/*";
logger.info("Running Un archiving command on storage resource " + unArchiveTarCommand);
try {
CommandOutput unTarCommandOutput = storageResourceAdaptor.executeCommand(unArchiveTarCommand, destParent);
if (unTarCommandOutput.getExitCode() != 0) {
throw new TaskOnFailException("Failed while running the untar command " + unTarCommandOutput + ". Sout : " +
unTarCommandOutput.getStdOut() + ". Serr " + unTarCommandOutput.getStdError(), false, null);
}
} catch (AgentException e) {
throw new TaskOnFailException("Failed while running the untar command " + tarringCommand, false, null);
}
return onSuccess("Archival task successfully completed");
} else {
logger.error("Archive size {} MB is larger than the maximum allowed size {} MB. So skipping the transfer.",
fileMetadata.getSize() / (1024L * 1024L), maxArchiveSize / (1024L * 1024L));
// This is not a recoverable issue. So mark it as critical
throw new TaskOnFailException("Archive task was skipped as size is " + fileMetadata.getSize() / (1024L * 1024L) + " MB", true, null);
}
} finally {
String deleteTarCommand = "rm " + tarCreationAbsPath;
logger.info("Running delete temporary tar command " + deleteTarCommand);
try {
CommandOutput rmCommandOutput = adaptor.executeCommand(deleteTarCommand, null);
if (rmCommandOutput.getExitCode() != 0) {
logger.error("Failed while running the rm command " + deleteTarCommand + ". Sout : " +
rmCommandOutput.getStdOut() + ". Serr " + rmCommandOutput.getStdError());
}
} catch (AgentException e) {
logger.error("Failed while running the rm command " + tarringCommand, e);
}
}
} catch (TaskOnFailException e) {
if (e.getError() != null) {
logger.error(e.getReason(), e.getError());
} else {
logger.error(e.getReason());
}
return onFail(e.getReason(), e.isCritical(), e.getError());
} catch (Exception e) {
logger.error("Unknown error while executing archiving staging task " + getTaskId(), e);
return onFail("Unknown error while executing archiving staging task " + getTaskId(), false, e);
}
}
@Override
public void onCancel(TaskContext taskContext) {
}
}