/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.datanode;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * DataBlockScanner manages block scanning for all the block pools. For each
 * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
 * thread to scan the blocks for that block pool. When a {@link BPOfferService}
 * becomes alive or dies, blockPoolScannerMap in this class is updated.
 */
@InterfaceAudience.Private
public class DataBlockScanner implements Runnable {
  public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
  private final DataNode datanode;
  private final FSDataset dataset;
  private final Configuration conf;
  
  /**
   * Map to find the BlockPoolScanner for a given block pool id. This is updated
   * when a BPOfferService becomes alive or dies.
   */
  private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = 
    new TreeMap<String, BlockPoolSliceScanner>();
  Thread blockScannerThread = null;
  
  DataBlockScanner(DataNode datanode, FSDataset dataset, Configuration conf) {
    this.datanode = datanode;
    this.dataset = dataset;
    this.conf = conf;
  }
  
  public void run() {
    String currentBpId = "";
    boolean firstRun = true;
    while (datanode.shouldRun && !Thread.interrupted()) {
      //Sleep everytime except in the first interation.
      if (!firstRun) {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException ex) {
          // Interrupt itself again to set the interrupt status
          blockScannerThread.interrupt();
          continue;
        }
      } else {
        firstRun = false;
      }
      
      BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
      if (bpScanner == null) {
        // Possible if thread is interrupted
        continue;
      }
      currentBpId = bpScanner.getBlockPoolId();
      // If BPOfferService for this pool is not alive, don't process it
      if (!datanode.isBPServiceAlive(currentBpId)) {
        LOG.warn("Block Pool " + currentBpId + " is not alive");
        // Remove in case BP service died abruptly without proper shutdown
        removeBlockPool(currentBpId);
        continue;
      }
      bpScanner.scanBlockPoolSlice();
    }
  }

  // Wait for at least one block pool to be up
  private void waitForInit(String bpid) {
    UpgradeManagerDatanode um = null;
    if(bpid != null && !bpid.equals(""))
      um = datanode.getUpgradeManagerDatanode(bpid);
    
    while ((um != null && ! um.isUpgradeCompleted())
        || (getBlockPoolSetSize() < datanode.getAllBpOs().length)
        || (getBlockPoolSetSize() < 1)) {
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        blockScannerThread.interrupt();
        return;
      }
    }
  }
  
  /**
   * Find next block pool id to scan. There should be only one current
   * verification log file. Find which block pool contains the current
   * verification log file and that is used as the starting block pool id. If no
   * current files are found start with first block-pool in the blockPoolSet.
   * However, if more than one current files are found, the one with latest 
   * modification time is used to find the next block pool id.
   */
  private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
    
    String nextBpId = null;
    while ((nextBpId == null) && datanode.shouldRun
        && !blockScannerThread.isInterrupted()) {
      waitForInit(currentBpId);
      synchronized (this) {
        if (getBlockPoolSetSize() > 0) {          
          // Find nextBpId by finding the last modified current log file, if any
          long lastScanTime = -1;
          Iterator<String> bpidIterator = blockPoolScannerMap.keySet()
              .iterator();
          while (bpidIterator.hasNext()) {
            String bpid = bpidIterator.next();
            for (FSDataset.FSVolume vol : dataset.volumes.getVolumes()) {
              try {
                File currFile = BlockPoolSliceScanner.getCurrentFile(vol, bpid);
                if (currFile.exists()) {
                  long lastModified = currFile.lastModified();
                  if (lastScanTime < lastModified) {
                    lastScanTime = lastModified;
                    nextBpId = bpid;
                  }
                }
              } catch (IOException e) {
                LOG.warn("Received exception: ", e);
              }
            }
          }
          
          // nextBpId can still be null if no current log is found,
          // find nextBpId sequentially.
          if (nextBpId == null) {
            if ("".equals(currentBpId)) {
              nextBpId = blockPoolScannerMap.firstKey();
            } else {
              nextBpId = blockPoolScannerMap.higherKey(currentBpId);
              if (nextBpId == null) {
                nextBpId = blockPoolScannerMap.firstKey();
              }
            }
          }
          if (nextBpId != null) {
            return getBPScanner(nextBpId);
          }
        }
      }
      LOG.warn("No block pool is up, going to wait");
      try {
        Thread.sleep(5000);
      } catch (InterruptedException ex) {
        LOG.warn("Received exception: " + ex);
        blockScannerThread.interrupt();
        return null;
      }
    }
    return null;
  }

  private synchronized int getBlockPoolSetSize() {
    return blockPoolScannerMap.size();
  }
  
  private synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
    return blockPoolScannerMap.get(bpid);
  }
  
  private synchronized String[] getBpIdList() {
    return blockPoolScannerMap.keySet().toArray(
        new String[blockPoolScannerMap.keySet().size()]);
  }
  
  public void addBlock(ExtendedBlock block) {
    BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
    if (bpScanner != null) {
      bpScanner.addBlock(block);
    } else {
      LOG.warn("No block pool scanner found for block pool id: "
          + block.getBlockPoolId());
    }
  }
  
  public synchronized boolean isInitialized(String bpid) {
    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
    if (bpScanner != null) {
      return bpScanner.isInitialized();
    }
    return false;
  }

  public synchronized void printBlockReport(StringBuilder buffer,
      boolean summary) {
    String[] bpIdList = getBpIdList();
    if (bpIdList == null || bpIdList.length == 0) {
      buffer.append("Periodic block scanner is not yet initialized. "
          + "Please check back again after some time.");
      return;
    }
    for (String bpid : bpIdList) {
      BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
      buffer.append("\n\nBlock report for block pool: "+bpid + "\n");
      bpScanner.printBlockReport(buffer, summary);
      buffer.append("\n");
    }
  }
  
  public void deleteBlock(String poolId, Block toDelete) {
    BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
    if (bpScanner != null) {
      bpScanner.deleteBlock(toDelete);
    } else {
      LOG.warn("No block pool scanner found for block pool id: "
          + poolId);
    }
  }

  public void deleteBlocks(String poolId, Block[] toDelete) {
    BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
    if (bpScanner != null) {
      bpScanner.deleteBlocks(toDelete);
    } else {
      LOG.warn("No block pool scanner found for block pool id: "
          + poolId);
    }
  }
  
  public synchronized void shutdown() {
    if (blockScannerThread != null) {
      blockScannerThread.interrupt();
    }
  }

  public synchronized void addBlockPool(String blockPoolId) {
    if (blockPoolScannerMap.get(blockPoolId) != null) {
      return;
    }
    BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(datanode, dataset,
        conf, blockPoolId);
    try {
      bpScanner.init();
    } catch (IOException ex) {
      LOG.warn("Failed to initialized block scanner for pool id="+blockPoolId);
      return;
    }
    blockPoolScannerMap.put(blockPoolId, bpScanner);
    LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
        + blockPoolScannerMap.size());
  }
  
  public synchronized void removeBlockPool(String blockPoolId) {
    blockPoolScannerMap.remove(blockPoolId);
    LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
  }
  
  // This method is used for testing
  long getBlocksScannedInLastRun(String bpid) throws IOException {
    BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
    if (bpScanner == null) {
      throw new IOException("Block Pool: "+bpid+" is not running");
    } else {
      return bpScanner.getBlocksScannedInLastRun();
    }
  }

  public void start() {
    blockScannerThread = new Thread(this);
    blockScannerThread.setDaemon(true);
    blockScannerThread.start();
  }
  
  @InterfaceAudience.Private
  public static class Servlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    public void doGet(HttpServletRequest request, 
                      HttpServletResponse response) throws IOException {
      response.setContentType("text/plain");
      
      DataNode datanode = (DataNode) getServletContext().getAttribute("datanode");
      DataBlockScanner blockScanner = datanode.blockScanner;
      
      boolean summary = (request.getParameter("listblocks") == null);
      
      StringBuilder buffer = new StringBuilder(8*1024);
      if (blockScanner == null) {
        LOG.warn("Periodic block scanner is not running");
        buffer.append("Periodic block scanner is not running. " +
                      "Please check the datanode log if this is unexpected.");
      } else {
        blockScanner.printBlockReport(buffer, summary);
      }
      response.getWriter().write(buffer.toString()); // extra copy!
    }
  }

}
