blob: ef44767d109e1cd358cae923af478580e4f26af3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.FutureTask;
import javax.validation.ValidationException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.collect.Sets;
import com.datatorrent.api.AffinityRule;
import com.datatorrent.api.AffinityRule.Type;
import com.datatorrent.api.AffinityRulesSet;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PhysicalPlanTest;
import com.datatorrent.stram.plan.physical.PlanModifier;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
public class LogicalPlanModificationTest
{
private LogicalPlan dag;
@Rule
public TestMeta testMeta = new TestMeta();
@Before
public void setup()
{
dag = StramTestSupport.createDAG(testMeta);
}
@Test
public void testAddOperator()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o2.outport1", o2.outport1, o3.inport1);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ctx.deploy.clear();
ctx.undeploy.clear();
Assert.assertEquals("containers", 3, plan.getContainers().size());
PlanModifier pm = new PlanModifier(plan);
GenericTestOperator added1 = new GenericTestOperator();
pm.addOperator("added1", added1);
pm.addStream("added1.outport1", added1.outport1, o3.inport2);
Assert.assertEquals("undeploy " + ctx.undeploy, 0, ctx.undeploy.size());
Assert.assertEquals("deploy " + ctx.deploy, 0, ctx.deploy.size());
pm.applyChanges(ctx);
Assert.assertEquals("containers post change", 4, plan.getContainers().size());
Assert.assertEquals("undeploy " + ctx.undeploy, 1, ctx.undeploy.size());
Assert.assertEquals("deploy " + ctx.deploy, 2, ctx.deploy.size());
}
@Test
public void testAddOperatorWithAffinityRules()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o2.outport1", o2.outport1, o3.inport1);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ctx.deploy.clear();
ctx.undeploy.clear();
Assert.assertEquals("containers", 3, plan.getContainers().size());
AffinityRulesSet ruleSet = new AffinityRulesSet();
List<AffinityRule> rules = new ArrayList<>();
ruleSet.setAffinityRules(rules);
rules.add(new AffinityRule(Type.AFFINITY, Locality.CONTAINER_LOCAL, false, "o1", "added1"));
rules.add(new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, false, "o3", "added1"));
dag.setAttribute(DAGContext.AFFINITY_RULES_SET, ruleSet);
PlanModifier pm = new PlanModifier(plan);
GenericTestOperator added1 = new GenericTestOperator();
pm.addOperator("added1", added1);
pm.addStream("added1.outport1", added1.outport1, o3.inport2);
Assert.assertEquals("undeploy " + ctx.undeploy, 0, ctx.undeploy.size());
Assert.assertEquals("deploy " + ctx.deploy, 0, ctx.deploy.size());
pm.applyChanges(ctx);
Assert.assertEquals("containers post change", 4, plan.getContainers().size());
Assert.assertEquals("undeploy " + ctx.undeploy, 1, ctx.undeploy.size());
Assert.assertEquals("deploy " + ctx.deploy, 2, ctx.deploy.size());
// Validate affinity rules are applied
for (PTContainer c : plan.getContainers()) {
if (c.getOperators().contains("added1")) {
Assert.assertEquals("Operators O1 and added1 should be in the same container as per affinity rule", 2, c.getOperators().size());
Assert.assertEquals("Operators O1 and added1 should be in the same container as per affinity rule", "o1", c.getOperators().get(0).getOperatorMeta().getName());
Assert.assertEquals("Operators O1 and added1 should be in the same container as per affinity rule", "added1", c.getOperators().get(1).getOperatorMeta().getName());
Set<PTContainer> antiAffinityList = c.getStrictAntiPrefs();
Assert.assertEquals("There should be one container in antiaffinity list", 1, antiAffinityList.size());
List<PTOperator> antiAffinityOperators = antiAffinityList.iterator().next().getOperators();
Assert.assertEquals("AntiAffinity operators should containn operator O3", antiAffinityOperators.iterator().next().getOperatorMeta().getName(), "o3");
}
}
}
@Test
public void testSetOperatorProperty()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
OperatorMeta o1Meta = dag.getMeta(o1);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ctx.deploy.clear();
ctx.undeploy.clear();
PlanModifier pm = new PlanModifier(plan);
try {
pm.setOperatorProperty(o1Meta.getName(), "myStringProperty", "propertyValue");
Assert.fail("validation error exepected");
} catch (javax.validation.ValidationException e) {
Assert.assertTrue(e.getMessage().contains(o1Meta.toString()));
}
GenericTestOperator newOperator = new GenericTestOperator();
pm.addOperator("newOperator", newOperator);
pm.setOperatorProperty("newOperator", "myStringProperty", "propertyValue");
Assert.assertEquals("", "propertyValue", newOperator.getMyStringProperty());
}
@Test
public void testRemoveOperator()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o12 = dag.addOperator("o12", GenericTestOperator.class);
OperatorMeta o12Meta = dag.getMeta(o12);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
OperatorMeta o2Meta = dag.getMeta(o2);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
OperatorMeta o3Meta = dag.getMeta(o3);
LogicalPlan.StreamMeta s1 = dag.addStream("o1.outport1", o1.outport1, o2.inport1, o12.inport1);
LogicalPlan.StreamMeta s2 = dag.addStream("o2.outport1", o2.outport1, o3.inport1);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ctx.deploy.clear();
ctx.undeploy.clear();
Assert.assertEquals("containers " + plan.getContainers(), 4, plan.getContainers().size());
Assert.assertEquals("physical operators " + plan.getAllOperators(), 4, plan.getAllOperators().size());
Assert.assertEquals("sinks s1 " + s1.getSinks(), 2, s1.getSinks().size());
List<PTOperator> o2PhysicalOpers = plan.getOperators(o2Meta);
Assert.assertEquals("instances " + o2Meta, 1, o2PhysicalOpers.size());
PlanModifier pm = new PlanModifier(plan);
try {
pm.removeOperator(o2Meta.getName());
Assert.fail("validation error (connected output stream) expected");
} catch (ValidationException ve) {
// all good
}
// remove output stream required before removing operator
pm.removeStream(s2.getName());
pm.removeOperator(o2Meta.getName());
pm.applyChanges(ctx);
Assert.assertEquals("sinks s1 " + s1.getSinks(), 1, s1.getSinks().size());
Assert.assertTrue("undeploy " + ctx.undeploy, ctx.undeploy.containsAll(o2PhysicalOpers));
Assert.assertTrue("deploy " + ctx.deploy, !ctx.deploy.containsAll(o2PhysicalOpers));
Assert.assertEquals("streams " + dag.getAllStreams(), 1, dag.getAllStreams().size());
Assert.assertEquals("operators " + dag.getAllOperators(), 3, dag.getAllOperators().size());
Assert.assertTrue("operators " + dag.getAllOperators(), dag.getAllOperators().containsAll(Sets.newHashSet(o1Meta, o3Meta)));
try {
plan.getOperators(o2Meta);
Assert.fail("removed from physical plan: " + o2Meta);
} catch (Exception e) {
// all good
}
Assert.assertEquals("containers " + plan.getContainers(), 3, plan.getContainers().size());
Assert.assertEquals("physical operators " + plan.getAllOperators(), 3, plan.getAllOperators().size());
Assert.assertEquals("removed containers " + ctx.releaseContainers, 1, ctx.releaseContainers.size());
try {
pm.removeOperator(o12Meta.getName());
Assert.fail("cannot remove operator prior to removing input stream");
} catch (ValidationException ve) {
Assert.assertTrue("" + ve.getMessage(), ve.getMessage().matches(".*Operator o12 connected to input streams.*"));
}
pm.removeStream(s1.getName());
pm.removeOperator(o12Meta.getName());
}
@Test
public void testRemoveOperator2()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
OperatorMeta o2Meta = dag.getMeta(o2);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
OperatorMeta o3Meta = dag.getMeta(o3);
LogicalPlan.StreamMeta s1 = dag.addStream("o1.outport1", o1.outport1, o2.inport1, o3.inport1).setLocality(Locality.CONTAINER_LOCAL);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ctx.deploy.clear();
ctx.undeploy.clear();
Assert.assertEquals("containers " + plan.getContainers(), 1, plan.getContainers().size());
Assert.assertEquals("physical operators " + plan.getAllOperators(), 3, plan.getAllOperators().size());
Assert.assertEquals("sinks s1 " + s1.getSinks(), 2, s1.getSinks().size());
List<PTOperator> o2PhysicalOpers = plan.getOperators(o2Meta);
Assert.assertEquals("instances " + o2Meta, 1, o2PhysicalOpers.size());
PlanModifier pm = new PlanModifier(plan);
pm.removeOperator(o2Meta.getName()); // remove operator w/o removing the stream
pm.applyChanges(ctx);
Assert.assertEquals("sinks s1 " + s1.getSinks(), 1, s1.getSinks().size());
Assert.assertTrue("undeploy " + ctx.undeploy, ctx.undeploy.containsAll(o2PhysicalOpers));
Set<PTOperator> expDeploy = Sets.newHashSet();
// TODO: container local operators should be included in undeploy/deploy
//expDeploy.addAll(plan.getOperators(o1Meta));
//expDeploy.addAll(plan.getOperators(o3Meta));
Assert.assertEquals("deploy " + ctx.deploy, ctx.deploy, expDeploy);
Assert.assertEquals("streams " + dag.getAllStreams(), 1, dag.getAllStreams().size());
Assert.assertEquals("operators " + dag.getAllOperators(), 2, dag.getAllOperators().size());
Assert.assertTrue("operators " + dag.getAllOperators(), dag.getAllOperators().containsAll(Sets.newHashSet(o1Meta, o3Meta)));
}
@Test
public void testRemoveStream()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
PlanModifier pm = new PlanModifier(plan);
pm.removeStream("o1.outport1");
pm.applyChanges(ctx);
Assert.assertEquals("undeploy " + ctx.undeploy, 2, ctx.undeploy.size());
Assert.assertEquals("deploy " + ctx.deploy, 2, ctx.deploy.size());
}
@Test
public void testAddStream()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
List<PTOperator> o1Instances = plan.getOperators(dag.getMeta(o1));
Assert.assertEquals("o1Instances " + o1Instances, 1, o1Instances.size());
PTOperator o1p1 = o1Instances.get(0);
OperatorMeta om2 = dag.getMeta(o2);
List<PTOperator> o2Instances = plan.getOperators(om2);
Assert.assertEquals("o2Instances " + o2Instances, 1, o2Instances.size());
PTOperator o2p1 = o2Instances.get(0);
Assert.assertEquals("outputs " + o1p1, 0, o1p1.getOutputs().size());
Assert.assertEquals("inputs " + o2p1, 0, o2p1.getInputs().size());
PlanModifier pm = new PlanModifier(plan);
pm.addStream("o1.outport1", o1.outport1, o2.inport1);
pm.addStream("o1.outport1", o1.outport1, o2.inport2);
pm.applyChanges(ctx);
Assert.assertEquals("undeploy " + ctx.undeploy, 2, ctx.undeploy.size());
Assert.assertEquals("deploy " + ctx.deploy, 2, ctx.deploy.size());
Assert.assertEquals("outputs " + o1p1, 1, o1p1.getOutputs().size());
Assert.assertEquals("inputs " + o2p1, 2, o2p1.getInputs().size());
Set<String> portNames = Sets.newHashSet();
for (PTOperator.PTInput in : o2p1.getInputs()) {
portNames.add(in.portName);
}
Set<String> expPortNames = Sets.newHashSet(GenericTestOperator.IPORT1, GenericTestOperator.IPORT2);
Assert.assertEquals("input port names " + o2p1.getInputs(), expPortNames, portNames);
}
private void testExecutionManager(StorageAgent agent) throws Exception
{
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
StreamingContainerManager dnm = new StreamingContainerManager(dag);
Assert.assertEquals("" + dnm.containerStartRequests, dnm.containerStartRequests.size(), 0);
CreateOperatorRequest cor = new CreateOperatorRequest();
cor.setOperatorFQCN(TestGeneratorInputOperator.class.getName());
cor.setOperatorName("o1");
FutureTask<?> lpmf = dnm.logicalPlanModification(Collections.<LogicalPlanRequest>singletonList(cor));
while (!lpmf.isDone()) {
dnm.processEvents();
}
lpmf.get();
Assert.assertEquals("" + dnm.containerStartRequests, 1, dnm.containerStartRequests.size());
PTContainer c = dnm.containerStartRequests.poll().container;
Assert.assertEquals("operators " + c, 1, c.getOperators().size());
int deployStatusCnt = 0;
for (PTOperator oper : c.getOperators()) {
if (oper.getState() == PTOperator.State.PENDING_DEPLOY) {
deployStatusCnt++;
}
}
Assert.assertEquals("deploy requests " + c, 1, deployStatusCnt);
PTOperator oper = c.getOperators().get(0);
Assert.assertEquals("operator name", "o1", oper.getOperatorMeta().getName());
Assert.assertEquals("operator class", TestGeneratorInputOperator.class, oper.getOperatorMeta().getOperator().getClass());
}
@Test
public void testExecutionManagerWithSyncStorageAgent() throws Exception
{
testExecutionManager(new FSStorageAgent(testMeta.getPath(), null));
}
@Test
public void testExecutionManagerWithAsyncStorageAgent() throws Exception
{
testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null));
}
@Test
public void testNewOperatorRecoveryWindowIds()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
ctx.deploy.clear();
ctx.undeploy.clear();
LogicalPlan.OperatorMeta o1Meta = dag.getMeta(o1);
List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
PhysicalPlanTest.setActivationCheckpoint(o1Partitions.get(0), 10);
PlanModifier pm = new PlanModifier(plan);
GenericTestOperator o2 = new GenericTestOperator();
GenericTestOperator o3 = new GenericTestOperator();
pm.addOperator("o2", o2);
pm.addOperator("o3", o3);
pm.addStream("s1", o1.outport1, o2.inport2);
pm.addStream("s2", o2.outport1, o3.inport1);
pm.applyChanges(ctx);
LogicalPlan.OperatorMeta o2Meta = plan.getLogicalPlan().getMeta(o2);
List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
Assert.assertEquals("o2 activation checkpoint " + o2Meta, 10, o2Partitions.get(0).getRecoveryCheckpoint().windowId);
LogicalPlan.OperatorMeta o3Meta = plan.getLogicalPlan().getMeta(o3);
List<PTOperator> o3Partitions = plan.getOperators(o3Meta);
Assert.assertEquals("o3 activation checkpoint " + o2Meta, 10, o3Partitions.get(0).getRecoveryCheckpoint().windowId);
}
}