blob: a56536dfe8a3da0049d0916c70706113c5ad6a91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl.dflt;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import com.google.protobuf.ByteString;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.util.StringInterner;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestDefaultSorter {
private static final int PORT = 80;
private static final String UniqueID = "UUID";
private static FileSystem localFs = null;
private static Path workingDir = null;
private Configuration conf;
private LocalDirAllocator dirAllocator;
@Before
public void setup() throws IOException {
conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name()); // DefaultSorter
conf.set("fs.defaultFS", "file:///");
localFs = FileSystem.getLocal(conf);
workingDir = new Path(
new Path(System.getProperty("test.build.data", "/tmp")),
TestDefaultSorter.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
String localDirs = workingDir.toString();
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
dirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
}
@AfterClass
public static void cleanup() throws IOException {
localFs.delete(workingDir, true);
}
@After
public void reset() throws IOException {
cleanup();
localFs.mkdirs(workingDir);
}
@Test(timeout = 5000)
public void testSortSpillPercent() throws Exception {
OutputContext context = createTezOutputContext();
conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, 0.0f);
try {
new DefaultSorter(context, conf, 10, (10 * 1024 * 1024l));
fail();
} catch(IllegalArgumentException e) {
assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT));
}
conf.setFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT, 1.1f);
try {
new DefaultSorter(context, conf, 10, (10 * 1024 * 1024l));
fail();
} catch(IllegalArgumentException e) {
assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT));
}
}
@Test
@Ignore
/**
* Disabling this, as this would need 2047 MB sort mb for testing.
* Set DefaultSorter.MAX_IO_SORT_MB = 20467 for running this.
*/
public void testSortLimitsWithSmallRecord() throws IOException {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, NullWritable.class.getName());
OutputContext context = createTezOutputContext();
doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
//Setting IO_SORT_MB to 2047 MB
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
context.requestInitialMemory(
ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
//Reset key/value in conf back to Text for other test cases
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
int i = 0;
/**
* If io.sort.mb is not capped to 1800, this would end up throwing
* "java.lang.ArrayIndexOutOfBoundsException" after many spills.
* Intentionally made it as infinite loop.
*/
while (true) {
//test for the avg record size 2 (in lower spectrum)
Text key = new Text(i + "");
sorter.write(key, NullWritable.get());
i = (i + 1) % 10;
}
}
@Test
@Ignore
/**
* Disabling this, as this would need 2047 MB io.sort.mb for testing.
* Provide > 2GB to JVM when running this test to avoid OOM in string generation.
*
* Set DefaultSorter.MAX_IO_SORT_MB = 2047 for running this.
*/
public void testSortLimitsWithLargeRecords() throws IOException {
OutputContext context = createTezOutputContext();
doReturn(2800 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
//Setting IO_SORT_MB to 2047 MB
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2047);
context.requestInitialMemory(
ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
DefaultSorter sorter = new DefaultSorter(context, conf, 2, 2047 << 20);
int i = 0;
/**
* If io.sort.mb is not capped to 1800, this would end up throwing
* "java.lang.ArrayIndexOutOfBoundsException" after many spills.
* Intentionally made it as infinite loop.
*/
while (true) {
Text key = new Text(i + "");
//Generate random size between 1 MB to 100 MB.
int valSize = ThreadLocalRandom.current().nextInt(1 * 1024 * 1024, 100 * 1024 * 1024);
String val = StringInterner.intern(StringUtils.repeat("v", valSize));
sorter.write(key, new Text(val));
i = (i + 1) % 10;
}
}
@Test(timeout = 5000)
public void testSortMBLimits() throws Exception {
assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
DefaultSorter.computeSortBufferSize(4096, "") == DefaultSorter.MAX_IO_SORT_MB);
assertTrue("Expected " + DefaultSorter.MAX_IO_SORT_MB,
DefaultSorter.computeSortBufferSize(2047, "") == DefaultSorter.MAX_IO_SORT_MB);
assertTrue("Expected 1024", DefaultSorter.computeSortBufferSize(1024, "") == 1024);
try {
DefaultSorter.computeSortBufferSize(0, "");
fail("Should have thrown error for setting buffer size to 0");
} catch(RuntimeException re) {
}
try {
DefaultSorter.computeSortBufferSize(-100, "");
fail("Should have thrown error for setting buffer size to negative value");
} catch(RuntimeException re) {
}
}
@Test(timeout = 30000)
//Test TEZ-1977
public void basicTest() throws IOException {
OutputContext context = createTezOutputContext();
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
try {
//Setting IO_SORT_MB to greater than available mem limit (should throw exception)
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 300);
context.requestInitialMemory(
ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), new MemoryUpdateCallbackHandler());
fail();
} catch(IllegalArgumentException e) {
assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB));
}
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 5, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();
//Write 1000 keys each of size 1000, (> 1 spill should happen)
try {
Text[] keys = generateData(1000, 1000);
Text[] values = generateData(1000, 1000);
for (int i = 0; i < keys.length; i++) {
sorterWrapper.writeKeyValue(keys[i], values[i]);
}
sorterWrapper.close();
assertTrue(sorter.getNumSpills() > 2);
verifyCounters(sorter, context);
} catch(IOException ioe) {
fail(ioe.getMessage());
}
verifyOutputPermissions(context.getUniqueIdentifier());
}
@Test(timeout = 30000)
public void testEmptyCaseFileLengths() throws IOException {
testEmptyCaseFileLengthsHelper(50, new String[] {"a", "b"}, new String[] {"1", "2"});
testEmptyCaseFileLengthsHelper(50, new String[] {"a", "a"}, new String[] {"1", "2"});
testEmptyCaseFileLengthsHelper(50, new String[] {"aaa", "bbb", "aaa"}, new String[] {"1", "2", "3"});
testEmptyCaseFileLengthsHelper(1, new String[] {"abcdefghij"}, new String[] {"1234567890"});
}
public void testEmptyCaseFileLengthsHelper(int numPartitions, String[] keys, String[] values)
throws IOException {
OutputContext context = createTezOutputContext();
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
String auxService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, numPartitions, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();
assertEquals("Key and Values must have the same number of elements", keys.length, values.length);
BitSet keyRLEs = new BitSet(keys.length);
for (int i = 0; i < keys.length; i++) {
boolean isRLE = sorterWrapper.writeKeyValue(new Text(keys[i]), new Text(values[i]));
keyRLEs.set(i, isRLE);
}
sorterWrapper.close();
List<Event> events = new ArrayList<>();
String pathComponent = (context.getUniqueIdentifier() + "_" + 0);
ShuffleUtils.generateEventOnSpill(events, true, true, context, 0,
sorter.indexCacheList.get(0), 0, true, pathComponent, sorter.getPartitionStats(),
sorter.reportDetailedPartitionStats(), auxService, TezCommonUtils.newBestCompressionDeflater());
CompositeDataMovementEvent compositeDataMovementEvent =
(CompositeDataMovementEvent) events.get(1);
ByteBuffer bb = compositeDataMovementEvent.getUserPayload();
ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
if (shufflePayload.hasEmptyPartitions()) {
byte[] emptyPartitionsBytesString =
TezCommonUtils.decompressByteStringToByteArray(
shufflePayload.getEmptyPartitions());
BitSet emptyPartitionBitSet = TezUtilsInternal.fromByteArray(emptyPartitionsBytesString);
Assert.assertEquals("Number of empty partitions did not match!",
emptyPartitionBitSet.cardinality(), sorterWrapper.getEmptyPartitionsCount());
} else {
Assert.assertEquals(sorterWrapper.getEmptyPartitionsCount(), 0);
}
// Each non-empty partition adds 4 bytes for header, 2 bytes for EOF_MARKER, 4 bytes for checksum
int expectedFileOutLength = sorterWrapper.getNonEmptyPartitionsCount() * 10;
for (int i = 0; i < keys.length; i++) {
// Each Record adds 1 byte for key length, 1 byte Text overhead (length), key.length bytes for key
expectedFileOutLength += keys[i].length() + 2;
// Each Record adds 1 byte for value length, 1 byte Text overhead (length), value.length bytes for value
expectedFileOutLength += values[i].length() + 2;
}
assertEquals("Unexpected Output File Size!", localFs.getFileStatus(sorter.getFinalOutputFile()).getLen(), expectedFileOutLength);
assertEquals(sorter.getNumSpills(), 1);
verifyCounters(sorter, context);
}
@Test
public void testWithEmptyData() throws IOException {
OutputContext context = createTezOutputContext();
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned());
//no data written. Empty
try {
sorter.flush();
sorter.close();
assertTrue(sorter.isClosed());
assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID));
verifyCounters(sorter, context);
} catch(Exception e) {
fail();
}
}
@Test(timeout = 30000)
public void testWithEmptyDataWithFinalMergeDisabled() throws IOException {
OutputContext context = createTezOutputContext();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
DefaultSorter sorter = new DefaultSorter(context, conf, 5, handler.getMemoryAssigned());
//no data written. Empty
try {
sorter.flush();
sorter.close();
assertTrue(sorter.isClosed());
assertTrue(sorter.getFinalOutputFile().getParent().getName().equalsIgnoreCase(UniqueID +
"_0"));
verifyCounters(sorter, context);
} catch(Exception e) {
fail();
}
}
@Test
public void testEmptyPartitions() throws Exception {
testEmptyPartitionsHelper(2, false);
testEmptyPartitionsHelper(2, true);
testEmptyPartitionsHelper(0, true);
testEmptyPartitionsHelper(0, true);
}
public void testEmptyPartitionsHelper(int numKeys, boolean sendEmptyPartitionDetails) throws IOException {
OutputContext context = createTezOutputContext();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionDetails);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 1);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
int partitions = 50;
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, partitions, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();
Text[] keys = generateData(numKeys, 1000000);
Text[] values = generateData(numKeys, 1000000);
for (int i = 0; i < keys.length; i++) {
sorterWrapper.writeKeyValue(keys[i], values[i]);
}
sorterWrapper.close();
if (numKeys == 0) {
assertTrue(sorter.getNumSpills() == 1);
} else {
assertTrue(sorter.getNumSpills() == numKeys);
}
verifyCounters(sorter, context);
verifyOutputPermissions(context.getUniqueIdentifier());
if (sorter.indexCacheList.size() != 0) {
for (int i = 0; i < sorter.getNumSpills(); i++) {
TezSpillRecord record = sorter.indexCacheList.get(i);
for (int j = 0; j < partitions; j++) {
TezIndexRecord tezIndexRecord = record.getIndex(j);
if (tezIndexRecord.hasData()) {
continue;
}
if (sendEmptyPartitionDetails) {
Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength());
} else {
Assert.assertEquals("", tezIndexRecord.getRawLength(), 6);
}
}
}
}
Path indexFile = sorter.getFinalIndexFile();
TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
for (int i = 0; i < partitions; i++) {
TezIndexRecord tezIndexRecord = spillRecord.getIndex(i);
if (tezIndexRecord.hasData()) {
continue;
}
if (sendEmptyPartitionDetails) {
Assert.assertEquals("Unexpected raw length for " + i + "th partition", 0, tezIndexRecord.getRawLength());
} else {
Assert.assertEquals("Unexpected raw length for " + i + "th partition", 6, tezIndexRecord.getRawLength());
}
}
}
void testPartitionStats(boolean withStats) throws IOException {
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, withStats);
OutputContext context = createTezOutputContext();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();
Text[] keys = generateData(1000, 10);
Text[] values = generateData(1000, 10);
for (int i = 0; i < keys.length; i++) {
sorterWrapper.writeKeyValue(keys[i], values[i]);
}
sorterWrapper.close();
assertTrue(sorter.getNumSpills() == 1);
verifyCounters(sorter, context);
if (withStats) {
assertTrue(sorter.getPartitionStats() != null);
} else {
assertTrue(sorter.getPartitionStats() == null);
}
}
@Test(timeout = 60000)
public void testWithPartitionStats() throws IOException {
testPartitionStats(true);
}
@Test(timeout = 60000)
public void testWithoutPartitionStats() throws IOException {
testPartitionStats(false);
}
@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testWithSingleSpillWithFinalMergeDisabled() throws IOException {
OutputContext context = createTezOutputContext();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();
Text[] keys = generateData(1000, 10);
Text[] values = generateData(1000, 10);
for (int i = 0; i < keys.length; i++) {
sorterWrapper.writeKeyValue(keys[i], values[i]);
}
sorterWrapper.close();
assertTrue(sorter.getNumSpills() == 1);
ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
verify(context, times(1)).sendEvents(eventCaptor.capture());
List<Event> events = eventCaptor.getValue();
for(Event event : events) {
if (event instanceof CompositeDataMovementEvent) {
CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event;
ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads
.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_0"));
verifyOutputPermissions(shufflePayload.getPathComponent());
}
}
verifyCounters(sorter, context);
}
@Test(timeout = 60000)
@SuppressWarnings("unchecked")
public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException {
OutputContext context = createTezOutputContext();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1);
MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler();
context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf,
context.getTotalMemoryAvailableToTask()), handler);
SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned());
DefaultSorter sorter = sorterWrapper.getSorter();
Text[] keys = generateData(10000, 1000);
Text[] values = generateData(10000, 1000);
for (int i = 0; i < keys.length; i++) {
sorterWrapper.writeKeyValue(keys[i], values[i]);
}
sorterWrapper.close();
int spillCount = sorter.getNumSpills();
ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
verify(context, times(1)).sendEvents(eventCaptor.capture());
List<Event> events = eventCaptor.getValue();
int spillIndex = 0;
for(Event event : events) {
if (event instanceof CompositeDataMovementEvent) {
CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) event;
ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload = ShuffleUserPayloads
.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
assertTrue(shufflePayload.getPathComponent().equalsIgnoreCase(UniqueID + "_" + spillIndex));
verifyOutputPermissions(shufflePayload.getPathComponent());
spillIndex++;
}
}
assertTrue(spillIndex == spillCount);
verifyCounters(sorter, context);
}
private void verifyOutputPermissions(String spillId) throws IOException {
String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId
+ "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;
Path outputPath = dirAllocator.getLocalPathToRead(subpath, conf);
Path indexPath = dirAllocator.getLocalPathToRead(subpath + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING, conf);
Assert.assertEquals("Incorrect output permissions", (short)0640,
localFs.getFileStatus(outputPath).getPermission().toShort());
Assert.assertEquals("Incorrect index permissions", (short)0640,
localFs.getFileStatus(indexPath).getPermission().toShort());
}
private void verifyCounters(DefaultSorter sorter, OutputContext context) {
TezCounter numShuffleChunks = context.getCounters().findCounter(TaskCounter.SHUFFLE_CHUNK_COUNT);
TezCounter additionalSpills = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
TezCounter additionalSpillBytesWritten = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
TezCounter additionalSpillBytesRead = context.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
if (sorter.isFinalMergeEnabled()) {
assertTrue(additionalSpills.getValue() == (sorter.getNumSpills() - 1));
//Number of files served by shuffle-handler
assertTrue(1 == numShuffleChunks.getValue());
if (sorter.getNumSpills() > 1) {
assertTrue(additionalSpillBytesRead.getValue() > 0);
assertTrue(additionalSpillBytesWritten.getValue() > 0);
}
} else {
assertTrue(0 == additionalSpills.getValue());
//Number of files served by shuffle-handler
assertTrue(sorter.getNumSpills() == numShuffleChunks.getValue());
assertTrue(additionalSpillBytesRead.getValue() == 0);
assertTrue(additionalSpillBytesWritten.getValue() == 0);
}
TezCounter finalOutputBytes = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
assertTrue(finalOutputBytes.getValue() >= 0);
TezCounter outputBytesWithOverheadCounter = context.getCounters().findCounter
(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
assertTrue(outputBytesWithOverheadCounter.getValue() >= 0);
verify(context, atLeastOnce()).notifyProgress();
}
private static class SorterWrapper {
private final DefaultSorter sorter;
private final Partitioner partitioner;
private final BitSet nonEmptyPartitions;
private final Object[] lastKeys;
private final int numPartitions;
public SorterWrapper(OutputContext context, Configuration conf, int numPartitions, long memoryAssigned) throws IOException {
sorter = new DefaultSorter(context, conf, numPartitions, memoryAssigned);
partitioner = TezRuntimeUtils.instantiatePartitioner(conf);
nonEmptyPartitions = new BitSet(numPartitions);
lastKeys = new Object[numPartitions];
this.numPartitions = numPartitions;
}
public boolean writeKeyValue(Object key, Object value) throws IOException {
int partition = partitioner.getPartition(key, value, this.numPartitions);
nonEmptyPartitions.set(partition);
sorter.write(key, value);
boolean isRLE = key.equals(lastKeys[partition]);
lastKeys[partition] = key;
return isRLE;
}
public int getNonEmptyPartitionsCount() {
return nonEmptyPartitions.cardinality();
}
public int getEmptyPartitionsCount() {
return numPartitions - nonEmptyPartitions.cardinality();
}
public void close () throws IOException {
sorter.flush();
sorter.close();
}
public DefaultSorter getSorter() {
return sorter;
}
}
private static Text[] generateData(int numKeys, int keyLen) {
Text[] ret = new Text[numKeys];
for (int i = 0; i < numKeys; i++) {
ret[i] = new Text(RandomStringUtils.randomAlphanumeric(keyLen));
}
return ret;
}
private OutputContext createTezOutputContext() throws IOException {
String[] workingDirs = { workingDir.toString() };
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
serviceProviderMetaData.writeInt(PORT);
TezCounters counters = new TezCounters();
OutputContext context = mock(OutputContext.class);
ExecutionContext execContext = new ExecutionContextImpl("localhost");
doReturn(mock(OutputStatisticsReporter.class)).when(context).getStatisticsReporter();
doReturn(execContext).when(context).getExecutionContext();
doReturn(counters).when(context).getCounters();
doReturn(workingDirs).when(context).getWorkDirs();
doReturn(payLoad).when(context).getUserPayload();
doReturn(5 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
doReturn(UniqueID).when(context).getUniqueIdentifier();
doReturn("v1").when(context).getDestinationVertexName();
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
.getServiceProviderMetaData
(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
long requestedSize = (Long) invocation.getArguments()[0];
MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
.getArguments()[1];
callback.memoryAssigned(requestedSize);
return null;
}
}).when(context).requestInitialMemory(anyLong(), any());
return context;
}
}