blob: 3f2c20b7a5363412fc831e14d2da8cd364a10571 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.websocket.WebSocket;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputByteBuffer;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.Stats.OperatorStats.PortStats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.MockContainer.MockOperatorStats;
import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.OperatorDeployInfo.InputDeployInfo;
import com.datatorrent.stram.api.OperatorDeployInfo.OutputDeployInfo;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestAppDataQueryOperator;
import com.datatorrent.stram.engine.TestAppDataResultOperator;
import com.datatorrent.stram.engine.TestAppDataSourceOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.OperatorStatus.PortStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PhysicalPlanTest;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.EmbeddedWebSocketServer;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
public class StreamingContainerManagerTest
{
@Rule
public TestMeta testMeta = new TestMeta();
private LogicalPlan dag;
@Before
public void setup()
{
dag = StramTestSupport.createDAG(testMeta);
}
@Test
public void testDeployInfoSerialization() throws Exception
{
OperatorDeployInfo ndi = new OperatorDeployInfo();
ndi.name = "node1";
ndi.type = OperatorDeployInfo.OperatorType.GENERIC;
ndi.id = 1;
ndi.contextAttributes = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
ndi.contextAttributes.put(OperatorContext.SPIN_MILLIS, 100);
OperatorDeployInfo.InputDeployInfo input = new OperatorDeployInfo.InputDeployInfo();
input.declaredStreamId = "streamToNode";
input.portName = "inputPortNameOnNode";
input.sourceNodeId = 99;
ndi.inputs = new ArrayList<>();
ndi.inputs.add(input);
OperatorDeployInfo.OutputDeployInfo output = new OperatorDeployInfo.OutputDeployInfo();
output.declaredStreamId = "streamFromNode";
output.portName = "outputPortNameOnNode";
ndi.outputs = new ArrayList<>();
ndi.outputs.add(output);
ContainerHeartbeatResponse scc = new ContainerHeartbeatResponse();
scc.deployRequest = Collections.singletonList(ndi);
DataOutputByteBuffer out = new DataOutputByteBuffer();
scc.write(out);
DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(out.getData());
ContainerHeartbeatResponse clone = new ContainerHeartbeatResponse();
clone.readFields(in);
Assert.assertNotNull(clone.deployRequest);
Assert.assertEquals(1, clone.deployRequest.size());
OperatorDeployInfo ndiClone = clone.deployRequest.get(0);
Assert.assertEquals("name", ndi.name, ndiClone.name);
Assert.assertEquals("type", ndi.type, ndiClone.type);
String nodeToString = ndi.toString();
Assert.assertTrue(nodeToString.contains(input.portName));
Assert.assertTrue(nodeToString.contains(output.portName));
Assert.assertEquals("contextAttributes " + ndiClone.contextAttributes, Integer.valueOf(100), ndiClone.contextAttributes.get(OperatorContext.SPIN_MILLIS));
}
@Test
public void testGenerateDeployInfo()
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
dag.setOutputPortAttribute(o1.outport,PortContext.BUFFER_MEMORY_MB,256);
dag.addStream("o1.outport", o1.outport, o2.inport1);
dag.setOutputPortAttribute(o1.outport, PortContext.SPIN_MILLIS, 99);
dag.addStream("o2.outport1", o2.outport1, o3.inport1)
.setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("o3.outport1", o3.outport1, o4.inport1)
.setLocality(Locality.THREAD_LOCAL);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
Assert.assertEquals("number operators", 4, dag.getAllOperators().size());
Assert.assertEquals("number root operators", 1, dag.getRootOperators().size());
StreamingContainerManager dnm = new StreamingContainerManager(dag);
Assert.assertEquals("number containers", 2, dnm.getPhysicalPlan().getContainers().size());
dnm.assignContainer(new ContainerResource(0, "container1Id", "host1", 1024, 0, null), InetSocketAddress.createUnresolved("host1", 9001));
dnm.assignContainer(new ContainerResource(0, "container2Id", "host2", 1024, 0, null), InetSocketAddress.createUnresolved("host2", 9002));
StreamingContainerAgent sca1 = dnm.getContainerAgent(dnm.getPhysicalPlan().getContainers().get(0).getExternalId());
StreamingContainerAgent sca2 = dnm.getContainerAgent(dnm.getPhysicalPlan().getContainers().get(1).getExternalId());
Assert.assertEquals("", dnm.getPhysicalPlan().getContainers().get(0), sca1.container);
Assert.assertEquals("", PTContainer.State.ALLOCATED, sca1.container.getState());
List<OperatorDeployInfo> c1 = sca1.getDeployInfoList(sca1.container.getOperators());
Assert.assertEquals("number operators assigned to c1", 1, c1.size());
OperatorDeployInfo o1DI = getNodeDeployInfo(c1, dag.getMeta(o1));
Assert.assertNotNull(o1 + " assigned to " + sca1.container.getExternalId(), o1DI);
Assert.assertEquals("type " + o1DI, OperatorDeployInfo.OperatorType.INPUT, o1DI.type);
Assert.assertEquals("inputs " + o1DI.name, 0, o1DI.inputs.size());
Assert.assertEquals("outputs " + o1DI.name, 1, o1DI.outputs.size());
Assert.assertNotNull("contextAttributes " + o1DI.name, o1DI.contextAttributes);
OutputDeployInfo c1o1outport = o1DI.outputs.get(0);
Assert.assertNotNull("stream connection for container1", c1o1outport);
Assert.assertEquals("stream connection for container1", "o1.outport", c1o1outport.declaredStreamId);
Assert.assertEquals("stream connects to upstream host", sca1.container.host, c1o1outport.bufferServerHost);
Assert.assertEquals("stream connects to upstream port", sca1.container.bufferServerAddress.getPort(), c1o1outport.bufferServerPort);
Assert.assertNotNull("contextAttributes " + c1o1outport, c1o1outport.contextAttributes);
Assert.assertEquals("contextAttributes " + c1o1outport, Integer.valueOf(99), c1o1outport.contextAttributes.get(PortContext.SPIN_MILLIS));
List<OperatorDeployInfo> c2 = sca2.getDeployInfoList(sca2.container.getOperators());
Assert.assertEquals("number operators assigned to container", 3, c2.size());
OperatorDeployInfo o2DI = getNodeDeployInfo(c2, dag.getMeta(o2));
OperatorDeployInfo o3DI = getNodeDeployInfo(c2, dag.getMeta(o3));
Assert.assertNotNull(dag.getMeta(o2) + " assigned to " + sca2.container.getExternalId(), o2DI);
Assert.assertNotNull(dag.getMeta(o3) + " assigned to " + sca2.container.getExternalId(), o3DI);
Assert.assertTrue("The buffer server memory for container 1", 256 == sca1.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB));
Assert.assertTrue("The buffer server memory for container 2", 0 == sca2.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB));
// buffer server input o2 from o1
InputDeployInfo c2o2i1 = getInputDeployInfo(o2DI, "o1.outport");
Assert.assertNotNull("stream connection for container2", c2o2i1);
Assert.assertEquals("stream connects to upstream host", sca1.container.host, c2o2i1.bufferServerHost);
Assert.assertEquals("stream connects to upstream port", sca1.container.bufferServerAddress.getPort(), c2o2i1.bufferServerPort);
Assert.assertEquals("portName " + c2o2i1, dag.getMeta(o2).getMeta(o2.inport1).getPortName(), c2o2i1.portName);
Assert.assertNull("partitionKeys " + c2o2i1, c2o2i1.partitionKeys);
Assert.assertEquals("sourceNodeId " + c2o2i1, o1DI.id, c2o2i1.sourceNodeId);
Assert.assertEquals("sourcePortName " + c2o2i1, TestGeneratorInputOperator.OUTPUT_PORT, c2o2i1.sourcePortName);
Assert.assertNotNull("contextAttributes " + c2o2i1, c2o2i1.contextAttributes);
// inline input o3 from o2
InputDeployInfo c2o3i1 = getInputDeployInfo(o3DI, "o2.outport1");
Assert.assertNotNull("input from o2.outport1", c2o3i1);
Assert.assertEquals("portName " + c2o3i1, GenericTestOperator.IPORT1, c2o3i1.portName);
Assert.assertNotNull("stream connection for container2", c2o3i1);
Assert.assertNull("bufferServerHost " + c2o3i1, c2o3i1.bufferServerHost);
Assert.assertEquals("bufferServerPort " + c2o3i1, 0, c2o3i1.bufferServerPort);
Assert.assertNull("partitionKeys " + c2o3i1, c2o3i1.partitionKeys);
Assert.assertEquals("sourceNodeId " + c2o3i1, o2DI.id, c2o3i1.sourceNodeId);
Assert.assertEquals("sourcePortName " + c2o3i1, GenericTestOperator.OPORT1, c2o3i1.sourcePortName);
Assert.assertEquals("locality " + c2o3i1, Locality.CONTAINER_LOCAL, c2o3i1.locality);
// THREAD_LOCAL o4.inport1
OperatorDeployInfo o4DI = getNodeDeployInfo(c2, dag.getMeta(o4));
Assert.assertNotNull(dag.getMeta(o4) + " assigned to " + sca2.container.getExternalId(), o4DI);
InputDeployInfo c2o4i1 = getInputDeployInfo(o4DI, "o3.outport1");
Assert.assertNotNull("input from o3.outport1", c2o4i1);
Assert.assertEquals("portName " + c2o4i1, GenericTestOperator.IPORT1, c2o4i1.portName);
Assert.assertNotNull("stream connection for container2", c2o4i1);
Assert.assertNull("bufferServerHost " + c2o4i1, c2o4i1.bufferServerHost);
Assert.assertEquals("bufferServerPort " + c2o4i1, 0, c2o4i1.bufferServerPort);
Assert.assertNull("partitionKeys " + c2o4i1, c2o4i1.partitionKeys);
Assert.assertEquals("sourceNodeId " + c2o4i1, o3DI.id, c2o4i1.sourceNodeId);
Assert.assertEquals("sourcePortName " + c2o4i1, GenericTestOperator.OPORT1, c2o4i1.sourcePortName);
Assert.assertEquals("locality " + c2o4i1, Locality.THREAD_LOCAL, c2o4i1.locality);
}
@Test
public void testStaticPartitioning()
{
//
// ,---> node2----,
// | |
// node1---+---> node2----+---> unifier | node3
// | |
// '---> node2----'
//
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
PhysicalPlanTest.PartitioningTestOperator node2 = dag.addOperator("node2", PhysicalPlanTest.PartitioningTestOperator.class);
node2.setPartitionCount(3);
dag.setOperatorAttribute(node2, OperatorContext.SPIN_MILLIS, 10); /* this should not affect anything materially */
dag.setOutputPortAttribute(node2.outport1, PortContext.QUEUE_CAPACITY, 1111);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
dag.setInputPortAttribute(node3.inport1, PortContext.QUEUE_CAPACITY, 2222);
LogicalPlan.StreamMeta n1n2 = dag.addStream("n1n2", node1.outport1, node2.inport1);
LogicalPlan.StreamMeta n2n3 = dag.addStream("n2n3", node2.outport1, node3.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, Integer.MAX_VALUE);
MemoryStorageAgent msa = new MemoryStorageAgent();
dag.setAttribute(OperatorContext.STORAGE_AGENT, msa);
StreamingContainerManager dnm = new StreamingContainerManager(dag);
PhysicalPlan plan = dnm.getPhysicalPlan();
Assert.assertEquals("number containers", 5, plan.getContainers().size());
List<StreamingContainerAgent> containerAgents = Lists.newArrayList();
for (int i = 0; i < plan.getContainers().size(); i++) {
containerAgents.add(assignContainer(dnm, "container" + (i + 1)));
}
PTContainer c = plan.getOperators(dag.getMeta(node1)).get(0).getContainer();
StreamingContainerAgent sca1 = dnm.getContainerAgent(c.getExternalId());
List<OperatorDeployInfo> c1 = getDeployInfo(sca1);
Assert.assertEquals("number operators assigned to container", 1, c1.size());
Assert.assertTrue(dag.getMeta(node2) + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1)));
List<PTOperator> o2Partitions = plan.getOperators(dag.getMeta(node2));
Assert.assertEquals("number partitions", TestStaticPartitioningSerDe.partitions.length, o2Partitions.size());
for (int i = 0; i < o2Partitions.size(); i++) {
String containerId = o2Partitions.get(i).getContainer().getExternalId();
List<OperatorDeployInfo> cc = getDeployInfo(dnm.getContainerAgent(containerId));
Assert.assertEquals("number operators assigned to container", 1, cc.size());
Assert.assertTrue(dag.getMeta(node2) + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2)));
// n1n2 in, mergeStream out
OperatorDeployInfo ndi = cc.get(0);
Assert.assertEquals("type " + ndi, OperatorDeployInfo.OperatorType.GENERIC, ndi.type);
Assert.assertEquals("inputs " + ndi, 1, ndi.inputs.size());
Assert.assertEquals("outputs " + ndi, 1, ndi.outputs.size());
InputDeployInfo nidi = ndi.inputs.get(0);
Assert.assertEquals("stream " + nidi, n1n2.getName(), nidi.declaredStreamId);
Assert.assertEquals("partition for " + containerId, Sets.newHashSet(node2.partitionKeys[i]), nidi.partitionKeys);
Assert.assertEquals("number stream codecs for " + nidi, 1, nidi.streamCodecs.size());
}
List<OperatorDeployInfo> cUnifier = getDeployInfo(dnm.getContainerAgent(plan.getOperators(dag.getMeta(node3)).get(0).getContainer().getExternalId()));
Assert.assertEquals("number operators " + cUnifier, 2, cUnifier.size());
OperatorDeployInfo mergeNodeDI = getNodeDeployInfo(cUnifier, dag.getMeta(node2).getMeta(node2.outport1).getUnifierMeta());
Assert.assertNotNull("unifier for " + node2, mergeNodeDI);
Assert.assertEquals("type " + mergeNodeDI, OperatorDeployInfo.OperatorType.UNIFIER, mergeNodeDI.type);
Assert.assertEquals("inputs " + mergeNodeDI, 3, mergeNodeDI.inputs.size());
List<Integer> sourceNodeIds = Lists.newArrayList();
for (InputDeployInfo nidi : mergeNodeDI.inputs) {
Assert.assertEquals("streamName " + nidi, n2n3.getName(), nidi.declaredStreamId);
String mergePortName = "<merge#" + dag.getMeta(node2).getMeta(node2.outport1).getPortName() + ">";
Assert.assertEquals("portName " + nidi, mergePortName, nidi.portName);
Assert.assertNotNull("sourceNodeId " + nidi, nidi.sourceNodeId);
Assert.assertNotNull("contextAttributes " + nidi, nidi.contextAttributes);
Assert.assertEquals("contextAttributes ", new Integer(1111), nidi.getValue(PortContext.QUEUE_CAPACITY));
sourceNodeIds.add(nidi.sourceNodeId);
}
for (PTOperator node : dnm.getPhysicalPlan().getOperators(dag.getMeta(node2))) {
Assert.assertTrue(sourceNodeIds + " contains " + node.getId(), sourceNodeIds.contains(node.getId()));
}
Assert.assertEquals("outputs " + mergeNodeDI, 1, mergeNodeDI.outputs.size());
for (OutputDeployInfo odi : mergeNodeDI.outputs) {
Assert.assertNotNull("contextAttributes " + odi, odi.contextAttributes);
Assert.assertEquals("contextAttributes ", new Integer(2222), odi.getValue(PortContext.QUEUE_CAPACITY));
}
try {
Object operator = msa.load(mergeNodeDI.id, Stateless.WINDOW_ID);
Assert.assertTrue("" + operator, operator instanceof DefaultUnifier);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
// node3 container
c = plan.getOperators(dag.getMeta(node3)).get(0).getContainer();
List<OperatorDeployInfo> cmerge = getDeployInfo(dnm.getContainerAgent(c.getExternalId()));
Assert.assertEquals("number operators " + cmerge, 2, cmerge.size());
OperatorDeployInfo node3DI = getNodeDeployInfo(cmerge, dag.getMeta(node3));
Assert.assertNotNull(dag.getMeta(node3) + " assigned", node3DI);
Assert.assertEquals("inputs " + node3DI, 1, node3DI.inputs.size());
InputDeployInfo node3In = node3DI.inputs.get(0);
Assert.assertEquals("streamName " + node3In, n2n3.getName(), node3In.declaredStreamId);
Assert.assertEquals("portName " + node3In, dag.getMeta(node3).getMeta(node3.inport1).getPortName(), node3In.portName);
Assert.assertNotNull("sourceNodeId " + node3DI, node3In.sourceNodeId);
Assert.assertEquals("sourcePortName " + node3DI, mergeNodeDI.outputs.get(0).portName, node3In.sourcePortName);
}
private static void shutdownOperator(StreamingContainerManager scm, PTOperator p1, PTOperator p2)
{
assignContainer(scm, "c1");
assignContainer(scm, "c2");
ContainerHeartbeat c1hb = new ContainerHeartbeat();
c1hb.setContainerStats(new ContainerStats(p1.getContainer().getExternalId()));
scm.processHeartbeat(c1hb);
ContainerHeartbeat c2hb = new ContainerHeartbeat();
c2hb.setContainerStats(new ContainerStats(p2.getContainer().getExternalId()));
scm.processHeartbeat(c2hb);
OperatorHeartbeat o1hb = new OperatorHeartbeat();
c1hb.getContainerStats().addNodeStats(o1hb);
o1hb.setNodeId(p1.getId());
o1hb.setState(DeployState.ACTIVE);
OperatorStats o1stats = new OperatorStats();
o1hb.getOperatorStatsContainer().add(o1stats);
o1stats.checkpoint = new Checkpoint(2, 0, 0);
o1stats.windowId = 3;
scm.processHeartbeat(c1hb);
Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState());
OperatorHeartbeat o2hb = new OperatorHeartbeat();
c2hb.getContainerStats().addNodeStats(o2hb);
o2hb.setNodeId(p2.getId());
o2hb.setState(DeployState.ACTIVE);
OperatorStats o2stats = new OperatorStats();
o2stats.checkpoint = new Checkpoint(2, 0, 0);
o2stats.windowId = 3;
scm.processHeartbeat(c2hb);
Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState());
Assert.assertEquals(PTOperator.State.ACTIVE, p2.getState());
o1hb.setState(DeployState.SHUTDOWN);
o1stats.checkpoint = new Checkpoint(4, 0,0);
o1stats.windowId = 5;
scm.processHeartbeat(c1hb);
Assert.assertEquals(PTOperator.State.INACTIVE, p1.getState());
}
@Test
public void testShutdownOperatorTimeout() throws Exception
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("s1", o1.outport1, o2.inport1);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 50);
dag.setAttribute(OperatorContext.TIMEOUT_WINDOW_COUNT, 1);
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan plan = scm.getPhysicalPlan();
PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0);
PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0);
shutdownOperator(scm, p1, p2);
scm.monitorHeartbeat(false);
Assert.assertTrue(scm.containerStopRequests.isEmpty());
Thread.sleep(100);
scm.monitorHeartbeat(false);
Assert.assertFalse(scm.containerStopRequests.containsKey(p1.getContainer().getExternalId()));
Assert.assertTrue(scm.containerStopRequests.containsKey(p2.getContainer().getExternalId()));
}
@Test
public void testShutdownOperatorRecovery() throws Exception
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("s1", o1.outport1, o2.inport1);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
scm.containerStartRequests.poll();
scm.containerStartRequests.poll();
PhysicalPlan plan = scm.getPhysicalPlan();
PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0);
PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0);
shutdownOperator(scm, p1, p2);
scm.scheduleContainerRestart(p1.getContainer().getExternalId());
ContainerStartRequest dr = scm.containerStartRequests.poll();
Assert.assertTrue(dr.container.getOperators().contains(p1));
}
@Test
public void testRecoveryOrder() throws Exception
{
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
dag.addStream("n1n2", node1.outport1, node2.inport1);
dag.addStream("n2n3", node2.outport1, node3.inport1);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
Assert.assertEquals("" + scm.containerStartRequests, 2, scm.containerStartRequests.size());
scm.containerStartRequests.clear();
PhysicalPlan plan = scm.getPhysicalPlan();
List<PTContainer> containers = plan.getContainers();
Assert.assertEquals("" + containers, 2, plan.getContainers().size());
PTContainer c1 = containers.get(0);
Assert.assertEquals("c1.operators " + c1.getOperators(), 2, c1.getOperators().size());
PTContainer c2 = containers.get(1);
Assert.assertEquals("c2.operators " + c2.getOperators(), 1, c2.getOperators().size());
assignContainer(scm, "container1");
assignContainer(scm, "container2");
StreamingContainerAgent sca1 = scm.getContainerAgent(c1.getExternalId());
StreamingContainerAgent sca2 = scm.getContainerAgent(c2.getExternalId());
Assert.assertEquals("", 0, countState(sca1.container, PTOperator.State.PENDING_UNDEPLOY));
Assert.assertEquals("", 2, countState(sca1.container, PTOperator.State.PENDING_DEPLOY));
scm.scheduleContainerRestart(c1.getExternalId());
Assert.assertEquals("", 0, countState(sca1.container, PTOperator.State.PENDING_UNDEPLOY));
Assert.assertEquals("", 2, countState(sca1.container, PTOperator.State.PENDING_DEPLOY));
Assert.assertEquals("" + scm.containerStartRequests, 1, scm.containerStartRequests.size());
ContainerStartRequest dr = scm.containerStartRequests.peek();
Assert.assertNotNull(dr);
Assert.assertEquals("" + sca2.container, 1, countState(sca2.container, PTOperator.State.PENDING_UNDEPLOY));
Assert.assertEquals("" + sca2.container, 0, countState(sca2.container, PTOperator.State.PENDING_DEPLOY));
}
@Test
public void testRecoveryUpstreamInline() throws Exception
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.addStream("o1o3", o1.outport1, o3.inport1);
dag.addStream("o2o3", o2.outport1, o3.inport2);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan plan = scm.getPhysicalPlan();
Assert.assertEquals(2, plan.getContainers().size());
plan.getOperators(dag.getMeta(o1)).get(0);
Assert.assertEquals(2, plan.getContainers().size());
PTContainer c1 = plan.getContainers().get(0);
Assert.assertEquals(Sets.newHashSet(plan.getOperators(dag.getMeta(o1)).get(0), plan.getOperators(dag.getMeta(o3)).get(0)), Sets.newHashSet(c1.getOperators()));
PTContainer c2 = plan.getContainers().get(1);
assignContainer(scm, "c1");
assignContainer(scm, "c2");
for (PTOperator oper : c1.getOperators()) {
Assert.assertEquals("state " + oper, PTOperator.State.PENDING_DEPLOY, oper.getState());
}
scm.scheduleContainerRestart(c2.getExternalId());
for (PTOperator oper : c1.getOperators()) {
Assert.assertEquals("state " + oper, PTOperator.State.PENDING_UNDEPLOY, oper.getState());
}
}
@Test
public void testCheckpointWindowIds() throws Exception
{
FSStorageAgent sa = new FSStorageAgent(testMeta.getPath(), null);
long[] windowIds = new long[]{123L, 345L, 234L};
for (long windowId : windowIds) {
sa.save(windowId, 1, windowId);
}
Arrays.sort(windowIds);
long[] windowsIds = sa.getWindowIds(1);
Arrays.sort(windowsIds);
Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
}
@Test
public void testAsyncCheckpointWindowIds() throws Exception
{
AsyncFSStorageAgent sa = new AsyncFSStorageAgent(testMeta.getPath(), null);
long[] windowIds = new long[]{123L, 345L, 234L};
for (long windowId : windowIds) {
sa.save(windowId, 1, windowId);
sa.copyToHDFS(1, windowId);
}
Arrays.sort(windowIds);
long[] windowsIds = sa.getWindowIds(1);
Arrays.sort(windowsIds);
Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
}
@Test
public void testProcessHeartbeat() throws Exception
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan plan = scm.getPhysicalPlan();
Assert.assertEquals("number required containers", 1, plan.getContainers().size());
PTOperator o1p1 = plan.getOperators(dag.getMeta(o1)).get(0);
// assign container
String containerId = "container1";
StreamingContainerAgent sca = scm.assignContainer(new ContainerResource(0, containerId, "localhost", 512, 0, null), InetSocketAddress.createUnresolved("localhost", 0));
Assert.assertNotNull(sca);
Assert.assertEquals(PTContainer.State.ALLOCATED, o1p1.getContainer().getState());
Assert.assertEquals(PTOperator.State.PENDING_DEPLOY, o1p1.getState());
ContainerStats cstats = new ContainerStats(containerId);
ContainerHeartbeat hb = new ContainerHeartbeat();
hb.setContainerStats(cstats);
ContainerHeartbeatResponse chr = scm.processHeartbeat(hb); // get deploy request
Assert.assertNotNull(chr.deployRequest);
Assert.assertEquals("" + chr.deployRequest, 1, chr.deployRequest.size());
Assert.assertEquals(PTContainer.State.ACTIVE, o1p1.getContainer().getState());
Assert.assertEquals("state " + o1p1, PTOperator.State.PENDING_DEPLOY, o1p1.getState());
// first operator heartbeat
OperatorHeartbeat ohb = new OperatorHeartbeat();
ohb.setNodeId(o1p1.getId());
ohb.setState(OperatorHeartbeat.DeployState.ACTIVE);
OperatorStats stats = new OperatorStats();
stats.checkpoint = new Checkpoint(2, 0, 0);
stats.windowId = 3;
stats.outputPorts = Lists.newArrayList();
PortStats ps = new PortStats(TestGeneratorInputOperator.OUTPUT_PORT);
ps.bufferServerBytes = 101;
ps.tupleCount = 1;
stats.outputPorts.add(ps);
ohb.windowStats = Lists.newArrayList(stats);
cstats.operators.add(ohb);
scm.processHeartbeat(hb); // activate operator
Assert.assertEquals(PTContainer.State.ACTIVE, o1p1.getContainer().getState());
Assert.assertEquals("state " + o1p1, PTOperator.State.ACTIVE, o1p1.getState());
Assert.assertEquals("tuples " + o1p1, 1, o1p1.stats.totalTuplesEmitted.get());
Assert.assertEquals("tuples " + o1p1, 0, o1p1.stats.totalTuplesProcessed.get());
Assert.assertEquals("window " + o1p1, 3, o1p1.stats.currentWindowId.get());
Assert.assertEquals("port stats", 1, o1p1.stats.outputPortStatusList.size());
PortStatus o1p1ps = o1p1.stats.outputPortStatusList.get(TestGeneratorInputOperator.OUTPUT_PORT);
Assert.assertNotNull("port stats", o1p1ps);
Assert.assertEquals("port stats", 1, o1p1ps.totalTuples);
// second operator heartbeat
stats = new OperatorStats();
stats.checkpoint = new Checkpoint(2, 0, 0);
stats.windowId = 4;
stats.outputPorts = Lists.newArrayList();
ps = new PortStats(TestGeneratorInputOperator.OUTPUT_PORT);
ps.bufferServerBytes = 1;
ps.tupleCount = 1;
stats.outputPorts.add(ps);
ohb.windowStats = Lists.newArrayList(stats);
cstats.operators.clear();
cstats.operators.add(ohb);
scm.processHeartbeat(hb);
Assert.assertEquals("tuples " + o1p1, 2, o1p1.stats.totalTuplesEmitted.get());
Assert.assertEquals("window " + o1p1, 4, o1p1.stats.currentWindowId.get());
Assert.assertEquals("statsQueue " + o1p1, 2, o1p1.stats.listenerStats.size());
scm.processEvents();
Assert.assertEquals("statsQueue " + o1p1, 0, o1p1.stats.listenerStats.size());
Assert.assertEquals("lastStats " + o1p1, 2, o1p1.stats.lastWindowedStats.size());
}
public static class TestStaticPartitioningSerDe extends DefaultStatefulStreamCodec<Object>
{
public static final int[] partitions = new int[]{0, 1, 2};
@Override
public int getPartition(Object o)
{
if (o instanceof Tuple) {
throw new UnsupportedOperationException("should not be called with control tuple");
}
return partitions[0];
}
}
private int countState(PTContainer c, PTOperator.State state)
{
int count = 0;
for (PTOperator o : c.getOperators()) {
if (o.getState() == state) {
count++;
}
}
return count;
}
private boolean containsNodeContext(List<OperatorDeployInfo> di, OperatorMeta nodeConf)
{
return getNodeDeployInfo(di, nodeConf) != null;
}
public static List<OperatorDeployInfo> getDeployInfo(StreamingContainerAgent sca)
{
return sca.getDeployInfoList(sca.container.getOperators());
}
private static OperatorDeployInfo getNodeDeployInfo(List<OperatorDeployInfo> di, OperatorMeta nodeConf)
{
for (OperatorDeployInfo ndi : di) {
if (nodeConf.getName().equals(ndi.name)) {
return ndi;
}
}
return null;
}
private static InputDeployInfo getInputDeployInfo(OperatorDeployInfo ndi, String streamId)
{
for (InputDeployInfo in : ndi.inputs) {
if (streamId.equals(in.declaredStreamId)) {
return in;
}
}
return null;
}
public static StreamingContainerAgent assignContainer(StreamingContainerManager scm, String containerId)
{
return scm.assignContainer(new ContainerResource(0, containerId, "localhost", 1024, 0, null), InetSocketAddress.createUnresolved(containerId + "Host", 0));
}
@Test
public void testValidGenericOperatorDeployInfoType()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
TestGeneratorInputOperator.ValidGenericOperator o2 = dag.addOperator("o2", TestGeneratorInputOperator.ValidGenericOperator.class);
dag.addStream("stream1", o1.outport1, o2.input);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan physicalPlan = scm.getPhysicalPlan();
List<PTContainer> containers = physicalPlan.getContainers();
for (int i = 0; i < containers.size(); ++i) {
assignContainer(scm, "container" + (i + 1));
}
OperatorMeta o2Meta = dag.getMeta(o2);
PTOperator o2Physical = physicalPlan.getOperators(o2Meta).get(0);
String containerId = o2Physical.getContainer().getExternalId();
OperatorDeployInfo o1DeployInfo = getDeployInfo(scm.getContainerAgent(containerId)).get(0);
Assert.assertEquals("type " + o1DeployInfo, OperatorDeployInfo.OperatorType.GENERIC, o1DeployInfo.type);
}
@Test
public void testValidInputOperatorDeployInfoType()
{
TestGeneratorInputOperator.ValidInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.ValidInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("stream1", o1.outport, o2.inport1);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan physicalPlan = scm.getPhysicalPlan();
List<PTContainer> containers = physicalPlan.getContainers();
for (int i = 0; i < containers.size(); ++i) {
assignContainer(scm, "container" + (i + 1));
}
OperatorMeta o1Meta = dag.getMeta(o1);
PTOperator o1Physical = physicalPlan.getOperators(o1Meta).get(0);
String containerId = o1Physical.getContainer().getExternalId();
OperatorDeployInfo o1DeployInfo = getDeployInfo(scm.getContainerAgent(containerId)).get(0);
Assert.assertEquals("type " + o1DeployInfo, OperatorDeployInfo.OperatorType.INPUT, o1DeployInfo.type);
}
@Test
public void testOperatorShutdown()
{
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.addStream("stream1", o1.outport1, o2.inport1);
dag.addStream("stream2", o2.outport1, o3.inport1);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan physicalPlan = scm.getPhysicalPlan();
Map<PTContainer, MockContainer> mockContainers = new HashMap<>();
for (PTContainer c : physicalPlan.getContainers()) {
MockContainer mc = new MockContainer(scm, c);
mockContainers.put(c, mc);
}
// deploy all containers
for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
ce.getValue().deploy();
}
for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
// skip buffer server purge in monitorHeartbeat
ce.getKey().bufferServerAddress = null;
}
List<PTOperator> o1p = physicalPlan.getOperators(dag.getMeta(o1));
Assert.assertEquals("o1 partitions", 1, o1p.size());
PTOperator o1p1 = o1p.get(0);
MockContainer mc1 = mockContainers.get(o1p1.getContainer());
MockOperatorStats o1p1mos = mc1.stats(o1p1.getId());
o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc1.sendHeartbeat();
PTOperator o2p1 = physicalPlan.getOperators(dag.getMeta(o2)).get(0);
MockContainer mc2 = mockContainers.get(o2p1.getContainer());
MockOperatorStats o2p1mos = mc2.stats(o2p1.getId());
o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc2.sendHeartbeat();
Assert.assertEquals("o2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1);
MockContainer mc3 = mockContainers.get(o2p2.getContainer());
MockOperatorStats o2p2mos = mc3.stats(o2p2.getId());
o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc3.sendHeartbeat();
Assert.assertEquals("o3 partitions", 1, physicalPlan.getOperators(dag.getMeta(o3)).size());
PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
MockContainer mc4 = mockContainers.get(o3p1.getContainer());
MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
MockOperatorStats unifierp1mos = mc4.stats(o3p1.upstreamMerge.values().iterator().next().getId());
unifierp1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
o3p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc4.sendHeartbeat();
o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
mc1.sendHeartbeat();
o1p = physicalPlan.getOperators(dag.getMeta(o1));
Assert.assertEquals("o1 partitions", 1, o1p.size());
Assert.assertEquals("o1p1 present", o1p1, o1p.get(0));
Assert.assertEquals("input operator state", PTOperator.State.INACTIVE, o1p1.getState());
scm.monitorHeartbeat(false);
Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
scm.monitorHeartbeat(false); // committedWindowId updated in next cycle
Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
scm.processEvents();
Assert.assertEquals("containers at committedWindowId=1", 4, physicalPlan.getContainers().size());
// checkpoint window 2
o1p1mos.checkpointWindowId(2);
mc1.sendHeartbeat();
scm.monitorHeartbeat(false);
Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
o2p1mos.currentWindowId(2).checkpointWindowId(2);
o2p2mos.currentWindowId(2).checkpointWindowId(2);
o3p1mos.currentWindowId(2).checkpointWindowId(2);
unifierp1mos.currentWindowId(2).checkpointWindowId(2);
mc2.sendHeartbeat();
mc3.sendHeartbeat();
mc4.sendHeartbeat();
scm.monitorHeartbeat(false);
// Operators are shutdown when both operators reach window Id 2
Assert.assertEquals(0, o1p1.getContainer().getOperators().size());
Assert.assertEquals(0, o2p1.getContainer().getOperators().size());
Assert.assertEquals(0, physicalPlan.getContainers().size());
}
private void testDownStreamPartition(Locality locality) throws Exception
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.addStream("o1Output1", o1.outport, o2.inport1).setLocality(locality);
int maxContainers = 5;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.validate();
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 1, plan.getContainers().size());
PTContainer container1 = plan.getContainers().get(0);
Assert.assertEquals("number operators " + container1, 3, container1.getOperators().size());
StramLocalCluster slc = new StramLocalCluster(dag);
slc.run(5000);
}
@Test
public void testOIODownstreamPartition() throws Exception
{
testDownStreamPartition(Locality.THREAD_LOCAL);
}
@Test
public void testContainerLocalDownstreamPartition() throws Exception
{
testDownStreamPartition(Locality.CONTAINER_LOCAL);
}
@Test
public void testPhysicalPropertyUpdate() throws Exception
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.runAsync();
StreamingContainerManager dnmgr = lc.dnmgr;
Map<Integer,PTOperator> operatorMap = dnmgr.getPhysicalPlan().getAllOperators();
for (PTOperator p: operatorMap.values()) {
StramTestSupport.waitForActivation(lc, p);
}
dnmgr.setPhysicalOperatorProperty(lc.getPlanOperators(dag.getMeta(o1)).get(0).getId(),"maxTuples","2");
Future<?> future = dnmgr.getPhysicalOperatorProperty(lc.getPlanOperators(dag.getMeta(o1)).get(0).getId(), "maxTuples", 10000);
Object object = future.get(10000, TimeUnit.MILLISECONDS);
Assert.assertNotNull(object);
@SuppressWarnings("unchecked")
Map<String, Object> propertyValue = (Map<String, Object>)object;
Assert.assertEquals(2,propertyValue.get("maxTuples"));
lc.shutdown();
}
private void setupAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass,
Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass)
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
TestAppDataQueryOperator q = dag.addOperator("q", qClass);
TestAppDataResultOperator r = dag.addOperator("r", rClass);
TestAppDataSourceOperator ds = dag.addOperator("ds", dsClass);
q.setAppDataUrl("ws://123.123.123.123:9090/pubsub");
q.setTopic("xyz.query");
r.setAppDataUrl("ws://123.123.123.124:9090/pubsub");
r.setTopic("xyz.result");
dag.addStream("o1-to-ds", o1.outport, ds.inport1);
dag.addStream("q-to-ds", q.outport, ds.query);
dag.addStream("ds-to-r", ds.result, r.inport);
}
private void testAppDataSources(boolean appendQIDToTopic) throws Exception
{
StramLocalCluster lc = new StramLocalCluster(dag);
StreamingContainerManager dnmgr = lc.dnmgr;
List<AppDataSource> appDataSources = dnmgr.getAppDataSources();
Assert.assertEquals("There should be exactly one data source", 1, appDataSources.size());
AppDataSource ads = appDataSources.get(0);
Assert.assertEquals("Data Source name verification", "ds.result", ads.getName());
AppDataSource.QueryInfo query = ads.getQuery();
Assert.assertEquals("Query operator name verification", "q", query.operatorName);
Assert.assertEquals("Query topic verification", "xyz.query", query.topic);
Assert.assertEquals("Query URL verification", "ws://123.123.123.123:9090/pubsub", query.url);
AppDataSource.ResultInfo result = ads.getResult();
Assert.assertEquals("Result operator name verification", "r", result.operatorName);
Assert.assertEquals("Result topic verification", "xyz.result", result.topic);
Assert.assertEquals("Result URL verification", "ws://123.123.123.124:9090/pubsub", result.url);
Assert.assertEquals("Result QID append verification", appendQIDToTopic, result.appendQIDToTopic);
}
@Test
public void testGetAppDataSources1() throws Exception
{
setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class);
testAppDataSources(true);
}
@Test
public void testGetAppDataSources2() throws Exception
{
setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class);
testAppDataSources(false);
}
@Test
public void testGetAppDataSources3() throws Exception
{
setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class);
testAppDataSources(false);
}
@Test
public void testAppDataPush() throws Exception
{
if (StramTestSupport.isInTravis()) {
// disable this test in travis because of an intermittent problem similar to this:
// http://stackoverflow.com/questions/32172925/travis-ci-sporadic-timeouts-to-localhost
// We should remove this when we find a solution to this.
LOG.info("Test testAppDataPush is disabled in Travis");
return;
}
final String topic = "xyz";
final List<String> messages = new ArrayList<>();
EmbeddedWebSocketServer server = new EmbeddedWebSocketServer(0);
server.setWebSocket(new WebSocket.OnTextMessage()
{
@Override
public void onMessage(String data)
{
messages.add(data);
}
@Override
public void onOpen(WebSocket.Connection connection)
{
}
@Override
public void onClose(int closeCode, String message)
{
}
});
try {
server.start();
int port = server.getPort();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new AutoMetricBuiltInTransport(topic));
dag.setAttribute(LogicalPlan.GATEWAY_CONNECT_ADDRESS, "localhost:" + port);
dag.setAttribute(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS, 2000);
StramLocalCluster lc = new StramLocalCluster(dag);
StreamingContainerManager dnmgr = lc.dnmgr;
StramAppContext appContext = new StramTestSupport.TestAppContext(dag.getAttributes());
AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
pushAgent.init();
pushAgent.pushData();
Thread.sleep(1000);
Assert.assertTrue(messages.size() > 0);
pushAgent.close();
JSONObject message = new JSONObject(messages.get(0));
Assert.assertEquals(topic, message.getString("topic"));
Assert.assertEquals("publish", message.getString("type"));
JSONObject data = message.getJSONObject("data");
Assert.assertTrue(StringUtils.isNotBlank(data.getString("appId")));
Assert.assertTrue(StringUtils.isNotBlank(data.getString("appUser")));
Assert.assertTrue(StringUtils.isNotBlank(data.getString("appName")));
JSONObject logicalOperators = data.getJSONObject("logicalOperators");
for (String opName : new String[]{"o1", "o2"}) {
JSONObject opObj = logicalOperators.getJSONObject(opName);
Assert.assertTrue(opObj.has("totalTuplesProcessed"));
Assert.assertTrue(opObj.has("totalTuplesEmitted"));
Assert.assertTrue(opObj.has("tuplesProcessedPSMA"));
Assert.assertTrue(opObj.has("tuplesEmittedPSMA"));
Assert.assertTrue(opObj.has("latencyMA"));
}
} finally {
server.stop();
}
}
public static class TestMetricTransport implements AutoMetric.Transport, Serializable
{
private String prefix;
private static List<String> messages = new ArrayList<>();
public TestMetricTransport(String prefix)
{
this.prefix = prefix;
}
@Override
public void push(String jsonData) throws IOException
{
messages.add(prefix + ":" + jsonData);
}
@Override
public long getSchemaResendInterval()
{
return 0;
}
}
@Test
public void testCustomMetricsTransport() throws Exception
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport", o1.outport, o2.inport1);
dag.setAttribute(LogicalPlan.METRICS_TRANSPORT, new TestMetricTransport("xyz"));
StramLocalCluster lc = new StramLocalCluster(dag);
StreamingContainerManager dnmgr = lc.dnmgr;
StramAppContext appContext = new StramTestSupport.TestAppContext(dag.getAttributes());
AppDataPushAgent pushAgent = new AppDataPushAgent(dnmgr, appContext);
pushAgent.init();
pushAgent.pushData();
Assert.assertTrue(TestMetricTransport.messages.size() > 0);
pushAgent.close();
String msg = TestMetricTransport.messages.get(0);
Assert.assertTrue(msg.startsWith("xyz:"));
}
public static class HighLatencyTestOperator extends GenericTestOperator
{
private long latency;
@Override
public void endWindow()
{
try {
Thread.sleep(latency);
} catch (InterruptedException ex) {
// move on
}
}
public void setLatency(long latency)
{
this.latency = latency;
}
}
@Test
public void testLatency() throws Exception
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
HighLatencyTestOperator o3 = dag.addOperator("o3", HighLatencyTestOperator.class);
GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
long latency = 5000; // 5 seconds
o3.setLatency(latency);
dag.addStream("o1.outport", o1.outport, o2.inport1, o3.inport1);
dag.addStream("o2.outport1", o2.outport1, o4.inport1);
dag.addStream("o3.outport1", o3.outport1, o4.inport2);
dag.setAttribute(Context.DAGContext.STATS_MAX_ALLOWABLE_WINDOWS_LAG, 2); // 1 second
StramLocalCluster lc = new StramLocalCluster(dag);
StreamingContainerManager dnmgr = lc.dnmgr;
lc.runAsync();
Thread.sleep(10000);
LogicalOperatorInfo o1Info = dnmgr.getLogicalOperatorInfo("o1");
LogicalOperatorInfo o2Info = dnmgr.getLogicalOperatorInfo("o2");
LogicalOperatorInfo o3Info = dnmgr.getLogicalOperatorInfo("o3");
LogicalOperatorInfo o4Info = dnmgr.getLogicalOperatorInfo("o4");
Assert.assertEquals("Input operator latency must be zero", 0, o1Info.latencyMA);
Assert.assertTrue("Latency must be greater than or equal to zero", o2Info.latencyMA >= 0);
Assert.assertTrue("Actual latency must be greater than the artificially introduced latency",
o3Info.latencyMA > latency);
Assert.assertTrue("Latency must be greater than or equal to zero", o4Info.latencyMA >= 0);
StreamingContainerManager.CriticalPathInfo criticalPathInfo = dnmgr.getCriticalPathInfo();
Assert.assertArrayEquals("Critical Path must be the path in the DAG that includes the HighLatencyTestOperator",
new Integer[]{o1Info.partitions.iterator().next(), o3Info.partitions.iterator().next(), o4Info.partitions.iterator().next()},
criticalPathInfo.path.toArray());
Assert.assertTrue("Whole DAG latency must be greater than the artificially introduced latency",
criticalPathInfo.latency > latency);
lc.shutdown();
}
private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManagerTest.class);
}