blob: 2ea61bb2719c3f37ff337401b00126fa11555d6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.curation.service;
//OODT imports
import org.apache.oodt.cas.curation.structs.IngestionTask;
import org.apache.oodt.cas.curation.util.DateUtils;
import org.apache.oodt.cas.curation.util.ExtractorConfigReader;
import org.apache.oodt.cas.filemgr.ingest.Ingester;
import org.apache.oodt.cas.filemgr.ingest.StdIngester;
import org.apache.oodt.cas.filemgr.structs.exceptions.IngestException;
import org.apache.oodt.cas.metadata.Metadata;
//JDK imports
import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
//JAX-RS imports
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
//JSON imports
import net.sf.json.JSONObject;
/**
*
* Leverages CAS {@link Ingester} interface to ingest Products into the CAS File
* Manager via CAS Curator and a REST-ful interface.
*
* @author mattmann
* @version $Revision$
*
*/
@Path("ingest")
public class IngestionResource extends CurationService {
private static final long serialVersionUID = -7514150767897700936L;
private static final Logger LOG = Logger.getLogger(IngestionResource.class
.getName());
private static final String DATA_TRANSFER_SERVICE = "org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory";
private static final String RESP_SUCCESS = "success";
private IngestionTaskList taskList;
public IngestionResource() {
super();
this.taskList = new IngestionTaskList();
IngestionTask task = new IngestionTask();
task.setCreateDate(new Date());
}
@Context
UriInfo uriInfo;
@GET
@Path("create")
@Produces("text/plain")
public String createTask(@QueryParam("files") String fileList,
@QueryParam("numfiles") Integer numFiles,
@QueryParam("metExtCfgId") String metExtractorConfigId,
@QueryParam("policy") String policy,
@QueryParam("ptype") String productType) {
IngestionTask newTask = new IngestionTask();
newTask.setCreateDate(new Date());
try {
newTask.setExtConf(ExtractorConfigReader.readFromDirectory(new File(
CurationService.config.getMetExtrConfUploadPath()),
metExtractorConfigId));
} catch (Exception e) {
e.printStackTrace();
String errorMsg = "Unable to load extractor config from metExtCfgId: ["
+ metExtractorConfigId + "]";
LOG.log(Level.WARNING, errorMsg);
return errorMsg;
}
newTask.setFileList(deducePaths(Arrays.asList(fileList.split(","))));
newTask.setPolicy(policy);
newTask.setProductType(productType);
newTask.setStatus(IngestionTask.NOT_STARTED);
return this.taskList.addIngestionTask(newTask);
}
@GET
@Path("remove")
@Produces("text/plain")
public void removeTask(@QueryParam("taskId") String ingestTaskId) {
this.taskList.removeIngestionTask(ingestTaskId);
}
@GET
@Path("list")
@Produces("text/plain")
public String getIngestTaskList(
@QueryParam("format") @DefaultValue(FORMAT_HTML) String format) {
if (format.equals(FORMAT_HTML)) {
return this.encodeTaskListAsHTML(this.taskList.getTaskList());
} else if (format.equals(FORMAT_JSON)) {
return this.encodeTaskListAsJSON(this.taskList.getTaskList());
} else {
return "Unsupported Format!";
}
}
@GET
@Path("start")
@Produces("text/plain")
public String doIngest(@QueryParam("taskId") String ingestTaskId) {
IngestionTask task = this.taskList.getIngestionTaskById(ingestTaskId);
if (task == null) {
String errorMsg = "Task with ID [" + ingestTaskId
+ "] is not being managed by this Ingestion Resource!";
LOG.log(Level.WARNING, errorMsg);
return this.encodeIngestResponseAsJSON(false, errorMsg);
}
Ingester ingest = this.configureIngester();
MetadataResource metService = new MetadataResource();
for (String file : task.getFileList()) {
Metadata fileMet = null;
try {
String vFilePath = this.getVirtualPath(CurationService.config
.getStagingAreaPath(), file);
LOG.log(Level.FINE,
"IngestionResource: getting staging metadata for virtual path: ["
+ vFilePath + "]");
fileMet = metService.getStagingMetadata(vFilePath, task.getExtConf()
.getIdentifier(), false);
} catch (Exception e) {
e.printStackTrace();
return this.encodeIngestResponseAsHTML(false, e.getMessage());
}
try {
ingest.ingest(safeGetUrl(CurationService.config.getFileMgrURL()),
new File(file), fileMet);
} catch (IngestException e) {
e.printStackTrace();
return this.encodeIngestResponseAsHTML(false, e.getMessage());
}
// set task status to success
task.setStatus(IngestionTask.FINISHED);
}
return this.encodeIngestResponseAsHTML(true, null);
}
private String encodeTaskListAsHTML(List<IngestionTask> taskList) {
StringBuffer out = new StringBuffer();
for (IngestionTask task : taskList) {
out.append("<tr>");
out.append("<td>");
out.append(task.getId());
out.append("</td><td>");
out.append(DateUtils.getDateAsISO8601String(task.getCreateDate()));
out.append("</td><td>");
out.append(task.getFileList().size());
out.append("</td><td>");
out.append(task.getPolicy());
out.append("</td><td>");
out.append(task.getProductType());
out.append("</td><td>");
out.append(task.getExtConf().getIdentifier());
out.append("</td><td>");
out.append(task.getExtConf().getConfigFiles().size());
out.append("</td><td id='");
out.append(task.getId());
out.append("_Status'>");
out.append(task.getStatus());
out.append("</td>");
if (!task.getStatus().equals(IngestionTask.FINISHED)) {
out.append("<td><input type=\"button\" rel=\"_taskid_\" value=\"Start\" onclick=\"startIngestionTask('");
out.append(task.getId());
out.append("')\"/></td>");
} else {
out.append("<td><input type=\"button\" rel=\"_taskid_\" value=\"Remove\" onclick=\"removeIngestionTask('");
out.append(task.getId());
out.append("')\"></td>");
}
out.append("</tr>");
}
return out.toString();
}
private String encodeTaskListAsJSON(List<IngestionTask> taskList) {
List<Map<String, String>> jsonFriendlyTaskList = new Vector<Map<String, String>>();
for (IngestionTask task : taskList) {
Map<String, String> taskPropMap = new HashMap<String, String>();
taskPropMap.put("id", task.getId());
taskPropMap.put("createDate", DateUtils.getDateAsISO8601String(task
.getCreateDate()));
taskPropMap.put("policy", task.getPolicy());
taskPropMap.put("productType", task.getProductType());
taskPropMap.put("status", task.getStatus());
taskPropMap.put("fileList", task.getFileList().toString());
taskPropMap.put("extractorClass", task.getExtConf().getClassName());
taskPropMap.put("extractorConfFiles", task.getExtConf().getConfigFiles()
.toString());
jsonFriendlyTaskList.add(taskPropMap);
}
JSONObject resObj = new JSONObject();
resObj.put("taskList", jsonFriendlyTaskList);
return resObj.toString();
}
private String encodeIngestResponseAsHTML(boolean success, String msg) {
StringBuffer out = new StringBuffer();
if (success) {
out.append("Success");
} else {
out.append(msg);
}
return out.toString();
}
private String encodeIngestResponseAsJSON(boolean success, String msg) {
Map<String, Object> resMap = new HashMap<String, Object>();
resMap.put("success", success);
resMap.put("msg", msg);
JSONObject resObj = new JSONObject();
resObj.putAll(resMap);
return resObj.toString();
}
private Ingester configureIngester() {
StdIngester ingest = new StdIngester(DATA_TRANSFER_SERVICE);
return ingest;
}
private URL safeGetUrl(String urlStr) {
try {
return new URL(urlStr);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
private List<String> deducePaths(List<String> vPaths) {
List<String> absolutePaths = new Vector<String>();
String stagingIngestPath = CurationService.config.getStagingAreaPath();
if (!stagingIngestPath.endsWith("/")) {
stagingIngestPath += "/";
}
for (String vPath : vPaths) {
String realPath = stagingIngestPath + vPath;
absolutePaths.add(realPath);
}
return absolutePaths;
}
private String getVirtualPath(String stagingAreaPath, String fullFilePath) {
int startIdx = stagingAreaPath.length();
try {
return fullFilePath.substring(startIdx);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
class IngestionTaskList {
private HashMap<String, IngestionTask> taskMap;
public IngestionTaskList() {
this.taskMap = new HashMap<String, IngestionTask>();
}
public synchronized String addIngestionTask(IngestionTask task) {
this.provideTaskId(task);
taskMap.put(task.getId(), task);
return task.getId();
}
public synchronized void removeIngestionTask(String taskId) {
taskMap.remove(taskId);
}
public IngestionTask getIngestionTaskById(String taskId) {
return taskMap.get(taskId);
}
public List<IngestionTask> getTaskList() {
List<IngestionTask> taskList = Arrays.asList(taskMap.values().toArray(
new IngestionTask[taskMap.values().size()]));
Collections.sort(taskList, new Comparator<IngestionTask>() {
public int compare(IngestionTask o1, IngestionTask o2) {
if (o1.getCreateDate().before(o2.getCreateDate())) {
return -1;
} else if (o1.getCreateDate().equals(o2.getCreateDate())) {
return 0;
} else
return 1;
}
});
return taskList;
}
private void provideTaskId(IngestionTask task) {
UUID id = UUID.randomUUID();
task.setId(id.toString());
}
}
}