blob: f40315a6da013f8d864cd28fc824693e11fac43d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;
/**
* Contains the input streams for the data and checksum of a replica.
*/
public class ReplicaInputStreams implements Closeable {
public static final Logger LOG = DataNode.LOG;
private InputStream dataIn;
private InputStream checksumIn;
private FsVolumeReference volumeRef;
private final FileIoProvider fileIoProvider;
private FileDescriptor dataInFd = null;
/** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(
InputStream dataStream, InputStream checksumStream,
FsVolumeReference volumeRef, FileIoProvider fileIoProvider) {
this.volumeRef = volumeRef;
this.fileIoProvider = fileIoProvider;
this.dataIn = dataStream;
this.checksumIn = checksumStream;
if (dataIn instanceof FileInputStream) {
try {
dataInFd = ((FileInputStream) dataIn).getFD();
} catch (Exception e) {
LOG.warn("Could not get file descriptor for inputstream of class " +
this.dataIn.getClass());
}
} else {
LOG.debug("Could not get file descriptor for inputstream of class " +
this.dataIn.getClass());
}
}
/** @return the data input stream. */
public InputStream getDataIn() {
return dataIn;
}
/** @return the checksum input stream. */
public InputStream getChecksumIn() {
return checksumIn;
}
public FileDescriptor getDataInFd() {
return dataInFd;
}
public FsVolumeReference getVolumeRef() {
return volumeRef;
}
public void readDataFully(byte[] buf, int off, int len)
throws IOException {
IOUtils.readFully(dataIn, buf, off, len);
}
public void readChecksumFully(byte[] buf, int off, int len)
throws IOException {
IOUtils.readFully(checksumIn, buf, off, len);
}
public void skipDataFully(long len) throws IOException {
IOUtils.skipFully(dataIn, len);
}
public void skipChecksumFully(long len) throws IOException {
IOUtils.skipFully(checksumIn, len);
}
public void closeChecksumStream() throws IOException {
IOUtils.closeStream(checksumIn);
checksumIn = null;
}
public void dropCacheBehindReads(String identifier, long offset, long len,
int flags) throws NativeIOException {
assert this.dataInFd != null : "null dataInFd!";
fileIoProvider.posixFadvise(getVolumeRef().getVolume(),
identifier, dataInFd, offset, len, flags);
}
public void closeStreams() throws IOException {
IOException ioe = null;
if(checksumIn!=null) {
try {
checksumIn.close(); // close checksum file
} catch (IOException e) {
ioe = e;
}
checksumIn = null;
}
if(dataIn!=null) {
try {
dataIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
dataIn = null;
dataInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;
}
}
@Override
public void close() {
IOUtils.closeStream(dataIn);
dataIn = null;
dataInFd = null;
IOUtils.closeStream(checksumIn);
checksumIn = null;
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
}