blob: ad409bc6033c7703f5d557da17b7e63c5a476cea [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.mobile;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import javax.validation.constraints.Min;
import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.util.HighLow;
/**
* <p>
* This operator generates the GPS locations for the phone numbers specified.
* The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated.
* It supports querying the locations of a given phone number.
* This is a partionable operator that can partition as the tuplesBlast increases.
* </p>
*
* @since 0.3.2
*/
public class PhoneMovementGenerator extends BaseOperator
{
public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>()
{
@Override
public void process(Integer tuple)
{
HighLow<Integer> loc = gps.get(tuple);
if (loc == null) {
loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range));
gps.put(tuple, loc);
}
int xloc = loc.getHigh();
int yloc = loc.getLow();
int state = rotate % 4;
// Compute new location
int delta = random.nextInt(100);
if (delta >= threshold) {
if (state < 2) {
xloc++;
}
else {
xloc--;
}
if (xloc < 0) {
xloc += range;
}
}
delta = random.nextInt(100);
if (delta >= threshold) {
if ((state == 1) || (state == 3)) {
yloc++;
}
else {
yloc--;
}
if (yloc < 0) {
yloc += range;
}
}
xloc %= range;
yloc %= range;
// Set new location
HighLow<Integer> nloc = newgps.get(tuple);
if (nloc == null) {
newgps.put(tuple, new HighLow<Integer>(xloc, yloc));
}
else {
nloc.setHigh(xloc);
nloc.setLow(yloc);
}
rotate++;
}
};
@InputPortFieldAnnotation(optional=true)
public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>()
{
@Override
public void process(Map<String,String> tuple)
{
LOG.info("new query {}", tuple);
String command = tuple.get(KEY_COMMAND);
if (command != null) {
if (command.equals(COMMAND_ADD)) {
commandCounters.getCounter(CommandCounters.ADD).increment();
String phoneStr= tuple.get(KEY_PHONE);
registerPhone(phoneStr);
}
else if (command.equals(COMMAND_ADD_RANGE)) {
commandCounters.getCounter(CommandCounters.ADD_RANGE).increment();
registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE));
}
else if (command.equals(COMMAND_DELETE)) {
commandCounters.getCounter(CommandCounters.DELETE).increment();
String phoneStr= tuple.get(KEY_PHONE);
deregisterPhone(phoneStr);
}
else if (command.equals(COMMAND_CLEAR)) {
commandCounters.getCounter(CommandCounters.CLEAR).increment();
clearPhones();
}
}
}
};
public static final String KEY_COMMAND = "command";
public static final String KEY_PHONE = "phone";
public static final String KEY_LOCATION = "location";
public static final String KEY_REMOVED = "removed";
public static final String KEY_START_PHONE = "startPhone";
public static final String KEY_END_PHONE = "endPhone";
public static final String COMMAND_ADD = "add";
public static final String COMMAND_ADD_RANGE = "addRange";
public static final String COMMAND_DELETE = "del";
public static final String COMMAND_CLEAR = "clear";
final Set<Integer> phoneRegister = Sets.newHashSet();
private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>();
private final Random random = new Random();
private int range = 50;
private int threshold = 80;
private int rotate = 0;
protected BasicCounters<MutableLong> commandCounters;
private transient OperatorContext context;
private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>();
public PhoneMovementGenerator()
{
this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class);
}
/**
* @return the range of the phone numbers
*/
@Min(0)
public int getRange()
{
return range;
}
/**
* Sets the range of phone numbers for which the GPS locations need to be generated.
*
* @param i the range of phone numbers to set
*/
public void setRange(int i)
{
range = i;
}
/**
* @return the threshold
*/
@Min(0)
public int getThreshold()
{
return threshold;
}
/**
* Sets the threshold that decides how frequently the GPS locations are updated.
*
* @param i the value that decides how frequently the GPS locations change.
*/
public void setThreshold(int i)
{
threshold = i;
}
private void registerPhone(String phoneStr)
{
// register the phone channel
if (Strings.isNullOrEmpty(phoneStr)) {
return;
}
try {
Integer phone = new Integer(phoneStr);
registerSinglePhone(phone);
}
catch (NumberFormatException nfe) {
LOG.warn("Invalid no {}", phoneStr);
}
}
private void registerPhoneRange(String startPhoneStr, String endPhoneStr)
{
if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) {
LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr);
return;
}
try {
Integer startPhone = new Integer(startPhoneStr);
Integer endPhone = new Integer(endPhoneStr);
if (endPhone < startPhone) {
LOG.warn("Invalid phone range {} {}", startPhone, endPhone);
return;
}
for (int i = startPhone; i <= endPhone; i++) {
registerSinglePhone(i);
}
}
catch (NumberFormatException nfe) {
LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr);
}
}
private void registerSinglePhone(int phone)
{
phoneRegister.add(phone);
LOG.debug("Registered query id with phone {}", phone);
emitQueryResult(phone);
}
private void deregisterPhone(String phoneStr)
{
if (Strings.isNullOrEmpty(phoneStr)) {
return;
}
try {
Integer phone = new Integer(phoneStr);
// remove the channel
if (phoneRegister.contains(phone)) {
phoneRegister.remove(phone);
LOG.debug("Removing query id {}", phone);
emitPhoneRemoved(phone);
}
}
catch (NumberFormatException nfe) {
LOG.warn("Invalid phone {}", phoneStr);
}
}
private void clearPhones() {
phoneRegister.clear();
LOG.info("Clearing phones");
}
public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>();
@Override
public void setup(OperatorContext context)
{
this.context = context;
commandCounters.setCounter(CommandCounters.ADD, new MutableLong());
commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong());
commandCounters.setCounter(CommandCounters.DELETE, new MutableLong());
commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong());
}
/**
* Emit all the data and clear the hash
*/
@Override
public void endWindow()
{
for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) {
HighLow<Integer> loc = gps.get(e.getKey());
if (loc == null) {
gps.put(e.getKey(), e.getValue());
}
else {
loc.setHigh(e.getValue().getHigh());
loc.setLow(e.getValue().getLow());
}
}
boolean found = false;
for (Integer phone: phoneRegister) {
emitQueryResult( phone);
found = true;
}
if (!found) {
LOG.debug("No phone number");
}
newgps.clear();
context.setCounters(commandCounters);
}
private void emitQueryResult(Integer phone) {
HighLow<Integer> loc = gps.get(phone);
if (loc != null) {
Map<String, String> queryResult = new HashMap<String, String>();
queryResult.put(KEY_PHONE, String.valueOf(phone));
queryResult.put(KEY_LOCATION, loc.toString());
locationQueryResult.emit(queryResult);
}
}
private void emitPhoneRemoved(Integer phone)
{
Map<String,String> removedResult= Maps.newHashMap();
removedResult.put(KEY_PHONE, String.valueOf(phone));
removedResult.put(KEY_REMOVED,"true");
locationQueryResult.emit(removedResult);
}
public static enum CommandCounters
{
ADD, ADD_RANGE, DELETE, CLEAR
}
private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class);
}