// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.integration_test.core;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

import com.twitter.heron.api.bolt.IOutputCollector;
import com.twitter.heron.api.bolt.IRichBolt;
import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.api.topology.IUpdatable;
import com.twitter.heron.api.topology.OutputFieldsDeclarer;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Fields;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;

public class IntegrationTestBolt implements IRichBolt, IUpdatable {
  private static final long serialVersionUID = 6304554167838679097L;
  private static final Logger LOG = Logger.getLogger(IntegrationTestBolt.class.getName());
  private final IRichBolt delegateBolt;
  private int terminalsToReceive = 0;
  private long tuplesReceived = 0;
  private long tuplesProcessed = 0;
  // For ack/fail
  private Tuple currentTupleProcessing = null;
  private OutputCollector collector;

  public IntegrationTestBolt(IRichBolt delegate) {
    this.delegateBolt = delegate;
  }

  @Override
  public void update(TopologyContext topologyContext) {
    LOG.info("update called with TopologyContext: " + topologyContext);
    // if we get a new topology context we reset the terminalsToReceive regardless of if we've
    // already received any. The expectation is that after a change in physical plan, upstream
    // spouts will re-emit and send new terminals.
    this.terminalsToReceive = calculateTerminalsToReceive(topologyContext);
  }

  @Override
  public void prepare(Map<String, Object> map,
                      TopologyContext context,
                      OutputCollector outputCollector) {
    update(context);
    this.collector = new OutputCollector(new IntegrationTestBoltCollector(outputCollector));
    this.delegateBolt.prepare(map, context, collector);
  }

  private int calculateTerminalsToReceive(TopologyContext context) {
    int total = 0;
    // Set the # of terminal Signal to receive, = the # number all instance of upstream components
    HashSet<String> upstreamComponents = new HashSet<String>();
    for (TopologyAPI.StreamId streamId : context.getThisSources().keySet()) {
      upstreamComponents.add(streamId.getComponentName());
    }
    for (String name : upstreamComponents) {
      total += context.getComponentTasks(name).size();
    }

    LOG.info("TerminalsToReceive: " + total);
    return total;
  }

  @Override
  public void execute(Tuple tuple) {
    String streamID = tuple.getSourceStreamId();

    LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);

    if (streamID.equals(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID)) {
      terminalsToReceive--;
      if (terminalsToReceive == 0) {

        // invoke the finishBatch() callback if necessary
        if (IBatchBolt.class.isInstance(delegateBolt)) {
          LOG.fine("Invoke bolt to do finishBatch!");
          ((IBatchBolt) delegateBolt).finishBatch();
        }

        LOG.info("Received the last terminal, populating the terminals to downstream");
        collector.emit(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID,
            tuple,
            new Values(Constants.INTEGRATION_TEST_TERMINAL));
      } else {
        LOG.info(String.format(
            "Received a terminal, need to receive %s more", terminalsToReceive));
      }
    } else {
      tuplesReceived++;
      currentTupleProcessing = tuple;
      delegateBolt.execute(tuple);
      // We ack only the tuples in user's logic
      collector.ack(tuple);
    }
  }

  @Override
  public void cleanup() {
    delegateBolt.cleanup();
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declareStream(Constants.INTEGRATION_TEST_CONTROL_STREAM_ID,
        new Fields(Constants.INTEGRATION_TEST_TERMINAL));
    delegateBolt.declareOutputFields(outputFieldsDeclarer);
  }

  @Override
  public Map<String, Object> getComponentConfiguration() {
    return delegateBolt.getComponentConfiguration();
  }

  private class IntegrationTestBoltCollector implements IOutputCollector {
    private final IOutputCollector delegate;

    IntegrationTestBoltCollector(IOutputCollector delegate) {
      this.delegate = delegate;
    }

    @Override
    public List<Integer> emit(String s, Collection<Tuple> tuples, List<Object> objects) {
      if (tuples == null) {
        return delegate.emit(s, Arrays.asList(currentTupleProcessing), objects);
      }
      return delegate.emit(s, tuples, objects);
    }

    @Override
    public void emitDirect(int i, String s, Collection<Tuple> tuples, List<Object> objects) {
      if (tuples == null) {
        delegate.emitDirect(i, s, Arrays.asList(currentTupleProcessing), objects);
      }
      delegate.emitDirect(i, s, tuples, objects);
    }

    @Override
    public void ack(Tuple tuple) {
      LOG.fine("Try to do a ack. tuplesProcessed: "
          + tuplesProcessed + " ; tuplesReceived: " + tuplesReceived);
      if (tuplesProcessed < tuplesReceived) {
        delegate.ack(tuple);
        tuplesProcessed++;
      }
    }

    @Override
    public void fail(Tuple tuple) {
      LOG.fine("Try to do a fail. tuplesProcessed: "
          + tuplesProcessed + " ; tuplesReceived: " + tuplesReceived);
      if (tuplesProcessed < tuplesReceived) {
        delegate.fail(tuple);
        tuplesProcessed++;
      }
    }

    @Override
    public void reportError(Throwable throwable) {
      delegate.reportError(throwable);
    }
  }
}
