/*
 * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datatorrent.contrib.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * Simple kafka consumer adaptor used by kafka input operator Properties:<br>
 * <b>timeout</b>: Timeout for connection and ping <br>
 * <b>bufferSize</b>: buffer size of the consumer <br>
 * <b>clientId</b>: client id of the consumer <br>
 * <b>partitionIds</b>: partition id that the consumer want to consume <br>
 * <li>(-1): create #partition threads and consumers to read the topic from different partitions in parallel</li> <br>
 * <b>metadataRefreshInterval</b>: The interval that the monitor thread use to monitor the broker leadership change <br>
 * <b>metadataRetrievalRetry</b>: Maximum retry times for metadata retrieval failures<br>
 * default value is 3 <br>
 * -1: unlimited retry <br>
 * <br>
 *
 * Load balance: <br>
 * <li>The consumer create several data-consuming threads to consume the data from broker(s)</li> 
 * <li>Each thread has only ONE kafka client connecting to ONE broker to consume data from for multiple partitions </li>
 * <li>
 * There is ONE separate thread to monitor the leadership for all the partitions of the topic at every
 * #metadataRefreshInterval milliseconds</li>
 * <li>Once leadership
 * change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li> <li>
 * For server-side leadership change, see kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh</li> <br>
 * <br>
 * Kafka broker failover: <br>
 * <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker</li> <li>
 * If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop
 * consuming the partition</li> <br>
 *
 * @since 0.9.0
 */
public class SimpleKafkaConsumer extends KafkaConsumer
{

  /**
   * The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested 
   */
  static final class ConsumerThread implements Runnable
  {
    private final Broker broker;
    private final String clientName;
    // kafka simple consumer object
    private SimpleConsumer ksc;
    // The SimpleKafkaConsumer which holds this thread
    private SimpleKafkaConsumer consumer;
    // partitions consumed in this thread
    private final Set<KafkaPartition> kpS;
    @SuppressWarnings("rawtypes")
    private Future threadItSelf;

    private ConsumerThread(Broker broker, Set<KafkaPartition> kpl, SimpleKafkaConsumer consumer)
    {
      this.broker = broker;
      this.clientName = consumer.getClientName(broker.host() + "_" + broker.port());
      this.consumer = consumer;
      this.kpS = Collections.newSetFromMap(new ConcurrentHashMap<KafkaPartition, Boolean>());
      this.kpS.addAll(kpl);
    }

