blob: 7c4510654d3003143c814164ac9e482a33ad5023 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.benchmark.spillable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl;
import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl;
import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
import org.apache.apex.malhar.lib.utils.serde.LongSerde;
import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ShutdownException;
import com.datatorrent.common.util.BaseOperator;
/**
* @since 3.6.0
*/
public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener
{
private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class);
public static final byte[] ID1 = new byte[] {(byte)1};
public static final byte[] ID2 = new byte[] {(byte)2};
public static final byte[] ID3 = new byte[] {(byte)3};
public SpillableArrayListMultimapImpl<String, String> multiMap;
public ManagedStateSpillableStateStore store;
public long totalCount = 0;
public transient long countInWindow;
public long minWinId = -1;
public long committedWinId = -1;
public long windowId;
public SpillableMapImpl<Long, Long> windowToCount;
public long shutdownCount = -1;
public static Throwable errorTrace;
private long lastLogTime;
private long beginTime;
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
{
@Override
public void process(String tuple)
{
processTuple(tuple);
}
};
public void processTuple(String tuple)
{
if (++totalCount == shutdownCount) {
throw new RuntimeException("Test recovery. count = " + totalCount);
}
countInWindow++;
multiMap.put("" + windowId, tuple);
}
@Override
public void setup(OperatorContext context)
{
super.setup(context);
if (windowToCount == null) {
windowToCount = createWindowToCountMap(store);
}
if (multiMap == null) {
multiMap = createMultimap(store);
}
store.setup(context);
windowToCount.setup(context);
multiMap.setup(context);
lastLogTime = System.currentTimeMillis();
beginTime = lastLogTime;
checkData();
}
public void checkData()
{
long startTime = System.currentTimeMillis();
logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount,
this.minWinId, committedWinId, this.windowId);
for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) {
Long count = this.windowToCount.get(winId);
SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId);
String msg;
if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) {
msg = "Invalid data/count. datas: " + datas + "; count: " + count;
logger.error(msg);
errorTrace = new RuntimeException(msg);
throw new ShutdownException();
} else {
int dataSize = datas.size();
if ((long)count != (long)dataSize) {
msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count);
logger.error(msg);
errorTrace = new RuntimeException(msg);
throw new ShutdownException();
}
}
}
logger.info("check data took {} millis.", System.currentTimeMillis() - startTime);
}
/**
* {@inheritDoc}
*/
@Override
public void beginWindow(long windowId)
{
store.beginWindow(windowId);
multiMap.beginWindow(windowId);
if (minWinId < 0) {
minWinId = windowId;
}
this.windowId = windowId;
countInWindow = 0;
}
@Override
public void endWindow()
{
multiMap.endWindow();
windowToCount.put(windowId, countInWindow);
windowToCount.endWindow();
store.endWindow();
if (windowId % 10 == 0) {
checkData();
logStatistics();
}
}
private long lastTotalCount = 0;
public void logStatistics()
{
long countInPeriod = totalCount - lastTotalCount;
long timeInPeriod = System.currentTimeMillis() - lastLogTime;
long totalTime = System.currentTimeMillis() - beginTime;
logger.info(
"Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}",
totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod);
}
@Override
public void beforeCheckpoint(long windowId)
{
store.beforeCheckpoint(windowId);
}
@Override
public void checkpointed(long windowId)
{
}
@Override
public void committed(long windowId)
{
this.committedWinId = windowId;
store.committed(windowId);
}
public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store)
{
return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
new StringSerde());
}
public static SpillableMapImpl<String, String> createMap(SpillableStateStore store)
{
return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(),
new StringSerde());
}
public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store)
{
return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(),
new LongSerde());
}
}