blob: 47f1ea0577f9a8243d630558737f4322a565c46c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.Stats.OperatorStats.PortStats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.engine.StatsTest.TestCollector.TestCollectorStatsListener;
import com.datatorrent.stram.engine.StatsTest.TestOperator.TestInputStatsListener;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
/**
* Tests the stats generated in the system.
*
*/
public class StatsTest
{
private static final Logger LOG = LoggerFactory.getLogger(StatsTest.class);
public static class TestOperator extends TestGeneratorInputOperator
{
transient long windowId;
boolean shutdown = false; // make sure shutdown happens after endwindow when stats are generated
@Override
public void beginWindow(long windowId)
{
if (shutdown) {
BaseOperator.shutdown();
}
this.windowId = windowId;
}
@Override
public void emitTuples()
{
try {
if (!shutdown) {
super.emitTuples();
}
} catch (ShutdownException ex) {
shutdown = true;
}
}
public static class TestInputStatsListener implements StatsListener, Serializable
{
private static final long serialVersionUID = 1L;
private List<OperatorStats> inputOperatorStats = new ArrayList<>();
@Override
public Response processStats(BatchedOperatorStats stats)
{
inputOperatorStats.addAll(stats.getLastWindowedStats());
Response rsp = new Response();
return rsp;
}
}
}
@StatsListener.DataQueueSize
public static class TestCollector extends GenericTestOperator implements StatsListener
{
transient long windowId;
List<OperatorStats> collectorOperatorStats = new ArrayList<>();
@Override
public Response processStats(BatchedOperatorStats stats)
{
collectorOperatorStats.addAll(stats.getLastWindowedStats());
Response rsp = new Response();
return rsp;
}
public void validateStats()
{
for (OperatorStats operatorStats : collectorOperatorStats) {
for (PortStats inputPortStats : operatorStats.inputPorts) {
Assert.assertTrue("Validate input port queue size " + inputPortStats.queueSize, inputPortStats.queueSize == 0);
}
}
}
@Override
public void beginWindow(long windowId)
{
this.windowId = windowId;
}
public static class TestCollectorStatsListener implements StatsListener, Serializable
{
private static final long serialVersionUID = 1L;
List<OperatorStats> collectorOperatorStats = new ArrayList<>();
@Override
public Response processStats(BatchedOperatorStats stats)
{
collectorOperatorStats.addAll(stats.getLastWindowedStats());
Response rsp = new Response();
return rsp;
}
public void validateStats()
{
for (OperatorStats operatorStats : collectorOperatorStats) {
for (PortStats inputPortStats : operatorStats.inputPorts) {
Assert.assertTrue("Validate input port queue size " + inputPortStats.queueSize, inputPortStats.queueSize >= 0);
}
}
}
}
@StatsListener.DataQueueSize
public static class QueueAwareTestCollectorStatsListener extends TestCollectorStatsListener
{
private static final long serialVersionUID = 2L;
public void validateStats()
{
for (OperatorStats operatorStats : collectorOperatorStats) {
for (PortStats inputPortStats : operatorStats.inputPorts) {
Assert.assertTrue("Validate input port queue size " + inputPortStats.queueSize, inputPortStats.queueSize == 0);
}
}
}
}
}
/**
* Verify buffer server bytes and tuple count.
*
* @throws Exception
*/
@Test
@SuppressWarnings("SleepWhileInLoop")
public void testPortStatsPropagation() throws Exception
{
int tupleCount = 10;
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target").getAbsolutePath();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
TestInputStatsListener testInputStatsListener = new TestInputStatsListener();
dag.setOperatorAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener}));
testOper.setMaxTuples(tupleCount);
testOper.setEmitInterval(0);
TestCollector collector = dag.addOperator("Collector", new TestCollector());
TestCollectorStatsListener testCollectorStatsListener = new TestCollectorStatsListener();
dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testCollectorStatsListener}));
dag.addStream("TestTuples", testOper.outport, collector.inport1).setLocality(null);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.run();
Assert.assertFalse("input operator stats", testInputStatsListener.inputOperatorStats.isEmpty());
Assert.assertFalse("collector operator stats", testCollectorStatsListener.collectorOperatorStats.isEmpty());
try {
int outputPortTupleCount = 0;
long outputPortBufferServerBytes = 0L;
for (Iterator<OperatorStats> it = testInputStatsListener.inputOperatorStats.iterator(); it.hasNext(); ) {
OperatorStats operatorStats = it.next();
for (PortStats outputPortStats : operatorStats.outputPorts) {
outputPortTupleCount += outputPortStats.tupleCount;
outputPortBufferServerBytes += outputPortStats.bufferServerBytes;
}
}
int inputPortTupleCount = 0;
long inputPortBufferServerBytes = 0L;
for (Iterator<OperatorStats> it = testCollectorStatsListener.collectorOperatorStats.iterator(); it.hasNext(); ) {
OperatorStats operatorStats = it.next();
for (PortStats inputPortStats : operatorStats.inputPorts) {
inputPortTupleCount += inputPortStats.tupleCount;
inputPortBufferServerBytes += inputPortStats.bufferServerBytes;
}
}
Assert.assertEquals("Tuple Count emitted", tupleCount, outputPortTupleCount);
Assert.assertTrue("Buffer server bytes", inputPortBufferServerBytes > 0);
Assert.assertEquals("Tuple Count processed", tupleCount, inputPortTupleCount);
Assert.assertTrue("Buffer server bytes", outputPortBufferServerBytes > 0);
} finally {
lc.shutdown();
}
}
@SuppressWarnings("SleepWhileInLoop")
private void baseTestForQueueSize(int maxTuples, TestCollectorStatsListener statsListener, DAG.Locality locality) throws Exception
{
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath();
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200);
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
testOper.setMaxTuples(maxTuples);
TestCollector collector = dag.addOperator("Collector", new TestCollector());
if (statsListener != null) {
dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{statsListener}));
}
dag.addStream("TestTuples", testOper.outport, collector.inport1).setLocality(locality);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
StreamingContainerManager dnmgr = lc.getStreamingContainerManager();
Map<Integer, PTOperator> operatorMap = dnmgr.getPhysicalPlan().getAllOperators();
for (PTOperator p : operatorMap.values()) {
StramTestSupport.waitForActivation(lc, p);
}
long startTms = System.currentTimeMillis();
if (statsListener != null) {
while (statsListener.collectorOperatorStats.isEmpty() && (StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - startTms)) {
Thread.sleep(300);
LOG.debug("Waiting for stats");
}
} else {
while (collector.collectorOperatorStats.isEmpty() && (StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - startTms)) {
Thread.sleep(300);
LOG.debug("Waiting for stats");
}
}
if (statsListener != null) {
statsListener.validateStats();
} else {
collector.validateStats();
}
lc.shutdown();
}
/**
* Verify queue size.
*
* @throws Exception
*/
@Test
public void testQueueSizeForContainerLocalOperators() throws Exception
{
baseTestForQueueSize(10, new TestCollectorStatsListener(), DAG.Locality.CONTAINER_LOCAL);
}
@Test
public void testQueueSize() throws Exception
{
baseTestForQueueSize(10, new TestCollectorStatsListener(), null);
}
/**
* Verify queue size.
*
* @throws Exception
*/
@Test
public void testQueueSizeWithQueueAwareStatsListenerForContainerLocalOperators() throws Exception
{
baseTestForQueueSize(0, new TestCollector.QueueAwareTestCollectorStatsListener(), DAG.Locality.CONTAINER_LOCAL);
}
@Test
public void testQueueSizeWithQueueAwareStatsListener() throws Exception
{
baseTestForQueueSize(0, new TestCollector.QueueAwareTestCollectorStatsListener(), null);
}
@Test
public void testQueueSizeWithOperatorAsStatsListener() throws Exception
{
baseTestForQueueSize(0, null, null);
}
}