    @Override
    public void run()
    {

      try {
        logger.info("Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", broker, consumer.timeout, consumer.bufferSize, clientName);
        ksc = new SimpleConsumer(broker.host(), broker.port(), consumer.timeout, consumer.bufferSize, clientName);
        // Initialize all start offsets for all kafka partitions read from this consumer
        // read either from beginning of the broker or last offset committed by the operator
        for (KafkaPartition kpForConsumer : kpS) {
          logger.info("Start consuming data of topic {} ", kpForConsumer);
          if (consumer.offsetTrack.get(kpForConsumer) != null) {
            // start from recovery
            // offsets.put(kpForConsumer, offsetTrack.get(kpForConsumer));
            logger.info("Partition {} initial offset {}", kpForConsumer, consumer.offsetTrack.get(kpForConsumer));
          } else {
            long startOffsetReq = consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
            logger.info("Partition {} initial offset {} {}", kpForConsumer.getPartitionId(), startOffsetReq, consumer.initialOffset);
            consumer.offsetTrack.put(kpForConsumer, KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kpForConsumer.getPartitionId(), startOffsetReq, clientName));
          }
        }

        // stop consuming only when the consumer container is stopped or the metadata can not be refreshed
        while (consumer.isAlive && (consumer.metadataRefreshRetryLimit == -1 || consumer.retryCounter.get() < consumer.metadataRefreshRetryLimit)) {

            if (kpS == null || kpS.isEmpty()) {
              return;
            }

            FetchRequestBuilder frb = new FetchRequestBuilder().clientId(clientName);
            // add all partition request in one Fretch request together
            for (KafkaPartition kpForConsumer : kpS) {
              frb.addFetch(consumer.topic, kpForConsumer.getPartitionId(), consumer.offsetTrack.get(kpForConsumer), consumer.bufferSize);
            }

            FetchRequest req = frb.build();
            if (ksc == null) {
              if (consumer.metadataRefreshInterval > 0) {
                Thread.sleep(consumer.metadataRefreshInterval + 1000);
              } else {
                Thread.sleep(100);
              }
            }
            FetchResponse fetchResponse = ksc.fetch(req);
            for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();) {
              KafkaPartition kafkaPartition = iterator.next();
              if (fetchResponse.hasError() && fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()) != ErrorMapping.NoError()) {
                // Kick off partition(s) which has error when fetch from this broker temporarily 
                // Monitor will find out which broker it goes in monitor thread
                logger.warn("Error when consuming topic {} from broker {} with error code {} ", kafkaPartition, broker,  fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()));
                iterator.remove();
                consumer.partitionToBroker.remove(kafkaPartition);
                consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
                continue;
              } 
              // If the fetchResponse either has no error or the no error for $kafkaPartition get the data
              long offset = -1l;
              for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) {
                offset = msg.nextOffset();
                consumer.putMessage(kafkaPartition, msg.message(), msg.offset());
              }
              if (offset != -1) {
                consumer.offsetTrack.put(kafkaPartition, offset);
              }

            }
        }
      } catch (Exception e){
        logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", broker, e);
      } finally {
        if (ksc != null) {
          ksc.close();
        }
        for (KafkaPartition kpForConsumer : kpS) {
          // Update consumer that these partitions are currently stop being consumed because of some unrecoverable exception
          consumer.partitionToBroker.remove(kpForConsumer);
        }
        
        logger.info("Exit the consumer thread for broker {} ", broker);
      }
    }

    public void addPartitions(Set<KafkaPartition> newKps)
    {
      // Add the partition(s) to this existing consumer thread they are assigned to this broker
      kpS.addAll(newKps);

    }

    @SuppressWarnings("rawtypes")
    public Future getThreadItSelf()
    {
      return threadItSelf;
    }

    public void setThreadItSelf(@SuppressWarnings("rawtypes") Future threadItSelf)
    {
      this.threadItSelf = threadItSelf;
    }
  }

  public SimpleKafkaConsumer()
  {
    super();
  }

  public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId)
  {
    this(topic, timeout, bufferSize, clientId, null);
  }

  public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
  {
    super(topic);
    this.timeout = timeout;
    this.bufferSize = bufferSize;
    this.clientId = clientId;
    this.kps = partitionIds;
  }

  public SimpleKafkaConsumer(SetMultimap<String, String> zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
  {
    super(zks, topic);
    this.timeout = timeout;
    this.bufferSize = bufferSize;
    this.clientId = clientId;
    this.kps = partitionIds;
  }

  private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);

  /**
   * Track consumers connected to each broker, topics and partitions hosted on same broker are consumed by same
   * consumer. Clean the resource if necessary. Key is the kafka broker object.
   */
  private final transient Map<Broker, ConsumerThread> simpleConsumerThreads = new HashMap<Broker, ConsumerThread>();


  private transient ExecutorService kafkaConsumerExecutor;

  private transient ScheduledExecutorService metadataRefreshExecutor;

  /**
   * The metadata refresh retry counter
   */
  private final transient AtomicInteger retryCounter = new AtomicInteger(0);

  private int timeout = 10000;

  /**
   * Default buffer size is 1M
   */
  private int bufferSize = 1024 * 1024;

  /**
   * Default client id prefix is "Kafka_Simple_Client"
   */
  @NotNull
  private String clientId = "Kafka_Simple_Client";

  /**
   * Interval in between refresh the metadata change(broker change) in milliseconds. Metadata refresh guarantees to
   * automatically reconnect to new broker that are new elected as broker host Disable metadata refresh by setting this
   * to -1 WARN: Turning off the refresh will disable auto reconnect to new broker
   */
  private int metadataRefreshInterval = 30000;

  /**
   * Maximum brokers' metadata refresh retry limit. -1 means unlimited retry
   */
  private int metadataRefreshRetryLimit = -1;

  /**
   * You can setup your particular kafka partitions you want to consume for this consumer client. This can be used to
   * share client and thread and maximize the overall performance. Null or empty value: consumer will create #
   * threads&clients same as # brokers that host the all partitions of the topic Each thread consumes 1(+) partitions
   * from 1 broker
   */
  private Set<KafkaPartition> kps = new HashSet<KafkaPartition>();

  // This map maintains mapping between kafka partition and it's leader broker in realtime monitored by a thread
  private transient final ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>();
  
  /**
   * Track offset for each partition, so operator could start from the last serialized state Use ConcurrentHashMap to
   * avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or
   * synchronizedmap)
   */
  private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();

  @Override
  public void create()
  {
    super.create();
    Map<String, List<PartitionMetadata>> partitionMetas = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
    if (kps == null) {
      kps = new HashSet<KafkaPartition>();
    }
    if (kps.size() != 0) {
      return;
    }

    // if partition ids are null or not specified , find all the partition metadata for
    // the specific topic from broker
    for (Entry<String, List<PartitionMetadata>> en : partitionMetas.entrySet()) {
      String clusterId = en.getKey();
      for (PartitionMetadata part : en.getValue()) {
        KafkaPartition kp = new KafkaPartition(clusterId, topic, part.partitionId());
        kps.add(kp);
      }
    }

  }

  @Override
  public void start()
  {
    super.start();

    // thread to consume the kafka data
    kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());

    if(metadataRefreshInterval <= 0) {
      return;
    }

    // background thread to monitor the kafka metadata change
    metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());

    // start one monitor thread to monitor the leader broker change and trigger some action
    final SimpleKafkaConsumer ref = this;
    metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {

      private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();

      @Override
      public void run()
      {
        if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
          logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
          Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
          if (pms == null) {
            // retrieve metadata fail add retry count and return
            retryCounter.getAndAdd(1);
            return;
          }

          for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
            for (PartitionMetadata pm : pmLEntry.getValue()) {
              KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
              if (!kps.contains(kp)) {
                // Out of this consumer's scope
                continue;
              }
              Broker b = pm.leader();
              Broker oldB = partitionToBroker.put(kp, b);
              if(b.equals(oldB)) {
                continue;
              }
              // add to positive
              deltaPositive.put(b,kp);

              // always update the latest connection information
              stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
            }
          }

          // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
          // example)
          for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext();) {
            Entry<Broker, ConsumerThread> item = iterator.next();
            if (item.getValue().getThreadItSelf().isDone()) {
              iterator.remove();
            }
          }

          for (Broker b : deltaPositive.keySet()) {
            if (!simpleConsumerThreads.containsKey(b)) {
              // start thread for new broker
              ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
              ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
              simpleConsumerThreads.put(b, ct);

            } else {
              simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
            }
          }

          deltaPositive.clear();

          // reset to 0 if it reconnect to the broker which has current broker metadata
          retryCounter.set(0);
        }
      }
    }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
  }

  @Override
  public void close()
  {
    logger.info("Stop all consumer threads");
    for (ConsumerThread ct : simpleConsumerThreads.values()) {
      ct.getThreadItSelf().cancel(true);
    }
    simpleConsumerThreads.clear();
    metadataRefreshExecutor.shutdownNow();
    kafkaConsumerExecutor.shutdownNow();
  }

  public void setBufferSize(int bufferSize)
  {
    this.bufferSize = bufferSize;
  }

  public void setClientId(String clientId)
  {
    this.clientId = clientId;
  }

  public void setTimeout(int timeout)
  {
    this.timeout = timeout;
  }

  public int getBufferSize()
  {
    return bufferSize;
  }

  public String getClientId()
  {
    return clientId;
  }

  public int getTimeout()
  {
    return timeout;
  }

  public int getMetadataRefreshInterval()
  {
    return metadataRefreshInterval;
  }

  public void setMetadataRefreshInterval(int reconnectInterval)
  {
    this.metadataRefreshInterval = reconnectInterval;
  }

  public int getMetadataRefreshRetryLimit()
  {
    return metadataRefreshRetryLimit;
  }

  public void setMetadataRefreshRetryLimit(int metadataRefreshRetryLimit)
  {
    this.metadataRefreshRetryLimit = metadataRefreshRetryLimit;
  }

  @Override
  protected void commitOffset()
  {
    // the simple consumer offset is kept in the offsetTrack
    // It's better to do server registry for client in the future. Wait for kafka community come up with more
    // sophisticated offset management
    // TODO https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management#
  }

  private String getClientName(String brokerName)
  {
    return clientId + SIMPLE_CONSUMER_ID_SUFFIX + brokerName;
  }

  @Override
  protected Map<KafkaPartition, Long> getCurrentOffsets()
  {
    return offsetTrack;
  }

  public void resetOffset(Map<KafkaPartition, Long> overrideOffset)
  {
    if (overrideOffset == null) {
      return;
    }
    offsetTrack.clear();
    // set offset of the partitions assigned to this consumer
    for (KafkaPartition kp : kps) {
      Long offsetForPar = overrideOffset.get(kp);
      if (offsetForPar != null) {
        offsetTrack.put(kp, offsetForPar);
      }
    }
  }

  public KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> offsetStats)
  {
    stats.updateOffsets(offsetStats);
    return super.getConsumerStats();
  }

  @Override
  protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
  {
    this.kps = partitionIds;
    resetOffset(startOffset);
  }
} // End of SimpleKafkaConsumer
