/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.core.iteratorsImpl.system;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.commons.lang3.mutable.MutableLong;

public class LocalityGroupIterator extends HeapIterator implements InterruptibleIterator {

  private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();

  public static class LocalityGroup {
    private LocalityGroup(LocalityGroup localityGroup, IteratorEnvironment env) {
      this(localityGroup.columnFamilies, localityGroup.isDefaultLocalityGroup);
      this.iterator = (InterruptibleIterator) localityGroup.iterator.deepCopy(env);
    }

    public LocalityGroup(InterruptibleIterator iterator,
        Map<ByteSequence,MutableLong> columnFamilies, boolean isDefaultLocalityGroup) {
      this(columnFamilies, isDefaultLocalityGroup);
      this.iterator = iterator;
    }

    public LocalityGroup(Map<ByteSequence,MutableLong> columnFamilies,
        boolean isDefaultLocalityGroup) {
      this.isDefaultLocalityGroup = isDefaultLocalityGroup;
      this.columnFamilies = columnFamilies;
    }

    public InterruptibleIterator getIterator() {
      return iterator;
    }

    protected boolean isDefaultLocalityGroup;
    protected Map<ByteSequence,MutableLong> columnFamilies;
    private InterruptibleIterator iterator;
  }

  public static class LocalityGroupContext {
    final List<LocalityGroup> groups;
    final LocalityGroup defaultGroup;
    final Map<ByteSequence,LocalityGroup> groupByCf;

    public LocalityGroupContext(LocalityGroup[] groups) {
      this.groups = Collections.unmodifiableList(Arrays.asList(groups));
      this.groupByCf = new HashMap<>();
      LocalityGroup foundDefault = null;

      for (LocalityGroup group : groups) {
        if (group.isDefaultLocalityGroup && group.columnFamilies == null) {
          if (foundDefault != null) {
            throw new IllegalStateException("Found multiple default locality groups");
          }
          foundDefault = group;
        } else {
          for (Entry<ByteSequence,MutableLong> entry : group.columnFamilies.entrySet()) {
            if (entry.getValue().longValue() > 0) {
              if (groupByCf.containsKey(entry.getKey())) {
                throw new IllegalStateException("Found the same cf in multiple locality groups");
              }
              groupByCf.put(entry.getKey(), group);
            }
          }
        }
      }
      defaultGroup = foundDefault;
    }
  }

  /**
   * This will cache the arguments used in the seek call along with the locality groups seeked.
   */
  public static class LocalityGroupSeekCache {
    private Set<ByteSequence> lastColumnFamilies;
    private volatile boolean lastInclusive;
    private Collection<LocalityGroup> lastUsed;

    public Set<ByteSequence> getLastColumnFamilies() {
      return lastColumnFamilies;
    }

    public boolean isLastInclusive() {
      return lastInclusive;
    }

    public Collection<LocalityGroup> getLastUsed() {
      return lastUsed;
    }

    public int getNumLGSeeked() {
      return (lastUsed == null ? 0 : lastUsed.size());
    }
  }

  private final LocalityGroupContext lgContext;
  private LocalityGroupSeekCache lgCache;
  private AtomicBoolean interruptFlag;

  public LocalityGroupIterator(LocalityGroup[] groups) {
    super(groups.length);
    this.lgContext = new LocalityGroupContext(groups);
  }

