blob: 7193d1c41f9a446956575ee805dd2483a9018251 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.protocol;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* State machine that drives the operator executable. Converts
* between the iterator protocol and the operator executable protocol.
* Implemented as a separate class in anticipation of eventually
* changing the record batch (iterator) protocol.
*
* <h4>Schema-Only Batch</h4>
*
* The scan operator is designed to provide an initial, empty, schema-only
* batch. At the time that this code was written, it was (mis-?) understood
* that Drill used a "fast schema" path that provided a schema-only batch
* as the first batch. However, it turns out that most operators fail when
* presented with an empty batch: many do not properly set the offset
* vector for variable-width vectors to an initial 0 position, causing all
* kinds of issues.
* <p>
* To work around this issue, the code defaults to *not* providing the
* schema batch.
*/
public class OperatorDriver {
private static final Logger logger = LoggerFactory.getLogger(OperatorDriver.class);
public enum State {
/**
* Before the first call to next().
*/
START,
/**
* Attempting to start the operator.
*/
STARTING,
/**
* Read from readers.
*/
RUN,
/**
* No more data to deliver.
*/
END,
/**
* An error occurred.
*/
FAILED,
/**
* Operation was cancelled. No more batches will be returned,
* but close() has not yet been called.
*/
CANCELED,
/**
* close() called and resources are released. No more batches
* will be returned, but close() has not yet been called.
* (This state is semantically identical to FAILED, it exists just
* in case an implementation needs to know the difference between the
* END, FAILED and CANCELED states.)
*/
CLOSED
}
private OperatorDriver.State state = State.START;
/**
* Operator context. The driver "owns" the context and is responsible
* for closing it.
*/
private final OperatorContext opContext;
private final OperatorExec operatorExec;
private final BatchAccessor batchAccessor;
private int schemaVersion;
private final boolean enableSchemaBatch;
public OperatorDriver(OperatorContext opContext, OperatorExec opExec, boolean enableSchemaBatch) {
this.opContext = opContext;
this.operatorExec = opExec;
batchAccessor = operatorExec.batchAccessor();
this.enableSchemaBatch = enableSchemaBatch;
}
/**
* Get the next batch. Performs initialization on the first call.
* @return the iteration outcome to send downstream
*/
public IterOutcome next() {
try {
switch (state) {
case START:
return start();
case RUN:
return doNext();
default:
logger.debug("Extra call to next() in state {}: {}", state, operatorLabel());
return IterOutcome.NONE;
}
} catch (UserException e) {
cancelSilently();
state = State.FAILED;
throw e;
} catch (Throwable t) {
cancelSilently();
state = State.FAILED;
throw UserException.executionError(t)
.addContext("Exception thrown from", operatorLabel())
.build(logger);
}
}
/**
* Cancels the operator before reaching EOF.
*/
public void cancel() {
try {
switch (state) {
case START:
case RUN:
cancelSilently();
break;
default:
break;
}
} finally {
state = State.CANCELED;
}
}
/**
* Start the operator executor. Bind it to the various contexts.
* Then start the executor and fetch the first schema.
* @return result of the first batch, which should contain
* only a schema, or EOF
*/
private IterOutcome start() {
state = State.STARTING;
schemaVersion = -1;
if (!enableSchemaBatch) {
return doNext();
}
if (operatorExec.buildSchema()) {
schemaVersion = batchAccessor.schemaVersion();
// Report schema change.
batchAccessor.container().schemaChanged();
state = State.RUN;
return IterOutcome.OK_NEW_SCHEMA;
} else {
state = State.END;
return IterOutcome.NONE;
}
}
/**
* Fetch a record batch, detecting EOF and a new schema.
* @return the <tt>IterOutcome</tt> for the above cases
*/
private IterOutcome doNext() {
if (! operatorExec.next()) {
state = State.END;
return IterOutcome.NONE;
}
int newVersion = batchAccessor.schemaVersion();
boolean schemaChanged = newVersion != schemaVersion;
// Set the container schema changed based on whether the
// current schema differs from that the last time through
// this method. That is, we take "schema changed" to be
// "schema changed since last call to next." The result hide
// trivial changes within this operator.
if (schemaChanged) {
batchAccessor.container().schemaChanged();
}
if (state == State.STARTING || schemaChanged) {
schemaVersion = newVersion;
state = State.RUN;
return IterOutcome.OK_NEW_SCHEMA;
}
state = State.RUN;
return IterOutcome.OK;
}
/**
* Implement a cancellation, and ignore any exception that is
* thrown. We're already in trouble here, no need to keep track
* of additional things that go wrong.
* <p>
* Cancellation is done only if the operator is doing work.
* The request is not propagated if either the operator never
* started, or is already finished.
*/
private void cancelSilently() {
try {
if (state == State.STARTING || state == State.RUN) {
operatorExec.cancel();
}
} catch (Throwable t) {
// Ignore; we're already in a bad state.
logger.error("Exception thrown from cancel() for {}", operatorLabel(), t);
}
}
private String operatorLabel() {
return operatorExec.getClass().getCanonicalName();
}
public void close() {
if (state == State.CLOSED) {
return;
}
try {
operatorExec.close();
} catch (UserException e) {
throw e;
} catch (Throwable t) {
throw UserException.executionError(t)
.addContext("Exception thrown from", operatorLabel())
.build(logger);
} finally {
opContext.close();
state = State.CLOSED;
}
}
public BatchAccessor batchAccessor() {
return batchAccessor;
}
public OperatorContext operatorContext() {
return opContext;
}
}