blob: 7b94cc4bb839086d77bc8219c91cf7c87c88dde1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
import org.apache.drill.exec.physical.impl.scan.v3.schema.ScanSchemaTracker.ProjectionType;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
/**
* Implementation of the schema negotiation between scan operator and
* batch reader. Anticipates that the select list (and/or the list of
* predefined fields (implicit, partition) might be set by the scanner.
* For now, all readers have their own implementation of the select
* set.
* <p>
* Handles both early- and late-schema readers. Early-schema readers
* provide a table schema, late-schema readers do not.
* <p>
* If the reader (or, later, the scanner) has a SELECT list, then that
* select list is pushed down into the result set loader created for
* the reader.
* <p>
* Also handles parsing out various column types, filling in null
* columns and (via the vector cache), minimizing changes across
* readers. In the worst case, a reader might have a column "c" in
* one file, might skip "c" in the second file, and "c" may appear again
* in a third file. This negotiator, along with the scan projection
* and vector cache, "smoothes out" schema changes by preserving the vector
* for "c" across all three files. In the first and third files "c" is
* a vector written by the reader, in the second, it is a null column
* filled in by the scan projector (assuming, of course, that "c"
* is nullable or an array.)
*/
public class SchemaNegotiatorImpl implements SchemaNegotiator {
protected final ReaderLifecycle readerLifecycle;
protected TupleMetadata readerSchema;
protected boolean isSchemaComplete;
protected int batchSize = ValueVector.MAX_ROW_COUNT;
protected CustomErrorContext baseErrorContext;
protected CustomErrorContext readerErrorContext;
public SchemaNegotiatorImpl(ReaderLifecycle readerLifecycle) {
this.readerLifecycle = readerLifecycle;
this.baseErrorContext = readerLifecycle.scanLifecycle().errorContext();
}
@Override
public boolean isProjectionEmpty() {
return readerLifecycle.schemaTracker().projectionType() == ProjectionType.NONE;
}
@Override
public ProjectedColumn projectionFor(String colName) {
return readerLifecycle.scanLifecycle().schemaTracker().columnProjection(colName);
}
@Override
public TupleMetadata providedSchema() {
return readerLifecycle.scanOptions().providedSchema();
}
@Override
public TupleMetadata inputSchema() {
return readerLifecycle.readerInputSchema();
}
@Override
public OperatorContext context() {
return readerLifecycle.scanLifecycle().context();
}
@Override
public CustomErrorContext parentErrorContext() {
return baseErrorContext;
}
@Override
public CustomErrorContext errorContext() {
return readerErrorContext == null ? baseErrorContext : readerErrorContext;
}
@Override
public void setErrorContext(CustomErrorContext errorContext) {
this.readerErrorContext = errorContext;
}
@Override
public void tableSchema(TupleMetadata schema, boolean isComplete) {
readerSchema = schema;
isSchemaComplete = schema != null && isComplete;
}
@Override
public void tableSchema(TupleMetadata schema) {
readerSchema = schema;
}
@Override
public void schemaIsComplete(boolean isComplete) {
isSchemaComplete = isComplete;
}
public boolean isSchemaComplete() { return readerSchema != null && isSchemaComplete; }
@Override
public void batchSize(int maxRecordsPerBatch) {
batchSize = maxRecordsPerBatch;
}
@Override
public String userName() {
return readerLifecycle.scanOptions().userName();
}
/**
* Callback from the schema negotiator to build the schema from information from
* both the table and scan operator. Returns the result set loader to be used
* by the reader to write to the table's value vectors.
*
* @param schemaNegotiator builder given to the reader to provide it's
* schema information
* @return the result set loader to be used by the reader
*/
@Override
public ResultSetLoader build() {
return readerLifecycle.buildLoader();
}
public StaticBatchBuilder implicitColumnsLoader() {
return null;
}
@SuppressWarnings("unchecked")
public ManagedReader newReader(ReaderFactory<?> readerFactory) throws EarlyEofException {
return ((ReaderFactory<SchemaNegotiator>) readerFactory).next(this);
}
protected void onEndBatch() { }
}