/*
 * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datatorrent.demos.mobile;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import javax.validation.constraints.Min;

import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;

import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.util.HighLow;

/**
 * <p>
 * This operator generates the GPS locations for the phone numbers specified.
 * The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated.
 * It supports querying the locations of a given phone number.
 * This is a partionable operator that can partition as the tuplesBlast increases.
 * </p>
 *
 * @since 0.3.2
 */
public class PhoneMovementGenerator extends BaseOperator
{
  public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>()
  {
    @Override
    public void process(Integer tuple)
    {
      HighLow<Integer> loc = gps.get(tuple);
      if (loc == null) {
        loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range));
        gps.put(tuple, loc);
      }
      int xloc = loc.getHigh();
      int yloc = loc.getLow();
      int state = rotate % 4;

      // Compute new location
      int delta = random.nextInt(100);
      if (delta >= threshold) {
        if (state < 2) {
          xloc++;
        }
        else {
          xloc--;
        }
        if (xloc < 0) {
          xloc += range;
        }
      }
      delta = random.nextInt(100);
      if (delta >= threshold) {
        if ((state == 1) || (state == 3)) {
          yloc++;
        }
        else {
          yloc--;
        }
        if (yloc < 0) {
          yloc += range;
        }
      }
      xloc %= range;
      yloc %= range;

      // Set new location
      HighLow<Integer> nloc = newgps.get(tuple);
      if (nloc == null) {
        newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
      }
      else {
        nloc.setHigh(xloc);
        nloc.setLow(yloc);
      }
      rotate++;
    }
  };

  @InputPortFieldAnnotation(optional=true)
  public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
  {
    @Override
    public void process(Map<String,String> tuple)
    {
      LOG.info("new query {}", tuple);
      String command = tuple.get(KEY_COMMAND);
      if (command != null) {
        if (command.equals(COMMAND_ADD)) {
          commandCounters.getCounter(CommandCounters.ADD).increment();
          String phoneStr= tuple.get(KEY_PHONE);
          registerPhone(phoneStr);
        }
        else if (command.equals(COMMAND_ADD_RANGE)) {
          commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
          registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
        }
        else if (command.equals(COMMAND_DELETE)) {
          commandCounters.getCounter(CommandCounters.DELETE).increment();
          String phoneStr= tuple.get(KEY_PHONE);
          deregisterPhone(phoneStr);
        }
        else if (command.equals(COMMAND_CLEAR)) {
          commandCounters.getCounter(CommandCounters.CLEAR).increment();
          clearPhones();
        }
      }
    }
  };

  public static final String KEY_COMMAND = "command";
  public static final String KEY_PHONE = "phone";
  public static final String KEY_LOCATION = "location";
  public static final String KEY_REMOVED = "removed";
  public static final String KEY_START_PHONE = "startPhone";
  public static final String KEY_END_PHONE = "endPhone";

  public static final String COMMAND_ADD = "add";
  public static final String COMMAND_ADD_RANGE = "addRange";
  public static final String COMMAND_DELETE = "del";
  public static final String COMMAND_CLEAR = "clear";

  final Set<Integer> phoneRegister = Sets.newHashSet();

  private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>();
  private final Random random = new Random();
  private int range = 50;
  private int threshold = 80;
  private int rotate = 0;

  protected BasicCounters<MutableLong> commandCounters;

  private transient OperatorContext context;
  private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>();

  public PhoneMovementGenerator()
  {
    this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class);
  }

  /**
   * @return the range of the phone numbers
   */
  @Min(0)
  public int getRange()
  {
    return range;
  }

  /**
   * Sets the range of phone numbers for which the GPS locations need to be generated.
   * 
   * @param i the range of phone numbers to set
   */
  public void setRange(int i)
  {
    range = i;
  }

  /**
   * @return the threshold 
   */
  @Min(0)
  public int getThreshold()
  {
    return threshold;
  }

  /**
   * Sets the threshold that decides how frequently the GPS locations are updated.
   * 
   * @param i the value that decides how frequently the GPS locations change.
   */
  public void setThreshold(int i)
  {
    threshold = i;
  }

  private void registerPhone(String phoneStr)
  {
    // register the phone channel
    if (Strings.isNullOrEmpty(phoneStr)) {
      return;
    }
    try {
      Integer phone = new Integer(phoneStr);
      registerSinglePhone(phone);
    }
    catch (NumberFormatException nfe) {
      LOG.warn("Invalid no {}", phoneStr);
    }
  }

  private void registerPhoneRange(String startPhoneStr, String endPhoneStr)
  {
    if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) {
      LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr);
      return;
    }
    try {
      Integer startPhone = new Integer(startPhoneStr);
      Integer endPhone = new Integer(endPhoneStr);
      if (endPhone < startPhone) {
        LOG.warn("Invalid phone range {} {}", startPhone, endPhone);
        return;
      }
      for (int i = startPhone; i <= endPhone; i++) {
        registerSinglePhone(i);
      }
    }
    catch (NumberFormatException nfe) {
      LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
    }
  }

  private void registerSinglePhone(int phone)
  {
    phoneRegister.add(phone);
    LOG.debug("Registered query id with phone {}", phone);
    emitQueryResult(phone);
  }

  private void deregisterPhone(String phoneStr)
  {
    if (Strings.isNullOrEmpty(phoneStr)) {
      return;
    }
    try {
      Integer phone = new Integer(phoneStr);
      // remove the channel
      if (phoneRegister.contains(phone)) {
        phoneRegister.remove(phone);
        LOG.debug("Removing query id {}", phone);
        emitPhoneRemoved(phone);
      }
    }
    catch (NumberFormatException nfe) {
      LOG.warn("Invalid phone {}", phoneStr);
    }
  }

  private void clearPhones() {
    phoneRegister.clear();
    LOG.info("Clearing phones");
  }

  public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>();

  @Override
  public void setup(OperatorContext context)
  {
    this.context = context;
    commandCounters.setCounter(CommandCounters.ADD, new MutableLong());
    commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong());
    commandCounters.setCounter(CommandCounters.DELETE, new MutableLong());
    commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong());
  }

  /**
   * Emit all the data and clear the hash
   */
  @Override
  public void endWindow()
  {
    for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) {
      HighLow<Integer> loc = gps.get(e.getKey());
      if (loc == null) {
        gps.put(e.getKey(), e.getValue());
      }
      else {
        loc.setHigh(e.getValue().getHigh());
        loc.setLow(e.getValue().getLow());
      }
    }
    boolean found = false;
    for (Integer phone: phoneRegister) {
      emitQueryResult( phone);
      found = true;
    }
    if (!found) {
      LOG.debug("No phone number");
    }
    newgps.clear();
    context.setCounters(commandCounters);
  }

  private void emitQueryResult(Integer phone) {
    HighLow<Integer> loc = gps.get(phone);
    if (loc != null) {
      Map<String, String> queryResult = new HashMap<String, String>();
      queryResult.put(KEY_PHONE, String.valueOf(phone));
      queryResult.put(KEY_LOCATION, loc.toString());
      locationQueryResult.emit(queryResult);
    }
  }

  private void emitPhoneRemoved(Integer phone)
  {
    Map<String,String> removedResult= Maps.newHashMap();
    removedResult.put(KEY_PHONE, String.valueOf(phone));
    removedResult.put(KEY_REMOVED,"true");
    locationQueryResult.emit(removedResult);
  }

  public static enum CommandCounters
  {
    ADD, ADD_RANGE, DELETE, CLEAR
  }

  private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class);
}
