blob: 941f5a4b17d1174cded9fa41024eb4ca5daffc21 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.plan.physical;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.Min;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.mutable.MutableInt;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.api.Partitioner.PartitionKeys;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.PartitioningTest;
import com.datatorrent.stram.PartitioningTest.TestInputOperator;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTOperator.PTInput;
import com.datatorrent.stram.plan.physical.PTOperator.PTOutput;
import com.datatorrent.stram.plan.physical.PhysicalPlan.LoadIndicator;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
public class PhysicalPlanTest
{
/**
* Stats listener for throughput based partitioning.
* Used when thresholds are configured on operator through attributes.
*/
public static class PartitionLoadWatch implements StatsListener, java.io.Serializable
{
private static final Logger logger = LoggerFactory.getLogger(PartitionLoadWatch.class);
private static final long serialVersionUID = 201312231633L;
public long evalIntervalMillis = 30 * 1000;
private final long tpsMin;
private final long tpsMax;
private long lastEvalMillis;
private long lastTps = 0;
private PartitionLoadWatch(long min, long max)
{
this.tpsMin = min;
this.tpsMax = max;
}
protected LoadIndicator getLoadIndicator(int operatorId, long tps)
{
if ((tps < tpsMin && lastTps != 0) || tps > tpsMax) {
lastTps = tps;
if (tps < tpsMin) {
return new LoadIndicator(-1, String.format("Tuples per second %d is less than the minimum %d", tps, tpsMin));
} else {
return new LoadIndicator(1, String.format("Tuples per second %d is greater than the maximum %d", tps, tpsMax));
}
}
lastTps = tps;
return new LoadIndicator(0, null);
}
@Override
public Response processStats(BatchedOperatorStats status)
{
long tps = status.getTuplesProcessedPSMA();
if (tps == 0L) {
tps = status.getTuplesEmittedPSMA();
}
Response rsp = new Response();
LoadIndicator loadIndicator = getLoadIndicator(status.getOperatorId(), tps);
rsp.loadIndicator = loadIndicator.indicator;
if (rsp.loadIndicator != 0) {
if (lastEvalMillis < (System.currentTimeMillis() - evalIntervalMillis)) {
lastEvalMillis = System.currentTimeMillis();
logger.debug("Requesting repartitioning {} {}", rsp.loadIndicator, tps);
rsp.repartitionRequired = true;
rsp.repartitionNote = loadIndicator.note;
}
}
return rsp;
}
}
private static class PartitioningTestStreamCodec extends DefaultStatefulStreamCodec<Object> implements Serializable
{
private static final long serialVersionUID = 201410301656L;
@Override
public int getPartition(Object o)
{
return 0;
}
}
public static class PartitioningTestOperator extends GenericTestOperator implements Partitioner<PartitioningTestOperator>
{
static final String INPORT_WITH_CODEC = "inportWithCodec";
public Integer[] partitionKeys = {0, 1, 2};
public String pks;
public transient Map<Integer, Partition<PartitioningTestOperator>> partitions;
public boolean fixedCapacity = true;
@Min(1)
private int partitionCount = 1;
public PartitioningTestOperator()
{
}
public void setPartitionCount(int partitionCount)
{
this.partitionCount = partitionCount;
}
public int getPartitionCount()
{
return partitionCount;
}
@InputPortFieldAnnotation(optional = true)
public final transient InputPort<Object> inportWithCodec = new DefaultInputPort<Object>()
{
@Override
public StreamCodec<Object> getStreamCodec()
{
return new PartitioningTestStreamCodec();
}
@Override
public final void process(Object payload)
{
}
};
@Override
public Collection<Partition<PartitioningTestOperator>> definePartitions(Collection<Partition<PartitioningTestOperator>> partitions, PartitioningContext context)
{
final int newPartitionCount = DefaultPartition.getRequiredPartitionCount(context, this.partitionCount);
if (!fixedCapacity) {
partitionKeys = new Integer[newPartitionCount];
for (int i = 0; i < partitionKeys.length; i++) {
partitionKeys[i] = i;
}
}
List<Partition<PartitioningTestOperator>> newPartitions = new ArrayList<>(this.partitionKeys.length);
for (Integer partitionKey : partitionKeys) {
PartitioningTestOperator temp = new PartitioningTestOperator();
temp.setPartitionCount(newPartitionCount);
Partition<PartitioningTestOperator> p = new DefaultPartition<>(temp);
PartitionKeys lpks = new PartitionKeys(2, Sets.newHashSet(partitionKey));
p.getPartitionKeys().put(this.inport1, lpks);
p.getPartitionKeys().put(this.inportWithCodec, lpks);
p.getPartitionedInstance().pks = p.getPartitionKeys().values().toString();
newPartitions.add(p);
}
return newPartitions;
}
@Override
public void partitioned(Map<Integer, Partition<PartitioningTestOperator>> partitions)
{
this.partitions = partitions;
}
}
@Test
public void testStaticPartitioning()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
TestGeneratorInputOperator node0 = dag.addOperator("node0", TestGeneratorInputOperator.class);
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
PartitioningTestOperator partitioned = dag.addOperator("partitioned", PartitioningTestOperator.class);
partitioned.setPartitionCount(partitioned.partitionKeys.length);
GenericTestOperator singleton1 = dag.addOperator("singleton1", GenericTestOperator.class);
GenericTestOperator singleton2 = dag.addOperator("singleton2", GenericTestOperator.class);
dag.addStream("n0.inport1", node0.outport, node1.inport1);
dag.addStream("n1.outport1", node1.outport1, partitioned.inport1, partitioned.inportWithCodec);
dag.addStream("mergeStream", partitioned.outport1, singleton1.inport1, singleton2.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
OperatorMeta partitionedMeta = dag.getMeta(partitioned);
dag.validate();
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 2, plan.getContainers().size());
Assert.assertNotNull("partition map", partitioned.partitions);
Assert.assertEquals("partition map " + partitioned.partitions, 3, partitioned.partitions.size());
List<PTOperator> n2Instances = plan.getOperators(partitionedMeta);
Assert.assertEquals("partition instances " + n2Instances, partitioned.partitionKeys.length, n2Instances.size());
for (int i = 0; i < n2Instances.size(); i++) {
PTOperator po = n2Instances.get(i);
Map<String, PTInput> inputsMap = new HashMap<>();
for (PTInput input: po.getInputs()) {
inputsMap.put(input.portName, input);
Assert.assertEquals("partitions " + input, Sets.newHashSet(partitioned.partitionKeys[i]), input.partitions.partitions);
//Assert.assertEquals("codec " + input.logicalStream, PartitioningTestStreamCodec.class, input.logicalStream.getCodecClass());
}
Assert.assertEquals("number inputs " + inputsMap, Sets.newHashSet(PartitioningTestOperator.IPORT1, PartitioningTestOperator.INPORT_WITH_CODEC), inputsMap.keySet());
}
Collection<PTOperator> unifiers = plan.getMergeOperators(partitionedMeta);
Assert.assertEquals("number unifiers " + partitionedMeta, 0, unifiers.size());
}
@Test
public void testDefaultPartitioning()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
dag.addStream("node1.outport1", node1.outport1, node2.inport2, node2.inport1);
int initialPartitionCount = 5;
OperatorMeta node2Decl = dag.getMeta(node2);
node2Decl.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(initialPartitionCount));
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
List<PTOperator> n2Instances = plan.getOperators(node2Decl);
Assert.assertEquals("partition instances " + n2Instances, initialPartitionCount, n2Instances.size());
List<Integer> assignedPartitionKeys = Lists.newArrayList();
for (int i = 0; i < n2Instances.size(); i++) {
PTOperator n2Partition = n2Instances.get(i);
Assert.assertNotNull("partition keys null: " + n2Partition, n2Partition.getPartitionKeys());
Map<InputPort<?>, PartitionKeys> pkeys = n2Partition.getPartitionKeys();
Assert.assertEquals("partition keys size: " + pkeys, 1, pkeys.size()); // one port partitioned
InputPort<?> expectedPort = node2.inport2;
Assert.assertEquals("partition port: " + pkeys, expectedPort, pkeys.keySet().iterator().next());
Assert.assertEquals("partition mask: " + pkeys, "111", Integer.toBinaryString(pkeys.get(expectedPort).mask));
Set<Integer> pks = pkeys.get(expectedPort).partitions;
Assert.assertTrue("number partition keys: " + pkeys, pks.size() == 1 || pks.size() == 2);
assignedPartitionKeys.addAll(pks);
}
int expectedMask = Integer.parseInt("111", 2);
Assert.assertEquals("assigned partitions ", expectedMask + 1, assignedPartitionKeys.size());
for (int i = 0; i <= expectedMask; i++) {
Assert.assertTrue("" + assignedPartitionKeys, assignedPartitionKeys.contains(i));
}
}
@Test
public void testNumberOfUnifiers()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
dag.addStream("node1.outport1", node1.outport1, node2.inport1);
dag.setOperatorAttribute(node1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(5));
dag.setOutputPortAttribute(node1.outport1, PortContext.UNIFIER_LIMIT, 3);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
List<PTContainer> containers = plan.getContainers();
int unifierCount = 0;
int totalOperators = 0;
for (PTContainer container : containers) {
List<PTOperator> operators = container.getOperators();
for (PTOperator operator : operators) {
totalOperators++;
if (operator.isUnifier()) {
unifierCount++;
}
}
}
Assert.assertEquals("Number of operators", 8, totalOperators);
Assert.assertEquals("Number of unifiers", 2, unifierCount);
}
@Test
public void testNumberOfUnifiersWithEvenPartitions()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
dag.addStream("node1.outport1", node1.outport1, node2.inport1);
dag.setOperatorAttribute(node1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(8));
dag.setOutputPortAttribute(node1.outport1, PortContext.UNIFIER_LIMIT, 4);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
List<PTContainer> containers = plan.getContainers();
int unifierCount = 0;
int totalOperators = 0;
for (PTContainer container : containers) {
List<PTOperator> operators = container.getOperators();
for (PTOperator operator : operators) {
totalOperators++;
if (operator.isUnifier()) {
unifierCount++;
}
}
}
Assert.assertEquals("Number of operators", 12, totalOperators);
Assert.assertEquals("Number of unifiers", 3, unifierCount);
}
@Test
public void testRepartitioningScaleUp()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator mergeNode = dag.addOperator("mergeNode", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.outport1, o2.inport1, o2.inport2);
dag.addStream("mergeStream", o2.outport1, mergeNode.inport1);
OperatorMeta o2Meta = dag.getMeta(o2);
o2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
Lists.newArrayList((StatsListener)new PartitionLoadWatch(0, 5)));
o2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(1));
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of operators", 3, plan.getAllOperators().size());
Assert.assertEquals("number of save requests", 3, ctx.backupRequests);
List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
Assert.assertEquals("partition count " + o2Meta, 1, o2Partitions.size());
PTOperator o2p1 = o2Partitions.get(0);
Assert.assertEquals("stats handlers " + o2p1, 1, o2p1.statsListeners.size());
StatsListener sl = o2p1.statsListeners.get(0);
Assert.assertTrue("stats handlers " + o2p1.statsListeners, sl instanceof PartitionLoadWatch);
((PartitionLoadWatch)sl).evalIntervalMillis = -1; // no delay
setThroughput(o2p1, 10);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("partitioning triggered", 1, ctx.events.size());
ctx.backupRequests = 0;
ctx.events.remove(0).run();
o2Partitions = plan.getOperators(o2Meta);
Assert.assertEquals("partition count " + o2Partitions, 2, o2Partitions.size());
o2p1 = o2Partitions.get(0);
Assert.assertEquals("sinks " + o2p1.getOutputs(), 1, o2p1.getOutputs().size());
PTOperator o2p2 = o2Partitions.get(1);
Assert.assertEquals("sinks " + o2p2.getOutputs(), 1, o2p2.getOutputs().size());
Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
expUndeploy.add(o2p1);
expUndeploy.addAll(plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values());
// verify load update generates expected events per configuration
setThroughput(o2p1, 0);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("load min", 0, ctx.events.size());
setThroughput(o2p1, 3);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("load within range", 0, ctx.events.size());
setThroughput(o2p1, 10);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("load exceeds max", 1, ctx.events.size());
ctx.backupRequests = 0;
ctx.events.remove(0).run();
Assert.assertEquals("new partitions", 3, plan.getOperators(o2Meta).size());
Assert.assertTrue("", plan.getOperators(o2Meta).contains(o2p2));
for (PTOperator partition : plan.getOperators(o2Meta)) {
Assert.assertNotNull("container null " + partition, partition.getContainer());
Assert.assertEquals("outputs " + partition, 1, partition.getOutputs().size());
Assert.assertEquals("downstream operators " + partition.getOutputs().get(0).sinks, 1, partition.getOutputs().get(0).sinks.size());
}
Assert.assertEquals("" + ctx.undeploy, expUndeploy, ctx.undeploy);
Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
expDeploy.addAll(plan.getOperators(o2Meta));
expDeploy.remove(o2p2);
expDeploy.addAll(plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values());
Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy);
Assert.assertEquals("Count of storage requests", 2, ctx.backupRequests);
// partitioning skipped on insufficient head room
o2p1 = plan.getOperators(o2Meta).get(0);
plan.setAvailableResources(0);
setThroughput(o2p1, 10);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("not repartitioned", 1, ctx.events.size());
ctx.events.remove(0).run();
Assert.assertEquals("partition count unchanged", 3, plan.getOperators(o2Meta).size());
}
/**
* Test partitioning of an input operator (no input port).
* Cover aspects that are not part of generic operator test.
* Test scaling from one to multiple partitions with unifier when one partition remains unmodified.
*/
@Test
public void testInputOperatorPartitioning()
{
LogicalPlan dag = new LogicalPlan();
final TestInputOperator<Object> o1 = dag.addOperator("o1", new TestInputOperator<>());
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.output, o2.inport1);
OperatorMeta o1Meta = dag.getMeta(o1);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
TestPartitioner<TestInputOperator<Object>> partitioner = new TestPartitioner<>();
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, partitioner);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 2, plan.getContainers().size());
List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1Partitions, 1, o1Partitions.size());
PTOperator o1p1 = o1Partitions.get(0);
// verify load update generates expected events per configuration
Assert.assertEquals("stats handlers " + o1p1, 1, o1p1.statsListeners.size());
StatsListener l = o1p1.statsListeners.get(0);
Assert.assertTrue("stats handlers " + o1p1.statsListeners, l instanceof PartitioningTest.PartitionLoadWatch);
PartitioningTest.PartitionLoadWatch.put(o1p1, 1);
plan.onStatusUpdate(o1p1);
Assert.assertEquals("scale up triggered", 1, ctx.events.size());
// add another partition, keep existing as is
partitioner.extraPartitions.add(new DefaultPartition<>(o1));
Runnable r = ctx.events.remove(0);
r.run();
partitioner.extraPartitions.clear();
o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("operators after scale up", 2, o1Partitions.size());
Assert.assertEquals("first partition unmodified", o1p1, o1Partitions.get(0));
Assert.assertEquals("single output", 1, o1p1.getOutputs().size());
Assert.assertEquals("output to unifier", 1, o1p1.getOutputs().get(0).sinks.size());
Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(o2)));
Set<PTOperator> expDeploy = Sets.newHashSet(o1Partitions.get(1));
expDeploy.addAll(plan.getMergeOperators(dag.getMeta(o1)));
expDeploy.addAll(expUndeploy);
expDeploy.add(o1p1.getOutputs().get(0).sinks.get(0).target);
Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
Assert.assertEquals("deploy", expDeploy, ctx.deploy);
for (PTOperator p : o1Partitions) {
Assert.assertEquals("activation window id " + p, Checkpoint.INITIAL_CHECKPOINT, p.recoveryCheckpoint);
Assert.assertEquals("checkpoints " + p + " " + p.checkpoints, Lists.newArrayList(), p.checkpoints);
PartitioningTest.PartitionLoadWatch.put(p, -1);
plan.onStatusUpdate(p);
}
ctx.events.remove(0).run();
Assert.assertEquals("operators after scale down", 1, plan.getOperators(o1Meta).size());
}
@Test
public void testRepartitioningScaleDown()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3parallel = dag.addOperator("o3parallel", GenericTestOperator.class);
OperatorMeta o3Meta = dag.getMeta(o3parallel);
GenericTestOperator mergeNode = dag.addOperator("mergeNode", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.outport1, o2.inport1, o2.inport2);
dag.addStream("o2.outport1", o2.outport1, o3parallel.inport1).setLocality(Locality.CONTAINER_LOCAL);
dag.setInputPortAttribute(o3parallel.inport1, PortContext.PARTITION_PARALLEL, true);
dag.addStream("o3parallel_outport1", o3parallel.outport1, mergeNode.inport1);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
OperatorMeta node2Meta = dag.getMeta(o2);
node2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
Lists.newArrayList((StatsListener)new PartitionLoadWatch(3, 5)));
node2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(8));
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 2, plan.getContainers().size());
Assert.assertEquals("Count of storage requests", plan.getAllOperators().size(), ctx.backupRequests);
List<PTOperator> n2Instances = plan.getOperators(node2Meta);
Assert.assertEquals("partition instances " + n2Instances, 8, n2Instances.size());
PTOperator po = n2Instances.get(0);
Collection<PTOperator> unifiers = plan.getMergeOperators(node2Meta);
Assert.assertEquals("unifiers " + node2Meta, 0, unifiers.size());
Collection<PTOperator> o3unifiers = plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values();
Assert.assertEquals("unifiers " + o3Meta, 1, o3unifiers.size());
PTOperator o3unifier = o3unifiers.iterator().next();
Assert.assertEquals("unifier inputs " + o3unifier, 8, o3unifier.getInputs().size());
Set<PTOperator> expUndeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
expUndeploy.addAll(n2Instances);
expUndeploy.addAll(plan.getOperators(o3Meta));
expUndeploy.addAll(o3unifiers);
// verify load update generates expected events per configuration
Assert.assertEquals("stats handlers " + po, 1, po.statsListeners.size());
StatsListener l = po.statsListeners.get(0);
Assert.assertTrue("stats handlers " + po.statsListeners, l instanceof PartitionLoadWatch);
((PartitionLoadWatch)l).evalIntervalMillis = -1; // no delay
setThroughput(po, 5);
plan.onStatusUpdate(po);
Assert.assertEquals("load upper bound", 0, ctx.events.size());
setThroughput(po, 3);
plan.onStatusUpdate(po);
Assert.assertEquals("load lower bound", 0, ctx.events.size());
setThroughput(po, 2);
plan.onStatusUpdate(po);
Assert.assertEquals("load below min", 1, ctx.events.size());
ctx.backupRequests = 0;
ctx.events.remove(0).run();
// expect operators unchanged
Assert.assertEquals("partitions unchanged", Sets.newHashSet(n2Instances), Sets.newHashSet(plan.getOperators(node2Meta)));
for (PTOperator o : n2Instances) {
setThroughput(o, 2);
plan.onStatusUpdate(o);
}
Assert.assertEquals("load below min", 1, ctx.events.size());
ctx.events.remove(0).run();
Assert.assertEquals("partitions merged", 4, plan.getOperators(node2Meta).size());
Assert.assertEquals("unifier inputs after scale down " + o3unifier, 4, o3unifier.getInputs().size());
for (PTOperator p : plan.getOperators(o3Meta)) {
Assert.assertEquals("outputs " + p.getOutputs(), 1, p.getOutputs().size());
}
for (PTOperator p : plan.getOperators(node2Meta)) {
PartitionKeys pks = p.getPartitionKeys().values().iterator().next();
Assert.assertEquals("partition mask " + p, 3, pks.mask);
Assert.assertEquals("inputs " + p, 2, p.getInputs().size());
boolean portConnected = false;
for (PTInput input : p.getInputs()) {
if (GenericTestOperator.IPORT1.equals(input.portName)) {
portConnected = true;
Assert.assertEquals("partition mask " + input, pks, input.partitions);
}
}
Assert.assertTrue("connected " + GenericTestOperator.IPORT1, portConnected);
}
Assert.assertEquals("" + ctx.undeploy, expUndeploy, ctx.undeploy);
o3unifiers = plan.getOperators(dag.getMeta(mergeNode)).get(0).upstreamMerge.values();
Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(dag.getMeta(mergeNode)));
expDeploy.addAll(plan.getOperators(node2Meta));
expDeploy.addAll(plan.getOperators(o3Meta));
expDeploy.addAll(o3unifiers);
Assert.assertEquals("" + ctx.deploy, expDeploy, ctx.deploy);
for (PTOperator oper : ctx.deploy) {
Assert.assertNotNull("container " + oper, oper.getContainer());
}
Assert.assertEquals("Count of storage requests", 8, ctx.backupRequests);
}
/**
* Test unifier gets removed when number partitions drops to 1.
*/
@Test
public void testRepartitioningScaleDownSinglePartition()
{
LogicalPlan dag = new LogicalPlan();
TestInputOperator<?> o1 = dag.addOperator("o1", TestInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("o1.outport1", o1.output, o2.inport1);
OperatorMeta o1Meta = dag.getMeta(o1);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1Partitions, 2, o1Partitions.size());
PTOperator o1p1 = o1Partitions.get(0);
PTOperator p1Doper = o1p1.getOutputs().get(0).sinks.get(0).target;
Assert.assertSame("", p1Doper.getOperatorMeta(), o1Meta.getMeta(o1.output).getUnifierMeta());
Assert.assertTrue("unifier ", p1Doper.isUnifier());
Assert.assertEquals("Unifiers " + o1Meta, 1, o1p1.getOutputs().get(0).sinks.size());
Collection<PTOperator> o1Unifiers = new ArrayList<>(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values());
StatsListener l = o1p1.statsListeners.get(0);
Assert.assertTrue("stats handlers " + o1p1.statsListeners, l instanceof PartitioningTest.PartitionLoadWatch);
PartitioningTest.PartitionLoadWatch.put(o1p1, -1);
PartitioningTest.PartitionLoadWatch.put(o1Partitions.get(1), -1);
plan.onStatusUpdate(o1p1);
plan.onStatusUpdate(o1Partitions.get(1));
Assert.assertEquals("partition scaling triggered", 1, ctx.events.size());
ctx.events.remove(0).run();
List<PTOperator> o1NewPartitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1NewPartitions, 1, o1NewPartitions.size());
List<PTOperator> o1NewUnifiers = new ArrayList<>(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values());
Assert.assertEquals("unifiers " + o1Meta, 0, o1NewUnifiers.size());
p1Doper = o1p1.getOutputs().get(0).sinks.get(0).target;
Assert.assertTrue("", p1Doper.getOperatorMeta() == dag.getMeta(o2));
Assert.assertFalse("unifier ", p1Doper.isUnifier());
Assert.assertTrue("removed unifier from deployment " + ctx.undeploy, ctx.undeploy.containsAll(o1Unifiers));
Assert.assertFalse("removed unifier from deployment " + ctx.deploy, ctx.deploy.containsAll(o1Unifiers));
// scale up, ensure unifier is setup at activation checkpoint
setActivationCheckpoint(o1NewPartitions.get(0), 3);
PartitioningTest.PartitionLoadWatch.put(o1NewPartitions.get(0), 1);
plan.onStatusUpdate(o1NewPartitions.get(0));
Assert.assertEquals("partition scaling triggered", 1, ctx.events.size());
ctx.events.remove(0).run();
o1NewUnifiers.addAll(plan.getOperators(dag.getMeta(o2)).get(0).upstreamMerge.values());
Assert.assertEquals("unifiers " + o1Meta, 1, o1NewUnifiers.size());
Assert.assertEquals("unifier activation checkpoint " + o1Meta, 3, o1NewUnifiers.get(0).recoveryCheckpoint.windowId);
}
public static void setActivationCheckpoint(PTOperator oper, long windowId)
{
try {
oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oper.operatorMeta.getOperator(), oper.id, windowId);
Checkpoint cp = new Checkpoint(windowId, 0, 0);
oper.setRecoveryCheckpoint(cp);
oper.checkpoints.add(cp);
} catch (Exception e) {
Assert.fail(e.toString());
}
}
@Test
public void testDefaultRepartitioning()
{
List<PartitionKeys> twoBitPartitionKeys = Arrays.asList(
newPartitionKeys("11", "00"),
newPartitionKeys("11", "10"),
newPartitionKeys("11", "01"),
newPartitionKeys("11", "11"));
GenericTestOperator operator = new GenericTestOperator();
Set<PartitionKeys> initialPartitionKeys = Sets.newHashSet(
newPartitionKeys("1", "0"),
newPartitionKeys("1", "1"));
final ArrayList<Partition<Operator>> partitions = new ArrayList<>();
for (PartitionKeys pks : initialPartitionKeys) {
Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>();
p1Keys.put(operator.inport1, pks);
partitions.add(new DefaultPartition<Operator>(operator, p1Keys, 1, null));
}
ArrayList<Partition<Operator>> lowLoadPartitions = new ArrayList<>();
for (Partition<Operator> p : partitions) {
lowLoadPartitions.add(new DefaultPartition<>(p.getPartitionedInstance(), p.getPartitionKeys(), -1, null));
}
// merge to single partition
List<Partition<Operator>> newPartitions = Lists.newArrayList();
Collection<Partition<Operator>> tempNewPartitions = StatelessPartitioner.repartition(lowLoadPartitions);
newPartitions.addAll(tempNewPartitions);
Assert.assertEquals("" + newPartitions, 1, newPartitions.size());
Assert.assertEquals("" + newPartitions.get(0).getPartitionKeys(), 0, newPartitions.get(0).getPartitionKeys().values().iterator().next().mask);
List<Partition<Operator>> tempList = Collections.singletonList((Partition<Operator>)new DefaultPartition<Operator>(operator, newPartitions.get(0).getPartitionKeys(), -1, null));
tempNewPartitions = StatelessPartitioner.repartition(tempList);
newPartitions.clear();
newPartitions.addAll(tempNewPartitions);
Assert.assertEquals("" + newPartitions, 1, newPartitions.size());
// split back into two
tempList = Collections.singletonList((Partition<Operator>)new DefaultPartition<Operator>(operator, newPartitions.get(0).getPartitionKeys(), 1, null));
tempNewPartitions = StatelessPartitioner.repartition(tempList);
newPartitions.clear();
newPartitions.addAll(tempNewPartitions);
Assert.assertEquals("" + newPartitions, 2, newPartitions.size());
// split partitions
tempNewPartitions = StatelessPartitioner.repartition(partitions);
newPartitions.clear();
newPartitions.addAll(tempNewPartitions);
Assert.assertEquals("" + newPartitions, 4, newPartitions.size());
Set<PartitionKeys> expectedPartitionKeys = Sets.newHashSet(twoBitPartitionKeys);
for (Partition<?> p: newPartitions) {
Assert.assertEquals("" + p.getPartitionKeys(), 1, p.getPartitionKeys().size());
Assert.assertEquals("" + p.getPartitionKeys(), operator.inport1, p.getPartitionKeys().keySet().iterator().next());
PartitionKeys pks = p.getPartitionKeys().values().iterator().next();
expectedPartitionKeys.remove(pks);
}
Assert.assertTrue("" + expectedPartitionKeys, expectedPartitionKeys.isEmpty());
// partition merge
List<HashSet<PartitionKeys>> expectedKeysSets = Arrays.asList(
Sets.newHashSet(newPartitionKeys("11", "00"), newPartitionKeys("11", "10"), newPartitionKeys("1", "1")),
Sets.newHashSet(newPartitionKeys("1", "0"), newPartitionKeys("11", "01"), newPartitionKeys("11", "11"))
);
for (Set<PartitionKeys> expectedKeys: expectedKeysSets) {
List<Partition<Operator>> clonePartitions = Lists.newArrayList();
for (PartitionKeys pks: twoBitPartitionKeys) {
Map<InputPort<?>, PartitionKeys> p1Keys = new HashMap<>();
p1Keys.put(operator.inport1, pks);
int load = expectedKeys.contains(pks) ? 0 : -1;
clonePartitions.add(new DefaultPartition<Operator>(operator, p1Keys, load, null));
}
tempNewPartitions = StatelessPartitioner.repartition(clonePartitions);
newPartitions.clear();
newPartitions.addAll(tempNewPartitions);
Assert.assertEquals("" + newPartitions, 3, newPartitions.size());
for (Partition<?> p: newPartitions) {
Assert.assertEquals("" + p.getPartitionKeys(), 1, p.getPartitionKeys().size());
Assert.assertEquals("" + p.getPartitionKeys(), operator.inport1, p.getPartitionKeys().keySet().iterator().next());
PartitionKeys pks = p.getPartitionKeys().values().iterator().next();
expectedKeys.remove(pks);
}
Assert.assertTrue("" + expectedKeys, expectedKeys.isEmpty());
}
// merge 2 into single partition
lowLoadPartitions = Lists.newArrayList();
for (Partition<?> p : partitions) {
lowLoadPartitions.add(new DefaultPartition<Operator>(operator, p.getPartitionKeys(), -1, null));
}
tempNewPartitions = StatelessPartitioner.repartition(lowLoadPartitions);
newPartitions.clear();
newPartitions.addAll(tempNewPartitions);
Assert.assertEquals("" + newPartitions, 1, newPartitions.size());
for (Partition<?> p: newPartitions) {
Assert.assertEquals("" + p.getPartitionKeys(), 1, p.getPartitionKeys().size());
PartitionKeys pks = p.getPartitionKeys().values().iterator().next();
Assert.assertEquals("" + pks, 0, pks.mask);
Assert.assertEquals("" + pks, Sets.newHashSet(0), pks.partitions);
}
}
private PartitionKeys newPartitionKeys(String mask, String key)
{
return new PartitionKeys(Integer.parseInt(mask, 2), Sets.newHashSet(Integer.parseInt(key, 2)));
}
private void setThroughput(PTOperator oper, long tps)
{
oper.stats.statsRevs.checkout();
oper.stats.tuplesProcessedPSMA.set(tps);
oper.stats.statsRevs.commit();
}
@Test
@SuppressWarnings("unchecked")
public void testInline()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
PartitioningTestOperator partOperator = dag.addOperator("partNode", PartitioningTestOperator.class);
partOperator.partitionKeys = new Integer[] {0,1};
dag.getMeta(partOperator).getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(partOperator.partitionKeys.length));
dag.addStream("o1_outport1", o1.outport1, o2.inport1, o3.inport1, partOperator.inport1)
.setLocality(null);
// same container for o2 and o3
dag.addStream("o2_outport1", o2.outport1, o3.inport2)
.setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("o3_outport1", o3.outport1, partOperator.inport2);
int maxContainers = 4;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", maxContainers, plan.getContainers().size());
Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size());
Assert.assertEquals("operators container 0", 1, plan.getContainers().get(0).getOperators().size());
Set<OperatorMeta> c2ExpNodes = Sets.newHashSet(dag.getMeta(o2), dag.getMeta(o3));
Set<OperatorMeta> c2ActNodes = new HashSet<>();
PTContainer c2 = plan.getContainers().get(1);
for (PTOperator pNode : c2.getOperators()) {
c2ActNodes.add(pNode.getOperatorMeta());
}
Assert.assertEquals("operators " + c2, c2ExpNodes, c2ActNodes);
// one container per partition
OperatorMeta partOperMeta = dag.getMeta(partOperator);
List<PTOperator> partitions = plan.getOperators(partOperMeta);
for (PTOperator partition : partitions) {
Assert.assertEquals("operators container" + partition, 1, partition.getContainer().getOperators().size());
}
}
@Test
public void testInlineMultipleInputs()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class);
GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class);
GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class);
dag.addStream("n1Output1", node1.outport1, node3.inport1)
.setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("n2Output1", node2.outport1, node3.inport2)
.setLocality(Locality.CONTAINER_LOCAL);
int maxContainers = 5;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
PhysicalPlan deployer = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 1, deployer.getContainers().size());
PTOutput node1Out = deployer.getOperators(dag.getMeta(node1)).get(0).getOutputs().get(0);
Assert.assertTrue("inline " + node1Out, node1Out.isDownStreamInline());
// per current logic, different container is assigned to second input node
PTOutput node2Out = deployer.getOperators(dag.getMeta(node2)).get(0).getOutputs().get(0);
Assert.assertTrue("inline " + node2Out, node2Out.isDownStreamInline());
}
@Test
public void testNodeLocality()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator partitioned = dag.addOperator("partitioned", GenericTestOperator.class);
dag.getMeta(partitioned).getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
GenericTestOperator partitionedParallel = dag.addOperator("partitionedParallel", GenericTestOperator.class);
dag.addStream("o1_outport1", o1.outport1, partitioned.inport1).setLocality(null);
dag.addStream("partitioned_outport1", partitioned.outport1, partitionedParallel.inport2).setLocality(Locality.NODE_LOCAL);
dag.setInputPortAttribute(partitionedParallel.inport2, PortContext.PARTITION_PARALLEL, true);
GenericTestOperator single = dag.addOperator("single", GenericTestOperator.class);
dag.addStream("partitionedParallel_outport1", partitionedParallel.outport1, single.inport1);
int maxContainers = 6;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", maxContainers, plan.getContainers().size());
PTContainer container1 = plan.getContainers().get(0);
Assert.assertEquals("number operators " + container1, 1, container1.getOperators().size());
Assert.assertEquals("operators " + container1, dag.getMeta(o1), container1.getOperators().get(0).getOperatorMeta());
for (int i = 1; i < 3; i++) {
PTContainer c = plan.getContainers().get(i);
Assert.assertEquals("number operators " + c, 1, c.getOperators().size());
Set<OperatorMeta> expectedLogical = Sets.newHashSet(dag.getMeta(partitioned));
Set<OperatorMeta> actualLogical = Sets.newHashSet();
for (PTOperator p : c.getOperators()) {
actualLogical.add(p.getOperatorMeta());
}
Assert.assertEquals("operators " + c, expectedLogical, actualLogical);
}
// in-node parallel partition
for (int i = 3; i < 5; i++) {
PTContainer c = plan.getContainers().get(i);
Assert.assertEquals("number operators " + c, 1, c.getOperators().size());
Set<OperatorMeta> expectedLogical = Sets.newHashSet(dag.getMeta(partitionedParallel));
Set<OperatorMeta> actualLogical = Sets.newHashSet();
for (PTOperator p : c.getOperators()) {
actualLogical.add(p.getOperatorMeta());
Assert.assertEquals("nodeLocal " + p.getNodeLocalOperators(), 2, p.getNodeLocalOperators().getOperatorSet()
.size());
}
Assert.assertEquals("operators " + c, expectedLogical, actualLogical);
}
}
@Test
public void testParallelPartitioning()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.addStream("o1Output1", o1.outport1, o2.inport1, o3.inport1).setLocality(null);
dag.addStream("o2Output1", o2.outport1, o3.inport2).setLocality(Locality.CONTAINER_LOCAL);
dag.setInputPortAttribute(o3.inport2, PortContext.PARTITION_PARALLEL, true);
// parallel partition two downstream operators
PartitioningTestOperator o3_1 = dag.addOperator("o3_1", PartitioningTestOperator.class);
o3_1.fixedCapacity = false;
dag.setInputPortAttribute(o3_1.inport1, PortContext.PARTITION_PARALLEL, true);
OperatorMeta o3_1Meta = dag.getMeta(o3_1);
GenericTestOperator o3_2 = dag.addOperator("o3_2", GenericTestOperator.class);
dag.setInputPortAttribute(o3_2.inport1, PortContext.PARTITION_PARALLEL, true);
OperatorMeta o3_2Meta = dag.getMeta(o3_2);
dag.addStream("o3outport1", o3.outport1, o3_1.inport1, o3_2.inport1).setLocality(Locality.CONTAINER_LOCAL);
// join within parallel partition
GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
dag.setInputPortAttribute(o4.inport1, PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(o4.inport2, PortContext.PARTITION_PARALLEL, true);
OperatorMeta o4Meta = dag.getMeta(o4);
dag.addStream("o3_1.outport1", o3_1.outport1, o4.inport1).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("o3_2.outport1", o3_2.outport1, o4.inport2).setLocality(Locality.CONTAINER_LOCAL);
// non inline
GenericTestOperator o5single = dag.addOperator("o5single", GenericTestOperator.class);
dag.addStream("o4outport1", o4.outport1, o5single.inport1);
int maxContainers = 4;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 4, plan.getContainers().size());
PTContainer container1 = plan.getContainers().get(0);
Assert.assertEquals("number operators " + container1, 1, container1.getOperators().size());
Assert.assertEquals("operators " + container1, "o1", container1.getOperators().get(0).getOperatorMeta().getName());
for (int i = 1; i < 3; i++) {
PTContainer container2 = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container2, 5, container2.getOperators().size());
Set<String> expectedLogicalNames = Sets.newHashSet("o2", "o3", o3_1Meta.getName(), o3_2Meta.getName(), o4Meta.getName());
Set<String> actualLogicalNames = Sets.newHashSet();
for (PTOperator p: container2.getOperators()) {
actualLogicalNames.add(p.getOperatorMeta().getName());
}
Assert.assertEquals("operator names " + container2, expectedLogicalNames, actualLogicalNames);
}
List<OperatorMeta> inlineOperators = Lists.newArrayList(dag.getMeta(o2), o3_1Meta, o3_2Meta);
for (OperatorMeta ow: inlineOperators) {
List<PTOperator> partitionedInstances = plan.getOperators(ow);
Assert.assertEquals("" + partitionedInstances, 2, partitionedInstances.size());
for (PTOperator p: partitionedInstances) {
Assert.assertEquals("outputs " + p, 1, p.getOutputs().size());
Assert.assertTrue("downstream inline " + p.getOutputs().get(0), p.getOutputs().get(0).isDownStreamInline());
}
}
// container 4: Unifier for o4 & O5
PTContainer container4 = plan.getContainers().get(3);
PTOperator ptOperatorO5 = plan.getOperators(dag.getMeta(o5single)).get(0);
PTOperator unifier = ptOperatorO5.upstreamMerge.values().iterator().next();
Assert.assertEquals("number operators " + container4, 2, container4.getOperators().size());
Assert.assertEquals("operators " + container4, o4Meta.getMeta(o4.outport1).getUnifierMeta(), unifier.getOperatorMeta());
Assert.assertEquals("unifier inputs" + unifier.getInputs(), 2, unifier.getInputs().size());
Assert.assertEquals("unifier outputs" + unifier.getOutputs(), 1, unifier.getOutputs().size());
OperatorMeta o5Meta = dag.getMeta(o5single);
Assert.assertEquals("operators " + container4, o5Meta, ptOperatorO5.getOperatorMeta());
List<PTOperator> o5Instances = plan.getOperators(o5Meta);
Assert.assertEquals("" + o5Instances, 1, o5Instances.size());
Assert.assertEquals("inputs" + ptOperatorO5.getInputs(), 1, ptOperatorO5.getInputs().size());
Assert.assertEquals("inputs" + ptOperatorO5.getInputs(), unifier, ptOperatorO5.getInputs().get(0).source.source);
// verify partitioner was called for parallel partition
Assert.assertNotNull("partitioner called " + o3_1, o3_1.partitions);
for (PTOperator p : plan.getOperators(o3_1Meta)) {
Assert.assertEquals("inputs " + p, 1, p.getInputs().size());
for (PTInput pti : p.getInputs()) {
Assert.assertNull("partition keys " + pti, pti.partitions);
}
}
}
@Test
public void testParallelPartitioningValidation()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setInputPortAttribute(o3.inport1, PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(o3.inport2, PortContext.PARTITION_PARALLEL, true);
dag.addStream("o1Output1", o1.outport1, o3.inport1);
dag.addStream("o2Output1", o2.outport1, o3.inport2);
try {
new PhysicalPlan(dag, new TestPlanContext());
} catch (AssertionError ae) {
Assert.assertThat("Parallel partition needs common ancestor", ae.getMessage(), RegexMatcher.matches("operator cannot extend multiple partitions.*"));
}
GenericTestOperator commonAncestor = dag.addOperator("commonAncestor", GenericTestOperator.class);
dag.setInputPortAttribute(o1.inport1, PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true);
dag.addStream("commonAncestor.outport1", commonAncestor.outport1, o1.inport1);
dag.addStream("commonAncestor.outport2", commonAncestor.outport2, o2.inport1);
new PhysicalPlan(dag, new TestPlanContext());
// two operators with multiple streams
dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new TestPlanContext());
o1 = dag.addOperator("o1", GenericTestOperator.class);
o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(o2.inport2, PortContext.PARTITION_PARALLEL, true);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o2.outport2", o1.outport2, o2.inport2);
new PhysicalPlan(dag, new TestPlanContext());
}
/**
* MxN partitioning. When source and sink of a stream are partitioned, a
* separate unifier is created container local with each downstream partition.
*/
@Test
public void testMxNPartitioning()
{
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
TestPartitioner<TestGeneratorInputOperator> o1Partitioner = new TestPartitioner<>();
o1Partitioner.setPartitionCount(2);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, o1Partitioner);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(3));
dag.setOperatorAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
OperatorMeta o2Meta = dag.getMeta(o2);
dag.addStream("o1.outport1", o1.outport, o2.inport1);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 5, plan.getContainers().size());
List<PTOperator> inputOperators = new ArrayList<>();
for (int i = 0; i < 2; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertEquals("operators " + container, o1Meta.getName(), container.getOperators().get(0).getOperatorMeta().getName());
inputOperators.add(container.getOperators().get(0));
}
for (int i = 2; i < 5; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 2, container.getOperators().size());
Assert.assertEquals("operators " + container, o2Meta.getName(), container.getOperators().get(0).getOperatorMeta().getName());
Set<String> expectedLogicalNames = Sets.newHashSet(o1Meta.getMeta(o1.outport).getUnifierMeta().getName(), o2Meta.getName());
Map<String, PTOperator> actualOperators = new HashMap<>();
for (PTOperator p : container.getOperators()) {
actualOperators.put(p.getOperatorMeta().getName(), p);
}
Assert.assertEquals("", expectedLogicalNames, actualOperators.keySet());
PTOperator pUnifier = actualOperators.get(o1Meta.getMeta(o1.outport).getUnifierMeta().getName());
Assert.assertNotNull("" + pUnifier, pUnifier.getContainer());
Assert.assertTrue("" + pUnifier, pUnifier.isUnifier());
// input from each upstream partition
Assert.assertEquals("" + pUnifier, 2, pUnifier.getInputs().size());
int numberPartitionKeys = (i == 2) ? 2 : 1;
for (int inputIndex = 0; inputIndex < pUnifier.getInputs().size(); inputIndex++) {
PTInput input = pUnifier.getInputs().get(inputIndex);
Assert.assertEquals("" + pUnifier, "outport", input.source.portName);
Assert.assertEquals("" + pUnifier, inputOperators.get(inputIndex), input.source.source);
Assert.assertEquals("partition keys " + input.partitions, numberPartitionKeys, input.partitions.partitions.size());
}
// output to single downstream partition
Assert.assertEquals("" + pUnifier, 1, pUnifier.getOutputs().size());
Assert.assertTrue("" + actualOperators.get(o2Meta.getName()).getOperatorMeta().getOperator(),
actualOperators.get(o2Meta.getName()).getOperatorMeta().getOperator() instanceof GenericTestOperator);
PTOperator p = actualOperators.get(o2Meta.getName());
Assert.assertEquals("partition inputs " + p.getInputs(), 1, p.getInputs().size());
Assert.assertEquals("partition inputs " + p.getInputs(), pUnifier, p.getInputs().get(0).source.source);
Assert.assertEquals("input partition keys " + p.getInputs(), null, p.getInputs().get(0).partitions);
Assert.assertTrue("partitioned unifier container local " + p.getInputs().get(0).source, p.getInputs().get(0).source.isDownStreamInline());
}
// Test Dynamic change
// for M x N partition
// scale down N from 3 to 2 and then from 2 to 1
for (int i = 0; i < 2; i++) {
List<PTOperator> ptos = plan.getOperators(o2Meta);
Set<PTOperator> expUndeploy = Sets.newHashSet(ptos);
for (PTOperator ptOperator : ptos) {
expUndeploy.addAll(ptOperator.upstreamMerge.values());
PartitioningTest.PartitionLoadWatch.put(ptOperator, -1);
plan.onStatusUpdate(ptOperator);
}
ctx.backupRequests = 0;
ctx.events.remove(0).run();
Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(o2Meta));
// Either unifiers for each partition or single unifier for single partition is expected to be deployed
expDeploy.addAll(plan.getMergeOperators(o1Meta));
for (PTOperator ptOperator : plan.getOperators(o2Meta)) {
expDeploy.addAll(ptOperator.upstreamMerge.values());
}
Assert.assertEquals("number of containers", 4 - i, plan.getContainers().size());
Assert.assertEquals("number of operators", 2 - i, plan.getOperators(o2Meta).size());
Assert.assertEquals("undeployed operators " + ctx.undeploy, expUndeploy, ctx.undeploy);
Assert.assertEquals("deployed operators " + ctx.deploy, expDeploy, ctx.deploy);
}
// scale up N from 1 to 2 and then from 2 to 3
for (int i = 0; i < 2; i++) {
List<PTOperator> unChangedOps = new LinkedList<>(plan.getOperators(o2Meta));
PTOperator o2p1 = unChangedOps.remove(0);
Set<PTOperator> expUndeploy = Sets.newHashSet(o2p1);
// Either single unifier for one partition or merged unifiers for each partition is expected to be undeployed
expUndeploy.addAll(plan.getMergeOperators(o1Meta));
expUndeploy.addAll(o2p1.upstreamMerge.values());
List<PTOperator> nOps = new LinkedList<>();
for (Iterator<PTOperator> iterator = unChangedOps.iterator(); iterator.hasNext(); ) {
PTOperator ptOperator = iterator.next();
nOps.addAll(ptOperator.upstreamMerge.values());
}
unChangedOps.addAll(nOps);
PartitioningTest.PartitionLoadWatch.put(o2p1, 1);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("repartition event", 1, ctx.events.size());
ctx.backupRequests = 0;
ctx.events.remove(0).run();
Assert.assertEquals("N partitions after scale up " + o2Meta, 2 + i, plan.getOperators(o2Meta).size());
Assert.assertTrue("no unifiers", plan.getMergeOperators(o1Meta).isEmpty());
for (PTOperator o : plan.getOperators(o2Meta)) {
Assert.assertNotNull(o.container);
PTOperator unifier = o.upstreamMerge.values().iterator().next();
Assert.assertNotNull(unifier.container);
Assert.assertSame("unifier in same container", o.container, unifier.container);
Assert.assertEquals("container operators " + o.container, Sets.newHashSet(o.container.getOperators()), Sets.newHashSet(o, unifier));
}
Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(o2Meta));
for (PTOperator ptOperator : plan.getOperators(o2Meta)) {
expDeploy.addAll(ptOperator.upstreamMerge.values());
}
expDeploy.removeAll(unChangedOps);
Assert.assertEquals("number of containers", 4 + i, plan.getContainers().size());
Assert.assertEquals("undeployed operators" + ctx.undeploy, expUndeploy, ctx.undeploy);
Assert.assertEquals("deployed operators" + ctx.deploy, expDeploy, ctx.deploy);
}
// scale down M to 1
{
Set<PTOperator> expUndeploy = Sets.newHashSet();
Set<PTOperator> expDeploy = Sets.newHashSet();
for (PTOperator o2p : plan.getOperators(o2Meta)) {
expUndeploy.addAll(o2p.upstreamMerge.values());
expUndeploy.add(o2p);
expDeploy.add(o2p);
}
for (PTOperator o1p : plan.getOperators(o1Meta)) {
expUndeploy.add(o1p);
PartitioningTest.PartitionLoadWatch.put(o1p, -1);
plan.onStatusUpdate(o1p);
}
Assert.assertEquals("repartition event", 1, ctx.events.size());
ctx.events.remove(0).run();
Assert.assertEquals("M partitions after scale down " + o1Meta, 1, plan.getOperators(o1Meta).size());
expUndeploy.removeAll(plan.getOperators(o1Meta));
for (PTOperator o2p : plan.getOperators(o2Meta)) {
Assert.assertTrue("merge unifier " + o2p + " " + o2p.upstreamMerge, o2p.upstreamMerge.isEmpty());
}
Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
Assert.assertEquals("deploy", expDeploy, ctx.deploy);
}
// scale up M to 2
Assert.assertEquals("M partitions " + o1Meta, 1, plan.getOperators(o1Meta).size());
{
Set<PTOperator> expUndeploy = Sets.newHashSet();
Set<PTOperator> expDeploy = Sets.newHashSet();
for (PTOperator o1p : plan.getOperators(o1Meta)) {
PartitioningTest.PartitionLoadWatch.put(o1p, 1);
plan.onStatusUpdate(o1p);
}
Assert.assertEquals("repartition event", 1, ctx.events.size());
o1Partitioner.extraPartitions.add(new DefaultPartition<>(o1));
ctx.events.remove(0).run();
o1Partitioner.extraPartitions.clear();
List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
Assert.assertEquals("M partitions after scale up " + o1Meta, 2, o1Partitions.size());
expDeploy.add(o1Partitions.get(1)); // previous partition unchanged
for (PTOperator o1p : o1Partitions) {
Assert.assertEquals("outputs " + o1p, 1, o1p.getOutputs().size());
Assert.assertEquals("sinks " + o1p, o2Partitions.size(), o1p.getOutputs().get(0).sinks.size());
}
for (PTOperator o2p : plan.getOperators(o2Meta)) {
expUndeploy.add(o2p);
expDeploy.add(o2p);
Assert.assertEquals("merge unifier " + o2p + " " + o2p.upstreamMerge, 1, o2p.upstreamMerge.size());
expDeploy.addAll(o2p.upstreamMerge.values());
}
Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
Assert.assertEquals("deploy", expDeploy, ctx.deploy);
}
}
/**
* MxN partitioning. When source and sink of a stream are partitioned, a
* separate unifier is created container local with each downstream partition.
*/
@Test
public void testSingleFinalMxNPartitioning()
{
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(2));
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
dag.setOutputPortAttribute(o1.outport, PortContext.UNIFIER_SINGLE_FINAL, true);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<TestGeneratorInputOperator>(3));
dag.setOperatorAttribute(o2, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
OperatorMeta o2Meta = dag.getMeta(o2);
dag.addStream("o1.outport1", o1.outport, o2.inport1);
int maxContainers = 10;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 6, plan.getContainers().size());
List<PTOperator> inputOperators = new ArrayList<>();
for (int i = 0; i < 2; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertEquals("operators " + container, o1Meta.getName(), container.getOperators().get(0).getOperatorMeta().getName());
inputOperators.add(container.getOperators().get(0));
}
PTOperator inputUnifier = null;
{
PTContainer container = plan.getContainers().get(2);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
PTOperator pUnifier = container.getOperators().get(0);
Assert.assertEquals("operators " + container, o1Meta.getMeta(o1.outport).getUnifierMeta().getName(), pUnifier.getOperatorMeta().getName());
Assert.assertTrue("single unifier " + pUnifier, pUnifier.isUnifier());
Assert.assertEquals("" + pUnifier, 2, pUnifier.getInputs().size());
for (int inputIndex = 0; inputIndex < pUnifier.getInputs().size(); inputIndex++) {
PTInput input = pUnifier.getInputs().get(inputIndex);
Assert.assertEquals("source port name " + pUnifier, "outport", input.source.portName);
Assert.assertEquals("" + pUnifier, inputOperators.get(inputIndex), input.source.source);
Assert.assertEquals("partition keys " + input.partitions, null, input.partitions);
}
Assert.assertEquals("number outputs " + pUnifier, 1, pUnifier.getOutputs().size());
PTOutput output = pUnifier.getOutputs().get(0);
Assert.assertEquals("number inputs " + output, 3, output.sinks.size());
for (int inputIndex = 0; inputIndex < output.sinks.size(); ++inputIndex) {
Assert.assertEquals("output sink " + output, o2Meta.getName(), output.sinks.get(inputIndex).target.getName());
Assert.assertEquals("destination port name " + output, GenericTestOperator.IPORT1, output.sinks.get(inputIndex).portName);
}
inputUnifier = pUnifier;
}
List<Integer> partitionKeySizes = new ArrayList<>();
for (int i = 3; i < 6; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertEquals("operators " + container, o2Meta.getName(), container.getOperators().get(0).getOperatorMeta().getName());
PTOperator operator = container.getOperators().get(0);
Assert.assertEquals("operators " + container, o2Meta.getName(), operator.getOperatorMeta().getName());
Assert.assertEquals("number inputs " + operator, 1, operator.getInputs().size());
PTInput input = operator.getInputs().get(0);
Assert.assertEquals("" + operator, inputUnifier, input.source.source);
Assert.assertNotNull("input partitions " + operator, input.partitions);
partitionKeySizes.add(input.partitions.partitions.size());
}
Assert.assertEquals("input partition sizes count", 3, partitionKeySizes.size());
Collections.sort(partitionKeySizes);
Assert.assertEquals("input partition sizes", Arrays.asList(1, 1, 2), partitionKeySizes);
// Test Dynamic change
// for M x N partition
// scale down N from 3 to 2 and then from 2 to 1
for (int i = 0; i < 2; i++) {
List<PTOperator> ptos = plan.getOperators(o2Meta);
Set<PTOperator> expUndeploy = Sets.newHashSet(ptos);
for (PTOperator ptOperator : ptos) {
//expUndeploy.addAll(ptOperator.upstreamMerge.values());
expUndeploy.add(ptOperator);
PartitioningTest.PartitionLoadWatch.put(ptOperator, -1);
plan.onStatusUpdate(ptOperator);
}
ctx.backupRequests = 0;
ctx.events.remove(0).run();
Assert.assertEquals("single unifier ", 1, plan.getMergeOperators(o1Meta).size());
Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(o2Meta));
// The unifier and o2 operators are expected to be deployed because of partition key changes
for (PTOperator ptOperator : plan.getOperators(o2Meta)) {
expDeploy.add(ptOperator);
}
// from 3 to 2 the containers decrease from 5 to 4, but from 2 to 1 the container remains same because single unifier are not inline with single operator partition
Assert.assertEquals("number of containers", 5 - i, plan.getContainers().size());
Assert.assertEquals("number of operators", 2 - i, plan.getOperators(o2Meta).size());
Assert.assertEquals("undeployed operators " + ctx.undeploy, expUndeploy, ctx.undeploy);
Assert.assertEquals("deployed operators " + ctx.deploy, expDeploy, ctx.deploy);
}
// scale up N from 1 to 2 and then from 2 to 3
for (int i = 0; i < 2; i++) {
List<PTOperator> unChangedOps = new LinkedList<>(plan.getOperators(o2Meta));
PTOperator o2p1 = unChangedOps.remove(0);
Set<PTOperator> expUndeploy = Sets.newHashSet(o2p1);
PartitioningTest.PartitionLoadWatch.put(o2p1, 1);
plan.onStatusUpdate(o2p1);
Assert.assertEquals("repartition event", 1, ctx.events.size());
ctx.backupRequests = 0;
ctx.events.remove(0).run();
Assert.assertEquals("single unifier ", 1, plan.getMergeOperators(o1Meta).size());
Assert.assertEquals("N partitions after scale up " + o2Meta, 2 + i, plan.getOperators(o2Meta).size());
for (PTOperator o : plan.getOperators(o2Meta)) {
Assert.assertNotNull(o.container);
Assert.assertEquals("number operators ", 1, o.container.getOperators().size());
}
Set<PTOperator> expDeploy = Sets.newHashSet(plan.getOperators(o2Meta));
expDeploy.removeAll(unChangedOps);
Assert.assertEquals("number of containers", 5 + i, plan.getContainers().size());
Assert.assertEquals("undeployed operators" + ctx.undeploy, expUndeploy, ctx.undeploy);
Assert.assertEquals("deployed operators" + ctx.deploy, expDeploy, ctx.deploy);
}
// scale down M to 1
{
Set<PTOperator> expUndeploy = Sets.newHashSet();
Set<PTOperator> expDeploy = Sets.newHashSet();
expUndeploy.addAll(plan.getMergeOperators(o1Meta));
for (PTOperator o2p : plan.getOperators(o2Meta)) {
expUndeploy.add(o2p);
expDeploy.add(o2p);
}
for (PTOperator o1p : plan.getOperators(o1Meta)) {
expUndeploy.add(o1p);
PartitioningTest.PartitionLoadWatch.put(o1p, -1);
plan.onStatusUpdate(o1p);
}
Assert.assertEquals("repartition event", 1, ctx.events.size());
ctx.events.remove(0).run();
Assert.assertEquals("M partitions after scale down " + o1Meta, 1, plan.getOperators(o1Meta).size());
expUndeploy.removeAll(plan.getOperators(o1Meta));
Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
Assert.assertEquals("deploy", expDeploy, ctx.deploy);
}
// scale up M to 2
Assert.assertEquals("M partitions " + o1Meta, 1, plan.getOperators(o1Meta).size());
{
Set<PTOperator> expUndeploy = Sets.newHashSet();
Set<PTOperator> expDeploy = Sets.newHashSet();
for (PTOperator o1p : plan.getOperators(o1Meta)) {
expUndeploy.add(o1p);
PartitioningTest.PartitionLoadWatch.put(o1p, 1);
plan.onStatusUpdate(o1p);
}
Assert.assertEquals("repartition event", 1, ctx.events.size());
ctx.events.remove(0).run();
Assert.assertEquals("M partitions after scale up " + o1Meta, 2, plan.getOperators(o1Meta).size());
expDeploy.addAll(plan.getOperators(o1Meta));
expDeploy.addAll(plan.getMergeOperators(o1Meta));
for (PTOperator o2p : plan.getOperators(o2Meta)) {
expUndeploy.add(o2p);
expDeploy.add(o2p);
Assert.assertNotNull(o2p.container);
Assert.assertEquals("number operators ", 1, o2p.container.getOperators().size());
}
Assert.assertEquals("undeploy", expUndeploy, ctx.undeploy);
Assert.assertEquals("deploy", expDeploy, ctx.deploy);
}
}
/**
* Test covering scenario when only new partitions are added during dynamic partitioning and there
* are no changes to existing partitions and partition mapping
*/
@Test
public void testAugmentedDynamicPartitioning()
{
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new TestAugmentingPartitioner<TestGeneratorInputOperator>(3));
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)new PartitioningTest.PartitionLoadWatch()));
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
OperatorMeta o2Meta = dag.getMeta(o2);
dag.addStream("o1.outport1", o1.outport, o2.inport1);
int maxContainers = 10;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 4, plan.getContainers().size());
List<PTOperator> o1ops = plan.getOperators(o1Meta);
Assert.assertEquals("number of o1 operators", 3, o1ops.size());
List<PTOperator> o2ops = plan.getOperators(o2Meta);
Assert.assertEquals("number of o2 operators", 1, o2ops.size());
Set<PTOperator> expUndeploy = Sets.newLinkedHashSet();
expUndeploy.addAll(plan.getOperators(o2Meta));
expUndeploy.add(plan.getOperators(o2Meta).get(0).upstreamMerge.values().iterator().next());
for (int i = 0; i < 2; ++i) {
PartitioningTest.PartitionLoadWatch.put(o1ops.get(i), 1);
plan.onStatusUpdate(o1ops.get(i));
}
ctx.backupRequests = 0;
ctx.events.remove(0).run();
Assert.assertEquals("number of containers", 6, plan.getContainers().size());
Assert.assertEquals("undeployed opertors", expUndeploy, ctx.undeploy);
}
private class TestAugmentingPartitioner<T> implements Partitioner<T>
{
int initalPartitionCount = 1;
private TestAugmentingPartitioner(int initialPartitionCount)
{
this.initalPartitionCount = initialPartitionCount;
}
@Override
public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context)
{
Collection<Partition<T>> newPartitions = Lists.newArrayList(partitions);
int numTotal = partitions.size();
Partition<T> first = partitions.iterator().next();
if (first.getStats() == null) {
// Initial partition
numTotal = initalPartitionCount;
} else {
for (Partition<T> p : partitions) {
// Assumption load is non-negative
numTotal += p.getLoad();
}
}
T paritionable = first.getPartitionedInstance();
for (int i = partitions.size(); i < numTotal; ++i) {
newPartitions.add(new DefaultPartition<>(paritionable));
}
return newPartitions;
}
@Override
public void partitioned(Map<Integer, Partition<T>> partitions)
{
}
}
@Test
public void testCascadingUnifier()
{
LogicalPlan dag = new LogicalPlan();
//TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
PartitioningTestOperator o1 = dag.addOperator("o1", PartitioningTestOperator.class);
o1.partitionKeys = new Integer[] {0,1,2,3};
o1.setPartitionCount(o1.partitionKeys.length);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_LIMIT, 2);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
OperatorMeta o2Meta = dag.getMeta(o2);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 9, plan.getContainers().size());
List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1Meta, 4, o1Partitions.size());
Assert.assertEquals("partitioned map " + o1.partitions, 4, o1.partitions.size());
List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
Assert.assertEquals("partitions " + o1Meta, 3, o2Partitions.size());
for (PTOperator o : o1Partitions) {
Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
for (PTOutput out : o.getOutputs()) {
Assert.assertEquals("sinks " + out, 1, out.sinks.size());
}
Assert.assertNotNull("container " + o, o.getContainer());
}
List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
Assert.assertEquals("o1Unifiers " + o1Meta, 2, o1Unifiers.size()); // 2 cascadingUnifiers to per-downstream partition unifier(s)
for (PTOperator o : o1Unifiers) {
Assert.assertEquals("inputs " + o, 2, o.getInputs().size());
Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
for (PTOutput out : o.getOutputs()) {
Assert.assertEquals("sinks " + out, 3, out.sinks.size());
for (PTInput in : out.sinks) {
// MxN unifier
Assert.assertTrue(in.target.isUnifier());
Assert.assertEquals(1, in.target.getOutputs().get(0).sinks.size());
}
}
Assert.assertNotNull("container " + o, o.getContainer());
}
for (int i = 0; i < 4; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertTrue(o1Partitions.contains(container.getOperators().get(0)));
}
for (int i = 4; i < 6; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertTrue(o1Unifiers.contains(container.getOperators().get(0)));
}
for (int i = 6; i < 9; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 2, container.getOperators().size());
Assert.assertTrue(o2Partitions.contains(container.getOperators().get(0)));
}
PTOperator p1 = o1Partitions.get(0);
StatsListener l = p1.statsListeners.get(0);
Assert.assertTrue("stats handlers " + p1.statsListeners, l instanceof PartitioningTest.PartitionLoadWatch);
PartitioningTest.PartitionLoadWatch.put(p1, 1);
plan.onStatusUpdate(p1);
Assert.assertEquals("partition scaling triggered", 1, ctx.events.size());
o1.partitionKeys = new Integer[] {0,1,2,3,4};
ctx.events.remove(0).run();
o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1Meta, 5, o1Partitions.size());
Assert.assertEquals("partitioned map " + o1.partitions, 5, o1.partitions.size());
o1Unifiers = plan.getMergeOperators(o1Meta);
Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 3(l1)x2(l2)
for (PTOperator o : o1Unifiers) {
Assert.assertNotNull("container null: " + o, o.getContainer());
}
}
@Test
public void testSingleFinalCascadingUnifier()
{
LogicalPlan dag = new LogicalPlan();
//TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
PartitioningTestOperator o1 = dag.addOperator("o1", PartitioningTestOperator.class);
o1.partitionKeys = new Integer[]{0, 1, 2, 3};
o1.setPartitionCount(3);
dag.setOperatorAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_LIMIT, 2);
dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_SINGLE_FINAL, true);
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
OperatorMeta o2Meta = dag.getMeta(o2);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 12);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 10, plan.getContainers().size());
List<PTOperator> o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1Meta, 4, o1Partitions.size());
Assert.assertEquals("partitioned map " + o1.partitions, 4, o1.partitions.size());
List<PTOperator> o2Partitions = plan.getOperators(o2Meta);
Assert.assertEquals("partitions " + o1Meta, 3, o2Partitions.size());
for (PTOperator o : o1Partitions) {
Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
for (PTOutput out : o.getOutputs()) {
Assert.assertEquals("sinks " + out, 1, out.sinks.size());
}
Assert.assertNotNull("container " + o, o.getContainer());
}
List<PTOperator> o1Unifiers = plan.getMergeOperators(o1Meta);
Assert.assertEquals("o1Unifiers " + o1Meta, 3, o1Unifiers.size()); // 2 cascadingUnifiers and one-downstream partition unifier
List<PTOperator> finalUnifiers = new ArrayList<>();
for (PTOperator o : o1Unifiers) {
Assert.assertEquals("inputs " + o, 2, o.getInputs().size());
Assert.assertEquals("outputs " + o, 1, o.getOutputs().size());
List<PTInput> sinks = o.getOutputs().get(0).sinks;
boolean finalUnifier = sinks.size() > 0 ? (sinks.get(0).target.getOperatorMeta() == o2Meta) : false;
if (!finalUnifier) {
for (PTOutput out : o.getOutputs()) {
Assert.assertEquals("sinks " + out, 1, out.sinks.size());
Assert.assertTrue(out.sinks.get(0).target.isUnifier());
}
} else {
for (PTOutput out : o.getOutputs()) {
Assert.assertEquals("sinks " + out, 3, out.sinks.size());
for (PTInput in : out.sinks) {
Assert.assertFalse(in.target.isUnifier());
}
}
finalUnifiers.add(o);
}
Assert.assertNotNull("container " + o, o.getContainer());
}
Assert.assertEquals("o1 final unifiers", 1, finalUnifiers.size());
for (int i = 0; i < 4; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertTrue(o1Partitions.contains(container.getOperators().get(0)));
}
for (int i = 4; i < 7; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertTrue(o1Unifiers.contains(container.getOperators().get(0)));
}
for (int i = 7; i < 10; i++) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("number operators " + container, 1, container.getOperators().size());
Assert.assertTrue(o2Partitions.contains(container.getOperators().get(0)));
}
PTOperator p1 = o1Partitions.get(0);
StatsListener l = p1.statsListeners.get(0);
Assert.assertTrue("stats handlers " + p1.statsListeners, l instanceof PartitioningTest.PartitionLoadWatch);
PartitioningTest.PartitionLoadWatch.put(p1, 1);
plan.onStatusUpdate(p1);
Assert.assertEquals("partition scaling triggered", 1, ctx.events.size());
o1.partitionKeys = new Integer[] {0,1,2,3,4};
ctx.events.remove(0).run();
o1Partitions = plan.getOperators(o1Meta);
Assert.assertEquals("partitions " + o1Meta, 5, o1Partitions.size());
Assert.assertEquals("partitioned map " + o1.partitions, 5, o1.partitions.size());
o1Unifiers = plan.getMergeOperators(o1Meta);
Assert.assertEquals("o1Unifiers " + o1Meta, 4, o1Unifiers.size()); // 3(l1)x2(l2)
for (PTOperator o : o1Unifiers) {
Assert.assertNotNull("container null: " + o, o.getContainer());
}
}
@Test
public void testSingleFinalUnifierInputOverride()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, true);
OperatorMeta o2Meta = dag.getMeta(o2);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 6, plan.getContainers().size());
Assert.assertEquals("o1 merge unifiers", 1, plan.getMergeOperators(o1Meta).size());
dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_SINGLE_FINAL, false);
ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 6, plan.getContainers().size());
Assert.assertEquals("o1 merge unifiers", 1, plan.getMergeOperators(o1Meta).size());
dag.setOutputPortAttribute(o1.outport1, PortContext.UNIFIER_SINGLE_FINAL, true);
dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, false);
ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 5, plan.getContainers().size());
Set<String> expectedNames = Sets.newHashSet(o1Meta.getMeta(o1.outport1).getUnifierMeta().getName(), o2Meta.getName());
for (int i = 3; i < 5; ++i) {
PTContainer container = plan.getContainers().get(i);
Assert.assertEquals("o2 container size", 2, container.getOperators().size());
Set<String> names = Sets.newHashSet();
for (PTOperator operator : container.getOperators()) {
names.add(operator.getOperatorMeta().getName());
}
Assert.assertEquals("o2 container operators", expectedNames, names);
}
}
@Test
public void testSingleFinalUnifierMultiInput()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
OperatorMeta o1Meta = dag.getMeta(o1);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(4));
dag.setInputPortAttribute(o2.inport1, PortContext.UNIFIER_SINGLE_FINAL, true);
OperatorMeta o2Meta = dag.getMeta(o2);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o3, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
OperatorMeta o3Meta = dag.getMeta(o3);
dag.addStream("o1o2o3", o1.outport1, o2.inport1, o3.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 12);
TestPlanContext ctx = new TestPlanContext();
dag.setAttribute(OperatorContext.STORAGE_AGENT, ctx);
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
Assert.assertEquals("number of containers", 10, plan.getContainers().size());
Assert.assertEquals("o1 merge unifiers", 1, plan.getMergeOperators(o1Meta).size());
// Check the merge unifier
{
PTContainer container = plan.getContainers().get(3);
Assert.assertEquals("number of operators " + container, 1, container.getOperators().size());
PTOperator operator = container.getOperators().get(0);
Assert.assertTrue("unifier check " + operator, operator.isUnifier());
Assert.assertEquals("operator meta " + operator, o1Meta.getMeta(o1.outport1).getUnifierMeta(), operator.getOperatorMeta());
}
int numberO2Containers = 0;
int numberO3Containers = 0;
Set<String> expectedNames = Sets.newHashSet(o1Meta.getMeta(o1.outport1).getUnifierMeta().getName(), o3Meta.getName());
for (int i = 4; i < 10; i++) {
PTContainer container = plan.getContainers().get(i);
List<PTOperator> operators = container.getOperators();
Assert.assertTrue("expected operator count " + container, (operators.size() <= 2) && (operators.size() > 0));
if (operators.size() == 1) {
Assert.assertEquals("operator in container " + container, o2Meta, operators.get(0).getOperatorMeta());
++numberO2Containers;
} else if (operators.size() == 2) {
Set<String> names = Sets.newHashSet();
for (PTOperator operator : container.getOperators()) {
names.add(operator.getOperatorMeta().getName());
}
Assert.assertEquals("container operators " + container, expectedNames, names);
++numberO3Containers;
}
}
Assert.assertEquals("number o2 containers", 4, numberO2Containers);
Assert.assertEquals("number o3 containers", 2, numberO3Containers);
}
@Test
public void testContainerSize()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o1,OperatorContext.VCORES,1);
dag.setOperatorAttribute(o2,OperatorContext.VCORES,2);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o2.outport1", o2.outport1, o3.inport1);
dag.setOperatorAttribute(o2, OperatorContext.MEMORY_MB, 4000);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 2, plan.getContainers().size());
Assert.assertEquals("memory container 1", 2560, plan.getContainers().get(0).getRequiredMemoryMB());
Assert.assertEquals("vcores container 1", 1, plan.getContainers().get(0).getRequiredVCores());
Assert.assertEquals("memory container 2", 4512, plan.getContainers().get(1).getRequiredMemoryMB());
Assert.assertEquals("vcores container 2", 2, plan.getContainers().get(1).getRequiredVCores());
Assert.assertEquals("number of operators in container 1", 2, plan.getContainers().get(0).getOperators().size());
}
@Test
public void testContainerCores()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
GenericTestOperator o5 = dag.addOperator("o5", GenericTestOperator.class);
GenericTestOperator o6 = dag.addOperator("o6", GenericTestOperator.class);
dag.setOperatorAttribute(o1,OperatorContext.VCORES,1);
dag.setOperatorAttribute(o2,OperatorContext.VCORES,2);
dag.setOperatorAttribute(o3,OperatorContext.VCORES,3);
dag.setOperatorAttribute(o4,OperatorContext.VCORES,4);
dag.setOperatorAttribute(o5,OperatorContext.VCORES,5);
dag.setOperatorAttribute(o6,OperatorContext.VCORES,6);
dag.addStream("o1.outport1", o1.outport1, o2.inport1).setLocality(Locality.CONTAINER_LOCAL);
dag.addStream("o2.outport1", o2.outport1, o3.inport1, o4.inport1).setLocality(Locality.THREAD_LOCAL);
dag.addStream("o3.output1", o3.outport1, o5.inport1).setLocality(Locality.THREAD_LOCAL);
dag.addStream("o4.output1", o4.outport1, o5.inport2).setLocality(Locality.THREAD_LOCAL);
dag.addStream("o5.output1", o5.outport1, o6.inport1).setLocality(Locality.CONTAINER_LOCAL);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 1, plan.getContainers().size());
Assert.assertEquals("vcores container 1 is 12", 12, plan.getContainers().get(0).getRequiredVCores());
}
@Test
public void testContainerSizeWithPartitioning()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3));
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 10);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 5, plan.getContainers().size());
PTContainer container;
for (int i = 0; i < 5; i++) {
container = plan.getContainers().get(i);
if (container.getOperators().size() == 1) {
Assert.assertEquals("container memory is 1536 for container :" + container, 1536, container.getRequiredMemoryMB());
}
if (container.getOperators().size() == 2) {
Assert.assertEquals("container memory is 2048 for container :" + container, 2048, container.getRequiredMemoryMB());
}
}
}
private class TestPartitioner<T extends Operator> extends StatelessPartitioner<T>
{
private static final long serialVersionUID = 1L;
final List<Partition<T>> extraPartitions = Lists.newArrayList();
@Override
public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context)
{
if (!extraPartitions.isEmpty()) {
partitions.addAll(extraPartitions);
return partitions;
}
Collection<Partition<T>> newPartitions = super.definePartitions(partitions, context);
if (context.getParallelPartitionCount() > 0 && newPartitions.size() < context.getParallelPartitionCount()) {
// parallel partitioned, fill to requested count
for (int i = newPartitions.size(); i < context.getParallelPartitionCount(); i++) {
newPartitions.add(new DefaultPartition<>(partitions.iterator().next().getPartitionedInstance()));
}
}
return newPartitions;
}
}
@Test
public void testDefaultPartitionerWithParallel() throws InterruptedException
{
final MutableInt loadInd = new MutableInt();
StatsListener listener = new StatsListener()
{
@Override
public Response processStats(BatchedOperatorStats stats)
{
Response response = new Response();
response.repartitionRequired = true;
response.loadIndicator = loadInd.intValue();
return response;
}
};
LogicalPlan dag = new LogicalPlan();
GenericTestOperator nodeX = dag.addOperator("X", GenericTestOperator.class);
dag.setOperatorAttribute(nodeX, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.setOperatorAttribute(nodeX, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(listener));
GenericTestOperator nodeY = dag.addOperator("Y", GenericTestOperator.class);
dag.setOperatorAttribute(nodeY, Context.OperatorContext.PARTITIONER, new TestPartitioner<GenericTestOperator>());
GenericTestOperator nodeZ = dag.addOperator("Z", GenericTestOperator.class);
dag.addStream("Stream1", nodeX.outport1, nodeY.inport1, nodeZ.inport1);
dag.addStream("Stream2", nodeX.outport2, nodeY.inport2, nodeZ.inport2);
dag.setInputPortAttribute(nodeY.inport1, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(nodeY.inport2, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(nodeZ.inport1, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(nodeZ.inport2, Context.PortContext.PARTITION_PARALLEL, true);
StramTestSupport.MemoryStorageAgent msa = new StramTestSupport.MemoryStorageAgent();
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, msa);
TestPlanContext ctx = new TestPlanContext();
PhysicalPlan plan = new PhysicalPlan(dag, ctx);
LogicalPlan.OperatorMeta metaOfX = dag.getMeta(nodeX);
LogicalPlan.OperatorMeta metaOfY = dag.getMeta(nodeY);
Assert.assertEquals("number operators " + metaOfX.getName(), 2, plan.getOperators(metaOfX).size());
Assert.assertEquals("number operators " + metaOfY.getName(), 2, plan.getOperators(metaOfY).size());
List<PTOperator> ptOfX = plan.getOperators(metaOfX);
for (PTOperator physicalX : ptOfX) {
Assert.assertEquals("2 streams " + physicalX.getOutputs(), 2, physicalX.getOutputs().size());
for (PTOutput outputPort : physicalX.getOutputs()) {
Set<PTOperator> dopers = Sets.newHashSet();
Assert.assertEquals("sink of " + metaOfX.getName() + " id " + physicalX.id + " port " + outputPort.portName,
2, outputPort.sinks.size());
for (PTInput inputPort : outputPort.sinks) {
dopers.add(inputPort.target);
}
Assert.assertEquals(2, dopers.size());
}
}
//Invoke redo-partition of PhysicalPlan, no partition change
loadInd.setValue(0);
for (PTOperator ptOperator : ptOfX) {
plan.onStatusUpdate(ptOperator);
}
ctx.events.remove(0).run();
for (PTOperator physicalX : ptOfX) {
Assert.assertEquals("2 streams " + physicalX.getOutputs(), 2, physicalX.getOutputs().size());
for (PTOutput outputPort : physicalX.getOutputs()) {
Set<PTOperator> dopers = Sets.newHashSet();
Assert.assertEquals("sink of " + metaOfX.getName() + " id " + physicalX.id + " port " + outputPort.portName,
2, outputPort.sinks.size());
for (PTInput inputPort : outputPort.sinks) {
dopers.add(inputPort.target);
}
Assert.assertEquals(2, dopers.size());
}
}
//scale up by splitting first partition
loadInd.setValue(1);
plan.onStatusUpdate(ptOfX.get(0));
ctx.events.get(0).run();
List<PTOperator> ptOfXScaleUp = plan.getOperators(metaOfX);
Assert.assertEquals("3 partitons " + ptOfXScaleUp, 3, ptOfXScaleUp.size());
for (PTOperator physicalX : ptOfXScaleUp) {
Assert.assertEquals("2 streams " + physicalX.getOutputs(), 2, physicalX.getOutputs().size());
for (PTOutput outputPort : physicalX.getOutputs()) {
Set<PTOperator> dopers = Sets.newHashSet();
Assert.assertEquals("sink of " + metaOfX.getName() + " id " + physicalX.id + " port " + outputPort.portName,
2, outputPort.sinks.size());
for (PTInput inputPort : outputPort.sinks) {
dopers.add(inputPort.target);
}
Assert.assertEquals(2, dopers.size());
}
}
}
@Test
public void testContainersForSlidingWindow()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 2000);
dag.getOperatorMeta("o1").getMeta(o1.outport2).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 4000);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o1.outport2", o1.outport2, o2.inport2);
dag.addStream("o2.outport1", o2.outport1, o3.inport1);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 5, plan.getContainers().size());
boolean sawOutput1Slider = false;
boolean sawOutput2Slider = false;
for (PTContainer container : plan.getContainers()) {
Assert.assertEquals("number of operators in each container is 1", container.operators.size(), 1);
if (container.operators.get(0).isUnifier()) {
String name = container.operators.get(0).getName();
if (name.equals("o1.outport1#slider")) {
sawOutput1Slider = true;
Assert.assertEquals("container memory is 2512", container.getRequiredMemoryMB(), 2512);
} else if (name.equals("o1.outport2#slider")) {
sawOutput2Slider = true;
Assert.assertEquals("container memory is 2512", container.getRequiredMemoryMB(), 4512);
}
}
}
Assert.assertEquals("Found output1 slider", true, sawOutput1Slider);
Assert.assertEquals("Found output2 slider", true, sawOutput2Slider);
}
@Test
public void testMxNPartitionForSlidingWindow()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
dag.getOperatorMeta("o1").getMeta(o1.outport1).getUnifierMeta().getAttributes().put(OperatorContext.MEMORY_MB, 1024);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
dag.setOperatorAttribute(o2, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
dag.setOperatorAttribute(o2, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o2.outport1", o2.outport1, o3.inport1);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 9, plan.getContainers().size());
}
@Test
public void testParallelPartitionForSlidingWindow()
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
dag.setOperatorAttribute(o1, OperatorContext.SLIDE_BY_WINDOW_COUNT, 2);
dag.setOperatorAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
dag.setInputPortAttribute(o2.inport1, PortContext.PARTITION_PARALLEL, true);
dag.setOperatorAttribute(o1, OperatorContext.APPLICATION_WINDOW_COUNT, 4);
dag.addStream("o1.outport1", o1.outport1, o2.inport1);
dag.addStream("o2.outport1", o2.outport1, o3.inport1);
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 7, plan.getContainers().size());
}
}