blob: 866b988d9f2625b06818c4a8a6556be4ea99fe3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.accessor.InvalidConversionError;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages a row batch reader through its lifecycle. Created when the reader
* is opened, discarded when the reader is closed. Encapsulates state that
* follows the life of the reader. This moves common scanner state out of
* each reader in order to make the reader as simple as possible.
* <p>
* This class is private to the scan operator and is not meant to be used,
* or even visible, outside of that operator itself. Instead, all reader-specific
* functionality should be in the {@link RowBatchReader} subclass.
* <p>
* Each reader is managed in the context of a scan operator. A prior reader
* may have established a schema that this reader will (hopefully) continue
* to use. This reader may have no data (and thus no schema). The scan
* operator will try to quietly pass over this reader, using a schema from
* either a prior reader or a later reader.
*
* <h4>Error Handling</h4>
*
* Handles all possible reader errors to control the exceptions thrown
* to callers. Drill uses unchecked exceptions which makes error handling a
* bit hard to follow. The reader can report any kind of error. If might
* nicely report an (unchecked) UserException with details of the failure.
* Or, it might throw some other unchecked exception (NPE, illegal state
* or whatever.) This method ensues that all errors are mapped to a
* UserException for two reasons. First, this ensures that errors are
* attributed to the reader itself. Second, callers need not also go
* through the "which kind of exception" dance: they are assured that
* only UserException is thrown with proper explanations filled in.
*
* <h4>Schema Handling</h4>
*
* Readers can report schema in one of two ways. An "early schema" reader
* reports the schema during the reader open. For example, Parquet can
* obtain the schema from the file header. CSV with headers finds the
* schema by reading the header row. This is the "happy path": the
* reader provides a schema on open, calls to next() just provide data.
* <p>
* "Late schema" readers are more complex. Opening the reader is not
* sufficient to discover the schema. Instead, the reader has to read a
* batch of data in order to infer the schema. The scanner, however, must
* report schema before the first row of data. This class acts as a shim:
* it will read ahead one batch of data on open in order to obtain a
* schema. It will then deliver that "look-ahead" batch on the first
* call to next().
* <p>
* Readers are free to alter the schema at any time by discovering new
* columns. (Think JSON.) Considerable complexity (mostly implemented
* elsewhere) is needed to properly handle this case, and to do so in
* a way that is transparent to the reader.
* <p>
* Because schema handling is complex, the logic is split into two
* parts. This class orchestrates schema actions. A reader-level shim
* handles the details of schema coordination in a way that is specific
* to each kind of reader (file reader, generic reader, etc.)
*
* <h4>Batch Overflow Handling<h4>
*
* A key driving principle of this design is the need to limit batch sizes.
* Readers, in general, cannot easily determine when to stop reading rows
* to just fill a batch, minimizing internal fragmentation. Instead, this
* is the job of the result set loader. This class simply needs to be
* aware that, even if the reader itself says it may have no more data,
* there could still be an overflow row in the look-ahead batch maintained
* by the result set loader. This class hides that complexity from the
* reader, but provides a clean protocol for the scan operator.
*
* <h4>Advice to Developers</hr>
*
* The protocol supported here creates a very
* simple API for readers to implement. We expect contributors to create
* a large number of readers: our job is to maintain a very clean API
* that hides as many routine details as possible from the reader
* author.
* <p>
* While Drill supports many readers, there is just one scan operator,
* with this class as the shim between the two. When considering changes,
* understand how those changes will affect a wide range of readers. A fix
* that make sense for one might not make sense for another.
* <p>
* Some of the considerations include:
* <ul>
* <li>Keep the number of methods small: focus on the essentials.</li>
* <li>Most of the complex interplay between scan operator and reader
* is due to schema negotiation. Handle all that via the schema
* negotiator class rather than by adding methods to the reader
* interface.</li>
* <li>As explained above, error handling is complex due to the
* use of unchecked exceptions. Encourage reader authors to handle all
* errors themselves by throwing a UserException that clearly states
* the problem, if it is caused by the user's query, by an environment
* issue or other item that the user can correct. Use any Java
* exception to report unexpected errors (assertions, NPEs, etc.) This
* class wraps all those errors handle whatever the author chooses to
* do while providing a clean error interface to callers.</li>
* <li>Readers are simplest if the don't have to worry about repeated
* calls to next() after the reader reports EOF. This class buffers
* those calls to enforce the protocol.</li>
* <li>Handles both early and late schema readers to provide a uniform
* API, and to allow odd-ball hybrid cases.</li>
* </ul>
* When considering changes and fixes to this class (or to the readers),
* think about how to preserve the clean separation that this class
* attempts to provide.
*/
class ReaderState {
static final Logger logger = LoggerFactory.getLogger(ReaderState.class);
private enum State {
/**
* Initial state before opening the reader.
*/
START,
/**
* The scan operator is obligated to provide a "fast schema", without data,
* before the first row of data. "Early schema" readers (those that provide
* a schema) can simply provide the schema. However "late schema" readers
* (those that discover the schema during read) must fetch a batch of rows
* to infer schema.
* <p>
* Note that the reader must fetch an entire batch of rows, not just sample
* the first dozen or so. This allows the "schema smoothing" features to work,
* such as finding the first non-null row in JSON, etc. The larger the sample
* size, the more likely that ambiguities can be resolved (though there is
* no guarantee, unfortunately.)
* <p>
* This state indicates that the reader has read the look-ahead batch so that
* the next call to {@link ReaderState#next()} will return this look-ahead
* batch rather than reading a new one.
*/
LOOK_AHEAD,
/**
* As above, but the reader hit EOF during the read of the look-ahead batch.
* The {@link ReaderState#next()} checks if the lookahead batch has any
* rows. If so it will return it and move to the EOF state.
* <p>
* The goal of the EOF states is to avoid calling the reader's next
* method after the reader reports EOF. This is done to avoid the need
* for readers to handle repeated reads after EOF.
* <p>
* Note that the look-ahead batch here is distinct from the look-ahead
* row in the result set loader. That look-ahead is handled by the
* (shim) reader which this class manages.
*/
LOOK_AHEAD_WITH_EOF,
/**
* Normal state: the reader has supplied data but not yet reported EOF.
*/
ACTIVE,
/**
* The reader has reported EOF. No look-ahead batch is active. The
* reader's next() method will no longer be called.
*/
EOF,
/**
* The reader is closed: no further operations are allowed.
*/
CLOSED
};
final ScanOperatorExec scanOp;
private final RowBatchReader reader;
private State state = State.START;
private VectorContainer lookahead;
public ReaderState(ScanOperatorExec scanOp, RowBatchReader reader) {
this.scanOp = scanOp;
this.reader = reader;
}
/**
* Open the next available reader, if any, preparing both the
* reader and table loader.
*
* @return true if the reader has data, false if EOF was found on
* open
* @throws UserException for all errors
*/
boolean open() {
// Open the reader. This can fail. if it does, clean up.
try {
// The reader can return a "soft" failure: the open worked, but
// the file is empty, non-existent or some other form of "no data."
// Handle this by immediately moving to EOF. The scanner will quietly
// pass over this reader and move onto the next, if any.
if (! reader.open()) {
state = State.EOF;
return false;
}
// When catching errors, leave the reader member set;
// we must close it on close() later.
} catch (UserException e) {
// Throw user exceptions as-is
throw e;
} catch (UnsupportedConversionError e) {
// Occurs if the provided schema asks to convert a reader-provided
// schema in a way that Drill (or the reader) cannot support.
// Example: implicit conversion of a float to an INTERVAL
// In such a case, there are no "natural" rules, a reader would have
// to provide ad-hoc rules or no conversion is possible.
throw UserException.validationError(e)
.message("Invalid runtime type conversion")
.build(logger);
} catch (Throwable t) {
// Wrap all others in a user exception.
throw UserException.executionError(t)
.addContext("Open failed for reader", reader.name())
.build(logger);
}
state = State.ACTIVE;
return true;
}
/**
* Prepare the schema for this reader. Called for the first reader within a
* scan batch, if the reader returns <tt>true</tt> from <tt>open()</tt>.
* Asks the reader if it can provide a schema-only empty batch by calling
* the reader's <tt>defineSchema()</tt> method. If this is an early-schema
* reader, and it can provide a schema, then it should create an empty
* batch so that the the result set loader already has
* the proper value vectors set up. If this is a late-schema reader, we must
* read one batch to get the schema, then set aside the data for the next
* call to <tt>next()</tt>.
* <p>
* Semantics for all readers:
* <ul>
* <li>If the file was not found, <tt>open()</tt> returned false and this
* method should never be called.</li>
* </ul>
* <p>
* Semantics for early-schema readers:
* <ul>
* <li>If if turned out that the file was
* empty when trying to read the schema, <tt>open()</tt> returned false
* and this method should never be called.</tt>
* <li>Otherwise, the reader does not know if it is the first reader or
* not. The call to <tt>defineSchema()</tt> notifies the reader that it
* is the first one. The reader should set up in the result set loader
* with an empty batch.
* </ul>
* <p>
* Semantics for late-schema readers:
* <ul>
* <li>This method will ask the reader to
* read a batch. If the reader hits EOF before finding any data, this method
* will return false, indicating that no schema is available.</li>
* <li>If the reader can read enough of the file to
* figure out the schema, but the file has no data, then this method will
* return <tt>true</tt> and a schema will be available. The first call to
* <tt>next()</tt> will report EOF.</li>
* <li>Otherwise, this method returns true, sets up an empty batch with the
* schema, saves the data batch, and will return that look-ahead batch on the
* first call to <tt>next()</tt>.</li>
* </ul>
* @return true if the schema was read, false if EOF was reached while trying
* to read the schema.
* @throws UserException for all errors
*/
protected boolean buildSchema() {
if (reader.defineSchema()) {
// Bind the output container to the output of the scan operator.
// This returns an empty batch with the schema filled in.
scanOp.containerAccessor.setSchema(reader.output());
return true;
}
// Late schema. Read a batch.
if (! next()) {
return false;
}
VectorContainer container = reader.output();
if (container.getRecordCount() == 0) {
return true;
}
// The reader returned actual data. Just forward the schema
// in the operator's container, saving the data for next time
// in a dummy container.
assert lookahead == null;
lookahead = new VectorContainer(scanOp.context.getAllocator(), scanOp.containerAccessor.getSchema());
lookahead.setRecordCount(0);
lookahead.exchange(scanOp.containerAccessor.getOutgoingContainer());
state = state == State.EOF ? State.LOOK_AHEAD_WITH_EOF : State.LOOK_AHEAD;
return true;
}
/**
* Read another batch of data from the reader. Can be called repeatedly
* even after EOF. Handles look-ahead batches.
*
* @return true if a batch of rows is available in the scan operator,
* false if EOF was hit
* @throws UserException for all reader errors
*/
protected boolean next() {
switch (state) {
case LOOK_AHEAD:
case LOOK_AHEAD_WITH_EOF:
// Use batch previously read.
assert lookahead != null;
lookahead.exchange(scanOp.containerAccessor.getOutgoingContainer());
assert lookahead.getRecordCount() == 0;
lookahead = null;
if (state == State.LOOK_AHEAD_WITH_EOF) {
state = State.EOF;
} else {
state = State.ACTIVE;
}
return true;
case ACTIVE:
return readBatch();
case EOF:
return false;
default:
throw new IllegalStateException("Unexpected state: " + state);
}
}
/**
* Read a batch from the reader.
* <p>
* Expected semantics for the reader's <tt>next()</tt> method:
* <ul>
* <li>Non-empty batch and return true: data returned and more
* data is (probably) available.</li>
* <li>Empty batch and return true: data returned but it is the last
* batch; EOF was reached while reading the batch.</li>
* <li>Empty batch and return false: EOF reached, discard the
* empty batch. (An inefficient way to indicate EOF since a set
* of vectors is allocated, then discarded. The previous result
* is preferred when possible.</li>
* <li>Empty batch and return true: An odd case that is allowed;
* the batch is discarded and <tt>next()</tt> is called again.</li>
* </ul>
* In short:
* <ul>
* <li>A non-empty batch says that there is data to return.</li>
* <li>The return code says whether <tt>next()</tt> should be called
* again.</li>
* </ul>
*
* @return true if a batch was read, false if the reader hit EOF
* @throws UserException for all reader errors
*/
private boolean readBatch() {
// Try to read a batch. This may fail. If so, clean up the mess.
boolean more;
try {
more = reader.next();
} catch (UserException e) {
throw e;
} catch (InvalidConversionError e) {
// Occurs when a specific data value to be converted to another type
// is not valid for that conversion. For example, providing the value
// "foo" to a string-to-int conversion.
throw UserException.unsupportedError(e)
.message("Invalid data value for automatic type conversion")
.addContext("Read failed for reader", reader.name())
.build(logger);
} catch (Throwable t) {
throw UserException.executionError(t)
.addContext("Read failed for reader", reader.name())
.build(logger);
}
VectorContainer output = reader.output();
if (! more) {
state = State.EOF;
// The reader can indicate EOF (they can't return any more rows)
// while returning a non-empty final batch. This is the typical
// case with files: the reader read some records and then hit
// EOF. Avoids the need for the reader to keep an EOF state.
if (output.getRecordCount() == 0) {
// No results, possibly from the first batch.
// If the scan has no schema, but this (possibly empty) reader
// does have a schema, then pass along this empty batch
// as a candidate empty result set of the entire scan.
if (scanOp.containerAccessor.schemaVersion() == 0 &&
reader.schemaVersion() > 0) {
scanOp.containerAccessor.setSchema(output);
}
return false;
}
// EOF (the reader can provide no more batches), but
// the reader did provide rows in this batch. Fall through.
}
// Late schema readers may change their schema between batches.
// Early schema changes only on the first batch of the next
// reader. (This is not a hard and fast rule, only a definition:
// a reader that starts with a schema, but later changes it, has
// morphed from an early- to late-schema reader.)
scanOp.containerAccessor.addBatch(output);
return true;
}
/**
* Close the current reader. The hard part is handling the possible
* error conditions, and cleaning up despite those errors.
*/
void close() {
if (state == State.CLOSED) {
return;
}
// Close the reader. This can fail.
try {
reader.close();
} catch (UserException e) {
throw e;
} catch (Throwable t) {
throw UserException.executionError(t)
.addContext("Close failed for reader", reader.name())
.build(logger);
} finally {
// Will not throw exceptions
if (lookahead != null) {
lookahead.clear();
lookahead = null;
}
state = State.CLOSED;
}
}
}