blob: 6b7294d6cbd53b562dc88782071827c4665b5dd1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl;
import io.netty.buffer.DrillBuf;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Record batch used for a particular scan. Operators against one or more
*/
public class ScanBatch implements RecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap();
private final VectorContainer container = new VectorContainer();
private VectorContainer tempContainer;
private int recordCount;
private final FragmentContext context;
private final OperatorContext oContext;
private Iterator<RecordReader> readers;
private RecordReader currentReader;
private BatchSchema schema;
private final Mutator mutator = new Mutator();
private Iterator<String[]> partitionColumns;
private String[] partitionValues;
private List<ValueVector> partitionVectors;
private List<Integer> selectedPartitionColumns;
private String partitionColumnDesignator;
private boolean first = true;
private boolean done = false;
private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext,
Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
if (!readers.hasNext()) {
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
}
this.currentReader = readers.next();
this.oContext = oContext;
this.currentReader.setOperatorContext(this.oContext);
try {
oContext.getStats().startProcessing();
this.currentReader.setup(mutator);
} finally {
oContext.getStats().stopProcessing();
}
this.partitionColumns = partitionColumns.iterator();
this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;
DrillConfig config = context.getConfig();
this.partitionColumnDesignator = config == null ? "dir" : config.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
addPartitionVectors();
}
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
this(subScanConfig, context,
new OperatorContext(subScanConfig, context, false /* ScanBatch is not subject to fragment memory limit */),
readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
}
public FragmentContext getContext() {
return context;
}
public OperatorContext getOperatorContext() {
return oContext;
}
@Override
public BatchSchema getSchema() {
return schema;
}
@Override
public int getRecordCount() {
return recordCount;
}
@Override
public void kill(boolean sendUpstream) {
if (sendUpstream) {
done = true;
} else {
releaseAssets();
}
}
private void releaseAssets() {
container.zeroVectors();
}
@Override
public IterOutcome next() {
if (done) {
return IterOutcome.NONE;
}
long t1 = System.nanoTime();
oContext.getStats().startProcessing();
try {
try {
currentReader.allocate(fieldVectorMap);
} catch (OutOfMemoryException e) {
logger.debug("Caught OutOfMemoryException");
for (ValueVector v : fieldVectorMap.values()) {
v.clear();
}
return IterOutcome.OUT_OF_MEMORY;
}
while ((recordCount = currentReader.next()) == 0) {
try {
if (!readers.hasNext()) {
currentReader.cleanup();
releaseAssets();
done = true;
if (mutator.isNewSchema()) {
container.buildSchema(SelectionVectorMode.NONE);
schema = container.getSchema();
}
return IterOutcome.NONE;
}
currentReader.cleanup();
currentReader = readers.next();
partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
currentReader.setup(mutator);
currentReader.setOperatorContext(oContext);
try {
currentReader.allocate(fieldVectorMap);
} catch (OutOfMemoryException e) {
logger.debug("Caught OutOfMemoryException");
for (ValueVector v : fieldVectorMap.values()) {
v.clear();
}
return IterOutcome.OUT_OF_MEMORY;
}
addPartitionVectors();
} catch (ExecutionSetupException e) {
this.context.fail(e);
releaseAssets();
return IterOutcome.STOP;
}
}
populatePartitionVectors();
if (mutator.isNewSchema()) {
container.buildSchema(SelectionVectorMode.NONE);
schema = container.getSchema();
return IterOutcome.OK_NEW_SCHEMA;
} else {
return IterOutcome.OK;
}
} catch (Exception ex) {
logger.debug("Failed to read the batch. Stopping...", ex);
context.fail(ex);
return IterOutcome.STOP;
} finally {
oContext.getStats().stopProcessing();
}
}
private void addPartitionVectors() throws ExecutionSetupException{
try {
if (partitionVectors != null) {
for (ValueVector v : partitionVectors) {
v.clear();
}
}
partitionVectors = Lists.newArrayList();
for (int i : selectedPartitionColumns) {
MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR));
ValueVector v = mutator.addField(field, NullableVarCharVector.class);
partitionVectors.add(v);
}
} catch(SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
}
private void populatePartitionVectors() {
for (int index = 0; index < selectedPartitionColumns.size(); index++) {
int i = selectedPartitionColumns.get(index);
NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index);
if (partitionValues.length > i) {
String val = partitionValues[i];
AllocationHelper.allocate(v, recordCount, val.length());
byte[] bytes = val.getBytes();
for (int j = 0; j < recordCount; j++) {
v.getMutator().setSafe(j, bytes, 0, bytes.length);
}
v.getMutator().setValueCount(recordCount);
} else {
AllocationHelper.allocate(v, recordCount, 0);
v.getMutator().setValueCount(recordCount);
}
}
}
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
}
@Override
public SelectionVector4 getSelectionVector4() {
throw new UnsupportedOperationException();
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return container.getValueVectorId(path);
}
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return container.getValueAccessorById(clazz, ids);
}
private class Mutator implements OutputMutator {
boolean schemaChange = true;
@SuppressWarnings("unchecked")
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
// Check if the field exists
ValueVector v = fieldVectorMap.get(field.key());
if (v == null || v.getClass() != clazz) {
// Field does not exist add it to the map and the output container
v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack);
if (!clazz.isAssignableFrom(v.getClass())) {
throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
}
ValueVector old = fieldVectorMap.put(field.key(), v);
if(old != null){
old.clear();
container.remove(old);
}
container.add(v);
// Adding new vectors to the container mark that the schema has changed
schemaChange = true;
}
return (T) v;
}
@Override
public void allocate(int recordCount) {
for (ValueVector v : fieldVectorMap.values()) {
AllocationHelper.allocate(v, recordCount, 50, 10);
}
}
@Override
public boolean isNewSchema() {
// Check if top level schema has changed, second condition checks if one of the deeper map schema has changed
if (schemaChange == true || callBack.getSchemaChange()) {
schemaChange = false;
return true;
}
return false;
}
@Override
public DrillBuf getManagedBuffer() {
return oContext.getManagedBuffer();
}
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
}
@Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
public void cleanup() {
container.clear();
if (tempContainer != null) {
tempContainer.clear();
}
for (ValueVector v : partitionVectors) {
v.clear();
}
fieldVectorMap.clear();
currentReader.cleanup();
oContext.close();
}
@Override
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
}