blob: 9359ea188d805af281c6c40ce543a4c4cf78bdd0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Stopwatch;
public class BatchGroup implements VectorAccessible {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
private VectorContainer currentContainer;
private SelectionVector2 sv2;
private int pointer = 0;
private FSDataInputStream inputStream;
private FSDataOutputStream outputStream;
private Path path;
private FileSystem fs;
private BufferAllocator allocator;
private int spilledBatches = 0;
public BatchGroup(VectorContainer container, SelectionVector2 sv2) {
this.sv2 = sv2;
this.currentContainer = container;
}
public BatchGroup(VectorContainer container, FileSystem fs, String path, BufferAllocator allocator) {
currentContainer = container;
this.fs = fs;
this.path = new Path(path);
this.allocator = allocator;
}
public SelectionVector2 getSv2() {
return sv2;
}
public void addBatch(VectorContainer newContainer) throws IOException {
assert fs != null;
assert path != null;
if (outputStream == null) {
outputStream = fs.create(path);
}
int recordCount = newContainer.getRecordCount();
WritableBatch batch = WritableBatch.getBatchNoHVWrap(recordCount, newContainer, false);
VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
Stopwatch watch = new Stopwatch();
watch.start();
outputBatch.writeToStream(outputStream);
newContainer.zeroVectors();
logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
spilledBatches++;
}
private VectorContainer getBatch() throws IOException {
assert fs != null;
assert path != null;
if (inputStream == null) {
inputStream = fs.open(path);
}
VectorAccessibleSerializable vas = new VectorAccessibleSerializable(allocator);
Stopwatch watch = new Stopwatch();
watch.start();
vas.readFromStream(inputStream);
VectorContainer c = vas.get();
// logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
spilledBatches--;
currentContainer.zeroVectors();
Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
for (VectorWrapper w : currentContainer) {
TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
pair.transfer();
}
currentContainer.setRecordCount(c.getRecordCount());
c.zeroVectors();
return c;
}
public int getNextIndex() {
int val;
if (pointer == getRecordCount()) {
if (spilledBatches == 0) {
return -1;
}
try {
currentContainer.zeroVectors();
getBatch();
} catch (IOException e) {
throw new RuntimeException(e);
}
pointer = 1;
return 0;
}
if (sv2 == null) {
val = pointer;
pointer++;
assert val < currentContainer.getRecordCount();
} else {
val = pointer;
pointer++;
assert val < currentContainer.getRecordCount();
val = sv2.getIndex(val);
}
return val;
}
public VectorContainer getContainer() {
return currentContainer;
}
public void cleanup() throws IOException {
if (sv2 != null) {
sv2.clear();
}
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (fs != null && fs.exists(path)) {
fs.delete(path, false);
}
}
public void closeOutputStream() throws IOException {
if (outputStream != null) {
outputStream.close();
}
}
@Override
public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return currentContainer.getValueAccessorById(clazz, ids);
}
@Override
public TypedFieldId getValueVectorId(SchemaPath path) {
return currentContainer.getValueVectorId(path);
}
@Override
public BatchSchema getSchema() {
return currentContainer.getSchema();
}
@Override
public int getRecordCount() {
if (sv2 != null) {
return sv2.getCount();
} else {
return currentContainer.getRecordCount();
}
}
@Override
public Iterator<VectorWrapper<?>> iterator() {
return currentContainer.iterator();
}
}