| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.runtime.library.common.sort.impl; |
| |
| import com.google.common.collect.Maps; |
| import org.apache.commons.lang.RandomStringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeys; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.WritableComparator; |
| import org.apache.hadoop.io.serializer.Deserializer; |
| import org.apache.hadoop.io.serializer.SerializationFactory; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.tez.common.TezRuntimeFrameworkConfigs; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.common.counters.TezCounter; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.ExecutionContext; |
| import org.apache.tez.runtime.api.OutputContext; |
| import org.apache.tez.runtime.api.OutputStatisticsReporter; |
| import org.apache.tez.runtime.api.TaskContext; |
| import org.apache.tez.runtime.api.impl.ExecutionContextImpl; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; |
| import org.apache.tez.runtime.library.common.Constants; |
| import org.apache.tez.runtime.library.common.combine.Combiner; |
| import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; |
| import org.apache.tez.runtime.library.partitioner.HashPartitioner; |
| import org.apache.tez.runtime.library.testutils.RandomTextGenerator; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.any; |
| import static org.mockito.Mockito.atLeastOnce; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.internal.verification.VerificationModeFactory.times; |
| |
| public class TestPipelinedSorter { |
| private static Configuration conf; |
| private static FileSystem localFs = null; |
| private static Path workDir = null; |
| private static LocalDirAllocator dirAllocator; |
| private OutputContext outputContext; |
| |
| private int numOutputs; |
| private long initialAvailableMem; |
| |
| //TODO: Need to make it nested structure so that multiple partition cases can be validated |
| private static TreeMap<Text, Text> sortedDataMap = Maps.newTreeMap(); |
| |
| static { |
| conf = getConf(); |
| try { |
| localFs = FileSystem.getLocal(conf); |
| workDir = new Path( |
| new Path(System.getProperty("test.build.data", "/tmp")), |
| TestPipelinedSorter.class.getName()) |
| .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); |
| dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @AfterClass |
| public static void cleanup() throws IOException { |
| localFs.delete(workDir, true); |
| } |
| |
| @Before |
| public void setup() throws IOException { |
| conf = getConf(); |
| ApplicationId appId = ApplicationId.newInstance(10000, 1); |
| TezCounters counters = new TezCounters(); |
| String uniqueId = UUID.randomUUID().toString(); |
| String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, |
| TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); |
| this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); |
| } |
| |
| public static Configuration getConf() { |
| Configuration conf = new Configuration(); |
| conf.set("fs.defaultFS", "file:///"); |
| conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); |
| //To enable PipelinedSorter |
| conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name()); |
| |
| conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); |
| conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); |
| conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName()); |
| |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| |
| //Setup localdirs |
| if (workDir != null) { |
| String localDirs = workDir.toString(); |
| conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); |
| } |
| return conf; |
| } |
| |
| @After |
| public void reset() throws IOException { |
| cleanup(); |
| localFs.mkdirs(workDir); |
| } |
| |
| @Test |
| public void basicTest() throws IOException { |
| //TODO: need to support multiple partition testing later |
| |
| //# partition, # of keys, size per key, InitialMem, blockSize |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); |
| basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| @Test |
| public void testWithoutPartitionStats() throws IOException { |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false); |
| //# partition, # of keys, size per key, InitialMem, blockSize |
| basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true); |
| } |
| |
| @Test |
| public void testWithEmptyData() throws IOException { |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); |
| //# partition, # of keys, size per key, InitialMem, blockSize |
| basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); |
| } |
| |
| @Test |
| public void testEmptyDataWithPipelinedShuffle() throws IOException { |
| this.numOutputs = 1; |
| this.initialAvailableMem = 1 *1024 * 1024; |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, 0, 1<<20); |
| |
| // final merge is disabled. Final output file would not be populated in this case. |
| assertTrue(sorter.finalOutputFile == null); |
| TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); |
| // assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| |
| } |
| |
| @Test |
| public void testEmptyPartitionsTwoSpillsNoEmptyEvents() throws Exception { |
| testEmptyPartitionsHelper(2, false); |
| } |
| |
| @Test |
| public void testEmptyPartitionsTwoSpillsWithEmptyEvents() throws Exception { |
| testEmptyPartitionsHelper(2, true); |
| } |
| |
| @Test |
| public void testEmptyPartitionsNoSpillsNoEmptyEvents() throws Exception { |
| testEmptyPartitionsHelper(0, false); |
| } |
| |
| @Test |
| public void testEmptyPartitionsNoSpillsWithEmptyEvents() throws Exception { |
| testEmptyPartitionsHelper(0, true); |
| } |
| |
| public void testEmptyPartitionsHelper(int numKeys, boolean sendEmptyPartitionDetails) throws IOException, InterruptedException { |
| int partitions = 50; |
| this.numOutputs = partitions; |
| this.initialAvailableMem = 1 *1024 * 1024; |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionDetails); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, partitions, |
| initialAvailableMem); |
| |
| writeData(sorter, numKeys, 1000000); |
| if (numKeys == 0) { |
| assertTrue(sorter.getNumSpills() == 1); |
| } else { |
| assertTrue(sorter.getNumSpills() == numKeys + 1); |
| } |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| Path indexFile = sorter.getFinalIndexFile(); |
| TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf); |
| for (int i = 0; i < partitions; i++) { |
| TezIndexRecord tezIndexRecord = spillRecord.getIndex(i); |
| if (tezIndexRecord.hasData()) { |
| continue; |
| } |
| if (sendEmptyPartitionDetails) { |
| Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength()); |
| } else { |
| Assert.assertEquals("Unexpected raw length for " + i + "th partition", 6, tezIndexRecord.getRawLength()); |
| } |
| } |
| } |
| |
| @Test |
| public void basicTestWithSmallBlockSize() throws IOException { |
| //3 MB key & 3 MB value, whereas block size is just 3 MB |
| basicTest(1, 5, (3 << 20), (10 * 1024l * 1024l), 3 << 20); |
| } |
| |
| @Test |
| public void testWithLargeKeyValue() throws IOException { |
| //15 MB key & 15 MB value, 48 MB sort buffer. block size is 48MB (or 1 block) |
| //meta would be 16 MB |
| basicTest(1, 5, (15 << 20), (48 * 1024l * 1024l), 48 << 20); |
| } |
| |
| @Test |
| public void testKVExceedsBuffer() throws IOException { |
| // a single block of 1mb, 2KV pair, key 1mb, value 1mb |
| basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 1<<20); |
| } |
| |
| @Test |
| public void testKVExceedsBuffer2() throws IOException { |
| // a list of 4 blocks each 256kb, 2KV pair, key 1mb, value 1mb |
| basicTest(1, 2, (1 << 20), (1 * 1024l * 1024l), 256<<20); |
| } |
| |
| @Test |
| public void testExceedsKVWithMultiplePartitions() throws IOException { |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| this.numOutputs = 5; |
| this.initialAvailableMem = 1 * 1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, 100, 1<<20); |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| @Test |
| public void testExceedsKVWithPipelinedShuffle() throws IOException { |
| this.numOutputs = 1; |
| this.initialAvailableMem = 1 *1024 * 1024; |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, 5, 1<<20); |
| |
| // final merge is disabled. Final output file would not be populated in this case. |
| assertTrue(sorter.finalOutputFile == null); |
| TezCounter numShuffleChunks = outputContext.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); |
| assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| } |
| |
| @Test |
| public void test_TEZ_2602_50mb() throws IOException { |
| this.numOutputs = 1; |
| this.initialAvailableMem = 1 *1024 * 1024; |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| Text value = new Text("1"); |
| long size = 50 * 1024 * 1024; |
| while(size > 0) { |
| Text key = RandomTextGenerator.generateSentence(); |
| sorter.write(key, value); |
| size -= key.getLength(); |
| } |
| |
| sorter.flush(); |
| sorter.close(); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| //@Test |
| public void testLargeDataWithMixedKV() throws IOException { |
| this.numOutputs = 1; |
| this.initialAvailableMem = 48 *1024 * 1024; |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| //write 10 MB KV |
| Text key = new Text(RandomStringUtils.randomAlphanumeric(10 << 20)); |
| Text value = new Text(RandomStringUtils.randomAlphanumeric(10 << 20)); |
| sorter.write(key, value); |
| |
| //write 24 MB KV. This should cause single record spill |
| key = new Text(RandomStringUtils.randomAlphanumeric(24 << 20)); |
| value = new Text(RandomStringUtils.randomAlphanumeric(24 << 20)); |
| sorter.write(key, value); |
| |
| //write 10 MB KV |
| key = new Text(RandomStringUtils.randomAlphanumeric(10 << 20)); |
| value = new Text(RandomStringUtils.randomAlphanumeric(10 << 20)); |
| sorter.write(key, value); |
| |
| sorter.flush(); |
| sorter.close(); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| |
| @Test |
| // first write a KV which dosnt fit into span, this will spill to disk |
| // next write smaller keys, which will update the hint |
| public void testWithVariableKVLength1() throws IOException { |
| int numkeys[] = {2, 2}; |
| int keylens[] = {32 << 20, 7 << 20}; |
| basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20); |
| } |
| |
| @Test |
| // first write a kv pair which fits into buffer, |
| // next try to write a kv pair which doesnt fit into remaining buffer |
| public void testWithVariableKVLength() throws IOException { |
| //2 KVpairs of 2X2mb, 2 KV of 2X7mb |
| int numkeys[] = {2, 2}; |
| int keylens[] = {2 << 20, 7<<20}; |
| basicTest2(1, numkeys, keylens, 64 << 20, 32 << 20); |
| } |
| |
| @Test |
| // first write KV which fits into span |
| // then write KV which doesnot fit in buffer. this will be spilled to disk |
| // all keys should be merged properly |
| public void testWithVariableKVLength2() throws IOException { |
| // 20 KVpairs of 2X10kb, 10 KV of 2X200kb, 20KV of 2X10kb |
| int numkeys[] = {20, 10, 20}; |
| int keylens[] = {10<<10, 200<<10, 10<<10}; |
| basicTest2(1, numkeys, keylens, (10 * 1024l * 1024l), 2); |
| } |
| |
| @Test |
| public void testWithCustomComparator() throws IOException { |
| //Test with custom comparator |
| conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, |
| CustomComparator.class.getName()); |
| basicTest(1, 100000, 100, (10 * 1024l * 1024l), 3 << 20); |
| } |
| |
| @Test |
| public void testWithPipelinedShuffle() throws IOException { |
| this.numOutputs = 1; |
| this.initialAvailableMem = 5 *1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| //Write 100 keys each of size 10 |
| writeData(sorter, 10000, 100, false); |
| sorter.flush(); |
| List<Event> events = sorter.close(); |
| |
| //final merge is disabled. Final output file would not be populated in this case. |
| assertTrue(sorter.finalOutputFile == null); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| verify(outputContext, times(0)).sendEvents(any()); |
| assertTrue(events.size() > 0); |
| } |
| |
| @Test |
| public void testCountersWithMultiplePartitions() throws IOException { |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| this.numOutputs = 5; |
| this.initialAvailableMem = 5 * 1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, 10000, 100); |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| @Test |
| public void testMultipleSpills() throws IOException { |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| this.numOutputs = 5; |
| this.initialAvailableMem = 5 * 1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, 25000, 1000); |
| assertFalse("Expecting needsRLE to be false", sorter.needsRLE()); |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| @Test |
| public void testWithCombiner() throws IOException { |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, DummyCombiner.class.getName()); |
| this.numOutputs = 5; |
| this.initialAvailableMem = 5 * 1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, 1, 20); |
| |
| Path outputFile = sorter.finalOutputFile; |
| FileSystem fs = outputFile.getFileSystem(conf); |
| IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); |
| verifyData(reader); |
| reader.close(); |
| |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| // for testWithCombiner |
| public static class DummyCombiner implements Combiner { |
| public DummyCombiner(TaskContext ctx) { |
| // do nothing |
| } |
| |
| @Override |
| public void combine(TezRawKeyValueIterator rawIter, IFile.Writer writer) throws InterruptedException, IOException { |
| while (rawIter.next()) { |
| writer.append(rawIter.getKey(), rawIter.getValue()); |
| } |
| } |
| } |
| |
| @Test |
| public void testMultipleSpills_WithRLE() throws IOException { |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); |
| this.numOutputs = 5; |
| this.initialAvailableMem = 5 * 1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeSimilarKeys(sorter, 25000, 1000, true); |
| assertTrue("Expecting needsRLE to be true", sorter.needsRLE()); |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| @Test |
| /** |
| * Verify whether all buffers are used evenly in sorter. |
| */ |
| public void basicTestForBufferUsage() throws IOException { |
| this.numOutputs = 1; |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); |
| |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (100 << 20)); |
| Assert.assertTrue(sorter.maxNumberOfBlocks >= 2); |
| |
| // Start filling in with data 1MB Key, 1MB Val. |
| for (int i = 0; i < 200; i++) { |
| writeData(sorter, 1, 1024 * 1024, false); |
| } |
| |
| // Check if all buffers are evenly used |
| int avg = (int) sorter.bufferUsage.stream().mapToDouble(d -> d).average().orElse(0.0); |
| |
| for(int i = 0; i< sorter.bufferUsage.size(); i++) { |
| int usage = sorter.bufferUsage.get(i); |
| Assert.assertTrue("Buffer index " + i + " is not used correctly. " |
| + " usage: " + usage + ", avg: " + avg, usage >= avg); |
| } |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); |
| } |
| |
| public void basicTest2(int partitions, int[] numkeys, int[] keysize, |
| long initialAvailableMem, int blockSize) throws IOException { |
| this.numOutputs = partitions; // single output |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 100); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| writeData2(sorter, numkeys, keysize); |
| verifyCounters(sorter, outputContext); |
| } |
| |
| private void writeData2(ExternalSorter sorter, |
| int[] numKeys, int[] keyLen) throws IOException { |
| sortedDataMap.clear(); |
| int counter = 0; |
| for (int numkey : numKeys) { |
| int curKeyLen = keyLen[counter]; |
| char[] buffer = new char[curKeyLen]; |
| for (int i = 0; i < numkey; i++) { |
| Text random = new Text(randomAlphanumeric(buffer)); |
| sorter.write(random, random); |
| } |
| counter++; |
| } |
| sorter.flush(); |
| sorter.close(); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| } |
| |
| public void basicTest(int partitions, int numKeys, int keySize, |
| long initialAvailableMem, int minBlockSize) throws IOException { |
| this.numOutputs = partitions; // single output |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, minBlockSize >> 20); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| |
| writeData(sorter, numKeys, keySize); |
| |
| //partition stats; |
| ReportPartitionStats partitionStats = |
| ReportPartitionStats.fromString(conf.get( |
| TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, |
| TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); |
| if (partitionStats.isEnabled()) { |
| assertTrue(sorter.getPartitionStats() != null); |
| } |
| |
| verifyCounters(sorter, outputContext); |
| verifyOutputPermissions(outputContext.getUniqueIdentifier()); |
| Path outputFile = sorter.finalOutputFile; |
| FileSystem fs = outputFile.getFileSystem(conf); |
| TezCounter finalOutputBytes = |
| outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); |
| if (finalOutputBytes.getValue() > 0) { |
| IFile.Reader reader = new IFile.Reader(fs, outputFile, null, null, null, false, -1, 4096); |
| verifyData(reader); |
| reader.close(); |
| } |
| //Verify dataset |
| verify(outputContext, atLeastOnce()).notifyProgress(); |
| } |
| |
| private void verifyCounters(PipelinedSorter sorter, OutputContext context) { |
| TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT); |
| TezCounter additionalSpills = |
| context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT); |
| TezCounter additionalSpillBytesWritten = |
| context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN); |
| TezCounter additionalSpillBytesRead = |
| context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ); |
| |
| if (sorter.isFinalMergeEnabled()) { |
| assertTrue(additionalSpills.getValue() == (sorter.getNumSpills() - 1)); |
| //Number of files served by shuffle-handler |
| assertTrue(1 == numShuffleChunks.getValue()); |
| if (sorter.getNumSpills() > 1) { |
| assertTrue(additionalSpillBytesRead.getValue() > 0); |
| assertTrue(additionalSpillBytesWritten.getValue() > 0); |
| } |
| } else { |
| assertTrue(0 == additionalSpills.getValue()); |
| //Number of files served by shuffle-handler |
| assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue()); |
| assertTrue(additionalSpillBytesRead.getValue() == 0); |
| assertTrue(additionalSpillBytesWritten.getValue() == 0); |
| } |
| |
| TezCounter finalOutputBytes = |
| context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL); |
| assertTrue(finalOutputBytes.getValue() >= 0); |
| |
| TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter |
| (TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD); |
| assertTrue(outputBytesWithOverheadCounter.getValue() >= 0); |
| } |
| |
| |
| @Test |
| //Intentionally not having timeout |
| //Its not possible to allocate > 2 GB in test environment. Carry out basic checks here. |
| public void memTest() throws IOException { |
| //Verify if > 2 GB can be set via config |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3076); |
| long size = ExternalSorter.getInitialMemoryRequirement(conf, 4096 * 1024 * 1024l); |
| Assert.assertTrue(size == (3076l << 20)); |
| |
| //Verify number of block buffers allocated |
| this.initialAvailableMem = 10 * 1024 * 1024; |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 10); |
| |
| //10 MB available, request for 3 MB chunk. Last 1 MB gets added to previous chunk. |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 3); |
| |
| //10 MB available, request for 10 MB min chunk. Would get 1 block. |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| initialAvailableMem); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 1); |
| |
| //Verify block sizes (10 MB min chunk size), but available mem is zero. |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 10); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 1); |
| int blockSize = sorter.computeBlockSize(0, (10 << 20)); |
| //available is zero. Can't allocate any more buffer. |
| Assert.assertTrue(blockSize == 0); |
| |
| //300 MB available. Request for 200 MB min block size. It would allocate a block with 200 MB, |
| // but last 100 would get clubbed. Hence, it would return 300 MB block. |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 200); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300 << 20)); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 1); |
| blockSize = sorter.computeBlockSize((300 << 20), (300 << 20)); |
| Assert.assertTrue(blockSize == (300 << 20)); |
| |
| //300 MB available. Request for 3500 MB min block size. throw exception |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 3500); |
| try { |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, |
| (300 << 20)); |
| } catch(IllegalArgumentException iae ) { |
| assertTrue(iae.getMessage().contains("positive value between 0 and 2047")); |
| } |
| |
| //64 MB available. Request for 32 MB min block size. |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 32); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20)); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 2); |
| blockSize = sorter.computeBlockSize((64 << 20), (64 << 20)); |
| Assert.assertTrue(blockSize == (32 << 20)); |
| |
| blockSize = sorter.computeBlockSize((32 << 20), (64 << 20)); |
| Assert.assertTrue(blockSize == (32 << 20)); |
| |
| blockSize = sorter.computeBlockSize((48 << 20), (64 << 20)); |
| Assert.assertTrue(blockSize == (48 << 20)); |
| |
| //64 MB available. Request for 8 MB min block size. |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 8); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (64 << 20)); |
| Assert.assertTrue(sorter.maxNumberOfBlocks == 8); |
| blockSize = sorter.computeBlockSize((64 << 20), (64 << 20)); |
| //Should return 16 instead of 8 which is min block size. |
| Assert.assertTrue(blockSize == (8 << 20)); |
| } |
| |
| @Test |
| //Intentionally not having timeout |
| public void test_without_lazyMemAllocation() throws IOException { |
| this.numOutputs = 10; |
| |
| //128 MB. Pre-allocate. Request for default block size. Get 1 buffer |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, |
| TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB_DEFAULT); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (128l << 20)); |
| assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), |
| sorter.buffers.size() == 1); |
| |
| //128 MB. Pre-allocate. Get 2 buffer |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); |
| sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (128l << 20)); |
| assertTrue("Expected 2 sort buffers. current len=" + sorter.buffers.size(), |
| sorter.buffers.size() == 2); |
| |
| //48 MB. Pre-allocate. But request for lesser block size (62). Get 2 buffer |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 62); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); |
| sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (48l << 20)); |
| assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), |
| sorter.buffers.size() == 1); |
| } |
| |
| @Test |
| //Intentionally not having timeout |
| public void test_with_lazyMemAllocation() throws IOException { |
| this.numOutputs = 10; |
| |
| //128 MB. Do not pre-allocate. |
| // Get 32 MB buffer first and the another buffer with 96 on filling up |
| // the 32 MB buffer. |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (128l << 20)); |
| assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), |
| sorter.buffers.size() == 1); |
| assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024 - 64); |
| writeData(sorter, 100, 1024*1024, false); //100 1 MB KV. Will spill |
| |
| //Now it should have created 2 buffers, 32 & 96 MB buffers. |
| assertTrue(sorter.buffers.size() == 2); |
| assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024 - 64); |
| assertTrue(sorter.buffers.get(1).capacity() == 96 * 1024 * 1024 + 64); |
| closeSorter(sorter); |
| verifyCounters(sorter, outputContext); |
| |
| //TODO: Not sure if this would fail in build machines due to mem |
| //300 MB. Do not pre-allocate. |
| // Get 1 buffer with 62 MB. But grow to 2 buffers as data is written |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); |
| sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, (300l << 20)); |
| assertTrue(sorter.buffers.size() == 1); |
| assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024 - 64); |
| |
| writeData(sorter, 50, 1024*1024, false); //50 1 MB KV to allocate 2nd buf |
| assertTrue(sorter.buffers.size() == 2); |
| assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024 - 64); |
| assertTrue(sorter.buffers.get(1).capacity() == 268 * 1024 * 1024 + 64); |
| |
| //48 MB. Do not pre-allocate. |
| // Get 32 MB buffer first invariably and proceed with the rest. |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 48); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); |
| sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (48l << 20)); |
| assertTrue("Expected 1 sort buffers. current len=" + sorter.buffers.size(), |
| sorter.buffers.size() == 1); |
| assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024 - 64); |
| writeData(sorter, 20, 1024*1024, false); //100 1 MB KV. Will spill |
| |
| //Now it should have created 2 buffers, 32 & 96 MB buffers. |
| assertTrue(sorter.buffers.size() == 2); |
| assertTrue(sorter.buffers.get(0).capacity() == 32 * 1024 * 1024 - 64); |
| assertTrue(sorter.buffers.get(1).capacity() == 16 * 1024 * 1024 + 64); |
| closeSorter(sorter); |
| } |
| |
| @Test |
| //Intentionally not having timeout |
| public void testLazyAllocateMem() throws IOException { |
| this.numOutputs = 10; |
| conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, false); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 4500); |
| try { |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (4500l << 20)); |
| } catch (IllegalArgumentException iae) { |
| assertTrue(iae.getMessage().contains(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB)); |
| assertTrue(iae.getMessage().contains("value between 0 and 2047")); |
| } |
| |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1); |
| try { |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (4500l << 20)); |
| } catch (IllegalArgumentException iae) { |
| assertTrue(iae.getMessage().contains(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB)); |
| assertTrue(iae.getMessage().contains("value between 0 and 2047")); |
| } |
| |
| conf.setBoolean(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); |
| conf.setInt(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, -1); |
| try { |
| PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, |
| numOutputs, (4500l << 20)); |
| } catch (IllegalArgumentException iae) { |
| assertTrue(iae.getMessage().contains(TezRuntimeConfiguration |
| .TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB)); |
| assertTrue(iae.getMessage().contains("value between 0 and 2047")); |
| } |
| |
| } |
| |
| @Test |
| //Intentionally not having timeout |
| public void testWithLargeKeyValueWithMinBlockSize() throws IOException { |
| //2 MB key & 2 MB value, 48 MB sort buffer. block size is 16MB |
| basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20); |
| } |
| |
| private void verifyOutputPermissions(String spillId) throws IOException { |
| String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId |
| + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; |
| Path outputPath = dirAllocator.getLocalPathToRead(subpath, conf); |
| Path indexPath = dirAllocator.getLocalPathToRead(subpath + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING, conf); |
| Assert.assertEquals("Incorrect output permissions", (short)0640, |
| localFs.getFileStatus(outputPath).getPermission().toShort()); |
| Assert.assertEquals("Incorrect index permissions", (short)0640, |
| localFs.getFileStatus(indexPath).getPermission().toShort()); |
| } |
| |
| private void writeData(ExternalSorter sorter, int numKeys, int keyLen) throws IOException { |
| writeData(sorter, numKeys, keyLen, true); |
| } |
| |
| // duplicate some of the keys |
| private void writeSimilarKeys(ExternalSorter sorter, int numKeys, int keyLen, |
| boolean autoClose) throws IOException { |
| sortedDataMap.clear(); |
| char[] buffer = new char[keyLen]; |
| String keyStr = randomAlphanumeric(buffer); |
| for (int i = 0; i < numKeys; i++) { |
| if (i % 4 == 0) { |
| keyStr = randomAlphanumeric(buffer); |
| } |
| Text key = new Text(keyStr); |
| Text value = new Text(RandomStringUtils.randomAlphanumeric(keyLen)); |
| sorter.write(key, value); |
| sortedDataMap.put(key, value); //for verifying data later |
| } |
| if (autoClose) { |
| closeSorter(sorter); |
| } |
| } |
| static private final Random RANDOM = new Random(); |
| int start = ' '; |
| int end = 'z' + 1; |
| int gap = end - start; |
| private String randomAlphanumeric(char[] buffer) { |
| for (int i = 0; i < buffer.length; ++i) { |
| buffer[i] = (char)(RANDOM.nextInt(gap) + start); |
| } |
| return new String(buffer); |
| } |
| |
| private void writeData(ExternalSorter sorter, int numKeys, int keyLen, |
| boolean autoClose) throws IOException { |
| char[] buffer = new char[keyLen]; |
| sortedDataMap.clear(); |
| for (int i = 0; i < numKeys; i++) { |
| String randomStr = randomAlphanumeric(buffer); |
| Text random = new Text(randomStr); |
| sorter.write(random, random); |
| sortedDataMap.put(random, random); //for verifying data later |
| } |
| if (autoClose) { |
| closeSorter(sorter); |
| } |
| } |
| |
| private void closeSorter(ExternalSorter sorter) throws IOException { |
| if (sorter != null) { |
| sorter.flush(); |
| sorter.close(); |
| } |
| } |
| |
| private void verifyData(IFile.Reader reader) |
| throws IOException { |
| Text readKey = new Text(); |
| Text readValue = new Text(); |
| DataInputBuffer keyIn = new DataInputBuffer(); |
| DataInputBuffer valIn = new DataInputBuffer(); |
| SerializationFactory serializationFactory = new SerializationFactory(conf); |
| Deserializer<Text> keyDeserializer = serializationFactory.getDeserializer(Text.class); |
| Deserializer<Text> valDeserializer = serializationFactory.getDeserializer(Text.class); |
| keyDeserializer.open(keyIn); |
| valDeserializer.open(valIn); |
| |
| int numRecordsRead = 0; |
| |
| for (Map.Entry<Text, Text> entry : sortedDataMap.entrySet()) { |
| Text key = entry.getKey(); |
| Text val = entry.getValue(); |
| if (reader.nextRawKey(keyIn)) { |
| reader.nextRawValue(valIn); |
| readKey = keyDeserializer.deserialize(readKey); |
| readValue = valDeserializer.deserialize(readValue); |
| Assert.assertTrue(key.equals(readKey)); |
| Assert.assertTrue(val.equals(readValue)); |
| numRecordsRead++; |
| } |
| } |
| Assert.assertTrue(numRecordsRead == sortedDataMap.size()); |
| } |
| |
| private static OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, |
| String uniqueId, String auxiliaryService) throws IOException { |
| OutputContext outputContext = mock(OutputContext.class); |
| |
| ExecutionContext execContext = new ExecutionContextImpl("localhost"); |
| |
| DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer(); |
| serviceProviderMetaData.writeInt(80); |
| doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(outputContext) |
| .getServiceProviderMetaData(auxiliaryService); |
| |
| doReturn(execContext).when(outputContext).getExecutionContext(); |
| doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter(); |
| doReturn(counters).when(outputContext).getCounters(); |
| doReturn(appId).when(outputContext).getApplicationId(); |
| doReturn(1).when(outputContext).getDAGAttemptNumber(); |
| doReturn("dagName").when(outputContext).getDAGName(); |
| doReturn("destinationVertexName").when(outputContext).getDestinationVertexName(); |
| doReturn(1).when(outputContext).getOutputIndex(); |
| doReturn(1).when(outputContext).getTaskAttemptNumber(); |
| doReturn(1).when(outputContext).getTaskIndex(); |
| doReturn(1).when(outputContext).getTaskVertexIndex(); |
| doReturn("vertexName").when(outputContext).getTaskVertexName(); |
| doReturn(uniqueId).when(outputContext).getUniqueIdentifier(); |
| Path outDirBase = new Path(workDir, "outDir_" + uniqueId); |
| String[] outDirs = new String[] { outDirBase.toString() }; |
| doReturn(outDirs).when(outputContext).getWorkDirs(); |
| return outputContext; |
| } |
| |
| /** |
| * E.g Hive uses TezBytesComparator which internally makes use of WritableComparator's comparison. |
| * Any length mismatches are handled there. |
| * |
| * However, custom comparators can handle this differently and might throw |
| * IndexOutOfBoundsException in case of invalid lengths. |
| * |
| * This comparator (similar to comparator in BinInterSedes of pig) would thrown exception when |
| * wrong lengths are mentioned. |
| */ |
| public static class CustomComparator extends WritableComparator { |
| @Override |
| public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { |
| //wrapping is done so that it would throw exceptions on wrong lengths |
| ByteBuffer bb1 = ByteBuffer.wrap(b1, s1, l1); |
| ByteBuffer bb2 = ByteBuffer.wrap(b2, s2, l2); |
| |
| return bb1.compareTo(bb2); |
| } |
| |
| } |
| } |