blob: 09cfb5742780cf3e85e0edd21c2b5f9d88cd71db [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.airavata.helix.impl.task.parsing;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerResponse;
import com.github.dockerjava.api.command.LogContainerCmd;
import com.github.dockerjava.api.model.Bind;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.command.LogContainerResultCallback;
import com.github.dockerjava.core.command.PullImageResultCallback;
import com.github.dockerjava.core.command.WaitContainerResultCallback;
import org.apache.airavata.agents.api.AgentException;
import org.apache.airavata.agents.api.StorageResourceAdaptor;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.client.CredentialStoreClientFactory;
import org.apache.airavata.credential.store.cpi.CredentialStoreService;
import org.apache.airavata.credential.store.exception.CredentialStoreException;
import org.apache.airavata.helix.core.AbstractTask;
import org.apache.airavata.helix.impl.task.TaskOnFailException;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInput;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskInputs;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskOutput;
import org.apache.airavata.helix.impl.task.parsing.models.ParsingTaskOutputs;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.airavata.helix.task.api.annotation.TaskParam;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
import org.apache.airavata.model.appcatalog.parser.Parser;
import org.apache.airavata.model.appcatalog.parser.ParserInput;
import org.apache.airavata.model.appcatalog.parser.ParserOutput;
import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
import org.apache.airavata.model.data.replica.DataProductModel;
import org.apache.airavata.model.data.replica.DataProductType;
import org.apache.airavata.model.data.replica.DataReplicaLocationModel;
import org.apache.airavata.model.data.replica.ReplicaLocationCategory;
import org.apache.airavata.model.data.replica.ReplicaPersistentType;
import org.apache.airavata.patform.monitoring.CountMonitor;
import org.apache.airavata.registry.api.RegistryService;
import org.apache.airavata.registry.api.client.RegistryServiceClientFactory;
import org.apache.airavata.registry.api.exception.RegistryServiceException;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskResult;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Implementation of the data parsing task.
*
* @since 1.0.0-SNAPSHOT
*/
@TaskDef(name = "Data Parsing Task")
public class DataParsingTask extends AbstractTask {
private final static Logger logger = LoggerFactory.getLogger(DataParsingTask.class);
private final static CountMonitor parsingTaskCounter = new CountMonitor("parsing_task_counter");
@TaskParam(name = "Parser Id")
private String parserId;
@TaskParam(name = "Parser inputs")
private ParsingTaskInputs parsingTaskInputs;
@TaskParam(name = "Parser outputs")
private ParsingTaskOutputs parsingTaskOutputs;
@TaskParam(name = "Gateway ID")
private String gatewayId;
@TaskParam(name = "Group Resource Profile Id")
private String groupResourceProfileId;
@TaskParam(name = "Local data dir")
private String localDataDir;
@Override
public TaskResult onRun(TaskHelper helper) {
logger.info("Starting data parsing task " + getTaskId());
parsingTaskCounter.inc();
try {
Parser parser = getRegistryServiceClient().getParser(parserId, gatewayId);
String containerId = getTaskId() + "_PARSER_"+ parser.getId();
containerId = containerId.replace(" ", "-");
String localInputDir = createLocalInputDir(containerId);
String localOutDir= createLocalOutputDir(containerId);
logger.info("Created local input and ouput directories : " + localInputDir + ", " + localOutDir);
logger.info("Downloading input files to local input directory");
Map<String, String> properties = new HashMap<>();
for (ParserInput parserInput : parser.getInputFiles()) {
Optional<ParsingTaskInput> filteredInputOptional = parsingTaskInputs.getInputs().stream().filter(inp -> parserInput.getId().equals(inp.getId())).findFirst();
if (filteredInputOptional.isPresent()) {
ParsingTaskInput parsingTaskInput = filteredInputOptional.get();
String inputVal = parsingTaskInput.getValue() != null ? parsingTaskInput.getValue() : getContextVariable(parsingTaskInput.getContextVariableName());
if ("PROPERTY".equals(parsingTaskInput.getType())) {
properties.put(parsingTaskInput.getName(), inputVal);
} else if ("FILE".equals(parsingTaskInput.getType())) {
String inputDataProductUri = inputVal;
if (inputDataProductUri == null || inputDataProductUri.isEmpty()) {
logger.error("Data product uri could not be null or empty for input " + parsingTaskInput.getId()
+ " with name " + parserInput.getName());
throw new TaskOnFailException("Data product uri could not be null or empty for input "
+ parsingTaskInput.getId() + " with name " + parserInput.getName(), true, null);
}
DataProductModel inputDataProduct = getRegistryServiceClient().getDataProduct(inputDataProductUri);
List<DataReplicaLocationModel> replicaLocations = inputDataProduct.getReplicaLocations();
boolean downloadPassed = false;
for (DataReplicaLocationModel replicaLocationModel : replicaLocations) {
String storageResourceId = replicaLocationModel.getStorageResourceId();
String remoteFilePath = new URI(replicaLocationModel.getFilePath()).getPath();
String localFilePath = localInputDir + (localInputDir.endsWith(File.separator) ? "" : File.separator)
+ parserInput.getName();
downloadPassed = downloadFileFromStorageResource(storageResourceId, remoteFilePath, localFilePath, helper.getAdaptorSupport());
if (downloadPassed) {
break;
}
}
if (!downloadPassed) {
logger.error("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri);
throw new TaskOnFailException("Failed to download input file with id " + parserInput.getId() + " from data product uri " + inputDataProductUri, true, null);
}
}
} else {
if (parserInput.isRequiredInput()) {
logger.error("Parser input with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available");
throw new TaskOnFailException("Parser input with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available", true, null);
} else {
logger.warn("Parser input with id with id " + parserInput.getId() + " and name " + parserInput.getName() + " is not available. But it is not required");
}
}
}
logger.info("Running container with id " + containerId + " local input dir " + localInputDir + " local output dir " + localOutDir);
runContainer(parser, containerId, localInputDir, localOutDir, properties);
for (ParserOutput parserOutput : parser.getOutputFiles()) {
Optional<ParsingTaskOutput> filteredOutputOptional = parsingTaskOutputs.getOutputs()
.stream().filter(out -> parserOutput.getId().equals(out.getId())).findFirst();
if (filteredOutputOptional.isPresent()) {
ParsingTaskOutput parsingTaskOutput = filteredOutputOptional.get();
String localFilePath = localOutDir + (localOutDir.endsWith(File.separator) ? "" : File.separator) + parserOutput.getName();
String remoteFilePath = "parsers" + File.separator + getTaskId() + File.separator + "outputs" + File.separator + parserOutput.getName();
if (new File(localFilePath).exists()) {
uploadFileToStorageResource(parsingTaskOutput, remoteFilePath, localFilePath, helper.getAdaptorSupport());
} else if (parserOutput.isRequiredOutput()) {
logger.error("Expected output file " + localFilePath + " can not be found");
throw new TaskOnFailException("Expected output file " + localFilePath + " can not be found", false, null);
} else {
logger.error("Expected output file " + localFilePath + " can not be found but skipping as it is not mandatory");
}
} else {
if (parserOutput.isRequiredOutput()) {
logger.error("File upload info with id " + parserOutput.getId() + " and name " + parserOutput.getName() + " is not available");
throw new TaskOnFailException("File upload info with id " + parserOutput.getId() + " and name " + parserOutput.getName() + " is not available", true, null);
} else {
logger.warn("File upload info with id " + parserOutput.getId() + " and name " + parserOutput.getName() + " is not available. But it is not required");
}
}
}
return onSuccess("Successfully completed data parsing task " + getTaskId());
} catch (TaskOnFailException e) {
if (e.getError() != null) {
logger.error(e.getReason(), e.getError());
} else {
logger.error(e.getReason());
}
return onFail(e.getReason(), e.isCritical());
} catch (Exception e) {
logger.error("Unknown error occurred in " + getTaskId(), e);
return onFail("Unknown error occurred in " + getTaskId(), true);
}
}
@Override
public void onCancel() {
}
private void runContainer(Parser parser, String containerId, String localInputDir, String localOutputDir, Map<String, String> properties) throws ApplicationSettingsException {
DefaultDockerClientConfig.Builder config = DefaultDockerClientConfig.createDefaultConfigBuilder();
DockerClient dockerClient = DockerClientBuilder.getInstance(config).build();
logger.info("Pulling image " + parser.getImageName());
dockerClient.pullImageCmd(parser.getImageName().split(":")[0])
.withTag(parser.getImageName().split(":")[1])
.exec(new PullImageResultCallback()).awaitSuccess();
logger.info("Successfully pulled image " + parser.getImageName());
String commands[] = parser.getExecutionCommand().split(" ");
CreateContainerResponse containerResponse = dockerClient.createContainerCmd(parser.getImageName()).withCmd(commands).withName(containerId)
.withBinds(Bind.parse(localInputDir + ":" + parser.getInputDirPath()),
Bind.parse(localOutputDir + ":" + parser.getOutputDirPath()))
.withTty(true)
.withAttachStdin(true)
.withAttachStdout(true).withEnv(properties.entrySet()
.stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.toList()))
.exec();
logger.info("Created the container with id " + containerResponse.getId());
final StringBuilder dockerLogs = new StringBuilder();
if (containerResponse.getWarnings() != null && containerResponse.getWarnings().length > 0) {
StringBuilder warningStr = new StringBuilder();
for (String w : containerResponse.getWarnings()) {
warningStr.append(w).append(",");
}
logger.warn("Container " + containerResponse.getId() + " warnings : " + warningStr);
} else {
logger.info("Starting container with id " + containerResponse.getId());
dockerClient.startContainerCmd(containerResponse.getId()).exec();
LogContainerCmd logContainerCmd = dockerClient.logContainerCmd(containerResponse.getId()).withStdOut(true).withStdErr(true);
try {
logContainerCmd.exec(new LogContainerResultCallback() {
@Override
public void onNext(Frame item) {
dockerLogs.append(item.toString());
dockerLogs.append("\n");
}
}).awaitCompletion();
} catch (InterruptedException e) {
logger.error("Interrupted while reading container log" + e.getMessage());
}
logger.info("Waiting for the container to stop");
Integer statusCode = dockerClient.waitContainerCmd(containerResponse.getId()).exec(new WaitContainerResultCallback()).awaitStatusCode();
logger.info("Container " + containerResponse.getId() + " exited with status code " + statusCode);
logger.info("Container logs " + dockerLogs.toString());
}
if (ServerSettings.isSettingDefined("data.parser.delete.container") &&
Boolean.parseBoolean(ServerSettings.getSetting("data.parser.delete.container"))) {
dockerClient.removeContainerCmd(containerResponse.getId()).exec();
logger.info("Successfully removed container with id " + containerResponse.getId());
}
}
private StorageResourceAdaptor getStorageResourceAdaptor(String storageResourceId, AdaptorSupport adaptorSupport) throws TaskOnFailException, TException, AgentException {
StoragePreference gatewayStoragePreference = getRegistryServiceClient().getGatewayStoragePreference(gatewayId, storageResourceId);
GatewayResourceProfile gatewayResourceProfile = getRegistryServiceClient().getGatewayResourceProfile(gatewayId);
String token = gatewayStoragePreference.getResourceSpecificCredentialStoreToken();
if (token == null || token.isEmpty()) {
token = gatewayResourceProfile.getCredentialStoreToken();
}
if (gatewayStoragePreference == null) {
logger.error("Could not find a gateway storage preference for storage " + storageResourceId + " gateway id " + gatewayId);
throw new TaskOnFailException("Could not find a gateway storage preference for storage " + storageResourceId + " gateway id " + gatewayId, false, null);
}
logger.info("Fetching adaptor for storage resource " + storageResourceId + " with token " + token);
return adaptorSupport.fetchStorageAdaptor(gatewayId, storageResourceId, DataMovementProtocol.SCP, token,
gatewayStoragePreference.getLoginUserName());
}
private boolean downloadFileFromStorageResource(String storageResourceId, String remoteFilePath, String localFilePath, AdaptorSupport adaptorSupport) {
logger.info("Downloading from storage resource " + storageResourceId + " from path " + remoteFilePath + " to local path " +
localFilePath);
try {
StorageResourceAdaptor storageResourceAdaptor = getStorageResourceAdaptor(storageResourceId, adaptorSupport);
storageResourceAdaptor.downloadFile(remoteFilePath, localFilePath);
return true;
} catch (Exception e) {
logger.error("Failed to download file from storage " + storageResourceId + " in path " + remoteFilePath + " to local path " + localFilePath, e);
return false;
}
}
private void uploadFileToStorageResource(ParsingTaskOutput parsingTaskOutput, String remoteFilePath, String localFilePath, AdaptorSupport adaptorSupport) throws TaskOnFailException {
logger.info("Uploading from local path " + localFilePath + " to remote path " + remoteFilePath + " of storage resource " + parsingTaskOutput.getStorageResourceId());
try {
StoragePreference gatewayStoragePreference = getRegistryServiceClient().getGatewayStoragePreference(gatewayId, parsingTaskOutput.getStorageResourceId());
StorageResourceDescription storageResource = getRegistryServiceClient().getStorageResource(parsingTaskOutput.getStorageResourceId());
String remoteFileRoot = gatewayStoragePreference.getFileSystemRootLocation();
remoteFilePath = remoteFileRoot + (remoteFileRoot.endsWith(File.separator) ? "" : File.separator) + remoteFilePath;
StorageResourceAdaptor storageResourceAdaptor = getStorageResourceAdaptor(parsingTaskOutput.getStorageResourceId(), adaptorSupport);
storageResourceAdaptor.createDirectory(new File(remoteFilePath).getParent(), true);
storageResourceAdaptor.uploadFile(localFilePath, remoteFilePath);
logger.info("Uploading completed. Registering data product for path " + remoteFilePath);
DataProductModel dataProductModel = new DataProductModel();
dataProductModel.setGatewayId(getGatewayId());
dataProductModel.setOwnerName("ParserTask");
dataProductModel.setProductName(parsingTaskOutput.getId());
dataProductModel.setDataProductType(DataProductType.FILE);
DataReplicaLocationModel replicaLocationModel = new DataReplicaLocationModel();
replicaLocationModel.setStorageResourceId(parsingTaskOutput.getStorageResourceId());
replicaLocationModel.setReplicaName("Parsing task output " + parsingTaskOutput.getId());
replicaLocationModel.setReplicaLocationCategory(ReplicaLocationCategory.GATEWAY_DATA_STORE);
replicaLocationModel.setReplicaPersistentType(ReplicaPersistentType.TRANSIENT);
URI destinationURI = new URI("file", gatewayStoragePreference.getLoginUserName(),
storageResource.getHostName(), 22, remoteFilePath, null, null);
replicaLocationModel.setFilePath(destinationURI.toString());
dataProductModel.addToReplicaLocations(replicaLocationModel);
String productUri = getRegistryServiceClient().registerDataProduct(dataProductModel);
logger.info("Data product is " + productUri + " for path " + remoteFilePath);
setContextVariable(parsingTaskOutput.getContextVariableName(), productUri);
} catch (Exception e) {
logger.error("Failed to upload from local path " + localFilePath + " to remote path " + remoteFilePath +
" of storage resource " + parsingTaskOutput.getStorageResourceId(), e);
throw new TaskOnFailException("Failed to upload from local path " + localFilePath + " to remote path " + remoteFilePath +
" of storage resource " + parsingTaskOutput.getStorageResourceId(), false, e);
}
}
private String createLocalInputDir(String containerName) throws TaskOnFailException {
String localInpDir = (localDataDir.endsWith(File.separator) ? localDataDir : localDataDir + File.separator) +
"parsers" + File.separator + containerName + File.separator + "data" + File.separator + "input" + File.separator;
try {
FileUtils.forceMkdir(new File(localInpDir));
return localInpDir;
} catch (IOException e) {
throw new TaskOnFailException("Failed to build input directories " + localInpDir, true, e);
}
}
private String createLocalOutputDir(String containerName) throws TaskOnFailException {
String localOutDir = (localDataDir.endsWith(File.separator) ? localDataDir : localDataDir + File.separator) +
"parsers" + File.separator + containerName + File.separator + "data" + File.separator + "output" + File.separator;
try {
FileUtils.forceMkdir(new File(localOutDir));
return localOutDir;
} catch (IOException e) {
throw new TaskOnFailException("Failed to build output directories " + localOutDir, true, e);
}
}
private static RegistryService.Client getRegistryServiceClient() throws TaskOnFailException {
try {
final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
final String serverHost = ServerSettings.getRegistryServerHost();
return RegistryServiceClientFactory.createRegistryClient(serverHost, serverPort);
} catch (RegistryServiceException |ApplicationSettingsException e) {
throw new TaskOnFailException("Unable to create registry client...", false, e);
}
}
private static CredentialStoreService.Client getCredentialServiceClient() throws TaskOnFailException {
try {
final int serverPort = Integer.parseInt(ServerSettings.getRegistryServerPort());
final String serverHost = ServerSettings.getRegistryServerHost();
return CredentialStoreClientFactory.createAiravataCSClient(serverHost, serverPort);
} catch (CredentialStoreException |ApplicationSettingsException e) {
throw new TaskOnFailException("Unable to create credential client...", false, e);
}
}
public String getParserId() {
return parserId;
}
public void setParserId(String parserId) {
this.parserId = parserId;
}
public ParsingTaskInputs getParsingTaskInputs() {
return parsingTaskInputs;
}
public void setParsingTaskInputs(ParsingTaskInputs parsingTaskInputs) {
this.parsingTaskInputs = parsingTaskInputs;
}
public ParsingTaskOutputs getParsingTaskOutputs() {
return parsingTaskOutputs;
}
public void setParsingTaskOutputs(ParsingTaskOutputs parsingTaskOutputs) {
this.parsingTaskOutputs = parsingTaskOutputs;
}
public String getGatewayId() {
return gatewayId;
}
public void setGatewayId(String gatewayId) {
this.gatewayId = gatewayId;
}
public String getGroupResourceProfileId() {
return groupResourceProfileId;
}
public void setGroupResourceProfileId(String groupResourceProfileId) {
this.groupResourceProfileId = groupResourceProfileId;
}
public String getLocalDataDir() {
return localDataDir;
}
public void setLocalDataDir(String localDataDir) {
this.localDataDir = localDataDir;
}
}