blob: ec76e256b5ddfc75db84cec56700f517aaa9131a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crail;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.crail.utils.RingBuffer;
class MultiFileBufferedInputStream extends CrailBufferedInputStream {
private CrailStore fs;
private Iterator<String> paths;
private RingBuffer<CrailInputStream> readyStreams;
private LinkedList<CrailInputStream> finalStreams;
MultiFileBufferedInputStream(CrailStore fs, Iterator<String> paths, int outstanding, int files) throws Exception {
super(fs, outstanding, 0);
this.fs = fs;
this.paths = paths;
this.readyStreams = new RingBuffer<CrailInputStream>(1);
this.finalStreams = new LinkedList<CrailInputStream>();
}
@Override
public CrailInputStream getStream() throws Exception {
while(readyStreams.isEmpty() && paths.hasNext()){
String path = paths.next();
CrailNode node = fs.lookup(path).get();
if (node != null){
CrailFile file = node.asFile();
if (file.getCapacity() > 0){
CrailInputStream stream = file.getDirectInputStream(file.getCapacity());
readyStreams.add(stream);
}
}
}
return readyStreams.peek();
}
public void putStream() throws Exception {
CrailInputStream stream = readyStreams.peek();
if (stream.position() >= stream.getFile().getCapacity()) {
stream = readyStreams.poll();
finalStreams.add(stream);
}
}
@Override
public void close() throws IOException {
super.close();
try {
while(!readyStreams.isEmpty()){
finalStreams.add(readyStreams.poll());
}
while(paths.hasNext()){
String path = paths.next();
CrailFile file = fs.lookup(path).get().asFile();
if (file != null){
CrailInputStream stream = file.getDirectInputStream(file.getCapacity());
finalStreams.add(stream);
}
}
while(!finalStreams.isEmpty()){
CrailInputStream stream = finalStreams.poll();
stream.close();
}
} catch(Exception e){
throw new IOException(e);
}
}
@Override
public void seek(long pos) throws IOException {
throw new IOException("Seek not supported on multistream");
}
}