blob: d77b1ae32570b9dd41a21f170e7ea02becc9dd78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.common.partitioner;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Partitioner;
/**
* This is a simple partitioner which creates partitionCount number of clones of an operator.
*
* @param <T> The type of the operator
* @since 2.0.0
*/
public class StatelessPartitioner<T extends Operator> implements Partitioner<T>, Serializable
{
private static final Logger logger = LoggerFactory.getLogger(StatelessPartitioner.class);
private static final long serialVersionUID = 201411071710L;
/**
* The number of partitions for the default partitioner to create.
*/
@Min(1)
private int partitionCount = 1;
/**
* This creates a partitioner which creates only one partition.
*/
public StatelessPartitioner()
{
}
/**
* This constructor is used to create the partitioner from a property.
* @param value A string which is an integer of the number of partitions to create
*/
public StatelessPartitioner(String value)
{
this(Integer.parseInt(value));
}
/**
* This creates a partitioner which creates partitonCount partitions.
* @param partitionCount The number of partitions to create.
*/
public StatelessPartitioner(int partitionCount)
{
this.partitionCount = partitionCount;
}
/**
* This method sets the number of partitions for the StatelessPartitioner to create.
* @param partitionCount The number of partitions to create.
*/
public void setPartitionCount(int partitionCount)
{
this.partitionCount = partitionCount;
}
/**
* This method gets the number of partitions for the StatelessPartitioner to create.
* @return The number of partitions to create.
*/
public int getPartitionCount()
{
return partitionCount;
}
@Override
public Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context)
{
final int newPartitionCount = DefaultPartition.getRequiredPartitionCount(context, this.partitionCount);
logger.debug("define partitions, partitionCount current {} requested {}", partitions.size(), newPartitionCount);
//Get a partition
DefaultPartition<T> partition = (DefaultPartition<T>)partitions.iterator().next();
Collection<Partition<T>> newPartitions;
if (partitions.iterator().next().getStats() == null) {
// first call to define partitions
newPartitions = Lists.newArrayList();
for (int partitionCounter = 0; partitionCounter < newPartitionCount; partitionCounter++) {
newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance()));
}
// partition the stream that was first connected in the DAG and send full data to remaining input ports
// this gives control over which stream to partition under default partitioning to the DAG writer
List<InputPort<?>> inputPortList = context.getInputPorts();
if (inputPortList != null && !inputPortList.isEmpty()) {
DefaultPartition.assignPartitionKeys(newPartitions, inputPortList.iterator().next());
}
} else {
// define partitions is being called again
if (context.getParallelPartitionCount() != 0) {
newPartitions = repartitionParallel(partitions, context);
} else if (partition.getPartitionKeys().isEmpty()) {
newPartitions = repartitionInputOperator(partitions);
} else {
newPartitions = repartition(partitions);
}
}
logger.debug("new partition size {}", newPartitions.size());
return newPartitions;
}
@Override
public void partitioned(Map<Integer, Partition<T>> partitions)
{
//Do nothing
}
/**
* Change existing partitioning based on runtime state (load). Unlike
* implementations of {@link Partitioner}), decisions are made
* solely based on load indicator and operator state is not
* considered in the event of partition split or merge.
*
* @param partitions
* List of new partitions
* @return The new operators.
*/
public static <T extends Operator> Collection<Partition<T>> repartition(Collection<Partition<T>> partitions)
{
List<Partition<T>> newPartitions = new ArrayList<>();
HashMap<Integer, Partition<T>> lowLoadPartitions = new HashMap<>();
for (Partition<T> p: partitions) {
int load = p.getLoad();
if (load < 0) {
// combine neighboring underutilized partitions
PartitionKeys pks = p.getPartitionKeys().values().iterator().next(); // one port partitioned
for (int partitionKey: pks.partitions) {
// look for the sibling partition by excluding leading bit
int reducedMask = pks.mask >>> 1;
Partition<T> siblingPartition = lowLoadPartitions.remove(partitionKey & reducedMask);
if (siblingPartition == null) {
lowLoadPartitions.put(partitionKey & reducedMask, p);
} else {
// both of the partitions are low load, combine
PartitionKeys newPks = new PartitionKeys(reducedMask, Sets.newHashSet(partitionKey & reducedMask));
// put new value so the map gets marked as modified
InputPort<?> port = siblingPartition.getPartitionKeys().keySet().iterator().next();
siblingPartition.getPartitionKeys().put(port, newPks);
// add as new partition
newPartitions.add(siblingPartition);
//LOG.debug("partition keys after merge {}", siblingPartition.getPartitionKeys());
}
}
} else if (load > 0) {
// split bottlenecks
Map<InputPort<?>, PartitionKeys> keys = p.getPartitionKeys();
Map.Entry<InputPort<?>, PartitionKeys> e = keys.entrySet().iterator().next();
final int newMask;
final Set<Integer> newKeys;
if (e.getValue().partitions.size() == 1) {
// split single key
newMask = (e.getValue().mask << 1) | 1;
int key = e.getValue().partitions.iterator().next();
int key2 = (newMask ^ e.getValue().mask) | key;
newKeys = Sets.newHashSet(key, key2);
} else {
// assign keys to separate partitions
newMask = e.getValue().mask;
newKeys = e.getValue().partitions;
}
for (int key: newKeys) {
Partition<T> newPartition = new DefaultPartition<>(p.getPartitionedInstance());
newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
newPartitions.add(newPartition);
}
} else {
// leave unchanged
newPartitions.add(p);
}
}
// put back low load partitions that could not be combined
newPartitions.addAll(lowLoadPartitions.values());
return newPartitions;
}
/**
* Adjust the partitions of an input operator (operator with no connected input stream).
*
* @param <T> The operator type
* @param partitions
* @return The new operators.
*/
public static <T extends Operator> Collection<Partition<T>> repartitionInputOperator(Collection<Partition<T>> partitions)
{
List<Partition<T>> newPartitions = new ArrayList<>();
List<Partition<T>> lowLoadPartitions = new ArrayList<>();
for (Partition<T> p: partitions) {
int load = p.getLoad();
if (load < 0) {
if (!lowLoadPartitions.isEmpty()) {
newPartitions.add(lowLoadPartitions.remove(0));
} else {
lowLoadPartitions.add(p);
}
} else if (load > 0) {
newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
newPartitions.add(new DefaultPartition<>(p.getPartitionedInstance()));
} else {
newPartitions.add(p);
}
}
newPartitions.addAll(lowLoadPartitions);
return newPartitions;
}
/**
* Adjust the partitions of a parallel partitioned operator.
*
* @param partitions existing partitions
* @param context partition context
* @param <T> the operator type
* @return new adjusted partitions
*/
public static <T extends Operator> Collection<Partition<T>> repartitionParallel(Collection<Partition<T>> partitions,
PartitioningContext context)
{
List<Partition<T>> newPartitions = Lists.newArrayList();
newPartitions.addAll(partitions);
int morePartitionsToCreate = context.getParallelPartitionCount() - newPartitions.size();
if (morePartitionsToCreate < 0) {
//Delete partitions
Iterator<Partition<T>> partitionIterator = newPartitions.iterator();
while (morePartitionsToCreate++ < 0) {
partitionIterator.next();
partitionIterator.remove();
}
} else {
//Add more partitions
T anOperator = newPartitions.iterator().next().getPartitionedInstance();
while (morePartitionsToCreate-- > 0) {
DefaultPartition<T> partition = new DefaultPartition<>(anOperator);
newPartitions.add(partition);
}
}
return newPartitions;
}
}