| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hive.service.cli.operation; |
| |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; |
| import org.apache.hive.service.cli.FetchOrientation; |
| import org.apache.hive.service.cli.HiveSQLException; |
| import org.apache.hive.service.cli.OperationHandle; |
| import org.apache.hive.service.cli.OperationState; |
| import org.apache.hive.service.cli.OperationStatus; |
| import org.apache.hive.service.cli.OperationType; |
| import org.apache.hive.service.cli.RowSet; |
| import org.apache.hive.service.cli.SessionHandle; |
| import org.apache.hive.service.cli.TableSchema; |
| import org.apache.hive.service.rpc.thrift.TProtocolVersion; |
| import org.apache.livy.thriftserver.LivyThriftServer$; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class Operation { |
| protected final SessionHandle sessionHandle; |
| private volatile OperationState state = OperationState.INITIALIZED; |
| private final OperationHandle opHandle; |
| public static final FetchOrientation DEFAULT_FETCH_ORIENTATION = FetchOrientation.FETCH_NEXT; |
| public static final Logger LOG = LoggerFactory.getLogger(Operation.class.getName()); |
| protected boolean hasResultSet; |
| protected volatile HiveSQLException operationException; |
| protected volatile Future<?> backgroundHandle; |
| private ScheduledExecutorService scheduledExecutorService; |
| |
| private long operationTimeout; |
| private volatile long lastAccessTime; |
| private final long beginTime; |
| |
| protected long operationStart; |
| protected long operationComplete; |
| |
| protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET = |
| EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); |
| |
| |
| protected Operation(SessionHandle sessionHandle, OperationType opType) { |
| this(sessionHandle, null, opType); |
| } |
| |
| protected Operation(SessionHandle sessionHandle, |
| Map<String, String> confOverlay, OperationType opType) { |
| this.sessionHandle = sessionHandle; |
| this.opHandle = new OperationHandle(opType, sessionHandle.getProtocolVersion()); |
| beginTime = System.currentTimeMillis(); |
| lastAccessTime = beginTime; |
| HiveConf conf = LivyThriftServer$.MODULE$.getInstance().get().getHiveConf(); |
| operationTimeout = HiveConf.getTimeVar(conf, |
| HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); |
| scheduledExecutorService = Executors.newScheduledThreadPool(1); |
| |
| } |
| |
| public Future<?> getBackgroundHandle() { |
| return backgroundHandle; |
| } |
| |
| protected void setBackgroundHandle(Future<?> backgroundHandle) { |
| this.backgroundHandle = backgroundHandle; |
| } |
| |
| public boolean shouldRunAsync() { |
| return false; // Most operations cannot run asynchronously. |
| } |
| |
| public SessionHandle getSessionHandle() { |
| return sessionHandle; |
| } |
| |
| public OperationHandle getHandle() { |
| return opHandle; |
| } |
| |
| public TProtocolVersion getProtocolVersion() { |
| return opHandle.getProtocolVersion(); |
| } |
| |
| public OperationType getType() { |
| return opHandle.getOperationType(); |
| } |
| |
| public OperationStatus getStatus() { |
| String taskStatus = null; |
| try { |
| taskStatus = getTaskStatus(); |
| } catch (HiveSQLException sqlException) { |
| LOG.error("Error getting task status for " + opHandle.toString(), sqlException); |
| } |
| return new OperationStatus(state, taskStatus, operationStart, operationComplete, hasResultSet, operationException); |
| } |
| |
| public boolean hasResultSet() { |
| return hasResultSet; |
| } |
| |
| protected void setHasResultSet(boolean hasResultSet) { |
| this.hasResultSet = hasResultSet; |
| opHandle.setHasResultSet(hasResultSet); |
| } |
| |
| protected final OperationState setState(OperationState newState) throws HiveSQLException { |
| state.validateTransition(newState); |
| OperationState prevState = state; |
| this.state = newState; |
| onNewState(state, prevState); |
| this.lastAccessTime = System.currentTimeMillis(); |
| return this.state; |
| } |
| |
| public boolean isTimedOut(long current) { |
| if (operationTimeout == 0) { |
| return false; |
| } |
| if (operationTimeout > 0) { |
| // check only when it's in terminal state |
| return state.isTerminal() && lastAccessTime + operationTimeout <= current; |
| } |
| return lastAccessTime + -operationTimeout <= current; |
| } |
| |
| public long getLastAccessTime() { |
| return lastAccessTime; |
| } |
| |
| public long getOperationTimeout() { |
| return operationTimeout; |
| } |
| |
| public void setOperationTimeout(long operationTimeout) { |
| this.operationTimeout = operationTimeout; |
| } |
| |
| protected void setOperationException(HiveSQLException operationException) { |
| this.operationException = operationException; |
| } |
| |
| protected final void assertState(List<OperationState> states) throws HiveSQLException { |
| if (!states.contains(state)) { |
| throw new HiveSQLException("Expected states: " + states.toString() + ", but found " |
| + this.state); |
| } |
| this.lastAccessTime = System.currentTimeMillis(); |
| } |
| |
| public boolean isDone() { |
| return state.isTerminal(); |
| } |
| |
| /** |
| * Invoked before runInternal(). |
| * Set up some preconditions, or configurations. |
| */ |
| protected void beforeRun() { |
| // For Livy operations, this currently does nothing |
| } |
| |
| /** |
| * Invoked after runInternal(), even if an exception is thrown in runInternal(). |
| * Clean up resources, which was set up in beforeRun(). |
| */ |
| protected void afterRun() { |
| // For Livy operations, this currently does nothing |
| } |
| |
| /** |
| * Implemented by subclass of Operation class to execute specific behaviors. |
| * @throws HiveSQLException |
| */ |
| protected abstract void runInternal() throws HiveSQLException; |
| |
| public void run() throws HiveSQLException { |
| beforeRun(); |
| try { |
| runInternal(); |
| } finally { |
| afterRun(); |
| } |
| } |
| |
| public abstract void cancel(OperationState stateAfterCancel) throws HiveSQLException; |
| |
| public abstract void close() throws HiveSQLException; |
| |
| public abstract TableSchema getResultSetSchema() throws HiveSQLException; |
| |
| public abstract RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException; |
| |
| public String getTaskStatus() throws HiveSQLException { |
| return null; |
| } |
| |
| /** |
| * Verify if the given fetch orientation is part of the default orientation types. |
| * @param orientation |
| * @throws HiveSQLException |
| */ |
| protected void validateDefaultFetchOrientation(FetchOrientation orientation) |
| throws HiveSQLException { |
| validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET); |
| } |
| |
| /** |
| * Verify if the given fetch orientation is part of the supported orientation types. |
| * @param orientation |
| * @param supportedOrientations |
| * @throws HiveSQLException |
| */ |
| protected void validateFetchOrientation(FetchOrientation orientation, |
| EnumSet<FetchOrientation> supportedOrientations) throws HiveSQLException { |
| if (!supportedOrientations.contains(orientation)) { |
| throw new HiveSQLException("The fetch type " + orientation.toString() + |
| " is not supported for this resultset", "HY106"); |
| } |
| } |
| |
| protected HiveSQLException toSQLException(String prefix, CommandProcessorResponse response) { |
| HiveSQLException ex = new HiveSQLException(prefix + ": " + response.getErrorMessage(), |
| response.getSQLState(), response.getResponseCode()); |
| if (response.getException() != null) { |
| ex.initCause(response.getException()); |
| } |
| return ex; |
| } |
| |
| public long getBeginTime() { |
| return beginTime; |
| } |
| |
| protected OperationState getState() { |
| return state; |
| } |
| |
| protected void onNewState(OperationState state, OperationState prevState) { |
| switch(state) { |
| case RUNNING: |
| markOperationStartTime(); |
| break; |
| case ERROR: |
| case FINISHED: |
| case CANCELED: |
| markOperationCompletedTime(); |
| break; |
| } |
| } |
| |
| public long getOperationComplete() { |
| return operationComplete; |
| } |
| |
| public long getOperationStart() { |
| return operationStart; |
| } |
| |
| protected void markOperationStartTime() { |
| operationStart = System.currentTimeMillis(); |
| } |
| |
| protected void markOperationCompletedTime() { |
| operationComplete = System.currentTimeMillis(); |
| } |
| } |