blob: 7d5c78ac7be66533be47f7c0f75532b2018f8469 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.ws.rs.resources;
import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.error.Errors.ResultCode;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.master.QueryInfo;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.PartitionedTableScanNode;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.ws.rs.*;
import org.apache.tajo.ws.rs.resources.outputs.AbstractStreamingOutput;
import org.apache.tajo.ws.rs.resources.outputs.RestOutputFactory;
import org.apache.tajo.ws.rs.responses.GetQueryResultDataResponse;
import org.apache.tajo.ws.rs.responses.ResultSetInfoResponse;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
import javax.ws.rs.core.Response.Status;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Optional;
public class QueryResultResource {
private static final Log LOG = LogFactory.getLog(QueryResultResource.class);
private UriInfo uriInfo;
private Application application;
private String queryId;
private JerseyResourceDelegateContext context;
private static final String queryIdKeyName = "queryId";
private static final String sessionIdKeyName = "sessionId";
private static final String cacheIdKeyName = "cacheId";
private static final String offsetKeyName = "offset";
private static final String countKeyName = "count";
private static final String acceptTypeKeyName = "accept";
private static final String tajoDigestHeaderName = "X-Tajo-Digest";
private static final String tajoOffsetHeaderName = "X-Tajo-Offset";
private static final String tajoCountHeaderName = "X-Tajo-Count";
private static final String tajoEOSHeaderName = "X-Tajo-EOS";
public UriInfo getUriInfo() {
return uriInfo;
}
public void setUriInfo(UriInfo uriInfo) {
this.uriInfo = uriInfo;
}
public Application getApplication() {
return application;
}
public void setApplication(Application application) {
this.application = application;
}
public String getQueryId() {
return queryId;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
private void initializeContext() {
context = new JerseyResourceDelegateContext();
JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class);
context.put(uriInfoKey, uriInfo);
JerseyResourceDelegateContextKey<String> queryIdKey =
JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class);
context.put(queryIdKey, queryId);
}
private static NonForwardQueryResultScanner getNonForwardQueryResultScanner(
MasterContext masterContext,
Session session,
QueryId queryId) throws IOException, TajoException {
NonForwardQueryResultScanner resultScanner = session.getNonForwardQueryResultScanner(queryId);
if (resultScanner == null) {
QueryInfo queryInfo = masterContext.getQueryJobManager().getFinishedQuery(queryId);
if (queryInfo == null) {
throw new RuntimeException("QueryInfo isnull.");
}
TableDesc resultTableDesc = queryInfo.getResultDesc();
if (resultTableDesc == null) {
throw new RuntimeException("Result Table Desc is null.");
}
ScanNode scanNode;
if (resultTableDesc.hasPartition()) {
scanNode = LogicalPlan.createNodeWithoutPID(PartitionedTableScanNode.class);
scanNode.init(resultTableDesc);
} else {
scanNode = LogicalPlan.createNodeWithoutPID(ScanNode.class);
scanNode.init(resultTableDesc);
}
resultScanner = new NonForwardQueryResultFileScanner(
masterContext.asyncTaskExecutor(),
masterContext.getConf(),
session.getSessionId(),
queryId,
scanNode,
Integer.MAX_VALUE,
Optional.empty());
resultScanner.init();
session.addNonForwardQueryResultScanner(resultScanner);
}
return resultScanner;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getQueryResult(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Client sent a get query result request.");
}
Response response = null;
try {
initializeContext();
JerseyResourceDelegateContextKey<String> sessionIdKey =
JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class);
if (sessionId == null || sessionId.isEmpty()) {
return ResourcesUtil.createBadRequestResponse(LOG, "Session id is required. Please refer the header " +
QueryResource.tajoSessionIdHeaderName);
}
context.put(sessionIdKey, sessionId);
response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
new GetQueryResultDelegate(),
application,
context,
LOG);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
}
return response;
}
private static class GetQueryResultDelegate implements JerseyResourceDelegate {
@Override
public Response run(JerseyResourceDelegateContext context) {
JerseyResourceDelegateContextKey<String> sessionIdKey =
JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class);
String sessionId = context.get(sessionIdKey);
JerseyResourceDelegateContextKey<String> queryIdKey =
JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class);
String queryId = context.get(queryIdKey);
JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey =
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.ClientApplicationKey, ClientApplication.class);
ClientApplication clientApplication = context.get(clientApplicationKey);
JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class);
MasterContext masterContext = context.get(masterContextKey);
JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class);
UriInfo uriInfo = context.get(uriInfoKey);
try {
masterContext.getSessionManager().touch(sessionId);
Session session = masterContext.getSessionManager().getSession(sessionId);
QueryId queryIdObj = TajoIdUtils.parseQueryId(queryId);
masterContext.getSessionManager().touch(sessionId);
QueryInfo queryInfo = masterContext.getQueryJobManager().getFinishedQuery(queryIdObj);
GetQueryResultDataResponse response = new GetQueryResultDataResponse();
if (queryInfo == null) {
response.setResultCode(ResultCode.INTERNAL_ERROR);
response.setErrorMessage("Unable to find a query info for requested id : " + queryId);
return Response.status(Status.NOT_FOUND).entity(response).build();
}
NonForwardQueryResultScanner queryResultScanner = getNonForwardQueryResultScanner(masterContext, session, queryIdObj);
if (queryInfo.getResultDesc() != null && queryInfo.getResultDesc().getSchema() != null) {
response.setSchema(queryInfo.getResultDesc().getSchema());
} else {
response.setSchema(queryResultScanner.getLogicalSchema());
}
long cacheId = clientApplication.generateCacheIdIfAbsent(queryIdObj);
clientApplication.setCachedNonForwardResultScanner(queryIdObj, cacheId, queryResultScanner);
URI resultSetCacheUri = uriInfo.getBaseUriBuilder()
.path(QueryResource.class)
.path(QueryResource.class, "getQueryResult")
.path(QueryResultResource.class, "getQueryResultSet")
.build(queryId, cacheId);
ResultSetInfoResponse resultSetInfoResponse = new ResultSetInfoResponse();
resultSetInfoResponse.setId(cacheId);
resultSetInfoResponse.setLink(resultSetCacheUri);
response.setResultset(resultSetInfoResponse);
response.setResultCode(ResultCode.OK);
return Response.status(Status.OK).entity(response).build();
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
GetQueryResultDataResponse response = new GetQueryResultDataResponse();
response.setResultCode(ResultCode.INTERNAL_ERROR);
response.setErrorMessage(e.getMessage());
response.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(e));
return Response.status(Status.INTERNAL_SERVER_ERROR).entity(response).build();
}
}
}
@GET
@Path("{cacheId}")
public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId,
@HeaderParam(HttpHeaders.ACCEPT) String acceptType,
@PathParam("cacheId") String cacheId,
@DefaultValue("100") @QueryParam("count") int count) {
if (LOG.isDebugEnabled()) {
LOG.debug("Client sent a get query result set request.");
}
Response response = null;
try {
initializeContext();
JerseyResourceDelegateContextKey<String> sessionIdKey =
JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class);
if (sessionId == null || sessionId.isEmpty()) {
return ResourcesUtil.createBadRequestResponse(LOG, "Session id is required. Please refer the header " +
QueryResource.tajoSessionIdHeaderName);
}
context.put(sessionIdKey, sessionId);
JerseyResourceDelegateContextKey<Long> cacheIdKey =
JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
context.put(cacheIdKey, Long.valueOf(cacheId));
JerseyResourceDelegateContextKey<Integer> countKey =
JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class);
context.put(countKey, count);
JerseyResourceDelegateContextKey<String> acceptTypeKey =
JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class);
context.put(acceptTypeKey, acceptType);
response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
new GetQueryResultSetDelegate(),
application,
context,
LOG);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
}
return response;
}
private static class GetQueryResultSetDelegate implements JerseyResourceDelegate {
@Override
public Response run(JerseyResourceDelegateContext context) {
JerseyResourceDelegateContextKey<String> sessionIdKey =
JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class);
String sessionId = context.get(sessionIdKey);
JerseyResourceDelegateContextKey<String> queryIdKey =
JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class);
String queryId = context.get(queryIdKey);
JerseyResourceDelegateContextKey<String> acceptTypeKey =
JerseyResourceDelegateContextKey.valueOf(acceptTypeKeyName, String.class);
String acceptType = context.get(acceptTypeKey);
JerseyResourceDelegateContextKey<Long> cacheIdKey =
JerseyResourceDelegateContextKey.valueOf(cacheIdKeyName, Long.class);
Long cacheId = context.get(cacheIdKey);
JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey =
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.ClientApplicationKey, ClientApplication.class);
ClientApplication clientApplication = context.get(clientApplicationKey);
JerseyResourceDelegateContextKey<Integer> countKey =
JerseyResourceDelegateContextKey.valueOf(countKeyName, Integer.class);
int count = context.get(countKey);
if (sessionId == null || sessionId.isEmpty()) {
return ResourcesUtil.createBadRequestResponse(LOG, "Session id is required. Please refer the header " +
QueryResource.tajoSessionIdHeaderName);
}
if (queryId == null || queryId.isEmpty()) {
return ResourcesUtil.createBadRequestResponse(LOG, "Query id is required. Please specify the query id");
}
QueryId queryIdObj;
try {
queryIdObj = TajoIdUtils.parseQueryId(queryId);
} catch (Throwable e) {
return ResourcesUtil.createExceptionResponse(LOG, "Invalid query id : " + queryId);
}
if (cacheId == null || cacheId.longValue() == 0) {
return ResourcesUtil.createBadRequestResponse(LOG, "Cache id is null or empty.");
}
if (count < 0) {
return ResourcesUtil.createBadRequestResponse(LOG, "Invalid count value : " + count);
}
NonForwardQueryResultScanner cachedQueryResultScanner =
clientApplication.getCachedNonForwardResultScanner(queryIdObj, cacheId.longValue());
try {
int startOffset = cachedQueryResultScanner.getCurrentRowNumber();
AbstractStreamingOutput restOutput = RestOutputFactory.get(acceptType, cachedQueryResultScanner, count, startOffset);
if (restOutput == null) {
return ResourcesUtil.createExceptionResponse(null, acceptType);
}
int size = restOutput.count();
boolean eos = count != size;
Response.ResponseBuilder builder = Response.ok(restOutput)
.header(tajoOffsetHeaderName, startOffset)
.header(tajoCountHeaderName, size)
.header(tajoEOSHeaderName, eos)
.header(HttpHeaders.CONTENT_TYPE, restOutput.contentType());
if (restOutput.hasLength()) {
builder.header(HttpHeaders.CONTENT_LENGTH, restOutput.length());
}
return builder.build();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
return ResourcesUtil.createExceptionResponse(null, e.getMessage());
}
}
}
}