blob: e14e6964b51b53f374fba67a90ae85da318fba74 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.server.rest.profile;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.server.rest.ViewableWithPermissions;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
import org.glassfish.jersey.server.mvc.Viewable;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@Path("/")
@RolesAllowed(DrillUserPrincipal.AUTHENTICATED_ROLE)
public class ProfileResources {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class);
@Inject
UserAuthEnabled authEnabled;
@Inject
WorkManager work;
@Inject
DrillUserPrincipal principal;
@Inject
SecurityContext sc;
@Inject
HttpServletRequest request;
public static class ProfileInfo implements Comparable<ProfileInfo> {
private static final int QUERY_SNIPPET_MAX_CHAR = 150;
private static final int QUERY_SNIPPET_MAX_LINES = 8;
public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
private final String queryId;
private final long startTime;
private final long endTime;
private final Date time;
private final String link;
private final String foreman;
private final String query;
private final String state;
private final String user;
private final double totalCost;
private final String queueName;
public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query,
String state, String user, double totalCost, String queueName) {
this.queryId = queryId;
this.startTime = startTime;
this.endTime = endTime;
this.time = new Date(startTime);
this.foreman = foreman;
this.link = generateLink(drillConfig, foreman, queryId);
this.query = extractQuerySnippet(query);
this.state = state;
this.user = user;
this.totalCost = totalCost;
this.queueName = queueName;
}
public String getUser() { return user; }
public String getQuery() { return query; }
public String getQueryId() { return queryId; }
public String getTime() { return format.format(time); }
public long getStartTime() { return startTime; }
public long getEndTime() { return endTime; }
public String getDuration() {
return (new SimpleDurationFormat(startTime, endTime)).verbose();
}
public String getState() { return state; }
public String getLink() { return link; }
public String getForeman() { return foreman; }
public double getTotalCost() { return totalCost; }
public String getQueueName() { return queueName; }
@Override
public int compareTo(ProfileInfo other) {
return time.compareTo(other.time);
}
/**
* Generates link which will return query profile in json representation.
*
* @param drillConfig drill configuration
* @param foreman foreman hostname
* @param queryId query id
* @return link
*/
private String generateLink(DrillConfig drillConfig, String foreman, String queryId) {
StringBuilder sb = new StringBuilder();
if (drillConfig.getBoolean(ExecConstants.HTTP_ENABLE_SSL)) {
sb.append("https://");
} else {
sb.append("http://");
}
sb.append(foreman);
sb.append(":");
sb.append(drillConfig.getInt(ExecConstants.HTTP_PORT));
sb.append("/profiles/");
sb.append(queryId);
sb.append(".json");
return sb.toString();
}
/**
* Extract only the first 150 characters of the query.
* If this spans more than 8 lines, we truncate excess lines for sake of readability
* @param queryText
* @return truncated text
*/
private String extractQuerySnippet(String queryText) {
//Extract upto max char limit as snippet
String sizeCappedQuerySnippet = queryText.substring(0, Math.min(queryText.length(), QUERY_SNIPPET_MAX_CHAR));
String[] queryParts = sizeCappedQuerySnippet.split(System.lineSeparator());
//Trimming down based on line-count
if (QUERY_SNIPPET_MAX_LINES < queryParts.length) {
int linesConstructed = 0;
StringBuilder lineCappedQuerySnippet = new StringBuilder();
for (String qPart : queryParts) {
lineCappedQuerySnippet.append(qPart);
if (++linesConstructed < QUERY_SNIPPET_MAX_LINES) {
lineCappedQuerySnippet.append(System.lineSeparator());
} else {
lineCappedQuerySnippet.append(" ... ");
break;
}
}
return lineCappedQuerySnippet.toString();
}
return sizeCappedQuerySnippet;
}
}
protected PersistentStoreProvider getProvider() {
return work.getContext().getStoreProvider();
}
protected ClusterCoordinator getCoordinator() {
return work.getContext().getClusterCoordinator();
}
@XmlRootElement
public class QProfiles {
private List<ProfileInfo> runningQueries;
private List<ProfileInfo> finishedQueries;
private List<String> errors;
public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo> finishedQueries, List<String> errors) {
this.runningQueries = runningQueries;
this.finishedQueries = finishedQueries;
this.errors = errors;
}
public List<ProfileInfo> getRunningQueries() {
return runningQueries;
}
public List<ProfileInfo> getFinishedQueries() {
return finishedQueries;
}
public int getMaxFetchedQueries() {
return work.getContext().getConfig().getInt(ExecConstants.HTTP_MAX_PROFILES);
}
public String getQueriesPerPage() {
List<Integer> queriesPerPageOptions = work.getContext().getConfig().getIntList(ExecConstants.HTTP_PROFILES_PER_PAGE);
Collections.sort(queriesPerPageOptions);
return Joiner.on(",").join(queriesPerPageOptions);
}
public List<String> getErrors() { return errors; }
}
//max Param to cap listing of profiles
private static final String MAX_QPROFILES_PARAM = "max";
@GET
@Path("/profiles.json")
@Produces(MediaType.APPLICATION_JSON)
public QProfiles getProfilesJSON(@Context UriInfo uriInfo) {
try {
final QueryProfileStoreContext profileStoreContext = work.getContext().getProfileStoreContext();
final PersistentStore<QueryProfile> completed = profileStoreContext.getCompletedProfileStore();
final TransientStore<QueryInfo> running = profileStoreContext.getRunningProfileStore();
final List<String> errors = Lists.newArrayList();
final List<ProfileInfo> runningQueries = Lists.newArrayList();
final Iterator<Map.Entry<String, QueryInfo>> runningEntries = running.entries();
while (runningEntries.hasNext()) {
try {
final Map.Entry<String, QueryInfo> runningEntry = runningEntries.next();
final QueryInfo profile = runningEntry.getValue();
if (principal.canManageProfileOf(profile.getUser())) {
runningQueries.add(
new ProfileInfo(work.getContext().getConfig(),
runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(),
profile.getForeman().getAddress(), profile.getQuery(),
ProfileUtil.getQueryStateDisplayName(profile.getState()),
profile.getUser(), profile.getTotalCost(), profile.getQueueName()));
}
} catch (Exception e) {
errors.add(e.getMessage());
logger.error("Error getting running query info.", e);
}
}
Collections.sort(runningQueries, Collections.reverseOrder());
final List<ProfileInfo> finishedQueries = Lists.newArrayList();
//Defining #Profiles to load
int maxProfilesToLoad = work.getContext().getConfig().getInt(ExecConstants.HTTP_MAX_PROFILES);
String maxProfilesParams = uriInfo.getQueryParameters().getFirst(MAX_QPROFILES_PARAM);
if (maxProfilesParams != null && !maxProfilesParams.isEmpty()) {
maxProfilesToLoad = Integer.valueOf(maxProfilesParams);
}
final Iterator<Map.Entry<String, QueryProfile>> range = completed.getRange(0, maxProfilesToLoad);
while (range.hasNext()) {
try {
final Map.Entry<String, QueryProfile> profileEntry = range.next();
final QueryProfile profile = profileEntry.getValue();
if (principal.canManageProfileOf(profile.getUser())) {
finishedQueries.add(
new ProfileInfo(work.getContext().getConfig(),
profileEntry.getKey(), profile.getStart(), profile.getEnd(),
profile.getForeman().getAddress(), profile.getQuery(),
ProfileUtil.getQueryStateDisplayName(profile.getState()),
profile.getUser(), profile.getTotalCost(), profile.getQueueName()));
}
} catch (Exception e) {
errors.add(e.getMessage());
logger.error("Error getting finished query profile.", e);
}
}
Collections.sort(finishedQueries, Collections.reverseOrder());
return new QProfiles(runningQueries, finishedQueries, errors);
} catch (Exception e) {
throw UserException.resourceError(e)
.message("Failed to get profiles from persistent or ephemeral store.")
.build(logger);
}
}
@GET
@Path("/profiles")
@Produces(MediaType.TEXT_HTML)
public Viewable getProfiles(@Context UriInfo uriInfo) {
QProfiles profiles = getProfilesJSON(uriInfo);
return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/list.ftl", sc, profiles);
}
private QueryProfile getQueryProfile(String queryId) {
QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
// first check local running
Foreman f = work.getBee().getForemanForQueryId(id);
if(f != null){
QueryProfile queryProfile = f.getQueryManager().getQueryProfile();
checkOrThrowProfileViewAuthorization(queryProfile);
return queryProfile;
}
// then check remote running
try {
final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
final QueryInfo info = running.get(queryId);
if (info != null) {
QueryProfile queryProfile = work.getContext()
.getController()
.getTunnel(info.getForeman())
.requestQueryProfile(id)
.checkedGet(2, TimeUnit.SECONDS);
checkOrThrowProfileViewAuthorization(queryProfile);
return queryProfile;
}
}catch(Exception e){
logger.trace("Failed to find query as running profile.", e);
}
// then check blob store
try {
final PersistentStore<QueryProfile> profiles = work.getContext().getProfileStoreContext().getCompletedProfileStore();
final QueryProfile queryProfile = profiles.get(queryId);
if (queryProfile != null) {
checkOrThrowProfileViewAuthorization(queryProfile);
return queryProfile;
}
} catch (final Exception e) {
throw new DrillRuntimeException("error while retrieving profile", e);
}
throw UserException.validationError()
.message("No profile with given query id '%s' exists. Please verify the query id.", queryId)
.build(logger);
}
@GET
@Path("/profiles/{queryid}.json")
@Produces(MediaType.APPLICATION_JSON)
public String getProfileJSON(@PathParam("queryid") String queryId) {
try {
return new String(work.getContext().getProfileStoreContext().getProfileStoreConfig().getSerializer().serialize(getQueryProfile(queryId)));
} catch (Exception e) {
logger.debug("Failed to serialize profile for: " + queryId);
return ("{ 'message' : 'error (unable to serialize profile)' }");
}
}
@GET
@Path("/profiles/{queryid}")
@Produces(MediaType.TEXT_HTML)
public Viewable getProfile(@PathParam("queryid") String queryId){
try {
ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId), work.getContext().getConfig(), request);
return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper);
} catch (Exception | Error e) {
logger.error("Exception was thrown when fetching profile {} :\n{}", queryId, e);
return ViewableWithPermissions.create(authEnabled.get(), "/rest/errorMessage.ftl", sc, e);
}
}
@GET
@Path("/profiles/cancel/{queryid}")
@Produces(MediaType.TEXT_PLAIN)
public String cancelQuery(@PathParam("queryid") String queryId) {
QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
// first check local running
if (work.getBee().cancelForeman(id, principal)) {
return String.format("Cancelled query %s on locally running node.", queryId);
}
// then check remote running
try {
final TransientStore<QueryInfo> running = work.getContext().getProfileStoreContext().getRunningProfileStore();
final QueryInfo info = running.get(queryId);
checkOrThrowQueryCancelAuthorization(info.getUser(), queryId);
Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
if(a.getOk()){
return String.format("Query %s canceled on node %s.", queryId, info.getForeman().getAddress());
}else{
return String.format("Attempted to cancel query %s on %s but the query is no longer active on that node.", queryId, info.getForeman().getAddress());
}
}catch(Exception e){
logger.debug("Failure to find query as running profile.", e);
return String.format
("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId);
}
}
private void checkOrThrowProfileViewAuthorization(final QueryProfile profile) {
if (!principal.canManageProfileOf(profile.getUser())) {
throw UserException.permissionError()
.message("Not authorized to view the profile of query '%s'", profile.getId())
.build(logger);
}
}
private void checkOrThrowQueryCancelAuthorization(final String queryUser, final String queryId) {
if (!principal.canManageQueryOf(queryUser)) {
throw UserException.permissionError()
.message("Not authorized to cancel the query '%s'", queryId)
.build(logger);
}
}
}