blob: f0199f90ae24927ec7dff7c57a33d4488cd91923 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.WaitCondition;
import static java.lang.Thread.sleep;
public class PartitioningTest
{
private static final Logger LOG = LoggerFactory.getLogger(PartitioningTest.class);
private static final File TEST_OUTPUT_DIR = new File("target", PartitioningTest.class.getName());
@Before
public void setup() throws IOException
{
StreamingContainer.eventloop.start();
}
@After
public void teardown()
{
StreamingContainer.eventloop.stop();
}
public static class CollectorOperator extends BaseOperator
{
/*
* Received tuples are stored in a map keyed with the system assigned operator id.
*/
public static final ConcurrentHashMap<String, List<Object>> receivedTuples = new ConcurrentHashMap<>();
private transient int operatorId;
public String prefix = "";
@Override
public void setup(OperatorContext context)
{
this.operatorId = context.getId();
}
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
LOG.debug("===received tuple {}===", tuple);
Assert.assertNotNull(CollectorOperator.this.operatorId);
String id = prefix + CollectorOperator.this.operatorId;
synchronized (receivedTuples) {
List<Object> l = receivedTuples.get(id);
if (l == null) {
l = Collections.synchronizedList(new ArrayList<>());
//LOG.debug("adding {} {}", id, l);
receivedTuples.put(id, l);
}
l.add(tuple);
}
if (output.isConnected()) {
output.emit(tuple);
}
}
};
@OutputPortFieldAnnotation( optional = true)
public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
}
public static class TestInputOperator<T> extends BaseOperator implements InputOperator
{
public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();
transient boolean first;
transient long windowId;
boolean blockEndStream = false;
/**
* Tuples to be emitted by the operator, with one entry per window.
*/
public List<List<T>> testTuples;
@Override
public void emitTuples()
{
if (testTuples == null || testTuples.isEmpty()) {
if (blockEndStream) {
return;
}
BaseOperator.shutdown();
}
if (first) {
List<T> tuples = testTuples.remove(0);
for (T t: tuples) {
output.emit(t);
LOG.debug("sent tuple ==={}===", t);
}
first = false;
}
}
@Override
public void beginWindow(long windowId)
{
this.windowId = windowId;
first = true;
}
}
@Test
public void testDefaultPartitioning() throws Exception
{
LogicalPlan dag = new LogicalPlan();
File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning");
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
Integer[][] testData = {
{4, 5}
};
CollectorOperator.receivedTuples.clear();
TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
input.testTuples = new ArrayList<>();
for (Integer[] tuples: testData) {
input.testTuples.add(new ArrayList<>(Arrays.asList(tuples)));
}
CollectorOperator collector = dag.addOperator("collector", new CollectorOperator());
collector.prefix = "" + System.identityHashCode(collector);
dag.getMeta(collector).getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<CollectorOperator>(2));
dag.addStream("fromInput", input.output, collector.input);
CollectorOperator merged = dag.addOperator("merged", new CollectorOperator());
merged.prefix = "" + System.identityHashCode(merged);
dag.addStream("toMerged", collector.output, merged.input);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.setHeartbeatMonitoringEnabled(false);
lc.run(); // terminates on end of stream
List<PTOperator> operators = lc.getPlanOperators(dag.getMeta(collector));
Assert.assertEquals("number operator instances " + operators, 2, operators.size());
// one entry for each partition + merged output
Assert.assertEquals("received tuples " + CollectorOperator.receivedTuples, 3, CollectorOperator.receivedTuples.size());
//Assert.assertEquals("received tuples " + operators.get(0), Arrays.asList(4), CollectorOperator.receivedTuples.get(collector.prefix + operators.get(0).getId()));
Assert.assertEquals("received tuples " + operators.get(1), Arrays.asList(5), CollectorOperator.receivedTuples.get(collector.prefix + operators.get(1).getId()));
PTOperator pmerged = lc.findByLogicalNode(dag.getMeta(merged));
List<Object> tuples = CollectorOperator.receivedTuples.get(merged.prefix + pmerged.getId());
Assert.assertNotNull("merged tuples " + pmerged, tuples);
Assert.assertEquals("merged tuples " + pmerged, Sets.newHashSet(testData[0]), Sets.newHashSet(tuples));
}
public static class PartitionLoadWatch implements StatsListener, java.io.Serializable
{
private static final long serialVersionUID = 1L;
private static final ThreadLocal<Map<Integer, Integer>> loadIndicators = new ThreadLocal<>();
@Override
public Response processStats(BatchedOperatorStats status)
{
Response hbr = null;
Map<Integer, Integer> m = loadIndicators.get();
if (m != null) {
Integer l = m.get(status.getOperatorId());
if (l != null) {
hbr = new Response();
hbr.repartitionRequired = true;
hbr.loadIndicator = l.intValue();
}
}
return hbr;
}
public static void put(PTOperator oper, int load)
{
Map<Integer, Integer> m = loadIndicators.get();
if (m == null) {
loadIndicators.set(m = new ConcurrentHashMap<>());
}
m.put(oper.getId(), load);
}
public static void remove(PTOperator oper)
{
loadIndicators.get().remove(oper.getId());
}
}
private static List<PTOperator> assertNumberPartitions(final int count, final StramLocalCluster lc, final LogicalPlan.OperatorMeta ow) throws Exception
{
WaitCondition c = new WaitCondition()
{
@Override
public boolean isComplete()
{
List<PTOperator> operators = lc.getPlanOperators(ow);
LOG.debug("Number of operators {}, expected number {}", operators.size(), count);
return (operators.size() == count);
}
};
StramTestSupport.awaitCompletion(c, 10000);
Assert.assertTrue("Number partitions match " + ow, c.isComplete());
return lc.getPlanOperators(ow);
}
//@Ignore
@Test
@SuppressWarnings("SleepWhileInLoop")
public void testDynamicDefaultPartitioning() throws Exception
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5);
File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning");
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
CollectorOperator.receivedTuples.clear();
TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>());
input.blockEndStream = true;
CollectorOperator collector = dag.addOperator("partitionedCollector", new CollectorOperator());
collector.prefix = "" + System.identityHashCode(collector);
dag.setOperatorAttribute(collector, OperatorContext.PARTITIONER, new StatelessPartitioner<CollectorOperator>(2));
dag.setOperatorAttribute(collector, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
dag.addStream("fromInput", input.output, collector.input);
CollectorOperator singleCollector = dag.addOperator("singleCollector", new CollectorOperator());
singleCollector.prefix = "" + System.identityHashCode(singleCollector);
dag.addStream("toSingleCollector", collector.output, singleCollector.input);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.setHeartbeatMonitoringEnabled(false);
lc.runAsync();
List<PTOperator> partitions = assertNumberPartitions(2, lc, dag.getMeta(collector));
Set<PTContainer> containers = Sets.newHashSet();
for (PTOperator oper : partitions) {
containers.add(oper.getContainer());
}
Assert.assertTrue("Number of containers are 4", 4 == lc.dnmgr.getPhysicalPlan().getContainers().size());
PTOperator splitPartition = partitions.get(0);
PartitionLoadWatch.put(splitPartition, 1);
LOG.debug("Triggered split for {}", splitPartition);
int count = 0;
long startMillis = System.currentTimeMillis();
while (count == 0 && startMillis > System.currentTimeMillis() - StramTestSupport.DEFAULT_TIMEOUT_MILLIS) {
sleep(20); // yield
count += lc.dnmgr.processEvents();
}
partitions = assertNumberPartitions(3, lc, dag.getMeta(collector));
Assert.assertTrue("container reused", lc.dnmgr.getPhysicalPlan().getContainers().containsAll(containers));
// check deployment
for (PTOperator p: partitions) {
StramTestSupport.waitForActivation(lc, p);
}
PartitionLoadWatch.remove(splitPartition);
//Assert.assertTrue("Number of containers are 5", 5 == lc.dnmgr.getPhysicalPlan().getContainers().size());
for (PTContainer container : lc.dnmgr.getPhysicalPlan().getContainers()) {
int memory = 0;
for (PTOperator operator : container.getOperators()) {
memory += operator.getBufferServerMemory();
memory += operator.getOperatorMeta().getValue(OperatorContext.MEMORY_MB);
}
Assert.assertEquals("memory", memory, container.getRequiredMemoryMB());
}
PTOperator planInput = lc.findByLogicalNode(dag.getMeta(input));
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, planInput);
Map<Integer, Node<?>> nodeMap = c.getNodes();
Assert.assertEquals("number operators " + nodeMap, 1, nodeMap.size());
@SuppressWarnings({"unchecked"})
TestInputOperator<Integer> inputDeployed = (TestInputOperator<Integer>)nodeMap.get(planInput.getId()).getOperator();
Assert.assertNotNull("" + nodeMap, inputDeployed);
// add tuple that matches the partition key and check that each partition receives it
ArrayList<Integer> inputTuples = new ArrayList<>();
LOG.debug("Number of partitions {}", partitions.size());
for (PTOperator p: partitions) {
// default partitioning has one port mapping with a single partition key
LOG.debug("Partition key map size: {}", p.getPartitionKeys().size());
inputTuples.add(p.getPartitionKeys().values().iterator().next().partitions.iterator().next());
}
inputDeployed.testTuples = Collections.synchronizedList(new ArrayList<List<Integer>>());
inputDeployed.testTuples.add(inputTuples);
for (PTOperator p: partitions) {
Integer expectedTuple = p.getPartitionKeys().values().iterator().next().partitions.iterator().next();
List<Object> receivedTuples;
int i = 0;
while ((receivedTuples = CollectorOperator.receivedTuples.get(collector.prefix + p.getId())) == null || receivedTuples.isEmpty()) {
if (i++ % 100 == 0) {
LOG.debug("Waiting for tuple: " + p);
}
sleep(10);
}
Assert.assertEquals("received " + p, Arrays.asList(expectedTuple), receivedTuples);
}
// single output operator to receive tuple from each partition
List<PTOperator> operators = lc.getPlanOperators(dag.getMeta(singleCollector));
Assert.assertEquals("number output operator instances " + operators, 1, operators.size());
StramTestSupport.waitForActivation(lc, operators.get(0)); // ensure redeploy
List<Object> receivedTuples;
while ((receivedTuples = CollectorOperator.receivedTuples.get(singleCollector.prefix + operators.get(0).getId())) == null || receivedTuples.size() < inputTuples.size()) {
LOG.debug("Waiting for tuple: " + operators.get(0) + " expected: " + inputTuples + " received: " + receivedTuples);
sleep(20);
}
Assert.assertEquals("output tuples " + receivedTuples, Sets.newHashSet(inputTuples), Sets.newHashSet(receivedTuples));
lc.shutdown();
}
public static class PartitionableInputOperator extends BaseOperator implements InputOperator, Partitioner<PartitionableInputOperator>
{
String partitionProperty = "partition";
@Override
public void emitTuples()
{
}
@Override
public Collection<Partition<PartitionableInputOperator>> definePartitions(Collection<Partition<PartitionableInputOperator>> partitions, PartitioningContext context)
{
List<Partition<PartitionableInputOperator>> newPartitions = new ArrayList<>(3);
Iterator<? extends Partition<PartitionableInputOperator>> iterator = partitions.iterator();
Partition<PartitionableInputOperator> templatePartition;
for (int i = 0; i < 3; i++) {
PartitionableInputOperator op = new PartitionableInputOperator();
if (iterator.hasNext()) {
templatePartition = iterator.next();
op.partitionProperty = templatePartition.getPartitionedInstance().partitionProperty;
}
op.partitionProperty += "_" + i;
newPartitions.add(new DefaultPartition<>(op));
}
return newPartitions;
}
@Override
public void partitioned(Map<Integer, Partition<PartitionableInputOperator>> partitions)
{
}
/**
* Tests input operator partitioning with state modification
*
* @throws Exception
*/
private void testInputOperatorPartitioning(LogicalPlan dag) throws Exception
{
File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning");
dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath());
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null));
PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator());
dag.setOperatorAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()}));
StramLocalCluster lc = new StramLocalCluster(dag);
lc.setHeartbeatMonitoringEnabled(false);
lc.runAsync();
List<PTOperator> partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
Set<String> partProperties = new HashSet<>();
for (PTOperator p : partitions) {
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
Map<Integer, Node<?>> nodeMap = c.getNodes();
Assert.assertEquals("number operators " + nodeMap, 1, nodeMap.size());
PartitionableInputOperator inputDeployed = (PartitionableInputOperator)nodeMap.get(p.getId()).getOperator();
Assert.assertNotNull(inputDeployed);
partProperties.add(inputDeployed.partitionProperty);
// move to checkpoint to verify that checkpoint state is updated upon repartition
Checkpoint checkpoint = new Checkpoint(10L, 0, 0);
p.checkpoints.add(checkpoint);
p.setRecoveryCheckpoint(checkpoint);
AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath(), null);
agent.save(inputDeployed, p.getId(), 10L);
agent.copyToHDFS(p.getId(), 10L);
}
Assert.assertEquals("", Sets.newHashSet("partition_0", "partition_1", "partition_2"), partProperties);
PartitionLoadWatch.put(partitions.get(0), 1);
int count = 0;
long startMillis = System.currentTimeMillis();
while (count == 0 && startMillis > System.currentTimeMillis() - StramTestSupport.DEFAULT_TIMEOUT_MILLIS) {
count += lc.dnmgr.processEvents();
}
PartitionLoadWatch.remove(partitions.get(0));
partitions = assertNumberPartitions(3, lc, dag.getMeta(input));
partProperties = new HashSet<>();
for (PTOperator p: partitions) {
LocalStreamingContainer c = StramTestSupport.waitForActivation(lc, p);
Map<Integer, Node<?>> nodeMap = c.getNodes();
Assert.assertEquals("number operators " + nodeMap, 1, nodeMap.size());
PartitionableInputOperator inputDeployed = (PartitionableInputOperator)nodeMap.get(p.getId()).getOperator();
Assert.assertNotNull(inputDeployed);
partProperties.add(inputDeployed.partitionProperty);
}
Assert.assertEquals("", Sets.newHashSet("partition_0_0", "partition_1_1", "partition_2_2"), partProperties);
lc.shutdown();
}
@Test
public void testInputOperatorPartitioningWithAsyncStorageAgent() throws Exception
{
LogicalPlan dag = new LogicalPlan();
testInputOperatorPartitioning(dag);
}
}
}