blob: 8988f8d4f5f70cf4d865887bc0b94bf508799127 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.xmlquery;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import org.apache.oodt.product.ProductException;
import org.apache.oodt.product.Retriever;
/**
* Streamer for chunked products.
*
* This streamer starts a background thread to repeatedly read chunks of a product into a
* bounded buffer. Reads read from chunks in the buffer.
*
* @author Kelly
* @version $Revision: 1.5 $
*/
final class ChunkedProductInputStream extends InputStream {
/**
* Creates a new <code>ChunkedProductInputStream</code> instance.
*
* @param id Product ID.
* @param retriever Retriever to use.
* @param size How big the product is.
*/
ChunkedProductInputStream(String id, Retriever retriever, long size) {
this.id = id; // Save product ID
this.size = size; // Save size
this.retriever = retriever; // And the retriever used to get chunks
open = true; // Start out open
eof = false; // And not yet reached EOF, even if size=0
}
/**
* Read a single byte.
*
* This method reads from a chunk stored in the bounded buffer. It may block if
* there are no more blocks in the buffer.
*
* @return Byte, or -1 if at end of file.
* @throws IOException if an error occurs.
*/
public int read() throws IOException {
checkOpen(); // Make sure the stream's open
if (eof) throw new IOException("End of file"); // Already reached EOF? You lose.
fetchBlock(); // Get a block.
if (eof) return -1; // No more blocks? Signal EOF.
return block[blockIndex++]; // Yield next byte (promoted) from block.
}
/**
* Read into an array.
*
* This method reads from a chunk stored in the bounded buffer. It may block if
* there are no more blocks in the buffer.
*
* @param b a <code>byte[]</code> value.
* @param offset Where in <var>b</var> to save read bytes.
* @param length How many bytes to try to read.
* @return Number of bytes actually read, or -1 at end of file.
* @throws IOException if an error occurs.
*/
public int read(byte[] b, int offset, int length) throws IOException {
checkOpen(); // Check if open
if (offset < 0 || offset > b.length || length < 0 || (offset + length) > b.length || (offset + length) < 0)
throw new IllegalArgumentException("Illegal offset=" + offset + "/length=" + length
+ " for byte array of length " + b.length);
else if (length == 0) // Want zero?
return 0; // Then you get zero
if (eof) throw new IOException("End of file"); // Already reached EOF? You lose.
fetchBlock(); // Get a block.
if (eof) return -1; // No more blocks? Signal EOF.
int amount = Math.min(length, block.length - blockIndex); // Return requested amount or whatever's left
System.arraycopy(block, blockIndex, b, offset, amount); // Transfer
blockIndex += amount; // Advance
return amount; // Done
}
/**
* Fetch another block.
*
* @throws IOException if an error occurs.
*/
private void fetchBlock() throws IOException {
if (block == null || blockIndex == block.length) try { // No block, or current block exhausted?
if (productIndex == size) { // No more blocks left to get?
block = null; // Drop current block
eof = true; // Signal EOF
} else { // Otherwise there are more blocks
int x=(int)Math.min(BLOCK_SIZE, size - productIndex); // Can only fetch so much
block = retriever.retrieveChunk(id, productIndex, x); // Get x's worth of data
blockIndex = 0; // Start at block's beginning
productIndex += block.length; // Advance product index by block size
}
} catch (ProductException ex) {
throw new IOException(ex.getMessage());
}
}
/**
* Return number of bytes currently available.
*
* If we have a block, the amount of available bytes is whatever's in the block.
* Otherwise we don't know how many bytes, and we could block, so say zero are available.
*
* @return an <code>int</code> value.
* @throws IOException if an error occurs.
*/
public int available() throws IOException {
checkOpen(); // Open?
return block == null? 0 : block.length - blockIndex; // If no current block, you can only get 0
}
public void close() throws IOException {
checkOpen(); // Open?
try {
retriever.close(id); // Tell retriever we're done
open = false; // Flag it
} catch (ProductException ex) {
throw new IOException(ex.getMessage());
}
}
/**
* Mark is not supported.
*
* @param limit Unused parameter.
*/
public void mark(int limit) {
throw new UnsupportedOperationException("Mark not supported");
}
/**
* Reset is not supported.
*/
public void reset() {
throw new UnsupportedOperationException("Reset not supported");
}
/**
* Mark/reset operations are not supported.
*
* @return False.
*/
public boolean markSuppoted() {
return false;
}
/**
* Throw an exception if the stream's closed.
*
* @throws IOException if the stream's closed.
*/
private final void checkOpen() throws IOException {
if (open) return;
throw new IOException("Stream closed");
}
/** Product ID. */
private String id;
/** What can retrieve the product. */
private Retriever retriever;
/** How big the product is. */
private long size;
/** Current block or null if there's no current block. */
private byte[] block;
/** From where in <code>block</code> to read. */
private int blockIndex;
/** From where in the product to read the next block. */
private long productIndex = 0L;
/** True if we got to the end of file. */
private boolean eof;
/** Is the stream open? */
private boolean open;
/** Size of chunks. */
private static final int BLOCK_SIZE = Integer.getInteger("org.apache.oodt.xmlquery.blockSize", 4096).intValue();
}