blob: 60283077091b55d4656e5bf34969bec07e4aaf39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.TypeDescription;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import static org.junit.Assert.assertEquals;
public class TestPhysicalFsWriter {
final Configuration conf = new Configuration();
static class MemoryOutputStream extends OutputStream {
private final List<byte[]> contents;
MemoryOutputStream(List<byte[]> contents) {
this.contents = contents;
}
@Override
public void write(int b) {
contents.add(new byte[]{(byte) b});
}
@Override
public void write(byte[] a, int offset, int length) {
byte[] buffer = new byte[length];
System.arraycopy(a, offset, buffer, 0, length);
contents.add(buffer);
}
}
static class MemoryFileSystem extends FileSystem {
@Override
public URI getUri() {
try {
return new URI("test:///");
} catch (URISyntaxException e) {
throw new IllegalStateException("bad url", e);
}
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null;
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize,
Progressable progress) throws IOException {
List<byte[]> contents = new ArrayList<>();
fileContents.put(f, contents);
return new FSDataOutputStream(new MemoryOutputStream(contents));
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) {
throw new UnsupportedOperationException("append not supported");
}
@Override
public boolean rename(Path src, Path dst) {
boolean result = fileContents.containsKey(src) &&
!fileContents.containsKey(dst);
if (result) {
List<byte[]> contents = fileContents.remove(src);
fileContents.put(dst, contents);
}
return result;
}
@Override
public boolean delete(Path f, boolean recursive) {
boolean result = fileContents.containsKey(f);
fileContents.remove(f);
return result;
}
@Override
public FileStatus[] listStatus(Path f) {
return new FileStatus[]{getFileStatus(f)};
}
@Override
public void setWorkingDirectory(Path new_dir) {
currentWorkingDirectory = new_dir;
}
@Override
public Path getWorkingDirectory() {
return currentWorkingDirectory;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) {
return false;
}
@Override
public FileStatus getFileStatus(Path f) {
List<byte[]> contents = fileContents.get(f);
if (contents != null) {
long sum = 0;
for(byte[] b: contents) {
sum += b.length;
}
return new FileStatus(sum, false, 1, 256 * 1024, 0, f);
}
return null;
}
private final Map<Path, List<byte[]>> fileContents = new HashMap<>();
private Path currentWorkingDirectory = new Path("/");
}
@Test
public void testStripePadding() throws IOException {
TypeDescription schema = TypeDescription.fromString("int");
OrcFile.WriterOptions opts =
OrcFile.writerOptions(conf)
.stripeSize(32 * 1024)
.blockSize(64 * 1024)
.compress(CompressionKind.NONE)
.setSchema(schema);
MemoryFileSystem fs = new MemoryFileSystem();
PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"),
opts);
writer.writeHeader();
StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA);
PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0);
byte[] buffer = new byte[1024];
for(int i=0; i < buffer.length; ++i) {
buffer[i] = (byte) i;
}
for(int i=0; i < 63; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder();
OrcProto.StripeInformation.Builder dirEntry =
OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// check to make sure that it laid it out without padding
assertEquals(0L, dirEntry.getIndexLength());
assertEquals(63 * 1024L, dirEntry.getDataLength());
assertEquals(3, dirEntry.getOffset());
for(int i=0; i < 62; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
footer = OrcProto.StripeFooter.newBuilder();
dirEntry = OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// the second one should pad
assertEquals(64 * 1024, dirEntry.getOffset());
assertEquals(62 * 1024, dirEntry.getDataLength());
long endOfStripe = dirEntry.getOffset() + dirEntry.getIndexLength() +
dirEntry.getDataLength() + dirEntry.getFooterLength();
for(int i=0; i < 3; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
footer = OrcProto.StripeFooter.newBuilder();
dirEntry = OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// the third one should be over the padding limit
assertEquals(endOfStripe, dirEntry.getOffset());
assertEquals(3 * 1024, dirEntry.getDataLength());
}
@Test
public void testNoStripePadding() throws IOException {
TypeDescription schema = TypeDescription.fromString("int");
OrcFile.WriterOptions opts =
OrcFile.writerOptions(conf)
.blockPadding(false)
.stripeSize(32 * 1024)
.blockSize(64 * 1024)
.compress(CompressionKind.NONE)
.setSchema(schema);
MemoryFileSystem fs = new MemoryFileSystem();
PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"),
opts);
writer.writeHeader();
StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA);
PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0);
byte[] buffer = new byte[1024];
for(int i=0; i < buffer.length; ++i) {
buffer[i] = (byte) i;
}
for(int i=0; i < 63; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder();
OrcProto.StripeInformation.Builder dirEntry =
OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// check to make sure that it laid it out without padding
assertEquals(0L, dirEntry.getIndexLength());
assertEquals(63 * 1024L, dirEntry.getDataLength());
assertEquals(3, dirEntry.getOffset());
long endOfStripe = dirEntry.getOffset() + dirEntry.getDataLength()
+ dirEntry.getFooterLength();
for(int i=0; i < 62; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
footer = OrcProto.StripeFooter.newBuilder();
dirEntry = OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// no padding, because we turned it off
assertEquals(endOfStripe, dirEntry.getOffset());
assertEquals(62 * 1024, dirEntry.getDataLength());
}
static class MockHadoopShim implements HadoopShims {
long lastShortBlock = -1;
@Override
public DirectDecompressor getDirectDecompressor(DirectCompressionType codec) {
return null;
}
@Override
public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
ByteBufferPoolShim pool) {
return null;
}
@Override
public boolean endVariableLengthBlock(OutputStream output) throws IOException {
if (output instanceof FSDataOutputStream) {
lastShortBlock = ((FSDataOutputStream) output).getPos();
return true;
}
return false;
}
@Override
public KeyProvider getHadoopKeyProvider(Configuration conf, Random random) {
return null;
}
}
@Test
public void testShortBlock() throws IOException {
MockHadoopShim shim = new MockHadoopShim();
TypeDescription schema = TypeDescription.fromString("int");
OrcFile.WriterOptions opts =
OrcFile.writerOptions(conf)
.blockPadding(false)
.stripeSize(32 * 1024)
.blockSize(64 * 1024)
.compress(CompressionKind.NONE)
.setSchema(schema)
.setShims(shim)
.writeVariableLengthBlocks(true);
MemoryFileSystem fs = new MemoryFileSystem();
PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"),
opts);
writer.writeHeader();
StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA);
PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0);
byte[] buffer = new byte[1024];
for(int i=0; i < buffer.length; ++i) {
buffer[i] = (byte) i;
}
for(int i=0; i < 63; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder();
OrcProto.StripeInformation.Builder dirEntry =
OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// check to make sure that it laid it out without padding
assertEquals(0L, dirEntry.getIndexLength());
assertEquals(63 * 1024L, dirEntry.getDataLength());
assertEquals(3, dirEntry.getOffset());
long endOfStripe = dirEntry.getOffset() + dirEntry.getDataLength()
+ dirEntry.getFooterLength();
for(int i=0; i < 62; ++i) {
output.output(ByteBuffer.wrap(buffer));
}
footer = OrcProto.StripeFooter.newBuilder();
dirEntry = OrcProto.StripeInformation.newBuilder();
writer.finalizeStripe(footer, dirEntry);
// we should get a short block and no padding
assertEquals(endOfStripe, dirEntry.getOffset());
assertEquals(62 * 1024, dirEntry.getDataLength());
assertEquals(endOfStripe, shim.lastShortBlock);
}
}