blob: 5efb47f734b97391c99fa3a0ffa0ac95e4a4e863 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.cache;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.netty.buffer.DrillBuf;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Serializes vector containers to an output stream or from
* an input stream.
*/
public class VectorSerializer {
/**
* Writes multiple VectorAccessible or VectorContainer
* objects to an output stream.
*/
public static class Writer implements Closeable
{
static final MetricRegistry metrics = DrillMetrics.getRegistry();
static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
private final WritableByteChannel channel;
private final OutputStream output;
private long timeNs;
private int bytesWritten;
private Writer(WritableByteChannel channel) {
this.channel = channel;
output = Channels.newOutputStream(channel);
}
public int write(VectorAccessible va) throws IOException {
return write(va, null);
}
public int write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
checkNotNull(va);
WritableBatch batch = WritableBatch.getBatchNoHVWrap(
va.getRecordCount(), va, sv2 != null);
try {
return write(batch, sv2);
} finally {
batch.clear();
}
}
public int write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
checkNotNull(batch);
checkNotNull(channel);
final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
final DrillBuf[] incomingBuffers = batch.getBuffers();
final UserBitShared.RecordBatchDef batchDef = batch.getDef();
int bytesWritten = batchDef.getSerializedSize();
/* Write the metadata to the file */
batchDef.writeDelimitedTo(output);
/* If we have a selection vector, dump it to file first */
if (sv2 != null) {
final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
ByteBuffer buffer = sv2.getBuffer(false).nioBuffer(0, dataLength);
while (buffer.remaining() > 0) {
bytesWritten += channel.write(buffer);
}
}
/* Dump the array of ByteBuf's associated with the value vectors */
for (DrillBuf buf : incomingBuffers) {
/* dump the buffer into the OutputStream */
ByteBuffer buffer = buf.nioBuffer();
while (buffer.remaining() > 0) {
bytesWritten += channel.write(buffer);
}
}
timeNs += timerContext.stop();
this.bytesWritten += bytesWritten;
return bytesWritten;
}
@Override
public void close() throws IOException {
if (!channel.isOpen()) {
return;
}
channel.close();
}
public long time(TimeUnit unit) {
return unit.convert(timeNs, TimeUnit.NANOSECONDS);
}
public int getBytesWritten() { return bytesWritten; }
}
/**
* Read one or more vector containers from an input stream.
*/
public static class Reader {
private final InputStream stream;
private long timeNs;
private final VectorAccessibleSerializable vas;
public Reader(BufferAllocator allocator, InputStream stream) {
this.stream = stream;
vas = new VectorAccessibleSerializable(allocator);
}
public VectorContainer read() throws IOException {
vas.readFromStream(stream);
timeNs = vas.getTimeNs();
return vas.get();
}
public SelectionVector2 sv2() { return vas.getSv2(); }
public long timeNs() { return timeNs; }
}
public static Writer writer(WritableByteChannel channel) throws IOException {
return new Writer(channel);
}
public static Reader reader(BufferAllocator allocator, InputStream stream) {
return new Reader(allocator, stream);
}
}