blob: f77fbbf3e137524e51cfd41159fe0c0ee81a0683 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class GryoRecordReader extends RecordReader<NullWritable, VertexWritable> {
private FSDataInputStream inputStream;
private static final byte[] PATTERN = GryoMapper.HEADER;
private static final byte[] TERMINATOR = VertexTerminator.instance().terminal;
private GryoReader gryoReader;
private final VertexWritable vertexWritable = new VertexWritable();
private long currentLength = 0;
private long splitLength;
public GryoRecordReader() {
}
@Override
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
final FileSplit split = (FileSplit) genericSplit;
final Configuration configuration = context.getConfiguration();
HadoopPools.initialize(configuration);
this.gryoReader = HadoopPools.getGryoPool().takeReader();
long start = split.getStart();
final Path file = split.getPath();
if (null != new CompressionCodecFactory(configuration).getCodec(file)) {
throw new IllegalStateException("Compression is not supported for the (binary) Gryo format");
}
// open the file and seek to the start of the split
this.inputStream = file.getFileSystem(configuration).open(split.getPath());
this.splitLength = split.getLength();
if (this.splitLength > 0) this.splitLength -= (seekToHeader(this.inputStream, start) - start);
}
private static long seekToHeader(final FSDataInputStream inputStream, final long start) throws IOException {
inputStream.seek(start);
long nextStart = start;
final byte[] buffer = new byte[PATTERN.length];
while (true) {
if ((buffer[0] = PATTERN[0]) == inputStream.readByte()) {
inputStream.read(nextStart + 1, buffer, 1, PATTERN.length - 1);
if (patternMatch(buffer)) {
inputStream.seek(nextStart);
return nextStart;
}
} else {
nextStart = nextStart + 1;
inputStream.seek(nextStart);
}
}
}
private static boolean patternMatch(final byte[] bytes) {
for (int i = 0; i < PATTERN.length - 1; i++) {
if (bytes[i] != PATTERN[i])
return false;
}
return true;
}
@Override
public boolean nextKeyValue() throws IOException {
if (this.currentLength >= this.splitLength)
return false;
final ByteArrayOutputStream output = new ByteArrayOutputStream();
long currentVertexLength = 0;
int terminatorLocation = 0;
while (true) {
final int currentByte = this.inputStream.read();
if (-1 == currentByte) {
if (currentVertexLength > 0)
throw new IllegalStateException("Remainder of stream exhausted without matching a vertex");
else
return false;
}
this.currentLength++;
currentVertexLength++;
output.write(currentByte);
terminatorLocation = ((byte) currentByte) == TERMINATOR[terminatorLocation] ? terminatorLocation + 1 : 0;
if (terminatorLocation >= TERMINATOR.length) {
try (InputStream in = new ByteArrayInputStream(output.toByteArray())) {
this.vertexWritable.set(this.gryoReader.readVertex(in, Attachable::get)); // I know how GryoReader works, so I'm cheating here
return true;
}
}
}
}
@Override
public NullWritable getCurrentKey() {
return NullWritable.get();
}
@Override
public VertexWritable getCurrentValue() {
return this.vertexWritable;
}
@Override
public float getProgress() throws IOException {
return (0 == this.currentLength || 0 == this.splitLength) ? 0.0f : (float) this.currentLength / (float) this.splitLength;
}
@Override
public synchronized void close() throws IOException {
this.inputStream.close();
if (null != this.gryoReader) {
HadoopPools.getGryoPool().offerReader(this.gryoReader);
this.gryoReader = null;
}
}
}