blob: 3aa557ed947cbfa3a4dfeb20961cc4891f69d327 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.xsort;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.cache.VectorSerializer.Writer;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import com.google.common.base.Stopwatch;
/**
* Holds a set of spilled batches, represented by a file on disk.
* Handles reads from, and writes to the spill file. The data structure
* is:
* <ul>
* <li>A pointer to a file that contains serialized batches.</li>
* <li>When writing, each batch is appended to the output file.</li>
* <li>When reading, iterates over each spilled batch, and for each
* of those, each spilled record.</li>
* </ul>
* <p>
* Starts out with no current batch. Defines the current batch to be the
* (shell: schema without data) of the last batch spilled to disk.
* <p>
* When reading, has destructive read-once behavior: closing the
* batch (after reading) deletes the underlying spill file.
* <p>
* This single class does three tasks: load data, hold data and
* read data. This should be split into three separate classes. But,
* the original (combined) structure is retained for expedience at
* present.
*/
public class SpilledRun extends BatchGroup {
private InputStream inputStream;
private final String path;
private final SpillSet spillSet;
private final BufferAllocator allocator;
private int spilledBatches;
private long batchSizeBytes;
private Writer writer;
private VectorSerializer.Reader reader;
public SpilledRun(SpillSet spillSet, String path, BufferAllocator allocator) throws IOException {
super(null, allocator);
this.spillSet = spillSet;
this.path = path;
this.allocator = allocator;
writer = spillSet.writer(path);
}
public void spillBatch(VectorContainer newContainer) throws IOException {
writer.write(newContainer);
newContainer.zeroVectors();
logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), writer.time(TimeUnit.MICROSECONDS));
spilledBatches++;
// Hold onto the husk of the last added container so that we have a
// current container when starting to read rows back later.
currentContainer = newContainer;
currentContainer.setEmpty();
}
public void setBatchSize(long batchSize) {
this.batchSizeBytes = batchSize;
}
public long getBatchSize() { return batchSizeBytes; }
public String getPath() { return path; }
@Override
public int getNextIndex() {
if (mergeIndex == getRecordCount()) {
if (spilledBatches == 0) {
return -1;
}
readBatch();
// The pointer indicates the NEXT index, not the one we
// return here. At this point, we just started reading a
// new batch and have returned index 0. So, the next index
// is 1.
mergeIndex = 1;
return 0;
}
return super.getNextIndex();
}
private void readBatch() {
try {
if (inputStream == null) {
inputStream = spillSet.openForInput(path);
reader = VectorSerializer.reader(allocator, inputStream);
}
Stopwatch watch = Stopwatch.createStarted();
long start = allocator.getAllocatedMemory();
VectorContainer c = reader.read();
long end = allocator.getAllocatedMemory();
logger.trace("Read {} records in {} us; size = {}, memory = {}",
c.getRecordCount(),
watch.elapsed(TimeUnit.MICROSECONDS),
(end - start), end);
if (schema != null) {
c = SchemaUtil.coerceContainer(c, schema, allocator);
}
spilledBatches--;
currentContainer.zeroVectors();
Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
for (VectorWrapper<?> w : currentContainer) {
TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
pair.transfer();
}
currentContainer.setRecordCount(c.getRecordCount());
c.zeroVectors();
} catch (IOException e) {
// Release any partially-loaded data.
currentContainer.clear();
throw UserException.dataReadError(e)
.message("Failure while reading spilled data")
.build(logger);
}
}
/**
* Close resources owned by this batch group. Each can fail; report
* only the first error. This is cluttered because this class tries
* to do multiple tasks. TODO: Split into multiple classes.
*/
@Override
public void close() throws IOException {
try {
AutoCloseables.close(super::close, this::closeWriter,
this::closeInputStream, () -> spillSet.delete(path));
} catch (Exception e) {
throw (e instanceof IOException) ? (IOException) e : new IOException(e);
}
}
private void closeInputStream() throws IOException {
if (inputStream == null) {
return;
}
long readLength = spillSet.getPosition(inputStream);
spillSet.tallyReadBytes(readLength);
inputStream.close();
inputStream = null;
reader = null;
logger.trace("Summary: Read {} bytes from {}", readLength, path);
}
public void closeWriter() throws IOException {
if (writer != null) {
spillSet.close(writer);
logger.trace("Summary: Wrote {} bytes in {} us to {}", writer.getBytesWritten(), writer.time(TimeUnit.MICROSECONDS), path);
writer = null;
}
}
}