blob: 441925033e90693b0719507d7165e7fd05c5068e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.impl.task.staging;
import org.apache.airavata.agents.api.AgentAdaptor;
import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
import org.apache.airavata.helix.impl.task.TaskContext;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.task.DataStagingTaskModel;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.helix.task.TaskResult;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@TaskDef(name = "Output Data Staging Task")
public class OutputDataStagingTask extends DataStagingTask {
private final static Logger logger = LoggerFactory.getLogger(OutputDataStagingTask.class);
private final static CountMonitor outputDSTaskCounter = new CountMonitor("output_ds_task_counter");
@Override
public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
logger.info("Starting output data staging task " + getTaskId() + " in experiment " + getExperimentId());
outputDSTaskCounter.inc();
saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
try {
// Get and validate data staging task model
DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
// Fetch and validate input data type from data staging task model
OutputDataObjectType processOutput = dataStagingTaskModel.getProcessOutput();
if (processOutput != null && processOutput.getValue() == null) {
String message = "expId: " + getExperimentId() + ", processId: " + getProcessId() + ", taskId: " + getTaskId() +
":- Couldn't stage file " + processOutput.getName() + " , file name shouldn't be null. ";
logger.error(message);
if (processOutput.isIsRequired()) {
message += "File name is null, but this output's isRequired bit is not set";
} else {
message += "File name is null";
}
throw new TaskOnFailException(message, true, null);
}
// Fetch and validate storage resource
StorageResourceDescription storageResource = getStorageResource();
// Fetch and validate source and destination URLS
URI sourceURI;
URI destinationURI;
String sourceFileName;
try {
sourceURI = new URI(dataStagingTaskModel.getSource());
sourceFileName = sourceURI.getPath().substring(sourceURI.getPath().lastIndexOf(File.separator) + 1,
sourceURI.getPath().length());
if (dataStagingTaskModel.getDestination().startsWith("dummy")) {
String inputPath = getTaskContext().getStorageFileSystemRootLocation();
String destFilePath = buildDestinationFilePath(inputPath, sourceFileName);
destinationURI = new URI("file", getTaskContext().getStorageResourceLoginUserName(),
storageResource.getHostName(), 22, destFilePath, null, null);
} else {
destinationURI = new URI(dataStagingTaskModel.getDestination());
}
if (logger.isDebugEnabled()) {
logger.debug("Source file " + sourceURI.getPath() + ", destination uri " + destinationURI.getPath() + " for task " + getTaskId());
}
} catch (URISyntaxException e) {
throw new TaskOnFailException("Failed to obtain source URI for output data staging task " + getTaskId(), true, e);
}
// Fetch and validate storage adaptor
StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
// Fetch and validate compute resource adaptor
AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
List<URI> destinationURIs = new ArrayList<URI>();
if (sourceFileName.contains("*")) {
// if file is declared as a wild card
logger.info("Handling output files with " + sourceFileName + " extension for task " + getTaskId());
String destParentPath = (new File(destinationURI.getPath())).getParentFile().getPath();
String sourceParentPath = (new File(sourceURI.getPath())).getParentFile().getPath();
logger.debug("Destination parent path " + destParentPath + ", source parent path " + sourceParentPath);
List<String> filePaths;
try {
filePaths = adaptor.getFileNameFromExtension(sourceFileName, sourceParentPath);
if (logger.isTraceEnabled()) {
filePaths.forEach(fileName -> logger.trace("File found : " + fileName));
}
} catch (AgentException e) {
throw new TaskOnFailException("Failed to fetch the file list from extension " + sourceFileName, false, e);
}
for (String subFilePath : filePaths) {
if (subFilePath == null || "".equals(subFilePath)) {
logger.warn("Ignoring file transfer as filename is empty or null");
continue;
}
sourceFileName = subFilePath;
if (destParentPath.endsWith(File.separator)) {
destinationURI = new URI(destParentPath + subFilePath);
} else {
destinationURI = new URI(destParentPath + File.separator + subFilePath);
}
URI newSourceURI = new URI((sourceParentPath.endsWith(File.separator) ?
sourceParentPath : sourceParentPath + File.separator) + sourceFileName);
//Wildcard support is only enabled for output data staging
assert processOutput != null;
logger.info("Transferring file " + sourceFileName);
boolean transferred = transferFileToStorage(newSourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
if (transferred) {
destinationURIs.add(destinationURI);
} else {
logger.warn("File " + sourceFileName + " did not transfer");
}
if (processOutput.getType() == DataType.URI) {
if (filePaths.size() > 1) {
logger.warn("More than one file matched wildcard, but output type is URI. Skipping remaining matches: " + filePaths.subList(1, filePaths.size()));
}
break;
}
}
if (!destinationURIs.isEmpty()) {
if (processOutput.getType() == DataType.URI) {
saveExperimentOutput(processOutput.getName(), destinationURIs.get(0).toString());
} else if (processOutput.getType() == DataType.URI_COLLECTION) {
saveExperimentOutputCollection(processOutput.getName(), destinationURIs.stream().map(URI::toString).collect(Collectors.toList()));
}
}
return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
} else {
// Uploading output file to the storage resource
assert processOutput != null;
boolean transferred = transferFileToStorage(sourceURI.getPath(), destinationURI.getPath(), sourceFileName, adaptor, storageResourceAdaptor);
if (transferred) {
saveExperimentOutput(processOutput.getName(), destinationURI.toString());
} else {
logger.warn("File " + sourceFileName + " did not transfer");
}
return onSuccess("Output data staging task " + getTaskId() + " successfully completed");
}
} catch (TaskOnFailException e) {
if (e.getError() != null) {
logger.error(e.getReason(), e.getError());
} else {
logger.error(e.getReason());
}
return onFail(e.getReason(), e.isCritical(), e.getError());
} catch (Exception e) {
logger.error("Unknown error while executing output data staging task " + getTaskId(), e);
return onFail("Unknown error while executing output data staging task " + getTaskId(), false, e);
}
}
@Override
public void onCancel(TaskContext taskContext) {
}
}