| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.benchmark.spillable; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl; |
| import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl; |
| import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl; |
| import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; |
| import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; |
| import org.apache.apex.malhar.lib.utils.serde.LongSerde; |
| import org.apache.apex.malhar.lib.utils.serde.StringSerde; |
| |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.Operator; |
| import com.datatorrent.api.Operator.ShutdownException; |
| import com.datatorrent.common.util.BaseOperator; |
| |
| /** |
| * @since 3.6.0 |
| */ |
| public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener |
| { |
| private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class); |
| |
| public static final byte[] ID1 = new byte[] {(byte)1}; |
| public static final byte[] ID2 = new byte[] {(byte)2}; |
| public static final byte[] ID3 = new byte[] {(byte)3}; |
| |
| public SpillableArrayListMultimapImpl<String, String> multiMap; |
| |
| public ManagedStateSpillableStateStore store; |
| |
| public long totalCount = 0; |
| public transient long countInWindow; |
| public long minWinId = -1; |
| public long committedWinId = -1; |
| public long windowId; |
| |
| public SpillableMapImpl<Long, Long> windowToCount; |
| |
| public long shutdownCount = -1; |
| |
| public static Throwable errorTrace; |
| |
| private long lastLogTime; |
| private long beginTime; |
| |
| public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() |
| { |
| @Override |
| public void process(String tuple) |
| { |
| processTuple(tuple); |
| } |
| }; |
| |
| public void processTuple(String tuple) |
| { |
| if (++totalCount == shutdownCount) { |
| throw new RuntimeException("Test recovery. count = " + totalCount); |
| } |
| countInWindow++; |
| multiMap.put("" + windowId, tuple); |
| } |
| |
| @Override |
| public void setup(OperatorContext context) |
| { |
| super.setup(context); |
| if (windowToCount == null) { |
| windowToCount = createWindowToCountMap(store); |
| } |
| if (multiMap == null) { |
| multiMap = createMultimap(store); |
| } |
| |
| store.setup(context); |
| windowToCount.setup(context); |
| multiMap.setup(context); |
| |
| lastLogTime = System.currentTimeMillis(); |
| beginTime = lastLogTime; |
| |
| checkData(); |
| } |
| |
| public void checkData() |
| { |
| long startTime = System.currentTimeMillis(); |
| logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount, |
| this.minWinId, committedWinId, this.windowId); |
| for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) { |
| Long count = this.windowToCount.get(winId); |
| SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId); |
| String msg; |
| if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) { |
| msg = "Invalid data/count. datas: " + datas + "; count: " + count; |
| logger.error(msg); |
| errorTrace = new RuntimeException(msg); |
| throw new ShutdownException(); |
| } else { |
| int dataSize = datas.size(); |
| if ((long)count != (long)dataSize) { |
| msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count); |
| logger.error(msg); |
| errorTrace = new RuntimeException(msg); |
| throw new ShutdownException(); |
| } |
| } |
| } |
| logger.info("check data took {} millis.", System.currentTimeMillis() - startTime); |
| } |
| |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void beginWindow(long windowId) |
| { |
| store.beginWindow(windowId); |
| multiMap.beginWindow(windowId); |
| if (minWinId < 0) { |
| minWinId = windowId; |
| } |
| |
| this.windowId = windowId; |
| countInWindow = 0; |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| multiMap.endWindow(); |
| windowToCount.put(windowId, countInWindow); |
| windowToCount.endWindow(); |
| store.endWindow(); |
| |
| if (windowId % 10 == 0) { |
| checkData(); |
| logStatistics(); |
| } |
| } |
| |
| private long lastTotalCount = 0; |
| |
| public void logStatistics() |
| { |
| long countInPeriod = totalCount - lastTotalCount; |
| long timeInPeriod = System.currentTimeMillis() - lastLogTime; |
| long totalTime = System.currentTimeMillis() - beginTime; |
| logger.info( |
| "Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}", |
| totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod); |
| } |
| |
| @Override |
| public void beforeCheckpoint(long windowId) |
| { |
| store.beforeCheckpoint(windowId); |
| } |
| |
| @Override |
| public void checkpointed(long windowId) |
| { |
| } |
| |
| @Override |
| public void committed(long windowId) |
| { |
| this.committedWinId = windowId; |
| store.committed(windowId); |
| } |
| |
| public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store) |
| { |
| return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(), |
| new StringSerde()); |
| } |
| |
| public static SpillableMapImpl<String, String> createMap(SpillableStateStore store) |
| { |
| return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(), |
| new StringSerde()); |
| } |
| |
| public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store) |
| { |
| return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(), |
| new LongSerde()); |
| } |
| } |