blob: 74ba658418418b5ae90c2e00c8fae6912b69bf5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.benchmark.state;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import com.google.common.collect.Maps;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.netlet.util.Slice;
/**
* @since 3.6.0
*/
public class StoreOperator extends BaseOperator implements Operator.CheckpointNotificationListener
{
private static final Logger logger = LoggerFactory.getLogger(StoreOperator.class);
public enum ExecMode
{
INSERT,
UPDATE_SYNC,
UPDATE_ASYNC,
GET_SYNC,
DO_NOTHING
}
private static final int numOfWindowPerStatistics = 120;
//this is the store we are going to use
private ManagedTimeUnifiedStateImpl store;
private long tupleCount = 0;
private int windowCountPerStatistics = 0;
private long statisticsBeginTime = 0;
private long applicationBeginTime = 0;
private long totalTupleCount = 0;
private ExecMode execMode = ExecMode.INSERT;
private int timeRange = 1000 * 60;
public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>()
{
@Override
public void process(KeyValPair<byte[], byte[]> tuple)
{
processTuple(tuple);
}
};
@Override
public void setup(OperatorContext context)
{
logger.info("The execute mode is: {}", execMode.name());
store.setup(context);
}
@Override
public void teardown()
{
}
@Override
public void beginWindow(long windowId)
{
store.beginWindow(windowId);
if (statisticsBeginTime <= 0) {
statisticsBeginTime = System.currentTimeMillis();
}
if (applicationBeginTime <= 0) {
applicationBeginTime = System.currentTimeMillis();
}
}
@Override
public void endWindow()
{
store.endWindow();
if (++windowCountPerStatistics >= numOfWindowPerStatistics) {
logStatistics();
windowCountPerStatistics = 0;
}
}
private transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>();
private transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair = Maps.newHashMap();
/**
* we verify 3 type of operation
* @param tuple
*/
private Slice keySliceForRead = new Slice(null, 0, 0);
private void processTuple(KeyValPair<byte[], byte[]> tuple)
{
switch (execMode) {
case UPDATE_ASYNC:
//handle it specially
updateAsync(tuple);
break;
case UPDATE_SYNC:
keySliceForRead.buffer = tuple.getKey();
keySliceForRead.offset = 0;
keySliceForRead.length = tuple.getKey().length;
store.getSync(getTimeByKey(tuple.getKey()), keySliceForRead);
insertValueToStore(tuple);
break;
case GET_SYNC:
store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey()));
break;
case DO_NOTHING:
break;
default: //insert
insertValueToStore(tuple);
}
++tupleCount;
}
private transient long sameKeyCount = 0;
private transient long preKey = -1;
private long getTimeByKey(byte[] key)
{
long lKey = ByteBuffer.wrap(key).getLong();
lKey = lKey - (lKey % timeRange);
if (preKey == lKey) {
sameKeyCount++;
} else {
logger.info("key: {} count: {}", preKey, sameKeyCount);
preKey = lKey;
sameKeyCount = 1;
}
return lKey;
}
// give a barrier to avoid used up memory
private final int taskBarrier = 100000;
/**
* This method first send request of get to the state manager, then handle all the task(get) which already done and update the value.
* @param tuple
*/
private void updateAsync(KeyValPair<byte[], byte[]> tuple)
{
if (taskQueue.size() > taskBarrier) {
//slow down to avoid too much task waiting.
try {
logger.info("Queue Size: {}, wait time(milli-seconds): {}", taskQueue.size(), taskQueue.size() / taskBarrier);
Thread.sleep(taskQueue.size() / taskBarrier);
} catch (Exception e) {
//ignore
}
}
//send request of get to the state manager and add to the taskQueue and taskToPair.
//the reason of an extra taskQueue to make sure the tasks are ordered
{
Slice key = new Slice(tuple.getKey());
Future<Slice> task = store.getAsync(getTimeByKey(tuple.getKey()), key);
taskQueue.add(task);
taskToPair.put(task, tuple);
}
//handle all the tasks which have finished
while (!taskQueue.isEmpty()) {
//assume task finished in order.
if (!taskQueue.peek().isDone()) {
break;
}
Future<Slice> task = taskQueue.poll();
insertValueToStore(taskToPair.remove(task));
}
}
private void insertValueToStore(KeyValPair<byte[], byte[]> tuple)
{
Slice key = new Slice(tuple.getKey());
Slice value = new Slice(tuple.getValue());
store.put(System.currentTimeMillis(), key, value);
}
@Override
public void checkpointed(long windowId)
{
}
@Override
public void committed(long windowId)
{
store.committed(windowId);
}
@Override
public void beforeCheckpoint(long windowId)
{
store.beforeCheckpoint(windowId);
logger.debug("beforeCheckpoint {}", windowId);
}
public ManagedTimeUnifiedStateImpl getStore()
{
return store;
}
public void setStore(ManagedTimeUnifiedStateImpl store)
{
this.store = store;
}
private void logStatistics()
{
final long now = System.currentTimeMillis();
long spentTime = now - statisticsBeginTime;
long totalSpentTime = now - applicationBeginTime;
totalTupleCount += tupleCount;
logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime,
totalTupleCount * 1000 / totalSpentTime);
statisticsBeginTime = System.currentTimeMillis();
tupleCount = 0;
}
public ExecMode getExecMode()
{
return execMode;
}
public void setExecMode(ExecMode execMode)
{
this.execMode = execMode;
}
public String getExecModeString()
{
return execMode.name();
}
public void setExecModeStr(String execModeStr)
{
//this method used for set configuration. so, use case-insensitive
for (ExecMode em : ExecMode.values()) {
if (em.name().equalsIgnoreCase(execModeStr)) {
this.execMode = em;
}
}
}
public int getTimeRange()
{
return timeRange;
}
public void setTimeRange(int timeRange)
{
this.timeRange = timeRange;
}
}