| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.apex.examples.partition; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Map; |
| |
| import javax.validation.constraints.Max; |
| import javax.validation.constraints.Min; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.DefaultPartition; |
| import com.datatorrent.api.Partitioner; |
| |
| import com.datatorrent.common.util.BaseOperator; |
| |
| /** |
| * Simple operator to test partitioning |
| * |
| * @since 3.7.0 |
| */ |
| public class TestPartition extends BaseOperator implements Partitioner<TestPartition> |
| { |
| private static final Logger LOG = LoggerFactory.getLogger(TestPartition.class); |
| |
| private transient int id; // operator/partition id |
| private transient long curWindowId; // current window id |
| private transient long cnt; // per-window tuple count |
| |
| @Min(1) |
| @Max(20) |
| private int nPartitions = 3; |
| |
| public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() |
| { |
| @Override |
| public void process(Integer tuple) |
| { |
| LOG.debug("{}: tuple = {}, operator id = {}", cnt, tuple, id); |
| ++cnt; |
| } |
| }; |
| |
| //public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| super.setup(context); |
| |
| long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); |
| id = context.getId(); |
| LOG.debug("Started setup, appWindowId = {}, operator id = {}", appWindowId, id); |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| cnt = 0; |
| curWindowId = windowId; |
| LOG.debug("window id = {}, operator id = {}", curWindowId, id); |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| LOG.debug("window id = {}, operator id = {}, cnt = {}", curWindowId, id, cnt); |
| } |
| |
| @Override |
| public void partitioned(Map<Integer, Partition<TestPartition>> partitions) |
| { |
| //Do nothing |
| } |
| |
| @Override |
| public Collection<Partition<TestPartition>> definePartitions(Collection<Partition<TestPartition>> partitions, PartitioningContext context) |
| { |
| int oldSize = partitions.size(); |
| LOG.debug("partitionCount: current = {} requested = {}", oldSize, nPartitions); |
| |
| // each partition i in 0...nPartitions receives tuples divisible by i but not by any other |
| // j in that range; all other tuples ignored |
| // |
| if (3 != nPartitions) { |
| return getPartitions(partitions, context); |
| } |
| |
| // special case of 3 partitions: All odd numbers to partition 0; even numbers divisible |
| // by 4 to partition 1, those divisible by 2 but not 4 to partition 2. |
| |
| // mask used to extract discriminant from tuple hashcode |
| int mask = 0x03; |
| |
| Partition<TestPartition>[] newPartitions = new Partition[] { |
| new DefaultPartition<TestPartition>(new TestPartition()), |
| new DefaultPartition<TestPartition>(new TestPartition()), |
| new DefaultPartition<TestPartition>(new TestPartition()) }; |
| |
| HashSet<Integer>[] set |
| = new HashSet[] {new HashSet<>(), new HashSet<>(), new HashSet<>()}; |
| set[0].add(0); |
| set[1].add(1); |
| set[2].add(2); |
| |
| PartitionKeys[] keys = { |
| new PartitionKeys(mask, set[0]), |
| new PartitionKeys(mask, set[1]), |
| new PartitionKeys(mask, set[2]) }; |
| |
| for (int i = 0; i < 3; ++i ) { |
| Partition<TestPartition> partition = newPartitions[i]; |
| partition.getPartitionKeys().put(in, keys[i]); |
| } |
| |
| return new ArrayList<Partition<TestPartition>>(Arrays.asList(newPartitions)); |
| } // definePartitions |
| |
| private Collection<Partition<TestPartition>> getPartitions(Collection<Partition<TestPartition>> partitions, PartitioningContext context) |
| { |
| // create array of partitions to return |
| Collection<Partition<TestPartition>> result = new ArrayList<Partition<TestPartition>>(nPartitions); |
| |
| int mask = getMask(nPartitions); |
| for (int i = 0; i < nPartitions; ++i) { |
| HashSet<Integer> set = new HashSet<>(); |
| set.add(i); |
| PartitionKeys keys = new PartitionKeys(mask, set); |
| Partition partition = new DefaultPartition<TestPartition>(new TestPartition()); |
| partition.getPartitionKeys().put(in, keys); |
| } |
| |
| return result; |
| } // getPartitions |
| |
| // return mask with bits 0..N set where N is the highest set bit of argument |
| private int getMask(final int n) |
| { |
| return -1 >>> Integer.numberOfLeadingZeros(n); |
| } // getMask |
| |
| // accessors |
| public int getNPartitions() |
| { |
| return nPartitions; |
| } |
| |
| public void setNPartitions(int v) |
| { |
| nPartitions = v; |
| } |
| } |
| |