blob: 7e795dd55361a7bd7c2281e81f01247aaa3673f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.validate;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.VectorValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
private static final Logger logger =
LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
static final boolean VALIDATE_VECTORS = true;
/** For logging/debugability only. */
private static volatile int instanceCount;
/** @see org.apache.drill.exec.physical.config.IteratorValidator */
private final boolean isRepeatable;
/** For logging/debugability only. */
private final int instNum;
{
instNum = ++instanceCount;
}
/**
* The upstream batch, calls to which and return values from which are
* checked by this validator.
*/
private final RecordBatch incoming;
/** Incoming batch's type (simple class name); for logging/debugability
* only. */
private final String batchTypeName;
/** Exception state of incoming batch; last value thrown by its next()
* method. */
private Throwable exceptionState;
/** Main state of incoming batch; last value returned by its next() method. */
private IterOutcome batchState;
/** Last schema retrieved after OK_NEW_SCHEMA or OK from next(). Null if none
* yet. Currently for logging/debugability only. */
private BatchSchema lastSchema;
/** Last schema retrieved after OK_NEW_SCHEMA from next(). Null if none yet.
* Currently for logging/debugability only. */
private BatchSchema lastNewSchema;
/**
* {@link IterOutcome} return value sequence validation state.
* (Only needs enough to validate returns of OK.)
*/
private enum ValidationState {
/** Initial state: Have not gotten any OK_NEW_SCHEMA yet and not
* terminated. OK is not allowed yet. */
INITIAL_NO_SCHEMA,
/** Have gotten OK_NEW_SCHEMA already and not terminated. OK is allowed
* now. */
HAVE_SCHEMA,
/** Terminal state: Have seen NONE or STOP. Nothing more is allowed. */
TERMINAL
}
/** High-level IterOutcome sequence state. */
private ValidationState validationState = ValidationState.INITIAL_NO_SCHEMA;
/**
* Enable/disable per-batch vector validation. Enable only to debug vector
* corruption issues.
*/
private boolean validateBatches;
public IteratorValidatorBatchIterator(RecordBatch incoming, boolean isRepeatable) {
this.incoming = incoming;
batchTypeName = incoming.getClass().getSimpleName();
this.isRepeatable = isRepeatable;
// (Log construction and close() at same level to bracket instance's activity.)
logger.trace( "[#{}; on {}; repeatable: {}]: Being constructed.", instNum, batchTypeName, isRepeatable);
}
public IteratorValidatorBatchIterator(RecordBatch incoming) {
this(incoming, false);
}
public void enableBatchValidation(boolean option) {
validateBatches = option;
}
@Override
public String toString() {
return
super.toString()
+ "["
+ "instNum = " + instNum
+ ", validationState = " + validationState
+ ", batchState = " + batchState
+ ", ... "
+ "; incoming = " + incoming
+ "]";
}
private void validateReadState(String operation) {
if (batchState == null) {
throw new IllegalStateException(
String.format(
"Batch data read operation (%s) attempted before first next() call"
+ " on batch [#%d, %s].",
operation, instNum, batchTypeName));
}
switch (batchState) {
case OK:
case OK_NEW_SCHEMA:
case NONE:
case EMIT:
return;
default:
throw new IllegalStateException(
String.format(
"Batch data read operation (%s) attempted when last next() call"
+ " on batch [#%d, %s] returned %s (not %s or %s).",
operation, instNum, batchTypeName, batchState, OK, OK_NEW_SCHEMA));
}
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
validateReadState("iterator()");
return incoming.iterator();
}
@Override
public FragmentContext getContext() {
return incoming.getContext();
}
@Override
public BatchSchema getSchema() {
return incoming.getSchema();
}
@Override
public int getRecordCount() {
validateReadState("getRecordCount()");
return incoming.getRecordCount();
}
@Override
public void kill(boolean sendUpstream) {
incoming.kill(sendUpstream);
}
@Override
public SelectionVector2 getSelectionVector2() {
validateReadState("getSelectionVector2()");
return incoming.getSelectionVector2();
}
@Override
public SelectionVector4 getSelectionVector4() {
validateReadState("getSelectionVector4()");
return incoming.getSelectionVector4();
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
validateReadState("getValueVectorId(SchemaPath)");
return incoming.getValueVectorId(path);
}
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
// validateReadState(); TODO fix this
return incoming.getValueAccessorById(clazz, ids);
}
@Override
public IterOutcome next() {
logger.trace( "[#{}; on {}]: next() called.", instNum, batchTypeName);
final IterOutcome prevBatchState = batchState;
try {
// Check whether next() should even have been called in current state.
if (null != exceptionState) {
throw new IllegalStateException(
String.format(
"next() [on #%d; %s] called again after it threw %s (after"
+ " returning %s). Caller should not have called next() again.",
instNum, batchTypeName, exceptionState, batchState));
}
// (Note: This could use validationState.)
if ((!isRepeatable && batchState == NONE) || batchState == STOP) {
throw new IllegalStateException(
String.format(
"next() [on #%d, %s] called again after it returned %s."
+ " Caller should not have called next() again.",
instNum, batchTypeName, batchState));
}
// Now get result from upstream next().
batchState = incoming.next();
logger.trace("[#{}; on {}]: incoming next() return: ({} ->) {}",
instNum, batchTypeName, prevBatchState, batchState);
// Check state transition and update high-level state.
switch (batchState) {
case OK_NEW_SCHEMA:
// OK_NEW_SCHEMA is allowed at any time, except if terminated (checked
// above).
// OK_NEW_SCHEMA moves to have-seen-schema state.
validationState = ValidationState.HAVE_SCHEMA;
validateBatch();
break;
case OK:
case EMIT:
// OK is allowed as long as OK_NEW_SCHEMA was seen, except if terminated
// (checked above).
if (validationState != ValidationState.HAVE_SCHEMA) {
throw new IllegalStateException(
String.format(
"next() returned %s without first returning %s [#%d, %s]",
batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
}
validateBatch();
// OK doesn't change high-level state.
break;
case NONE:
// NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
// FAST NONE.
// NONE moves to TERMINAL high-level state if NOT repeatable.
if (!isRepeatable) {
validationState = ValidationState.TERMINAL;
}
break;
case STOP:
// STOP is allowed at any time, except if already terminated (checked
// above).
// STOP moves to terminal high-level state.
validationState = ValidationState.TERMINAL;
break;
case NOT_YET:
// NOT_YET is allowed at any time, except if
// terminated (checked above).
// NOT_YET doesn't change high-level state.
break;
default:
throw new AssertionError(
"Unhandled new " + IterOutcome.class.getSimpleName() + " value "
+ batchState);
//break;
}
// Validate schema when available.
if (batchState == OK || batchState == OK_NEW_SCHEMA) {
final BatchSchema prevLastNewSchema = lastNewSchema;
lastSchema = incoming.getSchema();
if (batchState == OK_NEW_SCHEMA) {
lastNewSchema = lastSchema;
}
if (logger.isTraceEnabled()) {
logger.trace("[#{}; on {}]: incoming next() return: #records = {}, "
+ "\n schema:"
+ "\n {}, "
+ "\n prev. new ({}):"
+ "\n {}",
instNum, batchTypeName, incoming.getRecordCount(),
lastSchema,
lastSchema.equals(prevLastNewSchema) ? "equal" : "not equal",
prevLastNewSchema);
}
if (lastSchema == null) {
throw new IllegalStateException(
String.format(
"Incoming batch [#%d, %s] has a null schema. This is not allowed.",
instNum, batchTypeName));
}
// It's legal for a batch to have zero field. For instance, a relational table could have
// zero columns. Querying such table requires execution operator to process batch with 0 field.
if (incoming.getRecordCount() > MAX_BATCH_ROW_COUNT) {
throw new IllegalStateException(
String.format(
"Incoming batch [#%d, %s] has size %d, which is beyond the"
+ " limit of %d",
instNum, batchTypeName, incoming.getRecordCount(), MAX_BATCH_ROW_COUNT
));
}
if (VALIDATE_VECTORS) {
VectorValidator.validate(incoming);
}
}
return batchState;
} catch (RuntimeException | Error e) {
exceptionState = e;
logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}",
instNum, batchTypeName, prevBatchState, exceptionState,
exceptionState);
throw e;
}
}
private void validateBatch() {
if (validateBatches || VALIDATE_VECTORS) {
if (! BatchValidator.validate(incoming)) {
throw new IllegalStateException(
"Batch validation failed. Source operator: " +
incoming.getClass().getSimpleName());
}
// The following validation currently calculates, and discards
// a hash code. Since it requires manual checking, it is
// disabled by default.
// VectorValidator.validate(incoming);
}
}
@Override
public WritableBatch getWritableBatch() {
validateReadState("getWritableBatch()");
return incoming.getWritableBatch();
}
@Override
public void close() {
// (Log construction and close() calls at same logging level to bracket
// instance's activity.)
logger.trace( "[#{}; on {}]: close() called, state = {} / {}.",
instNum, batchTypeName, batchState, exceptionState);
}
@Override
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(
String.format("You should not call getOutgoingContainer() for class %s",
this.getClass().getCanonicalName()));
}
@Override
public VectorContainer getContainer() {
return incoming.getContainer();
}
public RecordBatch getIncoming() { return incoming; }
@Override
public boolean hasFailed() {
return exceptionState != null || batchState == STOP;
}
@Override
public void dump() {
logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, "
+ "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema);
}
}