blob: f464b274f7c973b33e4f50935ab9c88bb324e880 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.DrillBuf;
/**
* Record batch used for a particular scan. Operators against one or more
*/
public class ScanBatch implements CloseableRecordBatch {
private static final Logger logger = LoggerFactory.getLogger(ScanBatch.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScanBatch.class);
/** Main collection of fields' value vectors. */
private final VectorContainer container = new VectorContainer();
private int recordCount;
private final FragmentContext context;
private final OperatorContext oContext;
private Iterator<? extends RecordReader> readers;
private RecordReader currentReader;
private BatchSchema schema;
private final Mutator mutator;
private boolean done;
private final Iterator<Map<String, String>> implicitColumns;
private Map<String, String> implicitValues;
private final BufferAllocator allocator;
private final List<Map<String, String>> implicitColumnList;
private String currentReaderClassName;
private final RecordBatchStatsContext batchStatsContext;
// Represents last outcome of next(). If an Exception is thrown
// during the method's execution a value IterOutcome.STOP will be assigned.
private IterOutcome lastOutcome;
private List<RecordReader> readerList; // needed for repeatable scanners
private boolean isRepeatableScan; // needed for repeatable scanners
/**
*
* @param context
* @param oContext
* @param readerList
* @param implicitColumnList : either an empty list when all the readers do not have implicit
* columns, or there is a one-to-one mapping between reader and implicitColumns.
*/
public ScanBatch(FragmentContext context,
OperatorContext oContext, List<? extends RecordReader> readerList,
List<Map<String, String>> implicitColumnList) {
this.context = context;
this.readers = readerList.iterator();
this.implicitColumns = implicitColumnList.iterator();
if (!readers.hasNext()) {
throw UserException.internalError(
new ExecutionSetupException("A scan batch must contain at least one reader."))
.build(logger);
}
this.oContext = oContext;
allocator = oContext.getAllocator();
mutator = new Mutator(oContext, allocator, container);
oContext.getStats().startProcessing();
try {
if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) {
Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, "
+ "or implicit columns are not same across all the record readers!");
throw UserException.internalError(ex)
.addContext("Setup failed for", readerList.get(0).getClass().getSimpleName())
.build(logger);
}
this.implicitColumnList = implicitColumnList;
addImplicitVectors();
currentReader = null;
batchStatsContext = new RecordBatchStatsContext(context, oContext);
} finally {
oContext.getStats().stopProcessing();
}
}
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
List<RecordReader> readers)
throws ExecutionSetupException {
this(context, context.newOperatorContext(subScanConfig),
readers, Collections.<Map<String, String>> emptyList());
}
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
List<RecordReader> readerList, boolean isRepeatableScan)
throws ExecutionSetupException {
this(context, context.newOperatorContext(subScanConfig),
readerList, Collections.<Map<String, String>> emptyList());
this.readerList = readerList;
this.isRepeatableScan = isRepeatableScan;
}
@Override
public FragmentContext getContext() {
return context;
}
public OperatorContext getOperatorContext() { return oContext; }
@Override
public BatchSchema getSchema() {
return schema;
}
@Override
public int getRecordCount() {
return recordCount;
}
@Override
public void kill(boolean sendUpstream) {
if (sendUpstream) {
done = true;
} else {
releaseAssets();
}
}
/**
* This method is to perform scan specific actions when the scan needs to
* clean/reset readers and return NONE status
*
* @return NONE
*/
private IterOutcome cleanAndReturnNone() {
if (isRepeatableScan) {
readers = readerList.iterator();
} else {
releaseAssets(); // All data has been read. Release resource.
done = true;
}
return IterOutcome.NONE;
}
/**
* When receive zero record from current reader, update reader accordingly,
* and return the decision whether the iteration should continue
* @return whether we could continue iteration
* @throws Exception
*/
private boolean shouldContinueAfterNoRecords() throws Exception {
logger.trace("scan got 0 record.");
if (isRepeatableScan) {
if (!currentReader.hasNext()) {
currentReader = null;
readers = readerList.iterator();
return false;
}
return true;
} else { // Regular scan
currentReader.close();
currentReader = null;
return true; // In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop
}
}
private IterOutcome internalNext() throws Exception {
while (true) {
if (currentReader == null && !getNextReaderIfHas()) {
logger.trace("currentReader is null");
return cleanAndReturnNone();
}
injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
currentReader.allocate(mutator.fieldVectorMap());
recordCount = currentReader.next();
logger.trace("currentReader.next return recordCount={}", recordCount);
Preconditions.checkArgument(recordCount >= 0, "recordCount from RecordReader.next() should not be negative");
boolean isNewSchema = mutator.isNewSchema();
populateImplicitVectors();
mutator.container.setValueCount(recordCount);
oContext.getStats().batchReceived(0, recordCount, isNewSchema);
boolean toContinueIter = true;
if (recordCount == 0) {
// If we got 0 record, we may need to clean and exit, but we may need to return a new schema in below code block,
// so we use toContinueIter to mark the decision whether we should continue the iteration
toContinueIter = shouldContinueAfterNoRecords();
}
if (isNewSchema) {
// Even when recordCount = 0, we should return return OK_NEW_SCHEMA if current reader presents a new schema.
// This could happen when data sources have a non-trivial schema with 0 row.
container.buildSchema(SelectionVectorMode.NONE);
schema = container.getSchema();
lastOutcome = IterOutcome.OK_NEW_SCHEMA;
return lastOutcome;
}
// Handle case of same schema.
if (recordCount == 0) {
if (toContinueIter) {
continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
} else {
// Return NONE if recordCount == 0 && !isNewSchema
lastOutcome = IterOutcome.NONE;
return lastOutcome;
}
} else {
// return OK if recordCount > 0 && ! isNewSchema
lastOutcome = IterOutcome.OK;
return lastOutcome;
}
}
}
@Override
public IterOutcome next() {
if (done) {
lastOutcome = IterOutcome.NONE;
return lastOutcome;
}
oContext.getStats().startProcessing();
try {
return internalNext();
} catch (OutOfMemoryException ex) {
clearFieldVectorMap();
lastOutcome = IterOutcome.STOP;
throw UserException.memoryError(ex).build(logger);
} catch (ExecutionSetupException e) {
if (currentReader != null) {
try {
currentReader.close();
} catch (final Exception e2) {
logger.error("Close failed for reader " + currentReaderClassName, e2);
}
}
lastOutcome = IterOutcome.STOP;
throw UserException.internalError(e)
.addContext("Setup failed for", currentReaderClassName)
.build(logger);
} catch (UserException ex) {
lastOutcome = IterOutcome.STOP;
throw ex;
} catch (Exception ex) {
lastOutcome = IterOutcome.STOP;
throw UserException.internalError(ex).build(logger);
} finally {
oContext.getStats().stopProcessing();
}
}
private void releaseAssets() {
container.zeroVectors();
}
private void clearFieldVectorMap() {
for (final ValueVector v : mutator.fieldVectorMap().values()) {
v.clear();
}
for (final ValueVector v : mutator.implicitFieldVectorMap.values()) {
v.clear();
}
}
private boolean getNextReaderIfHas() throws ExecutionSetupException {
if (!readers.hasNext()) {
return false;
}
currentReader = readers.next();
if (!isRepeatableScan && readers.hasNext()) {
readers.remove();
}
implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
currentReader.setup(oContext, mutator);
currentReaderClassName = currentReader.getClass().getSimpleName();
return true;
}
private void addImplicitVectors() {
try {
if (!implicitColumnList.isEmpty()) {
for (String column : implicitColumnList.get(0).keySet()) {
final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR));
mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/);
}
}
} catch(SchemaChangeException e) {
// No exception should be thrown here.
throw UserException.internalError(e)
.addContext("Failure while allocating implicit vectors")
.build(logger);
}
}
private void populateImplicitVectors() {
mutator.populateImplicitVectors(implicitValues, recordCount);
}
@Override
public SelectionVector2 getSelectionVector2() {
return null;
}
@Override
public SelectionVector4 getSelectionVector4() {
throw new UnsupportedOperationException();
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return container.getValueVectorId(path);
}
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return container.getValueAccessorById(clazz, ids);
}
@SuppressWarnings("unused")
private void logRecordBatchStats() {
final int MAX_FQN_LENGTH = 50;
if (recordCount == 0) {
return; // NOOP
}
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
}
/** Might truncate the FQN if too long */
private String getFQNForLogging(int maxLength) {
final String FQNKey = "FQN";
final ValueVector v = mutator.implicitFieldVectorMap.get(FQNKey);
final Object fqnObj;
if (v == null
|| v.getAccessor().getValueCount() == 0
|| (fqnObj = ((NullableVarCharVector) v).getAccessor().getObject(0)) == null) {
return "NA";
}
String fqn = fqnObj.toString();
if (fqn != null && fqn.length() > maxLength) {
fqn = fqn.substring(fqn.length() - maxLength, fqn.length());
}
return fqn;
}
/**
* Row set mutator implementation provided to record readers created by
* this scan batch. Made visible so that tests can create this mutator
* without also needing a ScanBatch instance. (This class is really independent
* of the ScanBatch, but resides here for historical reasons. This is,
* in turn, the only use of the generated vector readers in the vector
* package.)
*/
@VisibleForTesting
public static class Mutator implements OutputMutator {
/** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}).
* It's initialized to false, or reset to false after #isNewSchema or after #clear, until a new value vector
* or a value vector with different type is added to fieldVectorMap.
**/
private boolean schemaChanged;
/** Regular fields' value vectors indexed by fields' keys. */
private final CaseInsensitiveMap<ValueVector> regularFieldVectorMap =
CaseInsensitiveMap.newHashMap();
/** Implicit fields' value vectors index by fields' keys. */
private final CaseInsensitiveMap<ValueVector> implicitFieldVectorMap =
CaseInsensitiveMap.newHashMap();
private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private final BufferAllocator allocator;
private final VectorContainer container;
private final OperatorContext oContext;
public Mutator(OperatorContext oContext, BufferAllocator allocator, VectorContainer container) {
this.oContext = oContext;
this.allocator = allocator;
this.container = container;
this.schemaChanged = false;
}
public Map<String, ValueVector> fieldVectorMap() {
return regularFieldVectorMap;
}
public Map<String, ValueVector> implicitFieldVectorMap() {
return implicitFieldVectorMap;
}
@Override
public <T extends ValueVector> T addField(MaterializedField field,
Class<T> clazz) throws SchemaChangeException {
return addField(field, clazz, false);
}
@Override
public void allocate(int recordCount) {
for (final ValueVector v : regularFieldVectorMap.values()) {
AllocationHelper.allocate(v, recordCount, 50, 10);
}
}
/**
* Reports whether schema has changed (field was added or re-added) since
* last call to {@link #isNewSchema}. Returns true at first call.
*/
@Override
public boolean isNewSchema() {
// Check if top-level schema or any of the deeper map schemas has changed.
// Note: Callback's getSchemaChangedAndReset() must get called in order
// to reset it and avoid false reports of schema changes in future. (Be
// careful with short-circuit OR (||) operator.)
final boolean deeperSchemaChanged = callBack.getSchemaChangedAndReset();
if (schemaChanged || deeperSchemaChanged) {
schemaChanged = false;
return true;
}
return false;
}
@Override
public DrillBuf getManagedBuffer() {
return oContext.getManagedBuffer();
}
@Override
public CallBack getCallBack() {
return callBack;
}
@Override
public void clear() {
regularFieldVectorMap.clear();
implicitFieldVectorMap.clear();
container.clear();
schemaChanged = false;
}
private <T extends ValueVector> T addField(MaterializedField field,
Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
Map<String, ValueVector> fieldVectorMap;
if (isImplicitField) {
fieldVectorMap = implicitFieldVectorMap;
} else {
fieldVectorMap = regularFieldVectorMap;
}
if (!isImplicitField && implicitFieldVectorMap.containsKey(field.getName()) ||
isImplicitField && regularFieldVectorMap.containsKey(field.getName())) {
throw new SchemaChangeException(
String.format(
"It's not allowed to have regular field and implicit field share common name %s. "
+ "Either change regular field name in datasource, or change the default implicit field names.",
field.getName()));
}
// Check if the field exists.
ValueVector v = fieldVectorMap.get(field.getName());
// for the cases when fields have a different scale or precision,
// the new vector should be used to handle the value correctly
if (v == null || !v.getField().getType().equals(field.getType())) {
// Field does not exist--add it to the map and the output container.
v = TypeHelper.getNewVector(field, allocator, callBack);
if (!clazz.isAssignableFrom(v.getClass())) {
throw new SchemaChangeException(
String.format(
"The class that was provided, %s, does not correspond to the "
+ "expected vector type of %s.",
clazz.getSimpleName(), v.getClass().getSimpleName()));
}
final ValueVector old = fieldVectorMap.put(field.getName(), v);
if (old != null) {
old.clear();
container.remove(old);
}
container.add(v);
// Only mark schema change for regular vectors added to the container; implicit schema is constant.
if (!isImplicitField) {
schemaChanged = true;
}
}
return clazz.cast(v);
}
private void populateImplicitVectors(Map<String, String> implicitValues, int recordCount) {
if (implicitValues != null) {
for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
final NullableVarCharVector v = (NullableVarCharVector) implicitFieldVectorMap.get(entry.getKey());
String val;
if ((val = entry.getValue()) != null) {
AllocationHelper.allocate(v, recordCount, val.length());
final byte[] bytes = val.getBytes();
for (int j = 0; j < recordCount; j++) {
v.getMutator().setSafe(j, bytes, 0, bytes.length);
}
v.getMutator().setValueCount(recordCount);
} else {
AllocationHelper.allocate(v, recordCount, 0);
v.getMutator().setValueCount(recordCount);
}
}
}
}
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
}
@Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
@Override
public void close() throws Exception {
container.clear();
mutator.clear();
if (currentReader != null) {
currentReader.close();
}
}
@Override
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(
String.format("You should not call getOutgoingContainer() for class %s",
this.getClass().getCanonicalName()));
}
@Override
public VectorContainer getContainer() {
return container;
}
/**
* Verify list of implicit column values is valid input:
* - Either implicit column list is empty;
* - Or implicit column list has same sie as reader list, and the key set is same across all the readers.
* @param numReaders
* @param implicitColumnList
* @return return true if
*/
private boolean verifyImplcitColumns(int numReaders, List<Map<String, String>> implicitColumnList) {
if (implicitColumnList.isEmpty()) {
return true;
}
if (numReaders != implicitColumnList.size()) {
return false;
}
Map<String, String> firstMap = implicitColumnList.get(0);
for (int i = 1; i< implicitColumnList.size(); i++) {
Map<String, String> nonFirstMap = implicitColumnList.get(i);
if (!firstMap.keySet().equals(nonFirstMap.keySet())) {
return false;
}
}
return true;
}
@Override
public boolean hasFailed() {
return lastOutcome == IterOutcome.STOP;
}
@Override
public void dump() {
logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema);
}
}