/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.samoa.tasks;

import com.github.javacliparser.ClassOption;
import java.util.Properties;

import org.apache.samoa.topology.ComponentFactory;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.Topology;
import org.apache.samoa.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.javacliparser.Configurable;
import com.github.javacliparser.IntOption;
import com.github.javacliparser.StringOption;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.samoa.streams.kafka.KafkaDeserializer;
import org.apache.samoa.streams.kafka.KafkaDestinationProcessor;
import org.apache.samoa.streams.kafka.KafkaEntranceProcessor;
import org.apache.samoa.streams.kafka.KafkaSerializer;

/**
 * Kafka task
 *
 * @author Jakub Jankowski
 * @version 0.5.0-incubating-SNAPSHOT
 * @since 0.5.0-incubating
 *
 */
public class KafkaTask implements Task, Configurable {

  private static final long serialVersionUID = 3984474041982397855L;
  private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);

  Properties producerProps;
  Properties consumerProps;
  int timeout;
  private KafkaDeserializer deserializer;
  private KafkaSerializer serializer;
  private String inTopic;
  private String outTopic;

  private TopologyBuilder builder;
  private Topology kafkaTopology;

  public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p',
          "Number of destination Processors", 1, 1, Integer.MAX_VALUE);

  public IntOption timeoutOption = new IntOption("timeout", 't',
          "Kafka consumer timeout", 1, 1, Integer.MAX_VALUE);

  public StringOption inputBrokerOption = new StringOption("inputBroker", 'r', "Input brokers addresses",
          "inputTopic");

  public StringOption outputBrokerOption = new StringOption("outputBroker", 's', "Output brokers name",
          "inputTopic");

  public StringOption inputTopicOption = new StringOption("inputTopic", 'i', "Input topic name",
          "inputTopic");

  public StringOption outputTopicOption = new StringOption("outputTopic", 'o', "Output topic name",
          "outputTopic");

  public ClassOption serializerOption = new ClassOption("serializer", 'w',
          "Serializer class name",
          KafkaSerializer.class, KafkaSerializer.class.getName());

  public ClassOption deserializerOption = new ClassOption("deserializer", 'd',
          "Deserializer class name",
          KafkaDeserializer.class, KafkaDeserializer.class.getName());

  public StringOption taskNameOption = new StringOption("taskName", 'n', "Identifier of the task",
          "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));

  /**
   * Class constructor (for tests purposes)
   *
   * @param producerProps Properties of Kafka Producer and Consumer
   * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka
   * Producer configuration</a>
   * @param consumerProps Properties of Kafka Producer and Consumer
   * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka
   * Consumer configuration</a>
   * @param inTopic Topic to which destination processor will read from
   * @param outTopic Topic to which destination processor will write into
   * @param timeout Timeout used when polling Kafka for new messages
   * @param serializer Implementation of KafkaSerializer that handles arriving
   * data serialization
   * @param deserializer Implementation of KafkaDeserializer that handles
   * arriving data deserialization
   */
  public KafkaTask(Properties producerProps, Properties consumerProps, String inTopic, String outTopic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) {
    this.producerProps = producerProps;
    this.consumerProps = consumerProps;
    this.deserializer = deserializer;
    this.serializer = serializer;
    this.inTopic = inTopic;
    this.outTopic = outTopic;
    this.timeout = timeout;
  }

  /**
   * Class constructor
   */
  public KafkaTask() {

  }

  @Override
  public void init() {
    producerProps = new Properties();
    producerProps.setProperty("bootstrap.servers", outputBrokerOption.getValue());

    consumerProps = new Properties();
    consumerProps.setProperty("bootstrap.servers", inputBrokerOption.getValue());

    serializer = serializerOption.getValue();

    deserializer = deserializerOption.getValue();

    inTopic = inputTopicOption.getValue();
    outTopic = outputTopicOption.getValue();

    timeout = timeoutOption.getValue();

    logger.info("Invoking init");
    if (builder == null) {
      builder = new TopologyBuilder();
      logger.info("Successfully instantiating TopologyBuilder");

      builder.initTopology(taskNameOption.getValue());
      logger.info("Successfully initializing SAMOA topology with name {}", taskNameOption.getValue());
    }

    // create enterance processor
    KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer);
    builder.addEntranceProcessor(sourceProcessor);

    // create stream
    Stream stream = builder.createStream(sourceProcessor);

    // create destination processor
    KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, outTopic, serializer);
    builder.addProcessor(destProcessor, kafkaParallelismOption.getValue());
    builder.connectInputShuffleStream(stream, destProcessor);

    // build topology
    kafkaTopology = builder.build();
    logger.info("Successfully built the topology");
  }

  @Override
  public Topology getTopology() {
    return kafkaTopology;
  }

  @Override
  public void setFactory(ComponentFactory factory) {
    logger.info("Invoking setFactory: " + factory.toString());
    builder = new TopologyBuilder(factory);
    logger.info("Successfully instantiating TopologyBuilder");

    builder.initTopology(taskNameOption.getValue());
    logger.info("Successfully initializing SAMOA topology with name {}", taskNameOption.getValue());

  }

}
