| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hdfs.web; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.HttpURLConnection; |
| import java.net.URL; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.StringTokenizer; |
| |
| import org.apache.commons.io.input.BoundedInputStream; |
| import org.apache.hadoop.fs.FSExceptionMessages; |
| import org.apache.hadoop.fs.FSInputStream; |
| import org.apache.http.HttpStatus; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.net.HttpHeaders; |
| |
| import javax.annotation.Nonnull; |
| |
| /** |
| * To support HTTP byte streams, a new connection to an HTTP server needs to be |
| * created each time. This class hides the complexity of those multiple |
| * connections from the client. Whenever seek() is called, a new connection |
| * is made on the successive read(). The normal input stream functions are |
| * connected to the currently active input stream. |
| */ |
| public abstract class ByteRangeInputStream extends FSInputStream { |
| |
| /** |
| * This class wraps a URL and provides method to open connection. |
| * It can be overridden to change how a connection is opened. |
| */ |
| public static abstract class URLOpener { |
| protected URL url; |
| |
| public URLOpener(URL u) { |
| url = u; |
| } |
| |
| public void setURL(URL u) { |
| url = u; |
| } |
| |
| public URL getURL() { |
| return url; |
| } |
| |
| /** Connect to server with a data offset. */ |
| protected abstract HttpURLConnection connect(final long offset, |
| final boolean resolved) throws IOException; |
| } |
| |
| static class InputStreamAndFileLength { |
| final Long length; |
| final InputStream in; |
| |
| InputStreamAndFileLength(Long length, InputStream in) { |
| this.length = length; |
| this.in = in; |
| } |
| } |
| |
| enum StreamStatus { |
| NORMAL, SEEK, CLOSED |
| } |
| protected InputStream in; |
| protected final URLOpener originalURL; |
| protected final URLOpener resolvedURL; |
| protected long startPos = 0; |
| protected long currentPos = 0; |
| protected Long fileLength = null; |
| |
| StreamStatus status = StreamStatus.SEEK; |
| |
| /** |
| * Create with the specified URLOpeners. Original url is used to open the |
| * stream for the first time. Resolved url is used in subsequent requests. |
| * @param o Original url |
| * @param r Resolved url |
| */ |
| public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException { |
| this.originalURL = o; |
| this.resolvedURL = r; |
| getInputStream(); |
| } |
| |
| protected abstract URL getResolvedUrl(final HttpURLConnection connection |
| ) throws IOException; |
| |
| @VisibleForTesting |
| protected InputStream getInputStream() throws IOException { |
| switch (status) { |
| case NORMAL: |
| break; |
| case SEEK: |
| if (in != null) { |
| in.close(); |
| } |
| InputStreamAndFileLength fin = openInputStream(startPos); |
| in = fin.in; |
| fileLength = fin.length; |
| status = StreamStatus.NORMAL; |
| break; |
| case CLOSED: |
| throw new IOException("Stream closed"); |
| } |
| return in; |
| } |
| |
| @VisibleForTesting |
| protected InputStreamAndFileLength openInputStream(long startOffset) |
| throws IOException { |
| if (startOffset < 0) { |
| throw new EOFException("Negative Position"); |
| } |
| // Use the original url if no resolved url exists, eg. if |
| // it's the first time a request is made. |
| final boolean resolved = resolvedURL.getURL() != null; |
| final URLOpener opener = resolved? resolvedURL: originalURL; |
| |
| final HttpURLConnection connection = opener.connect(startOffset, resolved); |
| resolvedURL.setURL(getResolvedUrl(connection)); |
| |
| InputStream in = connection.getInputStream(); |
| final Long length; |
| final Map<String, List<String>> headers = connection.getHeaderFields(); |
| if (isChunkedTransferEncoding(headers)) { |
| // file length is not known |
| length = null; |
| } else { |
| // for non-chunked transfer-encoding, get content-length |
| long streamlength = getStreamLength(connection, headers); |
| length = startOffset + streamlength; |
| |
| // Java has a bug with >2GB request streams. It won't bounds check |
| // the reads so the transfer blocks until the server times out |
| in = new BoundedInputStream(in, streamlength); |
| } |
| |
| return new InputStreamAndFileLength(length, in); |
| } |
| |
| private static long getStreamLength(HttpURLConnection connection, |
| Map<String, List<String>> headers) throws IOException { |
| String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH); |
| if (cl == null) { |
| // Try to get the content length by parsing the content range |
| // because HftpFileSystem does not return the content length |
| // if the content is partial. |
| if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) { |
| cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE); |
| return getLengthFromRange(cl); |
| } else { |
| throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: " |
| + headers); |
| } |
| } |
| return Long.parseLong(cl); |
| } |
| |
| private static long getLengthFromRange(String cl) throws IOException { |
| try { |
| |
| String[] str = cl.substring(6).split("[-/]"); |
| return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1; |
| } catch (Exception e) { |
| throw new IOException( |
| "failed to get content length by parsing the content range: " + cl |
| + " " + e.getMessage()); |
| } |
| } |
| |
| private static boolean isChunkedTransferEncoding( |
| final Map<String, List<String>> headers) { |
| return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked") |
| || contains(headers, HttpHeaders.TE, "chunked"); |
| } |
| |
| /** Does the HTTP header map contain the given key, value pair? */ |
| private static boolean contains(final Map<String, List<String>> headers, |
| final String key, final String value) { |
| final List<String> values = headers.get(key); |
| if (values != null) { |
| for(String v : values) { |
| for(final StringTokenizer t = new StringTokenizer(v, ","); |
| t.hasMoreTokens(); ) { |
| if (value.equalsIgnoreCase(t.nextToken())) { |
| return true; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| private int update(final int n) throws IOException { |
| if (n != -1) { |
| currentPos += n; |
| } else if (fileLength != null && currentPos < fileLength) { |
| throw new IOException("Got EOF but currentPos = " + currentPos |
| + " < filelength = " + fileLength); |
| } |
| return n; |
| } |
| |
| @Override |
| public int read() throws IOException { |
| final int b = getInputStream().read(); |
| update((b == -1) ? -1 : 1); |
| return b; |
| } |
| |
| @Override |
| public int read(@Nonnull byte b[], int off, int len) throws IOException { |
| return update(getInputStream().read(b, off, len)); |
| } |
| |
| /** |
| * Seek to the given offset from the start of the file. |
| * The next read() will be from that location. Can't |
| * seek past the end of the file. |
| */ |
| @Override |
| public void seek(long pos) throws IOException { |
| if (pos != currentPos) { |
| startPos = pos; |
| currentPos = pos; |
| if (status != StreamStatus.CLOSED) { |
| status = StreamStatus.SEEK; |
| } |
| } |
| } |
| |
| @Override |
| public int read(long position, byte[] buffer, int offset, int length) |
| throws IOException { |
| validatePositionedReadArgs(position, buffer, offset, length); |
| if (length == 0) { |
| return 0; |
| } |
| try (InputStream in = openInputStream(position).in) { |
| return in.read(buffer, offset, length); |
| } |
| } |
| |
| @Override |
| public void readFully(long position, byte[] buffer, int offset, int length) |
| throws IOException { |
| validatePositionedReadArgs(position, buffer, offset, length); |
| if (length == 0) { |
| return; |
| } |
| final InputStreamAndFileLength fin = openInputStream(position); |
| try { |
| if (fin.length != null && length + position > fin.length) { |
| throw new EOFException("The length to read " + length |
| + " exceeds the file length " + fin.length); |
| } |
| int nread = 0; |
| while (nread < length) { |
| int nbytes = fin.in.read(buffer, offset + nread, length - nread); |
| if (nbytes < 0) { |
| throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); |
| } |
| nread += nbytes; |
| } |
| } finally { |
| fin.in.close(); |
| } |
| } |
| |
| /** |
| * Return the current offset from the start of the file |
| */ |
| @Override |
| public long getPos() throws IOException { |
| return currentPos; |
| } |
| |
| /** |
| * Seeks a different copy of the data. Returns true if |
| * found a new source, false otherwise. |
| */ |
| @Override |
| public boolean seekToNewSource(long targetPos) throws IOException { |
| return false; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (in != null) { |
| in.close(); |
| in = null; |
| } |
| status = StreamStatus.CLOSED; |
| } |
| |
| @Override |
| public synchronized int available() throws IOException{ |
| getInputStream(); |
| if(fileLength != null){ |
| long remaining = fileLength - currentPos; |
| return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; |
| }else { |
| return Integer.MAX_VALUE; |
| } |
| } |
| } |