/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;

/**
 * BlockReaderLocal enables local short circuited reads. If the DFS client is on
 * the same machine as the datanode, then the client can read files directly
 * from the local file system rather than going through the datanode for better
 * performance. <br>
 * {@link BlockReaderLocal} works as follows:
 * <ul>
 * <li>The client performing short circuit reads must be configured at the
 * datanode.</li>
 * <li>The client gets the path to the file where block is stored using
 * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
 * RPC call</li>
 * <li>Client uses kerberos authentication to connect to the datanode over RPC,
 * if security is enabled.</li>
 * </ul>
 */
class BlockReaderLocal implements BlockReader {
  private static final Log LOG = LogFactory.getLog(DFSClient.class);

  //Stores the cache and proxy for a local datanode.
  private static class LocalDatanodeInfo {
    private ClientDatanodeProtocol proxy = null;
    private final Map<ExtendedBlock, BlockLocalPathInfo> cache;

    LocalDatanodeInfo() {
      final int cacheSize = 10000;
      final float hashTableLoadFactor = 0.75f;
      int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
      cache = Collections
          .synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
              hashTableCapacity, hashTableLoadFactor, true) {
            private static final long serialVersionUID = 1;

            @Override
            protected boolean removeEldestEntry(
                Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
              return size() > cacheSize;
            }
          });
    }

    private synchronized ClientDatanodeProtocol getDatanodeProxy(
        DatanodeInfo node, Configuration conf, int socketTimeout)
        throws IOException {
      if (proxy == null) {
        proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
            socketTimeout);
      }
      return proxy;
    }
    
    private synchronized void resetDatanodeProxy() {
      if (null != proxy) {
        RPC.stopProxy(proxy);
        proxy = null;
      }
    }

    private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
      return cache.get(b);
    }

    private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
      cache.put(b, info);
    }

    private void removeBlockLocalPathInfo(ExtendedBlock b) {
      cache.remove(b);
    }
  }
  
  // Multiple datanodes could be running on the local machine. Store proxies in
  // a map keyed by the ipc port of the datanode.
  private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();

  private final FileInputStream dataIn; // reader for the data file
  private FileInputStream checksumIn;   // reader for the checksum file

  private int offsetFromChunkBoundary;
  
  private byte[] skipBuf = null;
  private ByteBuffer dataBuff = null;
  private ByteBuffer checksumBuff = null;
  private DataChecksum checksum;
  private final boolean verifyChecksum;

  private static DirectBufferPool bufferPool = new DirectBufferPool();

  private int bytesPerChecksum;
  private int checksumSize;

  /** offset in block where reader wants to actually read */
  private long startOffset;
  private final String filename;
  
  /**
   * The only way this object can be instantiated.
   */
  static BlockReaderLocal newBlockReader(Configuration conf, String file,
      ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
      int socketTimeout, long startOffset, long length) throws IOException {

    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
        .getIpcPort());
    // check the cache first
    BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
    if (pathinfo == null) {
      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
    }

    // check to see if the file exists. It may so happen that the
    // HDFS file has been deleted and this block-lookup is occurring
    // on behalf of a new HDFS file. This time, the block file could
    // be residing in a different portion of the fs.data.dir directory.
    // In this case, we remove this entry from the cache. The next
    // call to this method will re-populate the cache.
    FileInputStream dataIn = null;
    FileInputStream checksumIn = null;
    BlockReaderLocal localBlockReader = null;
    boolean skipChecksumCheck = skipChecksumCheck(conf);
    try {
      // get a local file system
      File blkfile = new File(pathinfo.getBlockPath());
      dataIn = new FileInputStream(blkfile);

      if (LOG.isDebugEnabled()) {
        LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
            + blkfile.length() + " startOffset " + startOffset + " length "
            + length + " short circuit checksum " + skipChecksumCheck);
      }

      if (!skipChecksumCheck) {
        // get the metadata file
        File metafile = new File(pathinfo.getMetaPath());
        checksumIn = new FileInputStream(metafile);

        // read and handle the common header here. For now just a version
        BlockMetadataHeader header = BlockMetadataHeader
            .readHeader(new DataInputStream(checksumIn));
        short version = header.getVersion();
        if (version != BlockMetadataHeader.VERSION) {
          LOG.warn("Wrong version (" + version + ") for metadata file for "
              + blk + " ignoring ...");
        }
        DataChecksum checksum = header.getChecksum();
        long firstChunkOffset = startOffset
            - (startOffset % checksum.getBytesPerChecksum());
        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
            startOffset, length, pathinfo, checksum, true, dataIn,
            firstChunkOffset, checksumIn);
      } else {
        localBlockReader = new BlockReaderLocal(conf, file, blk, token,
            startOffset, length, pathinfo, dataIn);
      }
    } catch (IOException e) {
      // remove from cache
      localDatanodeInfo.removeBlockLocalPathInfo(blk);
      DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
          + " from cache because local file " + pathinfo.getBlockPath()
          + " could not be opened.");
      throw e;
    } finally {
      if (localBlockReader == null) {
        if (dataIn != null) {
          dataIn.close();
        }
        if (checksumIn != null) {
          checksumIn.close();
        }
      }
    }
    return localBlockReader;
  }
  
  private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
    LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
    if (ldInfo == null) {
      ldInfo = new LocalDatanodeInfo();
      localDatanodeInfoMap.put(port, ldInfo);
    }
    return ldInfo;
  }
  
  private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
      DatanodeInfo node, Configuration conf, int timeout,
      Token<BlockTokenIdentifier> token) throws IOException {
    LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
    BlockLocalPathInfo pathinfo = null;
    ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
        conf, timeout);
    try {
      // make RPC to local datanode to find local pathnames of blocks
      pathinfo = proxy.getBlockLocalPathInfo(blk, token);
      if (pathinfo != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Cached location of block " + blk + " as " + pathinfo);
        }
        localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
      }
    } catch (IOException e) {
      localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
      throw e;
    }
    return pathinfo;
  }
  
  private static boolean skipChecksumCheck(Configuration conf) {
    return conf.getBoolean(
        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
        DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
  }
  
  private BlockReaderLocal(Configuration conf, String hdfsfile,
      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
      long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
      throws IOException {
    this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
        DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false,
        dataIn, startOffset, null);
  }

  private BlockReaderLocal(Configuration conf, String hdfsfile,
      ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
      long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
      boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
      FileInputStream checksumIn) throws IOException {
    this.filename = hdfsfile;
    this.checksum = checksum;
    this.verifyChecksum = verifyChecksum;
    this.startOffset = Math.max(startOffset, 0);

    bytesPerChecksum = this.checksum.getBytesPerChecksum();
    checksumSize = this.checksum.getChecksumSize();

    this.dataIn = dataIn;
    this.checksumIn = checksumIn;
    this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
    dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
    checksumBuff = bufferPool.getBuffer(checksumSize*64);
    //Initially the buffers have nothing to read.
    dataBuff.flip();
    checksumBuff.flip();
    long toSkip = firstChunkOffset;
    while (toSkip > 0) {
      long skipped = dataIn.skip(toSkip);
      if (skipped == 0) {
        throw new IOException("Couldn't initialize input stream");
      }
      toSkip -= skipped;
    }
    if (checksumIn != null) {
      long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
          * checksumSize;
      while (checkSumOffset > 0) {
        long skipped = checksumIn.skip(checkSumOffset);
        if (skipped == 0) {
          throw new IOException("Couldn't initialize checksum input stream");
        }
        checkSumOffset -= skipped;
      }
    }
  }

  private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
      throws IOException {
    int bytesRead = stream.getChannel().read(buf);
    if (bytesRead < 0) {
      //EOF
      return bytesRead;
    }
    while (buf.remaining() > 0) {
      int n = stream.getChannel().read(buf);
      if (n < 0) {
        //EOF
        return bytesRead;
      }
      bytesRead += n;
    }
    return bytesRead;
  }
  
  @Override
  public synchronized int read(byte[] buf, int off, int len) throws IOException {
    if (LOG.isDebugEnabled()) {
      LOG.info("read off " + off + " len " + len);
    }
    if (!verifyChecksum) {
      return dataIn.read(buf, off, len);
    } else {
      int dataRead = -1;
      if (dataBuff.remaining() == 0) {
        dataBuff.clear();
        checksumBuff.clear();
        dataRead = readIntoBuffer(dataIn, dataBuff);
        readIntoBuffer(checksumIn, checksumBuff);
        checksumBuff.flip();
        dataBuff.flip();
        checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
            this.startOffset);
      } else {
        dataRead = dataBuff.remaining();
      }
      if (dataRead > 0) {
        int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
        if (offsetFromChunkBoundary > 0) {
          dataBuff.position(offsetFromChunkBoundary);
          // Its either end of file or dataRead is greater than the
          // offsetFromChunkBoundary
          offsetFromChunkBoundary = 0;
        }
        if (nRead > 0) {
          dataBuff.get(buf, off, nRead);
          return nRead;
        } else {
          return 0;
        }
      } else {
        return -1;
      }
    }
  }

  @Override
  public synchronized long skip(long n) throws IOException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("skip " + n);
    }
    if (n <= 0) {
      return 0;
    }
    if (!verifyChecksum) {
      return dataIn.skip(n);
    }
  
    // caller made sure newPosition is not beyond EOF.
    int remaining = dataBuff.remaining();
    int position = dataBuff.position();
    int newPosition = position + (int)n;
  
    // if the new offset is already read into dataBuff, just reposition
    if (n <= remaining) {
      assert offsetFromChunkBoundary == 0;
      dataBuff.position(newPosition);
      return n;
    }
  
    // for small gap, read through to keep the data/checksum in sync
    if (n - remaining <= bytesPerChecksum) {
      dataBuff.position(position + remaining);
      if (skipBuf == null) {
        skipBuf = new byte[bytesPerChecksum];
      }
      int ret = read(skipBuf, 0, (int)(n - remaining));
      return ret;
    }
  
    // optimize for big gap: discard the current buffer, skip to
    // the beginning of the appropriate checksum chunk and then
    // read to the middle of that chunk to be in sync with checksums.
    this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
    long toskip = n - remaining - this.offsetFromChunkBoundary;
  
    dataBuff.clear();
    checksumBuff.clear();
  
    long dataSkipped = dataIn.skip(toskip);
    if (dataSkipped != toskip) {
      throw new IOException("skip error in data input stream");
    }
    long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
    if (checkSumOffset > 0) {
      long skipped = checksumIn.skip(checkSumOffset);
      if (skipped != checkSumOffset) {
        throw new IOException("skip error in checksum input stream");
      }
    }

    // read into the middle of the chunk
    if (skipBuf == null) {
      skipBuf = new byte[bytesPerChecksum];
    }
    assert skipBuf.length == bytesPerChecksum;
    assert this.offsetFromChunkBoundary < bytesPerChecksum;
    int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
    if (ret == -1) {  // EOS
      return toskip;
    } else {
      return (toskip + ret);
    }
  }

  @Override
  public synchronized void close() throws IOException {
    dataIn.close();
    if (checksumIn != null) {
      checksumIn.close();
    }
    if (dataBuff != null) {
      bufferPool.returnBuffer(dataBuff);
      dataBuff = null;
    }
    if (checksumBuff != null) {
      bufferPool.returnBuffer(checksumBuff);
      checksumBuff = null;
    }
    startOffset = -1;
    checksum = null;
  }

  @Override
  public int readAll(byte[] buf, int offset, int len) throws IOException {
    return BlockReaderUtil.readAll(this, buf, offset, len);
  }

  @Override
  public void readFully(byte[] buf, int off, int len) throws IOException {
    BlockReaderUtil.readFully(this, buf, off, len);
  }

  @Override
  public Socket takeSocket() {
    return null;
  }

  @Override
  public boolean hasSentStatusCode() {
    return false;
  }
}