blob: a0d8e0ba23b75db3c7b48082bef1165cc7ed4124 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.server.api.query;
import static org.apache.lens.server.api.LensConfConstants.*;
import java.util.*;
import java.util.concurrent.Future;
import org.apache.lens.api.LensConf;
import org.apache.lens.api.query.FailedAttempt;
import org.apache.lens.api.query.LensQuery;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryStatus;
import org.apache.lens.api.query.QueryStatus.Status;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.driver.*;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
import org.apache.lens.server.api.retry.BackOffRetryHandler;
import org.apache.lens.server.api.retry.FailureContext;
import org.apache.lens.server.api.util.LensUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* The Class QueryContext.
*/
@Slf4j
public class QueryContext extends AbstractQueryContext implements FailureContext {
/**
* The Constant serialVersionUID.
*/
private static final long serialVersionUID = 1L;
/**
* The query handle.
*/
@Getter
@Setter
private QueryHandle queryHandle;
/**
* The is persistent.
*/
@Getter
private final boolean isPersistent;
/**
* The is driver persistent.
*/
@Getter
private boolean isDriverPersistent;
/**
* The status.
*/
@Getter
private QueryStatus status;
/**
* The result set path.
*/
@Getter
@Setter
private String resultSetPath;
/**
* The hdfsout path.
*/
@Getter
@Setter
private String driverResultPath;
/**
* The submission time.
*/
@Getter
private final long submissionTime;
/**
* The launch time.
*/
@Getter
@Setter
private long launchTime;
/**
* The end time.
*/
@Getter
@Setter
private long endTime;
/**
* The closed time.
*/
@Getter
@Setter
private long closedTime;
/**
* The driver op handle.
*/
@Getter
@Setter
private String driverOpHandle;
/**
* The driver status.
*/
@Getter
final DriverQueryStatus driverStatus;
/**
* The query output formatter.
*/
@Getter
@Setter
private QueryOutputFormatter queryOutputFormatter;
/**
* The finished query persisted.
*/
@Getter
@Setter
private boolean finishedQueryPersisted = false;
@Getter
@Setter
private boolean isQueryClosedOnDriver = false;
/**
* The query name.
*/
@Getter
@Setter
private String queryName;
/**
* This is the timeout that the client may have provided while initiating query execution
* This value is used for pre fetching in-memory result if applicable
*
* Note: in case the timeout is not provided, this value will not be set.
*/
@Setter
@Getter
private transient long executeTimeoutMillis;
/**
* Query result registered by driver
*/
@Getter
private transient LensResultSet driverResult;
/**
* True if driver has registered the result
*/
@Getter
private transient boolean isDriverResultRegistered;
@Getter
@Setter
private byte[] queryConfHash;
transient StatusUpdateFailureContext statusUpdateFailures = new StatusUpdateFailureContext();
@Getter
@Setter
private transient boolean isLaunching = false;
@Getter
@Setter
private transient Future queryLauncher;
transient List<QueryDriverStatusUpdateListener> driverStatusUpdateListeners = Lists.newCopyOnWriteArrayList();
@Getter
@Setter
List<FailedAttempt> failedAttempts = Lists.newArrayList();
@Getter
@Setter
private BackOffRetryHandler<QueryContext> driverRetryPolicy;
@Getter
@Setter
private BackOffRetryHandler<QueryContext> serverRetryPolicy;
/**
* Creates context from query
*
* @param query the query
* @param user the user
* @param qconf The query lensconf
* @param conf the conf
*/
public QueryContext(String query, String user, LensConf qconf, Configuration conf, Collection<LensDriver> drivers) {
this(query, user, qconf, conf, drivers, null, true);
}
/**
* Creates context from PreparedQueryContext
*
* @param prepared the prepared
* @param user the user
* @param qconf the qconf
* @param conf the conf
*/
public QueryContext(PreparedQueryContext prepared, String user, LensConf qconf, Configuration conf) {
this(prepared.getUserQuery(), user, qconf, mergeConf(prepared.getConf(), conf), prepared.getDriverContext()
.getDriverQueryContextMap().keySet(), prepared.getDriverContext().getSelectedDriver(), true);
setDriverContext(prepared.getDriverContext());
setSelectedDriverQuery(prepared.getSelectedDriverQuery());
setSelectedDriverQueryCost(prepared.getSelectedDriverQueryCost());
}
/**
* Create context by passing drivers and selected driver
*
* @param userQuery The user query
* @param user Submitted user
* @param qconf The query LensConf
* @param conf The configuration object
* @param drivers All the drivers
* @param selectedDriver SelectedDriver
*/
QueryContext(String userQuery, String user, LensConf qconf, Configuration conf, Collection<LensDriver> drivers,
LensDriver selectedDriver, boolean mergeDriverConf) {
this(userQuery, user, qconf, conf, drivers, selectedDriver, System.currentTimeMillis(), mergeDriverConf);
}
/**
* Instantiates a new query context.
*
* @param userQuery the user query
* @param user the user
* @param qconf the qconf
* @param conf the conf
* @param drivers All the drivers
* @param selectedDriver the selected driver
* @param submissionTime the submission time
*/
QueryContext(String userQuery, String user, LensConf qconf, Configuration conf, Collection<LensDriver> drivers,
LensDriver selectedDriver, long submissionTime, boolean mergeDriverConf) {
super(userQuery, user, qconf, conf, drivers, mergeDriverConf);
this.submissionTime = submissionTime;
this.queryHandle = new QueryHandle(UUID.randomUUID());
this.status = new QueryStatus(0.0f, null, Status.NEW, "Query just got created", false, null, null, null);
this.lensConf = qconf;
this.conf = conf;
this.isPersistent = conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_SET,
LensConfConstants.DEFAULT_PERSISTENT_RESULT_SET);
this.isDriverPersistent = conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER,
LensConfConstants.DEFAULT_DRIVER_PERSISTENT_RESULT_SET);
this.userQuery = userQuery;
if (selectedDriver != null) {
this.setSelectedDriver(selectedDriver);
}
this.driverStatus = new DriverQueryStatus();
}
/**
* Utility create method to create context with single driver.
*
* @param query The user query
* @param user The submitted query
* @param qconf The query lens conf
* @param conf Query configuration object - merged with session
* @param driver The driver
* @param lensSessionPublicId The session id
*
* @return QueryContext object
*/
public static QueryContext createContextWithSingleDriver(String query, String user, LensConf qconf,
Configuration conf, LensDriver driver, String lensSessionPublicId, boolean mergeDriverConf) {
QueryContext ctx = new QueryContext(query, user, qconf, conf, Lists.newArrayList(driver), driver, mergeDriverConf);
ctx.setLensSessionIdentifier(lensSessionPublicId);
return ctx;
}
public void initTransientState() {
super.initTransientState();
statusUpdateFailures = new StatusUpdateFailureContext();
driverStatusUpdateListeners = Lists.newArrayList();
}
/**
* Merge conf.
*
* @param prepared the prepared
* @param current the current
* @return the configuration
*/
private static Configuration mergeConf(Configuration prepared, Configuration current) {
Configuration conf = new Configuration(false);
for (Map.Entry<String, String> entry : prepared) {
conf.set(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, String> entry : current) {
conf.set(entry.getKey(), entry.getValue());
}
return conf;
}
public String getResultSetParentDir() {
if (getSelectedDriver() != null && getSelectedDriverConf().get(LensConfConstants.RESULT_SET_PARENT_DIR) != null) {
log.info("Fetching Parent Dir from driver conf:- "
+ getSelectedDriverConf().get(LensConfConstants.RESULT_SET_PARENT_DIR));
return getSelectedDriverConf().get(LensConfConstants.RESULT_SET_PARENT_DIR);
}
return conf.get(LensConfConstants.RESULT_SET_PARENT_DIR, LensConfConstants.RESULT_SET_PARENT_DIR_DEFAULT);
}
public Path getHDFSResultDir() {
return new Path(new Path(getResultSetParentDir(), conf.get(LensConfConstants.QUERY_HDFS_OUTPUT_PATH,
LensConfConstants.DEFAULT_HDFS_OUTPUT_PATH)), queryHandle.toString());
}
/**
* To lens query.
*
* @return the lens query
*/
public LensQuery toLensQuery() {
return new LensQuery(queryHandle, userQuery, super.getSubmittedUser(), getPriority(), isPersistent,
getSelectedDriver() != null ? getSelectedDriver().getFullyQualifiedName() : null,
getSelectedDriverQuery(),
status,
resultSetPath, driverOpHandle, lensConf, submissionTime, launchTime, driverStatus.getDriverStartTime(),
driverStatus.getDriverFinishTime(), endTime, closedTime, queryName, getFailedAttempts());
}
public boolean isResultAvailableInDriver() {
// result is available in driver if driverStatus.isResultSetAvailable() - will be true for fetching inmemory
// result set.
// if result is persisted in driver driverStatus.isResultSetAvailable() will be false but isDriverPersistent will
// be true. So, for select queries, if result is persisted in driver, we return true so that the result can be
// fetched thru persistent resultset
return driverStatus.isSuccessful() && (isDriverPersistent() || driverStatus.isResultSetAvailable());
}
/**
* Set whether result is persisted on driver to false. Set by drivers when drivers are not persisting
*
*/
public void unSetDriverPersistent() {
isDriverPersistent = false;
}
/*
* Introduced for Recovering finished query.
*/
public void setStatusSkippingTransitionTest(final QueryStatus newStatus) {
this.status = newStatus;
}
public synchronized void setStatus(final QueryStatus newStatus) throws LensException {
validateTransition(newStatus);
log.info("Updating status of {} from {} to {}", getQueryHandle(), this.status, newStatus);
this.status = newStatus;
}
/**
* Update status from selected driver
*
* @param statusUpdateRetryHandler The exponential retry handler
*
* @throws LensException Throws exception if update from driver has failed.
*/
public synchronized void updateDriverStatus(BackOffRetryHandler statusUpdateRetryHandler)
throws LensException {
if (statusUpdateRetryHandler.canTryOpNow(statusUpdateFailures)) {
try {
getSelectedDriver().updateStatus(this);
statusUpdateFailures.clear();
} catch (LensException exc) {
if (LensUtil.isSocketException(exc)) {
statusUpdateFailures.updateFailure();
if (!statusUpdateRetryHandler.hasExhaustedRetries(statusUpdateFailures)) {
// retries are not exhausted, so failure is ignored and update will be tried later
log.warn("Exception during update status from driver and update will be tried again at {}",
statusUpdateRetryHandler.getOperationNextTime(statusUpdateFailures), exc);
return;
}
}
throw exc;
}
}
}
public String getResultHeader() {
return getConf().get(LensConfConstants.QUERY_OUTPUT_HEADER);
}
public String getResultFooter() {
return getConf().get(LensConfConstants.QUERY_OUTPUT_FOOTER);
}
public String getResultEncoding() {
return conf.get(LensConfConstants.QUERY_OUTPUT_CHARSET_ENCODING, LensConfConstants.DEFAULT_OUTPUT_CHARSET_ENCODING);
}
public String getOuptutFileExtn() {
return conf.get(LensConfConstants.QUERY_OUTPUT_FILE_EXTN, LensConfConstants.DEFAULT_OUTPUT_FILE_EXTN);
}
public boolean getCompressOutput() {
return conf.getBoolean(LensConfConstants.QUERY_OUTPUT_ENABLE_COMPRESSION,
LensConfConstants.DEFAULT_OUTPUT_ENABLE_COMPRESSION);
}
public long getMaxResultSplitRows() {
return conf.getLong(LensConfConstants.RESULT_SPLIT_MULTIPLE_MAX_ROWS,
LensConfConstants.DEFAULT_RESULT_SPLIT_MULTIPLE_MAX_ROWS);
}
/**
* Split result into multiple files.
*
* @return true, if successful
*/
public boolean splitResultIntoMultipleFiles() {
return conf.getBoolean(LensConfConstants.RESULT_SPLIT_INTO_MULTIPLE,
LensConfConstants.DEFAULT_RESULT_SPLIT_INTO_MULTIPLE);
}
public String getClusterUser() {
return conf.get(LensConfConstants.SESSION_CLUSTER_USER, getSubmittedUser());
}
/**
* @return query handle string
*/
@Override
public String getLogHandle() {
return getQueryHandleString();
}
public String getQueryHandleString() {
return queryHandle.getHandleIdString();
}
public void validateTransition(final QueryStatus newStatus) throws LensException {
if (!this.status.isValidTransition(newStatus.getStatus())) {
throw new LensException("Invalid state transition:from[" + this.status.getStatus() + " to "
+ newStatus.getStatus() + "]");
}
}
public boolean finished() {
return this.status.finished();
}
public boolean successful() {
return this.status.successful();
}
public boolean executed() {
return this.status.executed();
}
public boolean launched() {
return this.status.launched();
}
public boolean running() {
return this.status.running();
}
public boolean queued() {
return this.status.queued();
}
public ImmutableSet<QueryLaunchingConstraint> getSelectedDriverQueryConstraints() {
return getSelectedDriver().getQueryConstraints();
}
public synchronized void registerDriverResult(LensResultSet result) throws LensException {
if (isDriverResultRegistered) {
return; //already registered
}
log.info("Registering driver resultset for query {}", getQueryHandleString());
this.isDriverResultRegistered = true;
/*
* Check if results needs to be streamed to client in which case driver result needs to be wrapped in
* PartiallyFetchedInMemoryResultSet
*
* 1. Driver Result should be of type InMemory (for streaming) as only such results can be streamed fast
* 2. Query result should be server persistent. Only in this case, an early streaming is required by client
* that starts even before server level result persistence/formatting finishes.
* 3. When execiteTimeout is 0, it refers to an async query. In this case streaming result does not make sense
* 4. PREFETCH_INMEMORY_RESULTSET = true, implies client intends to get early streamed result
* 5. rowsToPreFetch should be > 0
*/
if (isPersistent && executeTimeoutMillis > 0
&& result instanceof InMemoryResultSet
&& conf.getBoolean(PREFETCH_INMEMORY_RESULTSET, DEFAULT_PREFETCH_INMEMORY_RESULTSET)) {
int rowsToPreFetch = conf.getInt(PREFETCH_INMEMORY_RESULTSET_ROWS, DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS);
if (rowsToPreFetch > 0) {
long executeTimeOutTime = submissionTime + executeTimeoutMillis;
if (System.currentTimeMillis() < executeTimeOutTime) {
this.driverResult = new PartiallyFetchedInMemoryResultSet((InMemoryResultSet) result, rowsToPreFetch);
return;
} else {
log.info("Skipping creation of PartiallyFetchedInMemoryResultSet as the query {} has already timed out",
getQueryHandleString());
}
}
}
this.driverResult = result;
}
public void setDriverStatus(DriverQueryStatus.DriverQueryState state, String message) {
if (getDriverStatus().getState().getOrder() > state.getOrder()) {
log.info("current driver status: {}, ignoring transition request to {}", getDriverStatus().getState(), state);
return;
}
switch (state) {
case NEW:
case INITIALIZED:
case PENDING:
getDriverStatus().setProgress(0.0);
case RUNNING:
if (getDriverStatus().getDriverStartTime() == null || getDriverStatus().getDriverStartTime() <= 0) {
getDriverStatus().setDriverStartTime(System.currentTimeMillis());
}
break;
case SUCCESSFUL:
case FAILED:
case CANCELED:
getDriverStatus().setProgress(1.0);
if (getDriverStatus().getDriverFinishTime() == null || getDriverStatus().getDriverFinishTime() <= 0) {
getDriverStatus().setDriverFinishTime(System.currentTimeMillis());
}
break;
default:
break;
}
if (message != null) {
if (state == DriverQueryStatus.DriverQueryState.FAILED) {
getDriverStatus().setErrorMessage(message);
} else {
getDriverStatus().setStatusMessage(message);
}
}
if (getDriverStatus().getStatusMessage() == null) {
getDriverStatus().setStatusMessage("Query " + getQueryHandleString() + " " + state.name().toLowerCase());
}
getDriverStatus().setState(state);
for (QueryDriverStatusUpdateListener listener : this.driverStatusUpdateListeners) {
listener.onDriverStatusUpdated(getQueryHandle(), getDriverStatus());
}
}
public String toString() {
return queryHandle + ":" + this.status;
}
public void setDriverStatus(DriverQueryStatus.DriverQueryState state) {
setDriverStatus(state, null);
}
public void registerStatusUpdateListener(QueryDriverStatusUpdateListener driverStatusUpdateListener) {
this.driverStatusUpdateListeners.add(driverStatusUpdateListener);
}
@Override
public long getLastFailedTime() {
if (getFailCount() == 0) {
return 0;
}
return getFailedAttempts().get(getFailedAttempts().size() - 1).getDriverFinishTime();
}
@Override
public int getFailCount() {
return getFailedAttempts().size();
}
public BackOffRetryHandler<QueryContext> getRetryPolicy() {
return driverRetryPolicy != null ? driverRetryPolicy : serverRetryPolicy;
}
public void extractFailedAttempt() {
extractFailedAttempt(getSelectedDriver());
}
public void extractFailedAttempt(LensDriver selectedDriver) {
failedAttempts.add(new FailedAttempt(selectedDriver.getFullyQualifiedName(), getDriverStatus().getProgress(),
getDriverStatus().getProgressMessage(), getDriverStatus().getErrorMessage(),
getDriverStatus().getDriverStartTime(), getDriverStatus().getDriverFinishTime()));
getDriverStatus().clear();
}
public boolean hasTimedout() {
if (status.running()) {
long runtimeMillis = System.currentTimeMillis() - launchTime;
long timeoutMillis = getSelectedDriverConf().getInt(QUERY_TIMEOUT_MILLIS, DEFAULT_QUERY_TIMEOUT_MILLIS);
return runtimeMillis > timeoutMillis;
}
return false;
}
}