blob: aa88bb78e6ba6e5674f99280b750a24a6b9858e4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.http.HttpStatus;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HttpHeaders;
import javax.annotation.Nonnull;
/**
* To support HTTP byte streams, a new connection to an HTTP server needs to be
* created each time. This class hides the complexity of those multiple
* connections from the client. Whenever seek() is called, a new connection
* is made on the successive read(). The normal input stream functions are
* connected to the currently active input stream.
*/
public abstract class ByteRangeInputStream extends FSInputStream {
/**
* This class wraps a URL and provides method to open connection.
* It can be overridden to change how a connection is opened.
*/
public static abstract class URLOpener {
protected URL url;
public URLOpener(URL u) {
url = u;
}
public void setURL(URL u) {
url = u;
}
public URL getURL() {
return url;
}
/** Connect to server with a data offset. */
protected abstract HttpURLConnection connect(final long offset,
final boolean resolved) throws IOException;
}
static class InputStreamAndFileLength {
final Long length;
final InputStream in;
InputStreamAndFileLength(Long length, InputStream in) {
this.length = length;
this.in = in;
}
}
enum StreamStatus {
NORMAL, SEEK, CLOSED
}
protected InputStream in;
protected final URLOpener originalURL;
protected final URLOpener resolvedURL;
protected long startPos = 0;
protected long currentPos = 0;
protected Long fileLength = null;
StreamStatus status = StreamStatus.SEEK;
/**
* Create with the specified URLOpeners. Original url is used to open the
* stream for the first time. Resolved url is used in subsequent requests.
* @param o Original url
* @param r Resolved url
*/
public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
this.originalURL = o;
this.resolvedURL = r;
getInputStream();
}
protected abstract URL getResolvedUrl(final HttpURLConnection connection
) throws IOException;
@VisibleForTesting
protected InputStream getInputStream() throws IOException {
switch (status) {
case NORMAL:
break;
case SEEK:
if (in != null) {
in.close();
}
InputStreamAndFileLength fin = openInputStream(startPos);
in = fin.in;
fileLength = fin.length;
status = StreamStatus.NORMAL;
break;
case CLOSED:
throw new IOException("Stream closed");
}
return in;
}
@VisibleForTesting
protected InputStreamAndFileLength openInputStream(long startOffset)
throws IOException {
if (startOffset < 0) {
throw new EOFException("Negative Position");
}
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null;
final URLOpener opener = resolved? resolvedURL: originalURL;
final HttpURLConnection connection = opener.connect(startOffset, resolved);
resolvedURL.setURL(getResolvedUrl(connection));
InputStream in = connection.getInputStream();
final Long length;
final Map<String, List<String>> headers = connection.getHeaderFields();
if (isChunkedTransferEncoding(headers)) {
// file length is not known
length = null;
} else {
// for non-chunked transfer-encoding, get content-length
long streamlength = getStreamLength(connection, headers);
length = startOffset + streamlength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
in = new BoundedInputStream(in, streamlength);
}
return new InputStreamAndFileLength(length, in);
}
private static long getStreamLength(HttpURLConnection connection,
Map<String, List<String>> headers) throws IOException {
String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
if (cl == null) {
// Try to get the content length by parsing the content range
// because HftpFileSystem does not return the content length
// if the content is partial.
if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) {
cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE);
return getLengthFromRange(cl);
} else {
throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+ headers);
}
}
return Long.parseLong(cl);
}
private static long getLengthFromRange(String cl) throws IOException {
try {
String[] str = cl.substring(6).split("[-/]");
return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1;
} catch (Exception e) {
throw new IOException(
"failed to get content length by parsing the content range: " + cl
+ " " + e.getMessage());
}
}
private static boolean isChunkedTransferEncoding(
final Map<String, List<String>> headers) {
return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
|| contains(headers, HttpHeaders.TE, "chunked");
}
/** Does the HTTP header map contain the given key, value pair? */
private static boolean contains(final Map<String, List<String>> headers,
final String key, final String value) {
final List<String> values = headers.get(key);
if (values != null) {
for(String v : values) {
for(final StringTokenizer t = new StringTokenizer(v, ",");
t.hasMoreTokens(); ) {
if (value.equalsIgnoreCase(t.nextToken())) {
return true;
}
}
}
}
return false;
}
private int update(final int n) throws IOException {
if (n != -1) {
currentPos += n;
} else if (fileLength != null && currentPos < fileLength) {
throw new IOException("Got EOF but currentPos = " + currentPos
+ " < filelength = " + fileLength);
}
return n;
}
@Override
public int read() throws IOException {
final int b = getInputStream().read();
update((b == -1) ? -1 : 1);
return b;
}
@Override
public int read(@Nonnull byte b[], int off, int len) throws IOException {
return update(getInputStream().read(b, off, len));
}
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
@Override
public void seek(long pos) throws IOException {
if (pos != currentPos) {
startPos = pos;
currentPos = pos;
if (status != StreamStatus.CLOSED) {
status = StreamStatus.SEEK;
}
}
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
try (InputStream in = openInputStream(position).in) {
return in.read(buffer, offset, length);
}
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return;
}
final InputStreamAndFileLength fin = openInputStream(position);
try {
if (fin.length != null && length + position > fin.length) {
throw new EOFException("The length to read " + length
+ " exceeds the file length " + fin.length);
}
int nread = 0;
while (nread < length) {
int nbytes = fin.in.read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
} finally {
fin.in.close();
}
}
/**
* Return the current offset from the start of the file
*/
@Override
public long getPos() throws IOException {
return currentPos;
}
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
in = null;
}
status = StreamStatus.CLOSED;
}
@Override
public synchronized int available() throws IOException{
getInputStream();
if(fileLength != null){
long remaining = fileLength - currentPos;
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}else {
return Integer.MAX_VALUE;
}
}
}