blob: a855c1559092d3b1c44f20f8760e5ca5a27c8c80 [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.scalability;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>InputItemGenerator class.</p>
*
* @since 0.3.2
*/
public class InputItemGenerator implements InputOperator
{
private static final Logger LOG = LoggerFactory.getLogger(InputItemGenerator.class);
public static int numPublishers = 50;
public static int numAdvertisers = 100;
public static int numAdUnits = 5;
private double expectedClickThruRate = 0.005;
private int blastCount = 10000;
private Random random = new Random();
public final transient DefaultOutputPort<AdInfo> outputPort = new DefaultOutputPort<AdInfo>();
public double getExpectedClickThruRate()
{
return expectedClickThruRate;
}
public void setExpectedClickThruRate(double expectedClickThruRate)
{
this.expectedClickThruRate = expectedClickThruRate;
}
public void setBlastCount(int blastCount)
{
this.blastCount = blastCount;
}
@Override
public void beginWindow(long windowId)
{
}
@Override
public void endWindow()
{
}
@Override
public void setup(OperatorContext context)
{
}
@Override
public void teardown()
{
}
private int nextRandomId(int max)
{
int id;
do {
id = (int)Math.abs(Math.round(random.nextGaussian() * max / 2));
}
while (id >= max);
return id;
}
@Override
public void emitTuples()
{
try {
long timestamp;
for (int i = 0; i <blastCount; ++i) {
int advertiserId = nextRandomId(numAdvertisers);
//int publisherId = (advertiserId * 10 / numAdvertisers) * numPublishers / 10 + nextRandomId(numPublishers / 10);
int publisherId = nextRandomId(numPublishers);
int adUnit = random.nextInt(numAdUnits);
double cost = 0.5 + 0.25 * random.nextDouble();
timestamp = System.currentTimeMillis();
emitTuple(false, publisherId, advertiserId, adUnit, cost, timestamp);
if (random.nextDouble() < expectedClickThruRate) {
double revenue = 0.5 + 0.5 * random.nextDouble();
timestamp = System.currentTimeMillis();
// generate fake click
emitTuple(true, publisherId, advertiserId, adUnit, revenue, timestamp);
}
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
private void emitTuple(boolean click, int publisherId, int advertiserId, int adUnit, double value, long timestamp) {
AdInfo adInfo = new AdInfo();
adInfo.setPublisherId(publisherId);
adInfo.setAdvertiserId(advertiserId);
adInfo.setAdUnit(adUnit);
adInfo.setClick(click);
adInfo.setValue(value);
adInfo.setTimestamp(timestamp);
this.outputPort.emit(adInfo);
}
}