blob: 06b3027e2cf8e842afa0486042f6a08bd7b9cafc [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.google.common.collect.Maps;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import javax.validation.constraints.Min;
import java.util.Map;
import java.util.Random;
/**
* Generates sales events data and sends them out as JSON encoded byte arrays.
* <p>
* Sales events are JSON string encoded as byte arrays. All id's are expected to be positive integers, and default to 1.
* Transaction amounts are double values with two decimal places. Timestamp is unix epoch in milliseconds.
* Product categories are not assigned by default. They are expected to be added by the Enrichment operator, but can be
* enabled with addProductCategory override.
*
* Example Sales Event
*
* {
* "productId": 1,
* "customerId": 12345,
* "productCategory": 0,
* "regionId": 2,
* "channelId": 3,
* "amount": 107.99,
* "tax": 7.99,
* "discount": 15.73,
* "timestamp": 1412897574000
* }
*
* @displayName JSON Sales Event Generator
* @category Input
* @tags input, generator, json
*
* @since 2.0.0
*/
public class JsonSalesGenerator implements InputOperator
{
@Min(1)
private int maxProductId = 100;
@Min(1)
private int maxCustomerId = 1000000;
@Min(1)
private int maxChannelId = 3;
@Min(1)
private int maxRegionId = 10;
private double minAmount = 0.99;
private double maxAmount = 100.0;
private double taxTiers[] = new double[] { 0.0, 0.04, 0.055, 0.0625, 0.0725, 0.085};
private double discountTiers[] = new double[] {0.0, 0.025, 0.05, 0.10, 0.15, 0.50};
private double maxDiscountPercent = 0.75;
// Should not be included by default - only used for testing when running without enrichment operator
private boolean addProductCategory = false;
@Min(1)
private int maxProductCategories = 5;
// Limit number of emitted tuples per window
@Min(0)
private long maxTuplesPerWindow = 40000;
// Maximum amount of deviation below the maximum tuples per window
@Min(0)
private int tuplesPerWindowDeviation = 20000;
// Number of windows to maintain the same deviation before selecting another
@Min(1)
private int tuplesRateCycle = 40;
// Number of windows to maintain the same discount before selecting another
@Min(1)
private int discountCycle = 600;
@Min(1)
private int regionalCycle = 60;
@Min(1)
private int channelCycle = 60;
/**
* Outputs sales event in JSON format as a byte array
*/
public final transient DefaultOutputPort<byte[]> jsonBytes = new DefaultOutputPort<byte[]>();
private static final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
private final Random random = new Random();
private long tuplesCounter = 0;
private long tuplesPerCurrentWindow = maxTuplesPerWindow;
private transient Map<Integer, Double> channelDiscount = Maps.newHashMap();
private transient Map<Integer, Double> regionalDiscount = Maps.newHashMap();
private transient Map<Integer, Double> regionalTax = Maps.newHashMap();
private transient RandomWeightedMovableGenerator<Integer> regionalGenerator = new RandomWeightedMovableGenerator<Integer>();
private transient RandomWeightedMovableGenerator<Integer> channelGenerator = new RandomWeightedMovableGenerator<Integer>();
@Override
public void beginWindow(long windowId)
{
tuplesCounter = 0;
// Generate new output rate after tuplesRateCycle windows ONLY if tuplesPerWindowDeviation is non-zero
if (windowId % tuplesRateCycle == 0 && tuplesPerWindowDeviation > 0) {
tuplesPerCurrentWindow = maxTuplesPerWindow - random.nextInt(tuplesPerWindowDeviation);
}
// Generate new discounts after every discountCycle windows
if ( windowId % discountCycle == 0) {
generateDiscounts();
}
// Update channel selection probability (represents sales volume shifts between channels)
if (windowId % channelCycle == 0) {
channelGenerator.move();
}
// Update region selection probability (represents sales volume shifts between regions)
if (windowId % regionalCycle == 0) {
regionalGenerator.move();
}
}
@Override
public void endWindow()
{
}
@Override
public void setup(Context.OperatorContext context)
{
tuplesPerCurrentWindow = maxTuplesPerWindow;
generateDiscounts();
generateRegionalTax();
initializeDataGenerators();
}
@Override
public void teardown()
{
}
@Override
public void emitTuples()
{
while (tuplesCounter++ < tuplesPerCurrentWindow) {
try {
SalesEvent salesEvent = generateSalesEvent();
this.jsonBytes.emit(mapper.writeValueAsBytes(salesEvent));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
/**
* Initialize regional and channel data generators
*/
void initializeDataGenerators() {
// Channels - with increased level of change
channelGenerator.setMoveDeviation(channelGenerator.getMoveDeviation() * 2);
for (int i=1; i <= getMaxChannelId(); i++) {
channelGenerator.add(i);
}
// Regions - default level of change
for (int i=1; i <= getMaxRegionId(); i++) {
regionalGenerator.add(i);
}
}
/**
* Generate discounts per sales channel and region based on discount tiers
*/
void generateDiscounts() {
// Discounts per sales channel
for (int i=1; i <= getMaxChannelId(); i++) {
// 50% chance of applying discount to a channel
channelDiscount.put(i, (random.nextBoolean()) ? discountTiers[random.nextInt(discountTiers.length)] : 0.0);
}
// Discounts per region
for (int i=1; i <= getMaxRegionId(); i++) {
// 50% chance of applying discount to a region
regionalDiscount.put(i, (random.nextBoolean()) ? discountTiers[random.nextInt(discountTiers.length)] : 0.0);
}
}
/**
* Generate taxes based on each region
*/
void generateRegionalTax() {
for (int i=1; i <= getMaxRegionId(); i++) {
regionalTax.put(i, taxTiers[random.nextInt(taxTiers.length)]);
}
}
SalesEvent generateSalesEvent() throws Exception {
SalesEvent salesEvent = new SalesEvent();
salesEvent.timestamp = System.currentTimeMillis();
salesEvent.productId = randomId(maxProductId);
salesEvent.channelId = channelGenerator.next();
salesEvent.regionId = regionalGenerator.next();
salesEvent.customerId = randomCustomerByRegion(salesEvent.regionId);
salesEvent.amount = randomAmount(minAmount, maxAmount);
salesEvent.tax = calculateTax(salesEvent.amount, salesEvent.regionId);
salesEvent.discount = calculateDiscount(salesEvent.amount, salesEvent.channelId, salesEvent.regionId);
if (addProductCategory) {
salesEvent.productCategory = 1 + (salesEvent.productId % maxProductCategories);
}
return salesEvent;
}
private int randomId(int max) {
// Provide safe default for invalid max
if (max < 1) return 1;
return 1 + random.nextInt(max);
}
private int randomCustomerByRegion(int regionId) {
int regionMultiplier = getMaxCustomerId() / getMaxRegionId();
return (regionId * regionMultiplier) - random.nextInt(regionMultiplier);
}
private double calculateTax(double amount, int regionId) {
double rawAmount = amount * regionalTax.get(regionId);
return Math.floor(rawAmount * 100) / 100;
}
private double calculateDiscount(double amount, int channelId, int regionId) {
// Total discount is combination of channel and region discounts up to maximum allowed discount percentage
double rawAmount = amount * Math.min(channelDiscount.get(channelId) + regionalDiscount.get(regionId), maxDiscountPercent);
return Math.floor(rawAmount * 100) / 100;
}
// Generate random double with gaussian distribution between min and max and two decimal places
private double randomAmount(double min, double max) {
// Provide safe default for invalid min/max relationships
if (max <= min) return Math.floor(min * 100) / 100;
double mean = (min + max)/2.0;
double deviation = (max - min)/2.0/3.0;
double rawAmount;
do {
rawAmount = random.nextGaussian() * deviation + mean;
} while (rawAmount < minAmount || rawAmount > maxAmount);
return Math.floor(rawAmount * 100) / 100;
}
// Generate random tax given transaction amount
private double randomPercent(double amount, double percent) {
double tax = amount * ( random.nextDouble() * percent);
return Math.floor(tax * 100) / 100;
}
public long getMaxTuplesPerWindow() {
return maxTuplesPerWindow;
}
public void setMaxTuplesPerWindow(long maxTuplesPerWindow) {
this.maxTuplesPerWindow = maxTuplesPerWindow;
}
public int getMaxProductId() {
return maxProductId;
}
public void setMaxProductId(int maxProductId) {
if (maxProductId >= 1)
this.maxProductId = maxProductId;
}
public int getMaxCustomerId() {
return maxCustomerId;
}
public void setMaxCustomerId(int maxCustomerId) {
if (maxCustomerId >= 1)
this.maxCustomerId = maxCustomerId;
}
public int getMaxChannelId() {
return maxChannelId;
}
public void setMaxChannelId(int maxChannelId) {
if (maxChannelId >= 1)
this.maxChannelId = maxChannelId;
}
public double getMinAmount() {
return minAmount;
}
public void setMinAmount(double minAmount) {
this.minAmount = minAmount;
}
public double getMaxAmount() {
return maxAmount;
}
public void setMaxAmount(double maxAmount) {
this.maxAmount = maxAmount;
}
public boolean isAddProductCategory() {
return addProductCategory;
}
public void setAddProductCategory(boolean addProductCategory) {
this.addProductCategory = addProductCategory;
}
public int getMaxProductCategories() {
return maxProductCategories;
}
public void setMaxProductCategories(int maxProductCategories) {
this.maxProductCategories = maxProductCategories;
}
public int getTuplesPerWindowDeviation() {
return tuplesPerWindowDeviation;
}
public void setTuplesPerWindowDeviation(int tuplesPerWindowDeviation) {
this.tuplesPerWindowDeviation = tuplesPerWindowDeviation;
}
public int getTuplesRateCycle() {
return tuplesRateCycle;
}
public void setTuplesRateCycle(int tuplesRateCycle) {
this.tuplesRateCycle = tuplesRateCycle;
}
public int getMaxRegionId() {
return maxRegionId;
}
public void setMaxRegionId(int maxRegionId) {
if (maxRegionId >= 1)
this.maxRegionId = maxRegionId;
}
public double getMaxDiscountPercent() {
return maxDiscountPercent;
}
public void setMaxDiscountPercent(double maxDiscountPercent) {
this.maxDiscountPercent = maxDiscountPercent;
}
public int getDiscountCycle() {
return discountCycle;
}
public void setDiscountCycle(int discountCycle) {
this.discountCycle = discountCycle;
}
}
/**
* A single sales event
*/
class SalesEvent {
/* dimension keys */
public long timestamp;
public int productId;
public int customerId;
public int channelId;
public int regionId;
public int productCategory;
/* metrics */
public double amount;
public double discount;
public double tax;
}