  @Override
  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
      IteratorEnvironment env) throws IOException {
    throw new UnsupportedOperationException();
  }

  /**
   * This is the seek work horse for a HeapIterator with locality groups (uses by the InMemory and
   * RFile mechanisms). This method will find the locality groups to use in the
   * LocalityGroupContext, and will seek those groups.
   *
   * @param hiter
   *          The heap iterator
   * @param lgContext
   *          The locality groups
   * @param range
   *          The range to seek
   * @param columnFamilies
   *          The column fams to seek
   * @param inclusive
   *          The inclusiveness of the column fams
   * @return The locality groups seeked
   * @throws IOException
   *           thrown if an locality group seek fails
   */
  static final Collection<LocalityGroup> _seek(HeapIterator hiter, LocalityGroupContext lgContext,
      Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
    hiter.clear();

    Set<ByteSequence> cfSet;
    if (columnFamilies.isEmpty()) {
      cfSet = Collections.emptySet();
    } else {
      if (columnFamilies instanceof Set<?>) {
        cfSet = (Set<ByteSequence>) columnFamilies;
      } else {
        cfSet = new HashSet<>();
        cfSet.addAll(columnFamilies);
      }
    }

    // determine the set of groups to use
    Collection<LocalityGroup> groups = Collections.emptyList();

    // if no column families specified, then include all groups unless !inclusive
    if (cfSet.isEmpty()) {
      if (!inclusive) {
        groups = lgContext.groups;
      }
    } else {
      groups = new HashSet<>();

      // do not know what column families are in the default locality group,
      // only know what column families are not in it
      if (lgContext.defaultGroup != null) {
        if (inclusive) {
          if (!lgContext.groupByCf.keySet().containsAll(cfSet)) {
            // default LG may contain wanted and unwanted column families
            groups.add(lgContext.defaultGroup);
          } // else - everything wanted is in other locality groups, so nothing to do
        } else {
          // must include the default group as it may include cfs not in our cfSet
          groups.add(lgContext.defaultGroup);
        }
      }

      /*
       * Need to consider the following cases for inclusive and exclusive (lgcf:locality group
       * column family set, cf:column family set) lgcf and cf are disjoint lgcf and cf are the same
       * cf contains lgcf lgcf contains cf lgccf and cf intersect but neither is a subset of the
       * other
       */
      if (!inclusive) {
        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet()) {
          if (!cfSet.contains(entry.getKey())) {
            groups.add(entry.getValue());
          }
        }
      } else if (lgContext.groupByCf.size() <= cfSet.size()) {
        for (Entry<ByteSequence,LocalityGroup> entry : lgContext.groupByCf.entrySet()) {
          if (cfSet.contains(entry.getKey())) {
            groups.add(entry.getValue());
          }
        }
      } else {
        for (ByteSequence cf : cfSet) {
          LocalityGroup group = lgContext.groupByCf.get(cf);
          if (group != null) {
            groups.add(group);
          }
        }
      }
    }

    for (LocalityGroup lgr : groups) {
      lgr.getIterator().seek(range, EMPTY_CF_SET, false);
      hiter.addSource(lgr.getIterator());
    }

    return groups;
  }

  /**
   * This seek method will reuse the supplied LocalityGroupSeekCache if it can. Otherwise it will
   * delegate to the _seek method.
   *
   * @param hiter
   *          The heap iterator
   * @param lgContext
   *          The locality groups
   * @param range
   *          The range to seek
   * @param columnFamilies
   *          The column fams to seek
   * @param inclusive
   *          The inclusiveness of the column fams
   * @param lgSeekCache
   *          A cache returned by the previous call to this method
   * @return A cache for this seek call
   * @throws IOException
   *           thrown if an locality group seek fails
   */
  public static LocalityGroupSeekCache seek(HeapIterator hiter, LocalityGroupContext lgContext,
      Range range, Collection<ByteSequence> columnFamilies, boolean inclusive,
      LocalityGroupSeekCache lgSeekCache) throws IOException {
    if (lgSeekCache == null) {
      lgSeekCache = new LocalityGroupSeekCache();
    }

    // determine if the arguments have changed since the last time
    boolean sameArgs = false;
    Set<ByteSequence> cfSet = null;
    if (lgSeekCache.lastUsed != null && inclusive == lgSeekCache.lastInclusive) {
      if (columnFamilies instanceof Set) {
        sameArgs = lgSeekCache.lastColumnFamilies.equals(columnFamilies);
      } else {
        cfSet = Set.copyOf(columnFamilies);
        sameArgs = lgSeekCache.lastColumnFamilies.equals(cfSet);
      }
    }

    // if the column families and inclusiveness have not changed, then we can simply re-seek the
    // locality groups we discovered last round and rebuild the heap.
    if (sameArgs) {
      hiter.clear();
      for (LocalityGroup lgr : lgSeekCache.lastUsed) {
        lgr.getIterator().seek(range, EMPTY_CF_SET, false);
        hiter.addSource(lgr.getIterator());
      }
    } else { // otherwise capture the parameters, and use the static seek method to locate the
             // locality groups to use.
      lgSeekCache.lastColumnFamilies = (cfSet == null ? Set.copyOf(columnFamilies) : cfSet);
      lgSeekCache.lastInclusive = inclusive;
      lgSeekCache.lastUsed = _seek(hiter, lgContext, range, columnFamilies, inclusive);
    }

    return lgSeekCache;
  }

  @Override
  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
      throws IOException {
    lgCache = seek(this, lgContext, range, columnFamilies, inclusive, lgCache);
  }

  @Override
  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
    LocalityGroup[] groupsCopy = new LocalityGroup[lgContext.groups.size()];

    for (int i = 0; i < lgContext.groups.size(); i++) {
      groupsCopy[i] = new LocalityGroup(lgContext.groups.get(i), env);
      if (interruptFlag != null) {
        groupsCopy[i].getIterator().setInterruptFlag(interruptFlag);
      }
    }

    return new LocalityGroupIterator(groupsCopy);
  }

  @Override
  public void setInterruptFlag(AtomicBoolean flag) {
    this.interruptFlag = flag;
    for (LocalityGroup lgr : lgContext.groups) {
      lgr.getIterator().setInterruptFlag(flag);
    }
  }

}
