| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.io.block; |
| |
| import java.util.Collection; |
| import java.util.List; |
| |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import org.apache.commons.lang.mutable.MutableLong; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| |
| import com.google.common.collect.Lists; |
| |
| import com.datatorrent.api.DefaultPartition; |
| import com.datatorrent.api.Partitioner; |
| import com.datatorrent.api.Stats; |
| import com.datatorrent.api.StatsListener; |
| import com.datatorrent.lib.counters.BasicCounters; |
| import com.datatorrent.lib.util.TestUtils; |
| import com.datatorrent.netlet.util.Slice; |
| |
| /** |
| * Stats and partitioning tests for {@link AbstractBlockReader} |
| */ |
| public class AbstractBlockReaderTest |
| { |
| |
| @Test |
| public void testAdjustedCount() |
| { |
| TestReader sliceReader = new TestReader(); |
| Assert.assertEquals("min", 1, sliceReader.getAdjustedCount(1)); |
| Assert.assertEquals("max", 16, sliceReader.getAdjustedCount(16)); |
| Assert.assertEquals("max-1", 8, sliceReader.getAdjustedCount(15)); |
| Assert.assertEquals("min+1", 2, sliceReader.getAdjustedCount(2)); |
| Assert.assertEquals("between 1", 4, sliceReader.getAdjustedCount(4)); |
| Assert.assertEquals("between 2", 4, sliceReader.getAdjustedCount(7)); |
| Assert.assertEquals("between 2", 8, sliceReader.getAdjustedCount(12)); |
| } |
| |
| @Test |
| public void testProcessStatsForPartitionCount() |
| { |
| |
| TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2); |
| readerStats.operatorStats = Lists.newArrayList(); |
| readerStats.operatorStats.add(new ReaderStats(10, 1, 100, 1)); |
| |
| TestReader sliceReader = new TestReader(); |
| |
| StatsListener.Response response = sliceReader.processStats(readerStats); |
| |
| Assert.assertTrue("partition needed", response.repartitionRequired); |
| Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount()); |
| } |
| |
| @Test |
| public void testProcessStatsForRepeatedPartitionCount() throws InterruptedException |
| { |
| |
| TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2); |
| readerStats.operatorStats = Lists.newArrayList(); |
| readerStats.operatorStats.add(new ReaderStats(10, 1, 100, 1)); |
| |
| TestReader sliceReader = new TestReader(); |
| sliceReader.setIntervalMillis(500); |
| |
| sliceReader.processStats(readerStats); |
| Thread.sleep(500); |
| StatsListener.Response response = sliceReader.processStats(readerStats); |
| |
| Assert.assertFalse("partition needed", response.repartitionRequired); |
| Assert.assertEquals("partition count not changed", 8, sliceReader.getPartitionCount()); |
| } |
| |
| @Test |
| public void testPartitioning() throws Exception |
| { |
| TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2); |
| readerStats.operatorStats = Lists.newArrayList(); |
| readerStats.operatorStats.add(new ReaderStats(10, 1, 100, 1)); |
| |
| TestReader sliceReader = new TestReader(); |
| |
| StatsListener.Response response = sliceReader.processStats(readerStats); |
| |
| Assert.assertTrue("partition needed", response.repartitionRequired); |
| Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount()); |
| |
| List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> |
| partitions = Lists |
| .newArrayList(); |
| |
| DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new |
| DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>( |
| sliceReader); |
| |
| TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> |
| pseudoParttion = new TestUtils.MockPartition<>( |
| apartition, readerStats); |
| |
| partitions.add(pseudoParttion); |
| |
| Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> |
| newPartitions = sliceReader |
| .definePartitions(partitions, null); |
| Assert.assertEquals(8, newPartitions.size()); |
| } |
| |
| @Test |
| public void testCountersTransfer() throws Exception |
| { |
| TestUtils.MockBatchedOperatorStats readerStats = new TestUtils.MockBatchedOperatorStats(2); |
| readerStats.operatorStats = Lists.newArrayList(); |
| readerStats.operatorStats.add(new ReaderStats(10, 1, 100, 1)); |
| |
| TestReader sliceReader = new TestReader(); |
| sliceReader.processStats(readerStats); |
| |
| List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> |
| partitions = Lists |
| .newArrayList(); |
| |
| DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new |
| DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>( |
| sliceReader); |
| |
| TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> |
| pseudoParttion = new TestUtils.MockPartition<>(apartition, readerStats); |
| |
| partitions.add(pseudoParttion); |
| |
| Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> |
| newPartitions = sliceReader.definePartitions(partitions, null); |
| |
| List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> |
| newMocks = Lists.newArrayList(); |
| |
| for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition |
| : newPartitions) { |
| |
| partition.getPartitionedInstance().counters |
| .setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1)); |
| |
| newMocks.add(new TestUtils.MockPartition<>( |
| (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>)partition, |
| readerStats)); |
| } |
| sliceReader.partitionCount = 1; |
| newPartitions = sliceReader.definePartitions(newMocks, null); |
| Assert.assertEquals(1, newPartitions.size()); |
| |
| AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator() |
| .next().getPartitionedInstance(); |
| Assert.assertEquals("num blocks", 8, |
| last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue()); |
| } |
| |
| static class ReaderStats extends Stats.OperatorStats |
| { |
| |
| ReaderStats(int backlog, long readBlocks, long bytes, long time) |
| { |
| BasicCounters<MutableLong> bc = new BasicCounters<>(MutableLong.class); |
| bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(readBlocks)); |
| bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BYTES, new MutableLong(bytes)); |
| bc.setCounter(AbstractBlockReader.ReaderCounterKeys.TIME, new MutableLong(time)); |
| |
| counters = bc; |
| |
| PortStats portStats = new PortStats("blocks"); |
| portStats.queueSize = backlog; |
| inputPorts = Lists.newArrayList(portStats); |
| } |
| } |
| |
| static class TestReader extends FSSliceReader |
| { |
| int getPartitionCount() |
| { |
| return partitionCount; |
| } |
| } |
| } |