blob: d9c69e8bb021084bd21b94964131f107d4e9f7c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.view.hive.resources.jobs;
import com.google.inject.Inject;
import org.apache.ambari.view.ViewResourceHandler;
import org.apache.ambari.view.hive.BaseService;
import org.apache.ambari.view.hive.backgroundjobs.BackgroundJobController;
import org.apache.ambari.view.hive.client.Cursor;
import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegate;
import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegateImpl;
import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSParser;
import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser;
import org.apache.ambari.view.hive.resources.jobs.viewJobs.*;
import org.apache.ambari.view.hive.utils.*;
import org.apache.ambari.view.hive.utils.HdfsApi;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.Callable;
/**
* Servlet for queries
* API:
* GET /:id
* read job
* POST /
* create new job
* Required: title, queryFile
* GET /
* get all Jobs of current user
*/
public class JobService extends BaseService {
@Inject
ViewResourceHandler handler;
protected JobResourceManager resourceManager;
private IOperationHandleResourceManager opHandleResourceManager;
protected final static Logger LOG =
LoggerFactory.getLogger(JobService.class);
protected synchronized JobResourceManager getResourceManager() {
if (resourceManager == null) {
SharedObjectsFactory connectionsFactory = getSharedObjectsFactory();
resourceManager = new JobResourceManager(connectionsFactory, context);
}
return resourceManager;
}
private IOperationHandleResourceManager getOperationHandleResourceManager() {
if (opHandleResourceManager == null) {
opHandleResourceManager = new OperationHandleResourceManager(getSharedObjectsFactory());
}
return opHandleResourceManager;
}
/**
* Get single item
*/
@GET
@Path("{jobId}")
@Produces(MediaType.APPLICATION_JSON)
public Response getOne(@PathParam("jobId") String jobId) {
try {
JobController jobController = getResourceManager().readController(jobId);
JSONObject jsonJob = jsonObjectFromJob(jobController);
return Response.ok(jsonJob).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
private JSONObject jsonObjectFromJob(JobController jobController) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Map createdJobMap = PropertyUtils.describe(jobController.getJob());
createdJobMap.remove("class"); // no need to show Bean class on client
JSONObject jobJson = new JSONObject();
jobJson.put("job", createdJobMap);
return jobJson;
}
/**
* Get job results in csv format
*/
@GET
@Path("{jobId}/results/csv")
@Produces("text/csv")
public Response getResultsCSV(@PathParam("jobId") String jobId,
@Context HttpServletResponse response,
@QueryParam("columns") final String requestedColumns) {
try {
JobController jobController = getResourceManager().readController(jobId);
final Cursor resultSet = jobController.getResults();
resultSet.selectColumns(requestedColumns);
StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream os) throws IOException, WebApplicationException {
Writer writer = new BufferedWriter(new OutputStreamWriter(os));
CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
try {
while (resultSet.hasNext()) {
csvPrinter.printRecord(resultSet.next().getRow());
writer.flush();
}
} finally {
writer.close();
}
}
};
return Response.ok(stream).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Get job results in csv format
*/
@GET
@Path("{jobId}/results/csv/saveToHDFS")
@Produces(MediaType.APPLICATION_JSON)
public Response getResultsToHDFS(@PathParam("jobId") String jobId,
@QueryParam("commence") String commence,
@QueryParam("file") final String targetFile,
@QueryParam("stop") final String stop,
@QueryParam("columns") final String requestedColumns,
@Context HttpServletResponse response) {
try {
final JobController jobController = getResourceManager().readController(jobId);
String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId());
if (commence != null && commence.equals("true")) {
if (targetFile == null)
throw new MisconfigurationFormattedException("targetFile should not be empty");
BackgroundJobController.getInstance(context).startJob(String.valueOf(backgroundJobId), new Runnable() {
@Override
public void run() {
try {
Cursor resultSet = jobController.getResults();
resultSet.selectColumns(requestedColumns);
FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true);
Writer writer = new BufferedWriter(new OutputStreamWriter(stream));
CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
try {
while (resultSet.hasNext() && !Thread.currentThread().isInterrupted()) {
csvPrinter.printRecord(resultSet.next().getRow());
writer.flush();
}
} finally {
writer.close();
}
stream.close();
} catch (IOException e) {
throw new ServiceFormattedException("Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e);
} catch (InterruptedException e) {
throw new ServiceFormattedException("Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e);
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException("Job results are expired", itemNotFound);
}
}
});
}
if (stop != null && stop.equals("true")) {
BackgroundJobController.getInstance(context).interrupt(backgroundJobId);
}
JSONObject object = new JSONObject();
object.put("stopped", BackgroundJobController.getInstance(context).isInterrupted(backgroundJobId));
object.put("jobId", jobController.getJob().getId());
object.put("backgroundJobId", backgroundJobId);
object.put("operationType", "CSV2HDFS");
object.put("status", BackgroundJobController.getInstance(context).state(backgroundJobId).toString());
return Response.ok(object).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Get next results page
*/
@GET
@Path("{jobId}/results")
@Produces(MediaType.APPLICATION_JSON)
public Response getResults(@PathParam("jobId") String jobId,
@QueryParam("first") String fromBeginning,
@QueryParam("count") Integer count,
@QueryParam("searchId") String searchId,
@QueryParam("columns") final String requestedColumns) {
try {
final JobController jobController = getResourceManager().readController(jobId);
return ResultsPaginationController.getInstance(context)
.request(jobId, searchId, true, fromBeginning, count,
new Callable<Cursor>() {
@Override
public Cursor call() throws Exception {
Cursor cursor = jobController.getResults();
cursor.selectColumns(requestedColumns);
return cursor;
}
}).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Renew expiration time for results
*/
@GET
@Path("{jobId}/results/keepAlive")
public Response keepAliveResults(@PathParam("jobId") String jobId,
@QueryParam("first") String fromBeginning,
@QueryParam("count") Integer count) {
try {
if (!ResultsPaginationController.getInstance(context).keepAlive(jobId, ResultsPaginationController.DEFAULT_SEARCH_ID)) {
throw new NotFoundFormattedException("Results already expired", null);
}
return Response.ok().build();
} catch (WebApplicationException ex) {
throw ex;
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Delete single item
*/
@DELETE
@Path("{id}")
public Response delete(@PathParam("id") String id,
@QueryParam("remove") final String remove) {
try {
JobController jobController;
try {
jobController = getResourceManager().readController(id);
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
}
jobController.cancel();
if (remove != null && remove.compareTo("true") == 0) {
getResourceManager().delete(id);
}
// getResourceManager().delete(Integer.valueOf(queryId));
return Response.status(204).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Get all Jobs
*/
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getList() {
try {
LOG.debug("Getting all job");
ATSRequestsDelegate transport = new ATSRequestsDelegateImpl(context, "http://127.0.0.1:8188");
IATSParser atsParser = new ATSParser(transport);
Aggregator aggregator = new Aggregator(getResourceManager(), getOperationHandleResourceManager(), atsParser);
List allJobs = aggregator.readAll(context.getUsername());
JSONObject object = new JSONObject();
object.put("jobs", allJobs);
return Response.ok(object).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Create job
*/
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response create(JobRequest request, @Context HttpServletResponse response,
@Context UriInfo ui) {
try {
Map jobInfo = PropertyUtils.describe(request.job);
Job job = new JobImpl(jobInfo);
getResourceManager().create(job);
JobController createdJobController = getResourceManager().readController(job.getId());
createdJobController.submit();
response.setHeader("Location",
String.format("%s/%s", ui.getAbsolutePath().toString(), job.getId()));
JSONObject jobObject = jsonObjectFromJob(createdJobController);
return Response.ok(jobObject).status(201).build();
} catch (WebApplicationException ex) {
throw ex;
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
} catch (Exception ex) {
throw new ServiceFormattedException(ex.getMessage(), ex);
}
}
/**
* Wrapper object for json mapping
*/
public static class JobRequest {
public JobImpl job;
}
}