blob: 82f786172fb6a1d2283da8174c1d8d31651d5c75 [file] [log] [blame]
package com.datatorrent.demos.mobile;
import com.datatorrent.api.BaseOperator;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.Min;
import java.util.Map;
import java.util.Random;
/**
* Generates mobile numbers that will be displayed in mobile demo just after launch.<br></br>
* Operator attributes:<b>
* <ul>
* <li>initialDisplayCount: No. of seed phone numbers that will be generated.</li>
* <li>maxSeedPhoneNumber: The largest seed phone number.</li>
* </ul>
* </b>
*
* @since 0.3.5
*/
public class PhoneEntryOperator extends BaseOperator
{
private static Logger LOG = LoggerFactory.getLogger(PhoneEntryOperator.class);
private boolean seedGenerationDone = false;
@Min(0)
private int initialDisplayCount = 0;
private int maxSeedPhoneNumber = 0;
private int rangeLowerEndpoint;
private int rangeUpperEndpoint;
/**
* Sets the initial number of phones to display on the google map.
*
* @param i the count of initial phone numbers to display
*/
public void setInitialDisplayCount(int i)
{
initialDisplayCount = i;
}
/**
* Sets the range for the phone numbers generated by the operator.
*
* @param i the range within which the phone numbers are randomly generated.
*/
public void setPhoneRange(Range<Integer> phoneRange)
{
this.rangeLowerEndpoint = phoneRange.lowerEndpoint();
this.rangeUpperEndpoint = phoneRange.upperEndpoint();
}
/**
* Sets the max seed for random phone number generation
*
* @param i the number to initialize the random number phone generator.
*/
public void setMaxSeedPhoneNumber(int number)
{
this.maxSeedPhoneNumber = number;
}
@InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<Map<String, String>> locationQuery = new DefaultInputPort<Map<String, String>>()
{
@Override
public void process(Map<String, String> tuple)
{
seedPhones.emit(tuple);
}
};
public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>();
@Override
public void beginWindow(long windowId){
if (!seedGenerationDone) {
Random random = new Random();
int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint;
maxPhone -= 5550000;
int phonesToDisplay = initialDisplayCount > maxPhone ? maxPhone : initialDisplayCount;
for (int i = phonesToDisplay; i-- > 0; ) {
int phoneNo = 5550000 + random.nextInt(maxPhone + 1);
LOG.info("seed no: " + phoneNo);
Map<String, String> valueMap = Maps.newHashMap();
valueMap.put(PhoneMovementGenerator.KEY_COMMAND, PhoneMovementGenerator.COMMAND_ADD);
valueMap.put(PhoneMovementGenerator.KEY_PHONE, Integer.toString(phoneNo));
seedPhones.emit(valueMap);
}
// done generating data
seedGenerationDone = true;
LOG.info("Finished generating seed data.");
}
}
}