blob: 617b4732066a47731c95b56537feebd578225c54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.simple.filedata;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An input stream that reads file data stored in one or more Accumulo values. Used by
* {@link ChunkInputFormat} to present input streams to a mapper.
*/
public class ChunkInputStream extends InputStream {
private static final Logger log = LoggerFactory.getLogger(ChunkInputStream.class);
protected PeekingIterator<Entry<Key,Value>> source;
protected Key currentKey;
protected Set<Text> currentVis;
protected int currentChunk;
protected int currentChunkSize;
protected boolean gotEndMarker;
protected byte buf[];
protected int count;
protected int pos;
public ChunkInputStream() {
source = null;
}
public ChunkInputStream(PeekingIterator<Entry<Key,Value>> in) throws IOException {
setSource(in);
}
public void setSource(PeekingIterator<Entry<Key,Value>> in) throws IOException {
if (source != null)
throw new IOException("setting new source without closing old one");
this.source = in;
currentVis = new TreeSet<>();
count = pos = 0;
if (!source.hasNext()) {
log.debug("source has no next");
gotEndMarker = true;
return;
}
// read forward until we reach a chunk
Entry<Key,Value> entry = source.next();
currentKey = entry.getKey();
buf = entry.getValue().get();
while (!currentKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
log.debug("skipping key: " + currentKey.toString());
if (!source.hasNext())
return;
entry = source.next();
currentKey = entry.getKey();
buf = entry.getValue().get();
}
log.debug("starting chunk: " + currentKey.toString());
count = buf.length;
currentVis.add(currentKey.getColumnVisibility());
currentChunk = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 4);
currentChunkSize = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 0);
gotEndMarker = false;
if (buf.length == 0)
gotEndMarker = true;
if (currentChunk != 0) {
source = null;
throw new IOException("starting chunk number isn't 0 for " + currentKey.getRow());
}
}
private int fill() throws IOException {
if (source == null || !source.hasNext()) {
if (gotEndMarker)
return count = pos = 0;
else
throw new IOException("no end chunk marker but source has no data");
}
Entry<Key,Value> entry = source.peek();
Key thisKey = entry.getKey();
log.debug("evaluating key: " + thisKey.toString());
// check that we're still on the same row
if (!thisKey.equals(currentKey, PartialKey.ROW)) {
if (gotEndMarker)
return -1;
else {
String currentRow = currentKey.getRow().toString();
clear();
throw new IOException("got to the end of the row without end chunk marker " + currentRow);
}
}
log.debug("matches current key");
// ok to advance the iterator
source.next();
// check that this is part of a chunk
if (!thisKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
log.debug("skipping non-chunk key");
return fill();
}
log.debug("is a chunk");
// check that the chunk size is the same as the one being read
if (currentChunkSize != FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 0)) {
log.debug("skipping chunk of different size");
return fill();
}
// add the visibility to the list if it's not there
if (!currentVis.contains(thisKey.getColumnVisibility()))
currentVis.add(thisKey.getColumnVisibility());
// check to see if it is an identical chunk with a different visibility
if (thisKey.getColumnQualifier().equals(currentKey.getColumnQualifier())) {
log.debug("skipping identical chunk with different visibility");
return fill();
}
if (gotEndMarker) {
log.debug("got another chunk after end marker: " + currentKey.toString() + " "
+ thisKey.toString());
clear();
throw new IOException("found extra chunk after end marker");
}
// got new chunk of the same file, check that it's the next chunk
int thisChunk = FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 4);
if (thisChunk != currentChunk + 1) {
log.debug("new chunk same file, unexpected chunkID: " + currentKey.toString() + " "
+ thisKey.toString());
clear();
throw new IOException("missing chunks between " + currentChunk + " and " + thisChunk);
}
currentKey = thisKey;
currentChunk = thisChunk;
buf = entry.getValue().get();
pos = 0;
// check to see if it's the last chunk
if (buf.length == 0) {
gotEndMarker = true;
return fill();
}
return count = buf.length;
}
public Set<Text> getVisibilities() {
if (source != null)
throw new IllegalStateException(
"don't get visibilities before chunks have been completely read");
return currentVis;
}
@Override
public int read() throws IOException {
if (source == null)
return -1;
log.debug("pos: " + pos + " count: " + count);
if (pos >= count) {
if (fill() <= 0) {
log.debug("done reading input stream at key: "
+ (currentKey == null ? "null" : currentKey.toString()));
if (source != null && source.hasNext())
log.debug("next key: " + source.peek().getKey());
clear();
return -1;
}
}
return buf[pos++] & 0xff;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|| ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
log.debug("filling buffer " + off + " " + len);
int total = 0;
while (total < len) {
int avail = count - pos;
log.debug(avail + " available in current local buffer");
if (avail <= 0) {
if (fill() <= 0) {
log.debug("done reading input stream at key: "
+ (currentKey == null ? "null" : currentKey.toString()));
if (source != null && source.hasNext())
log.debug("next key: " + source.peek().getKey());
clear();
log.debug("filled " + total + " bytes");
return total == 0 ? -1 : total;
}
avail = count - pos;
}
int cnt = (avail < len - total) ? avail : len - total;
log.debug("copying from local buffer: local pos " + pos + " into pos " + off + " len " + cnt);
System.arraycopy(buf, pos, b, off, cnt);
pos += cnt;
off += cnt;
total += cnt;
}
log.debug("filled " + total + " bytes");
return total;
}
public void clear() {
source = null;
buf = null;
currentKey = null;
currentChunk = 0;
pos = count = 0;
}
@Override
public void close() throws IOException {
try {
while (fill() > 0) {}
} catch (IOException e) {
clear();
throw new IOException(e);
}
clear();
}
}