blob: 937d34e2bbf242f152696e656ed8b780a08b9015 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple
* connections from the client. Whenever seek() is called, a new connection
* is made on the successive read(). The normal input stream functions are
* connected to the currently active input stream.
*/
class ByteRangeInputStream extends FSInputStream {
/**
* This class wraps a URL to allow easy mocking when testing. The URL class
* cannot be easily mocked because it is public.
*/
static class URLOpener {
protected URL url;
public URLOpener(URL u) {
url = u;
}
public void setURL(URL u) {
url = u;
}
public URL getURL() {
return url;
}
public HttpURLConnection openConnection() throws IOException {
return (HttpURLConnection)url.openConnection();
}
}
protected InputStream in;
protected URLOpener originalURL;
protected URLOpener resolvedURL;
protected long startPos = 0;
protected long currentPos = 0;
protected long filelength;
protected int status = STATUS_SEEK;
protected static final int STATUS_NORMAL = 0;
protected static final int STATUS_SEEK = 1;
ByteRangeInputStream(final URL url) {
this(new URLOpener(url), new URLOpener(null));
}
ByteRangeInputStream(URLOpener o, URLOpener r) {
this.originalURL = o;
this.resolvedURL = r;
}
private InputStream getInputStream() throws IOException {
if (status != STATUS_NORMAL) {
if (in != null) {
in.close();
in = null;
}
// use the original url if no resolved url exists (e.g., if it's
// the first time a request is made)
final URLOpener o = resolvedURL.getURL() == null? originalURL: resolvedURL;
final HttpURLConnection connection = o.openConnection();
try {
connection.setRequestMethod("GET");
if (startPos != 0) {
connection.setRequestProperty("Range", "bytes="+startPos+"-");
}
connection.connect();
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
filelength = cl == null? -1: Long.parseLong(cl);
if (HftpFileSystem.LOG.isDebugEnabled()) {
HftpFileSystem.LOG.debug("filelength = " + filelength);
}
in = connection.getInputStream();
} catch(IOException ioe) {
HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
}
if (startPos != 0 && connection.getResponseCode() != 206) {
// we asked for a byte range but did not receive a partial content
// response...
throw new IOException("206 expected, but received "
+ connection.getResponseCode());
} else if(startPos == 0 && connection.getResponseCode() != 200) {
// we asked for all bytes from the beginning but didn't receive a 200
// response (none of the other 2xx codes are valid here)
throw new IOException("200 expected, but received "
+ connection.getResponseCode());
}
resolvedURL.setURL(connection.getURL());
status = STATUS_NORMAL;
}
return in;
}
private void update(final boolean isEOF, final int n
) throws IOException {
if (!isEOF) {
currentPos += n;
} else if (currentPos < filelength) {
throw new IOException("Got EOF but currentPos = " + currentPos
+ " < filelength = " + filelength);
}
}
public int read() throws IOException {
final int b = getInputStream().read();
update(b == -1, 1);
return b;
}
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
public void seek(long pos) throws IOException {
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
status = STATUS_SEEK;
}
}
/**
* Return the current offset from the start of the file
*/
public long getPos() throws IOException {
return currentPos; // keep total count?
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
}