/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.examples.filedata;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.PeekingIterator;

/**
 * An input stream that reads file data stored in one or more Accumulo values. Used by
 * {@link ChunkInputFormat} to present input streams to a mapper.
 */
public class ChunkInputStream extends InputStream {
  private static final Logger log = LoggerFactory.getLogger(ChunkInputStream.class);

  protected PeekingIterator<Entry<Key,Value>> source;
  protected Key currentKey;
  protected Set<Text> currentVis;
  protected int currentChunk;
  protected int currentChunkSize;
  protected boolean gotEndMarker;

  protected byte[] buf;
  protected int count;
  protected int pos;

  public ChunkInputStream() {
    source = null;
  }

  public ChunkInputStream(PeekingIterator<Entry<Key,Value>> in) throws IOException {
    setSource(in);
  }

  public void setSource(PeekingIterator<Entry<Key,Value>> in) throws IOException {
    if (source != null)
      throw new IOException("setting new source without closing old one");
    this.source = in;
    currentVis = new TreeSet<>();
    count = pos = 0;
    if (!source.hasNext()) {
      log.debug("source has no next");
      gotEndMarker = true;
      return;
    }

    // read forward until we reach a chunk
    Entry<Key,Value> entry = source.next();
    currentKey = entry.getKey();
    buf = entry.getValue().get();
    while (!currentKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
      log.debug("skipping key: " + currentKey.toString());
      if (!source.hasNext())
        return;
      entry = source.next();
      currentKey = entry.getKey();
      buf = entry.getValue().get();
    }
    log.debug("starting chunk: " + currentKey.toString());
    count = buf.length;
    currentVis.add(currentKey.getColumnVisibility());
    currentChunk = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 4);
    currentChunkSize = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 0);
    gotEndMarker = buf.length == 0;
    if (currentChunk != 0) {
      source = null;
      throw new IOException("starting chunk number isn't 0 for " + currentKey.getRow());
    }
  }

  private int fill() throws IOException {
    if (source == null || !source.hasNext()) {
      if (gotEndMarker)
        return count = pos = 0;
      else
        throw new IOException("no end chunk marker but source has no data");
    }

    Entry<Key,Value> entry = source.peek();
    Key thisKey = entry.getKey();
    log.debug("evaluating key: " + thisKey.toString());

    // check that we're still on the same row
    if (!thisKey.equals(currentKey, PartialKey.ROW)) {
      if (gotEndMarker)
        return -1;
      else {
        String currentRow = currentKey.getRow().toString();
        clear();
        throw new IOException("got to the end of the row without end chunk marker " + currentRow);
      }
    }
    log.debug("matches current key");

    // ok to advance the iterator
    source.next();

    // check that this is part of a chunk
    if (!thisKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
      log.debug("skipping non-chunk key");
      return fill();
    }
    log.debug("is a chunk");

    // check that the chunk size is the same as the one being read
    if (currentChunkSize != FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 0)) {
      log.debug("skipping chunk of different size");
      return fill();
    }

    // add the visibility to the list if it's not there
    currentVis.add(thisKey.getColumnVisibility());

    // check to see if it is an identical chunk with a different visibility
    if (thisKey.getColumnQualifier().equals(currentKey.getColumnQualifier())) {
      log.debug("skipping identical chunk with different visibility");
      return fill();
    }

    if (gotEndMarker) {
      log.debug("got another chunk after end marker: " + currentKey.toString() + " "
          + thisKey.toString());
      clear();
      throw new IOException("found extra chunk after end marker");
    }

    // got new chunk of the same file, check that it's the next chunk
    int thisChunk = FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 4);
    if (thisChunk != currentChunk + 1) {
      log.debug("new chunk same file, unexpected chunkID: " + currentKey.toString() + " "
          + thisKey.toString());
      clear();
      throw new IOException("missing chunks between " + currentChunk + " and " + thisChunk);
    }

    currentKey = thisKey;
    currentChunk = thisChunk;
    buf = entry.getValue().get();
    pos = 0;

    // check to see if it's the last chunk
    if (buf.length == 0) {
      gotEndMarker = true;
      return fill();
    }

    return count = buf.length;
  }

  public Set<Text> getVisibilities() {
    if (source != null)
      throw new IllegalStateException(
          "don't get visibilities before chunks have been completely read");
    return currentVis;
  }

  @Override
  public int read() throws IOException {
    if (source == null)
      return -1;
    log.debug("pos: " + pos + " count: " + count);
    if (pos >= count) {
      if (fill() <= 0) {
        log.debug("done reading input stream at key: "
            + (currentKey == null ? "null" : currentKey.toString()));
        if (source != null && source.hasNext())
          log.debug("next key: " + source.peek().getKey());
        clear();
        return -1;
      }
    }
    return buf[pos++] & 0xff;
  }

  @Override
  public int read(byte[] b, int off, int len) throws IOException {
    if (b == null) {
      throw new NullPointerException();
    } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
        || ((off + len) < 0)) {
      throw new IndexOutOfBoundsException();
    } else if (len == 0) {
      return 0;
    }

    log.debug("filling buffer " + off + " " + len);
    int total = 0;
    while (total < len) {
      int avail = count - pos;
      log.debug(avail + " available in current local buffer");
      if (avail <= 0) {
        if (fill() <= 0) {
          log.debug("done reading input stream at key: "
              + (currentKey == null ? "null" : currentKey.toString()));
          if (source != null && source.hasNext())
            log.debug("next key: " + source.peek().getKey());
          clear();
          log.debug("filled " + total + " bytes");
          return total == 0 ? -1 : total;
        }
        avail = count - pos;
      }

      int cnt = (avail < len - total) ? avail : len - total;
      log.debug("copying from local buffer: local pos " + pos + " into pos " + off + " len " + cnt);
      System.arraycopy(buf, pos, b, off, cnt);
      pos += cnt;
      off += cnt;
      total += cnt;
    }
    log.debug("filled " + total + " bytes");
    return total;
  }

  public void clear() {
    source = null;
    buf = null;
    currentKey = null;
    currentChunk = 0;
    pos = count = 0;
  }

  @Override
  public void close() throws IOException {
    try {
      while (fill() > 0) {}
    } catch (IOException e) {
      clear();
      throw new IOException(e);
    }
    clear();
  }
}
