blob: 4b7236053066aa5e2272f36a7320ca577b49b29d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.validation.constraints.NotNull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.engine.AutoMetricTest.TestOperator.TestStatsListener;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.support.StramTestSupport;
public class AutoMetricTest
{
private static final Logger LOG = LoggerFactory.getLogger(AutoMetricTest.class);
public static class TestOperator extends TestGeneratorInputOperator implements Partitioner<TestOperator>, StatsListener
{
static class TestOperatorStats implements Serializable
{
private String message;
private boolean attributeListenerCalled;
private static final long serialVersionUID = -8096838101190642798L;
private boolean currentPropVal;
}
public static class TestStatsListener implements StatsListener, Serializable
{
private static final long serialVersionUID = 1L;
private boolean lastPropVal;
@Override
public Response processStats(BatchedOperatorStats stats)
{
for (OperatorStats os : stats.getLastWindowedStats()) {
Assert.assertNotNull("metrics", os.metrics.get("operatorMetric"));
TestOperatorStats loperatorStats = (TestOperatorStats)os.metrics.get("operatorMetric");
loperatorStats.attributeListenerCalled = true;
lastPropVal = loperatorStats.currentPropVal;
}
if (lastPropVal) {
Assert.assertNotNull(stats.getOperatorResponse());
Assert.assertTrue(1 == stats.getOperatorResponse().size());
Assert.assertEquals("test", stats.getOperatorResponse().get(0).getResponse());
}
Response rsp = new Response();
rsp.operatorRequests = Lists.newArrayList(new SetPropertyRequest());
return rsp;
}
public static class SetPropertyRequest implements OperatorRequest, Serializable
{
private static final long serialVersionUID = 1L;
@Override
public OperatorResponse execute(Operator oper, int arg1, long arg2) throws IOException
{
if (oper instanceof TestOperator) {
LOG.debug("Setting property");
((TestOperator)oper).propVal = true;
}
return new TestOperatorResponse();
}
}
public static class TestOperatorResponse implements OperatorResponse, Serializable
{
private static final long serialVersionUID = 2L;
@Override
public Object getResponseId()
{
return 1;
}
@Override
public Object getResponse()
{
return "test";
}
}
}
private static TestOperatorStats lastMetric = null;
private static Thread processStatsThread = null;
private static Thread definePartitionsThread = null;
private transient boolean propVal;
@AutoMetric
private TestOperatorStats operatorMetric;
@Override
public void partitioned(Map<Integer, Partition<TestOperator>> partitions)
{
}
@Override
public Collection<Partition<TestOperator>> definePartitions(Collection<Partition<TestOperator>> partitions, PartitioningContext context)
{
List<Partition<TestOperator>> newPartitions = Lists.newArrayList();
newPartitions.addAll(partitions);
for (Partition<?> p : partitions) {
BatchedOperatorStats stats = p.getStats();
if (stats != null) {
definePartitionsThread = Thread.currentThread();
for (OperatorStats os : stats.getLastWindowedStats()) {
if (os.metrics.get("operatorMetric") != null) {
lastMetric = (TestOperatorStats)os.metrics.get("operatorMetric");
}
}
}
}
return newPartitions;
}
@Override
public void endWindow()
{
super.endWindow();
operatorMetric = new TestOperatorStats();
operatorMetric.message = "interesting";
operatorMetric.currentPropVal = this.propVal;
}
@Override
public Response processStats(BatchedOperatorStats stats)
{
processStatsThread = Thread.currentThread();
for (OperatorStats os : stats.getLastWindowedStats()) {
Assert.assertNotNull("metric in listener", os.metrics.get("operatorMetric"));
}
Response rsp = new Response();
rsp.repartitionRequired = true; // trigger definePartitions
return rsp;
}
}
private LogicalPlan dag;
@Before
public void setup()
{
dag = StramTestSupport.createDAG(testMeta);
}
/**
* Verify custom stats generated by operator are propagated and trigger repartition.
*
* @throws Exception
*/
@Test
@SuppressWarnings("SleepWhileInLoop")
public void testMetricPropagation() throws Exception
{
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class);
TestStatsListener sl = new TestStatsListener();
dag.setOperatorAttribute(testOper, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
GenericTestOperator collector = dag.addOperator("Collector", new GenericTestOperator());
dag.addStream("TestTuples", testOper.outport, collector.inport1).setLocality(Locality.CONTAINER_LOCAL);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
long startTms = System.currentTimeMillis();
while (TestOperator.lastMetric == null && StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - startTms) {
Thread.sleep(300);
LOG.debug("Waiting for stats");
}
while (StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - startTms) {
if (sl.lastPropVal) {
break;
}
Thread.sleep(100);
LOG.debug("Waiting for property set");
}
lc.shutdown();
Assert.assertNotNull("metric received", TestOperator.lastMetric);
Assert.assertEquals("metric message", "interesting", TestOperator.lastMetric.message);
Assert.assertTrue("attribute defined stats listener called", TestOperator.lastMetric.attributeListenerCalled);
Assert.assertSame("single thread", TestOperator.definePartitionsThread, TestOperator.processStatsThread);
Assert.assertTrue("property set", sl.lastPropVal);
}
@Rule
public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
@Test
public void testMetrics() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
MockAggregator aggregator = new MockAggregator(latch);
dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("progress", 1L, ((Long)aggregator.result.get("progress")).longValue());
lc.shutdown();
}
@Test
@Ignore
public void testMetricsAggregations() throws Exception
{
CountDownLatch latch = new CountDownLatch(2);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class);
MockAggregator aggregator = new MockAggregator(latch);
dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.setOperatorAttribute(o1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2));
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("progress", 2L, ((Long)aggregator.result.get("progress")).longValue());
lc.shutdown();
}
@Test
public void testInjectionOfDefaultMetricsAggregator() throws Exception
{
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
LogicalPlan.OperatorMeta o1meta = dag.getOperatorMeta("o1");
Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator());
}
@Test
public void testDefaultMetricsAggregator() throws Exception
{
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
CountDownLatch latch = new CountDownLatch(1);
OperatorAndAggregator o1 = dag.addOperator("o1", new OperatorAndAggregator(latch));
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
LogicalPlan.OperatorMeta o1meta = dag.getOperatorMeta("o1");
Assert.assertNotNull("default aggregator injected", o1meta.getMetricAggregatorMeta().getAggregator());
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("progress", 1, o1.result.get("progress"));
lc.shutdown();
}
private static class MockAggregator implements AutoMetric.Aggregator, Serializable
{
long cachedSum = -1;
Map<String, Object> result = Maps.newHashMap();
transient CountDownLatch latch;
private MockAggregator(CountDownLatch latch)
{
this.latch = Preconditions.checkNotNull(latch);
}
@Override
public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics)
{
long sum = 0;
int myMetricSum = 0;
for (AutoMetric.PhysicalMetricsContext physicalMetricsContext : physicalMetrics) {
sum += (Integer)physicalMetricsContext.getMetrics().get("progress");
if (physicalMetricsContext.getMetrics().containsKey("myMetric")) {
myMetricSum += (Integer)physicalMetricsContext.getMetrics().get("myMetric");
}
}
cachedSum = sum;
result.put("progress", cachedSum);
result.put("myMetric", myMetricSum);
latch.countDown();
return result;
}
private static final long serialVersionUID = 201503311744L;
}
public static class OperatorWithMetrics extends GenericTestOperator
{
@AutoMetric
protected int progress;
@Override
public void endWindow()
{
progress = 1;
super.endWindow();
}
}
public static class OperatorWithMetricMethod extends OperatorWithMetrics
{
@AutoMetric
public int getMyMetric()
{
return 3;
}
}
public static class OperatorAndAggregator extends OperatorWithMetrics implements AutoMetric.Aggregator
{
Map<String, Object> result = Maps.newHashMap();
private final transient CountDownLatch latch;
private OperatorAndAggregator()
{
latch = null;
}
OperatorAndAggregator(@NotNull CountDownLatch latch)
{
this.latch = Preconditions.checkNotNull(latch);
}
@Override
public Map<String, Object> aggregate(long windowId, Collection<AutoMetric.PhysicalMetricsContext> physicalMetrics)
{
result.put("progress", physicalMetrics.iterator().next().getMetrics().get("progress"));
latch.countDown();
return result;
}
}
@Test
public void testMetricsAnnotatedMethod() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration());
TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class);
OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class);
MockAggregator aggregator = new MockAggregator(latch);
dag.setOperatorAttribute(o1, Context.OperatorContext.METRICS_AGGREGATOR, aggregator);
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.addStream("TestTuples", inputOperator.outport, o1.inport1);
lpc.prepareDAG(dag, null, "AutoMetricTest");
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
latch.await();
Assert.assertEquals("myMetric", 3, ((Integer)aggregator.result.get("myMetric")).intValue());
lc.shutdown();
}
}