blob: 7ef34df7b8f4fe7669c55b2f51c51193761932e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a group of batches spilled to disk.
* <p>
* The batches are defined by a schema which can change over time. When the schema changes,
* all existing and new batches are coerced into the new schema. Provides a
* uniform way to iterate over records for one or more batches whether
* the batches are in memory or on disk.
* <p>
* The <code>BatchGroup</code> operates in two modes as given by the two
* subclasses:
* <ul>
* <li>Input mode {@link InputBatch}: Used to buffer in-memory batches
* prior to spilling.</li>
* <li>Spill mode {@link SpilledRun}: Holds a "memento" to a set
* of batches written to disk. Acts as both a reader and writer for
* those batches.</li>
* </ul>
*/
public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
static final Logger logger = LoggerFactory.getLogger(BatchGroup.class);
protected final BufferAllocator allocator;
protected VectorContainer currentContainer;
/**
* This class acts as both "holder" for a vector container and an iterator
* into that container when the sort enters the merge phase. (This should
* be revisited.) This field keeps track of the next record to merge
* during the merge phase.
*/
protected int mergeIndex;
protected BatchSchema schema;
public BatchGroup(VectorContainer container, BufferAllocator allocator) {
this.currentContainer = container;
this.allocator = allocator;
}
/**
* Updates the schema for this batch group. The current as well as any
* deserialized batches will be coerced to this schema.
* @param schema
*/
public void setSchema(BatchSchema schema) {
currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, allocator);
this.schema = schema;
}
public int getNextIndex() {
if (mergeIndex == getRecordCount()) {
return -1;
}
int val = mergeIndex++;
assert val < currentContainer.getRecordCount();
return val;
}
public VectorContainer getContainer() {
return currentContainer;
}
@Override
public void close() throws IOException {
currentContainer.zeroVectors();
}
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return currentContainer.getValueAccessorById(clazz, ids);
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return currentContainer.getValueVectorId(path);
}
@Override
public BatchSchema getSchema() {
return currentContainer.getSchema();
}
@Override
public int getRecordCount() {
return currentContainer.getRecordCount();
}
public int getUnfilteredRecordCount() {
return currentContainer.getRecordCount();
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
return currentContainer.iterator();
}
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
}
@Override
public SelectionVector4 getSelectionVector4() {
throw new UnsupportedOperationException();
}
public static void closeAll(Collection<? extends BatchGroup> groups) {
try {
AutoCloseables.close(groups);
} catch (Exception e) {
throw UserException.dataWriteError(e)
.message("Failure while flushing spilled data")
.build(logger);
}
}
}