org.apache.hadoop.fs.FSDataInputStream
FSDataInputStream extends DataInputStream
The core behavior of FSDataInputStream
is defined by java.io.DataInputStream
, with extensions that add key assumptions to the system.
Seekable.seek()
to offsets within the array of bytes, with future reads starting at this offset.Files are opened via FileSystem.open(p)
, which, if successful, returns:
result = FSDataInputStream(0, FS.Files[p])
The stream can be modeled as:
FSDIS = (pos, data[], isOpen)
with access functions:
pos(FSDIS) data(FSDIS) isOpen(FSDIS)
Implicit invariant: the size of the data stream equals the size of the file as returned by FileSystem.getFileStatus(Path p)
forall p in dom(FS.Files[p]) : len(data(FSDIS)) == FS.getFileStatus(p).length
Closeable.close()
The semantics of java.io.Closeable
are defined in the interface definition within the JRE.
The operation MUST be idempotent; the following sequence is not an error:
FSDIS.close(); FSDIS.close();
Implementations SHOULD be robust against failure. If an inner stream is closed, it should be checked for being null
first.
Implementations SHOULD NOT raise IOException
exceptions (or any other exception) during this operation. Client applications often ignore these, or may fail unexpectedly.
FSDIS' = ((undefined), (undefined), False)
Seekable.getPos()
Return the current position. The outcome when a stream is closed is undefined.
isOpen(FSDIS)
result = pos(FSDIS)
InputStream.read()
Return the data at the current position.
read()
may take to complete.isOpen(FSDIS)
if ( pos < len(data) ): FSDIS' = (pos + 1, data, True) result = data[pos] else result = -1
InputStream.read(buffer[], offset, length)
Read length
bytes of data into the destination buffer, starting at offset offset
. The source of the data is the current position of the stream, as implicitly set in pos
.
isOpen(FSDIS) buffer != null else raise NullPointerException length >= 0 offset < len(buffer) length <= len(buffer) - offset pos >= 0 else raise EOFException, IOException
Exceptions that may be raised on precondition failure are
InvalidArgumentException ArrayIndexOutOfBoundsException RuntimeException
Not all filesystems check the isOpen
state.
if length == 0 : result = 0 else if pos > len(data): result = -1 else let l = min(length, len(data)-length) : buffer' = buffer where forall i in [0..l-1]: buffer'[o+i] = data[pos+i] FSDIS' = (pos+l, data, true) result = l
The java.io
API states that if the amount of data to be read (i.e. length
) then the call must block until the amount of data available is greater than zero —that is, until there is some data. The call is not required to return when the buffer is full, or indeed block until there is no data left in the stream.
That is, rather than l
being simply defined as min(length, len(data)-length)
, it strictly is an integer in the range 1..min(length, len(data)-length)
. While the caller may expect as much of the buffer as possible to be filled in, it is within the specification for an implementation to always return a smaller number, perhaps only ever 1 byte.
What is critical is that unless the destination buffer size is 0, the call must block until at least one byte is returned. Thus, for any data source of length greater than zero, repeated invocations of this read()
operation will eventually read all the data.
Seekable.seek(s)
Not all subclasses implement the Seek operation:
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
If the operation is supported, the file SHOULD be open:
isOpen(FSDIS)
Some filesystems do not perform this check, relying on the read()
contract to reject reads on a closed stream (e.g. RawLocalFileSystem
).
A seek(0)
MUST always succeed, as the seek position must be positive and less than the length of the Stream:
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
Some FileSystems do not raise an exception if this condition is not met. They instead return -1 on any read()
operation where, at the time of the read, len(data(FSDIS)) < pos(FSDIS)
.
After a failed seek, the value of pos(FSDIS)
may change. As an example, seeking past the EOF may move the read position to the end of the file, as well as raising an EOFException
.
FSDIS' = (s, data, True)
There is an implicit invariant: a seek to the current position is a no-op
seek(getPos())
Implementations may recognise this operation and bypass all other precondition checks, leaving the input stream unchanged.
The most recent connectors to object stores all implement some form of “lazy-seek”: the seek()
call may appear to update the stream, and the value of getPos()
is updated, but the file is not opened/reopenend until data is actually read. Implementations of lazy seek MUST still validate the new seek position against the known length of the file. However the state of the file (i.e. does it exist, what its current length is) does not need to be refreshed at this point. The fact that a file has been deleted or truncated may not surface until that read()
call.
Seekable.seekToNewSource(offset)
This operation instructs the source to retrieve data[]
from a different source from the current source. This is only relevant if the filesystem supports multiple replicas of a file and there is more than 1 replica of the data at offset offset
.
Not all subclasses implement this operation, and instead either raise an exception or return False
.
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
Examples: CompressionInputStream
, HttpFSFileSystem
If supported, the file must be open:
isOpen(FSDIS)
The majority of subclasses that do not implement this operation simply fail.
if not supported(FSDIS, Seekable.seekToNewSource(s)): result = False
Examples: RawLocalFileSystem
, HttpFSFileSystem
If the operation is supported and there is a new location for the data:
FSDIS' = (pos, data', true) result = True
The new data is the original data (or an updated version of it, as covered in the Consistency section below), but the block containing the data at offset
is sourced from a different replica.
If there is no other copy, FSDIS
is not updated; the response indicates this:
result = False
Outside of test methods, the primary use of this method is in the {{FSInputChecker}} class, which can react to a checksum error in a read by attempting to source the data elsewhere. If a new source can be found it attempts to reread and recheck that portion of the file.
CanUnbuffer.unbuffer()
This operation instructs the source to release any system resources they are currently holding on to, such as buffers, sockets, file descriptors, etc. Any subsequent IO operation will likely have to reacquire these resources. Unbuffering is useful in situation where streams need to remain open, but no IO operation is expected from the stream in the immediate future (examples include file handle cacheing).
Not all subclasses implement this operation. In addition to implementing CanUnbuffer
. Subclasses must implement the StreamCapabilities
interface and StreamCapabilities.hasCapability(UNBUFFER)
must return true. If a subclass implements CanUnbuffer
but does not report the functionality via StreamCapabilities
then the call to unbuffer
does nothing. If a subclass reports that it does implement UNBUFFER
, but does not implement the CanUnbuffer
interface, an UnsupportedOperationException
is thrown.
supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
This method is not thread-safe. If unbuffer
is called while a read
is in progress, the outcome is undefined.
unbuffer
can be called on a closed file, in which case unbuffer
will do nothing.
The majority of subclasses that do not implement this operation simply do nothing.
If the operation is supported, unbuffer
releases any and all system resources associated with the stream. The exact list of what these resources are is generally implementation dependent, however, in general, it may include buffers, sockets, file descriptors, etc.
PositionedReadable
The PositionedReadable
operations supply “positioned reads” (“pread”). They provide the ability to read data into a buffer from a specific position in the data stream. Positioned reads equate to a Seekable.seek
at a particular offset followed by a InputStream.read(buffer[], offset, length)
, only there is a single method invocation, rather than seek
then read
, and two positioned reads can optionally run concurrently over a single instance of a FSDataInputStream
stream.
The interface declares positioned reads thread-safe (some of the implementations do not follow this guarantee).
Any positional read run concurrent with a stream operation — e.g. Seekable.seek
, Seekable.getPos()
, and InputStream.read()
— MUST run in isolation; there must not be mutual interference.
Concurrent positional reads and stream operations MUST be serializable; one may block the other so they run in series but, for better throughput and ‘liveness’, they SHOULD run concurrently.
Given two parallel positional reads, one at pos1
for len1
into buffer dest1
, and another at pos2
for len2
into buffer dest2
, AND given a concurrent, stream read run after a seek to pos3
, the resultant buffers MUST be filled as follows, even if the reads happen to overlap on the underlying stream:
// Positioned read #1 read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] = [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1] // Positioned read #2 read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] = [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1] // Stream read seek(pos3); read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
Note that implementations are not required to be atomic; the intermediate state of the operation (the change in the value of getPos()
) may be visible.
Not all FSDataInputStream
implementations support these operations. Those that do not implement Seekable.seek()
do not implement the PositionedReadable
interface.
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
This could be considered obvious: if a stream is not Seekable
, a client cannot seek to a location. It is also a side effect of the base class implementation, which uses Seekable.seek()
.
Implicit invariant: for all PositionedReadable
operations, the value of pos
is unchanged at the end of the operation
pos(FSDIS') == pos(FSDIS)
For any operations that fail, the contents of the destination buffer
are undefined. Implementations may overwrite part or all of the buffer before reporting a failure.
int PositionedReadable.read(position, buffer, offset, length)
Read as much data as possible into the buffer space allocated for it.
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] length >= 0 offset >= 0
The amount of data read is the less of the length or the amount of data available from the specified position:
let available = min(length, len(data)-position) buffer'[offset..(offset+available-1)] = data[position..position+available -1] result = available
length==0
implicitly does not read any data; implementations may short-cut the operation and omit any IO. In such instances, checks for the stream being at the end of the file may be omitted.buffer
is undefined.void PositionedReadable.readFully(position, buffer, offset, length)
Read exactly length
bytes of data into the buffer, failing if there is not enough data available.
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] length >= 0 offset >= 0 len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] (position + length) <= len(data) else raise [EOFException, IOException]
If an IO exception occurs during the read operation(s), the final state of buffer
is undefined.
If there is not enough data in the input stream to satisfy the requests, the final state of buffer
is undefined.
The buffer from offset offset
is filled with the data starting at position
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
PositionedReadable.readFully(position, buffer)
The semantics of this are exactly equivalent to
readFully(position, buffer, 0, len(buffer))
That is, the buffer is filled entirely with the contents of the input source from position position
.
void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)
Read fully data for a list of ranges asynchronously. The default implementation iterates through the ranges, tries to coalesce the ranges based on values of minSeekForVectorReads
and maxReadSizeForVectorReads
and then read each merged ranges synchronously, but the intent is sub classes can implement efficient implementation. Reading in both direct and heap byte buffers are supported. Also, clients are encouraged to use WeakReferencedElasticByteBufferPool
for allocating buffers such that even direct buffers are garbage collected when they are no longer referenced.
The position returned by getPos()
after readVectored()
is undefined.
If a file is changed while the readVectored()
operation is in progress, the output is undefined. Some ranges may have old data, some may have new, and some may have both.
While a readVectored()
operation is in progress, normal read API calls MAY block; the value of getPos(
) is also undefined. Applications SHOULD NOT make such requests while waiting for the results of a vectored read.
Note: Don't use direct buffers for reading from ChecksumFileSystem
as that may lead to memory fragmentation explained in HADOOP-18296 Memory fragmentation in ChecksumFileSystem Vectored IO implementation
No empty lists.
if ranges = null raise NullPointerException if allocate = null raise NullPointerException
For each requested range range[i]
in the list of ranges range[0..n]
sorted on getOffset()
ascending such that
for all i where i > 0
:
range[i].getOffset() > range[i-1].getOffset()
For all ranges 0..i
the preconditions are:
ranges[i] != null else raise IllegalArgumentException ranges[i].getOffset() >= 0 else raise EOFException ranges[i].getLength() >= 0 else raise IllegalArgumentException if i > 0 and ranges[i].getOffset() < (ranges[i-1].getOffset() + ranges[i-1].getLength) : raise IllegalArgumentException
If the length of the file is known during the validation phase:
if range[i].getOffset + range[i].getLength >= data.length() raise EOFException
For each requested range range[i]
in the list of ranges range[0..n]
ranges[i]'.getData() = CompletableFuture<buffer: ByteBuffer>
and when getData().get()
completes:
let buffer = `getData().get() let len = ranges[i].getLength() let data = new byte[len] (buffer.position() - buffer.limit) = len buffer.get(data, 0, len) = readFully(ranges[i].getOffset(), data, 0, len)
That is: the result of every ranged read is the result of the (possibly asynchronous) call to PositionedReadable.readFully()
for the same offset and length
minSeekForVectorReads()
The smallest reasonable seek. Two ranges won't be merged together if the difference between end of first and start of next range is more than this value.
maxReadSizeForVectorReads()
Maximum number of bytes which can be read in one go after merging the ranges. Two ranges won‘t be merged if the combined data to be read It’s okay we have a look at what we do right now for readOkayis more than this value. Essentially setting this to 0 will disable the merging of ranges.
readVectored()
while a separate thread is trying to read data through read()
/readFully()
, all operations MUST complete successfully.read()
/readFully()
while a vector API call is in progress MUST be supported. The order of which calls return data is undefined.The S3A connector closes any open stream when its synchronized readVectored()
method is invoked; It will then switch the read policy from normal to random so that any future invocations will be for limited ranges. This is because the expectation is that vector IO and large sequential reads are not mixed and that holding on to any open HTTP connection is wasteful.
Implementations MAY short-circuit reads for any range where range.getLength() = 0
and return an empty buffer.
In such circumstances, other validation checks MAY be omitted.
There are no guarantees that such optimizations take place; callers SHOULD NOT include empty ranges for this reason.
FSDIS
provided from a FileSystem.open(p)
are expected to receive access to the data of FS.Files[p]
at the time of opening.At time t0
FSDIS0 = FS'read(p) = (0, data0[])
At time t1
FS' = FS' where FS'.Files[p] = data1
From time t >= t1
, the value of FSDIS0
is undefined.
It may be unchanged
FSDIS0.data == data0 forall l in len(FSDIS0.data): FSDIS0.read() == data0[l]
It may pick up the new data
FSDIS0.data == data1 forall l in len(FSDIS0.data): FSDIS0.read() == data1[l]
It may be inconsistent, such that a read of an offset returns data from either of the datasets
forall l in len(FSDIS0.data): (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
That is, every value read may be from the original or updated file.
It may also be inconsistent on repeated reads of same offset, that is at time t2 > t1
:
r2 = FSDIS0.read(l)
While at time t3 > t2
:
r3 = FSDIS0.read(l)
It may be that r3 != r2
. (That is, some of the data my be cached or replicated, and on a subsequent read, a different version of the file's contents are returned).
Similarly, if the data at the path p
, is deleted, this change MAY or MAY not be visible during read operations performed on FSDIS0
.
The readVectored()
API was shipped in Hadoop 3.3.5, with explicit local, raw local and S3A support -and fallback everywhere else.
Overlapping ranges
The restriction “no overlapping ranges” was only initially enforced in the S3A connector, which would raise UnsupportedOperationException
. Adding the range check as a precondition for all implementations guarantees consistent behavior everywhere. For reliable use with older hadoop releases with the API: sort the list of ranges and check for overlaps before calling readVectored()
.
Direct Buffer Reads
Releases without HADOOP-19101 Vectored Read into off-heap buffer broken in fallback implementation can read data from the wrong offset with the default “fallback” implementation if the buffer allocator function returns off heap “direct” buffers.
The custom implementations in local filesystem and S3A's non-prefetching stream are safe.
Anyone implementing support for the API, unless confident they only run against releases with the fixed implementation, SHOULD NOT use the API if the allocator is direct and the input stream does not explicitly declare support through an explicit hasCapability()
probe:
Stream.hasCapability("in:readvectored")
Given the HADOOP-18296 problem with ChecksumFileSystem
and direct buffers, across all releases, it is best to avoid using this API in production with direct buffers.