blob: dc7357c9ccbfa869e01f4c5acbc30ce4e54cedc7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.writers;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.protobuf.ByteString;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.SpillInfo;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.mockito.invocation.InvocationOnMock;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
@RunWith(value = Parameterized.class)
public class TestUnorderedPartitionedKVWriter {
private static final Logger LOG = LoggerFactory.getLogger(TestUnorderedPartitionedKVWriter.class);
private static final String HOST_STRING = "localhost";
private static final int SHUFFLE_PORT = 4000;
private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
private static final Path TEST_ROOT_DIR = new Path(testTmpDir,
TestUnorderedPartitionedKVWriter.class.getSimpleName());
private static FileSystem localFs;
private boolean shouldCompress;
private ReportPartitionStats reportPartitionStats;
private Configuration defaultConf = new Configuration();
public TestUnorderedPartitionedKVWriter(boolean shouldCompress,
ReportPartitionStats reportPartitionStats) {
this.shouldCompress = shouldCompress;
this.reportPartitionStats = reportPartitionStats;
}
@SuppressWarnings("deprecation")
@Parameterized.Parameters(name = "test[{0}, {1}]")
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{ false, ReportPartitionStats.DISABLED },
{ false, ReportPartitionStats.ENABLED },
{ false, ReportPartitionStats.NONE },
{ false, ReportPartitionStats.MEMORY_OPTIMIZED },
{ false, ReportPartitionStats.PRECISE },
{ true, ReportPartitionStats.DISABLED },
{ true, ReportPartitionStats.ENABLED },
{ true, ReportPartitionStats.NONE },
{ true, ReportPartitionStats.MEMORY_OPTIMIZED },
{ true, ReportPartitionStats.PRECISE }};
return Arrays.asList(data);
}
@Before
public void setup() throws IOException {
LOG.info("Setup. Using test dir: " + TEST_ROOT_DIR);
localFs = FileSystem.getLocal(new Configuration());
localFs.delete(TEST_ROOT_DIR, true);
localFs.mkdirs(TEST_ROOT_DIR);
}
@After
public void cleanup() throws IOException {
LOG.info("CleanUp");
localFs.delete(TEST_ROOT_DIR, true);
}
@Test(timeout = 10000)
public void testBufferSizing() throws IOException {
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
final int maxSingleBufferSizeBytes = 2047;
final long sizePerBuffer = maxSingleBufferSizeBytes - 64 - maxSingleBufferSizeBytes % 4;
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
false, maxSingleBufferSizeBytes);
int numOutputs = 10;
UnorderedPartitionedKVWriter kvWriter = null;
// Not enough memory so divide into 2 buffers.
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048);
assertEquals(2, kvWriter.numBuffers);
assertEquals(1024, kvWriter.sizePerBuffer);
assertEquals(1024, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(1, kvWriter.spillLimit);
// allocate exact
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
maxSingleBufferSizeBytes * 3);
assertEquals(3, kvWriter.numBuffers);
assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(1, kvWriter.spillLimit);
// under allocate
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
maxSingleBufferSizeBytes * 2 + maxSingleBufferSizeBytes / 2);
assertEquals(2, kvWriter.numBuffers);
assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(1, kvWriter.spillLimit);
// over allocate
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
maxSingleBufferSizeBytes * 2 + maxSingleBufferSizeBytes / 2 + 1);
assertEquals(3, kvWriter.numBuffers);
assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
assertEquals(maxSingleBufferSizeBytes / 2 + 1, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(1, kvWriter.spillLimit);
// spill limit 1.
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
4 * maxSingleBufferSizeBytes + 1);
assertEquals(4, kvWriter.numBuffers);
assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(1, kvWriter.spillLimit);
// spill limit 2.
conf.setInt(
TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT,
50);
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
4 * maxSingleBufferSizeBytes + 1);
assertEquals(4, kvWriter.numBuffers);
assertEquals(sizePerBuffer, kvWriter.sizePerBuffer);
assertEquals(sizePerBuffer, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(2, kvWriter.spillLimit);
// Available memory is less than buffer size.
conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
2048);
assertEquals(2, kvWriter.numBuffers);
assertEquals(1024, kvWriter.sizePerBuffer);
assertEquals(1024, kvWriter.lastBufferSize);
assertEquals(1, kvWriter.numInitializedBuffers);
assertEquals(1, kvWriter.spillLimit);
}
@Test(timeout = 10000)
public void testNoSpill() throws IOException, InterruptedException {
baseTest(10, 10, null, shouldCompress, -1, 0);
}
@Test(timeout = 10000)
public void testSingleSpill() throws IOException, InterruptedException {
baseTest(50, 10, null, shouldCompress, -1, 0);
}
@Test(timeout = 10000)
public void testMultipleSpills() throws IOException, InterruptedException {
baseTest(200, 10, null, shouldCompress, -1, 0);
}
@Test(timeout = 10000)
public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException {
// numBuffers is much higher than available threads.
baseTest(200, 10, null, shouldCompress, 512, 0, 9600, false);
}
@Test(timeout = 10000)
public void testMergeBuffersAndSpill() throws IOException, InterruptedException {
baseTest(200, 10, null, shouldCompress, 2048, 10);
}
@Test(timeout = 10000)
public void testNoRecords() throws IOException, InterruptedException {
baseTest(0, 10, null, shouldCompress, -1, 0);
}
@Test(timeout = 10000)
public void testNoRecords_SinglePartition() throws IOException, InterruptedException {
// skipBuffers
baseTest(0, 1, null, shouldCompress, -1, 0, 2048, false);
// Check with data via events
baseTest(0, 1, null, shouldCompress, -1, 0, 2048, true);
}
@Test(timeout = 10000)
public void testSkippedPartitions() throws IOException, InterruptedException {
baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress, -1, 0);
}
@Test(timeout = 10000)
public void testNoSpill_SinglePartition() throws IOException, InterruptedException {
baseTest(10, 1, null, shouldCompress, -1, 0);
}
@Test(timeout = 10000)
public void testSpill_SinglePartition() throws IOException, InterruptedException {
baseTest(1000, 1, null, shouldCompress, -1, 0, 2048, true);
}
@Test(timeout = 10000)
public void testRandomText() throws IOException, InterruptedException {
textTest(100, 10, 2048, 0, 0, 0, false, true);
}
@Test(timeout = 10000)
public void testLargeKeys() throws IOException, InterruptedException {
textTest(0, 10, 2048, 10, 0, 0, false, true);
}
@Test(timeout = 10000)
public void testLargevalues() throws IOException, InterruptedException {
textTest(0, 10, 2048, 0, 10, 0, false, true);
}
@Test(timeout = 10000)
public void testLargeKvPairs() throws IOException, InterruptedException {
textTest(0, 10, 2048, 0, 0, 10, false, true);
}
@Test(timeout = 10000)
public void testTextMixedRecords() throws IOException, InterruptedException {
textTest(100, 10, 2048, 10, 10, 10, false, true);
}
@Test(timeout = 10000000)
public void testRandomTextWithoutFinalMerge() throws IOException, InterruptedException {
textTest(100, 10, 2048, 0, 0, 0, false, false);
}
@Test(timeout = 10000)
public void testLargeKeysWithoutFinalMerge() throws IOException, InterruptedException {
textTest(0, 10, 2048, 10, 0, 0, false, false);
}
@Test(timeout = 10000)
public void testLargevaluesWithoutFinalMerge() throws IOException, InterruptedException {
textTest(0, 10, 2048, 0, 10, 0, false, false);
}
@Test(timeout = 10000)
public void testLargeKvPairsWithoutFinalMerge() throws IOException, InterruptedException {
textTest(0, 10, 2048, 0, 0, 10, false, false);
}
@Test(timeout = 10000)
public void testTextMixedRecordsWithoutFinalMerge() throws IOException, InterruptedException {
textTest(100, 10, 2048, 10, 10, 10, false, false);
}
public void textTest(int numRegularRecords, int numPartitions, long availableMemory,
int numLargeKeys, int numLargevalues, int numLargeKvPairs,
boolean pipeliningEnabled, boolean isFinalMergeEnabled) throws IOException,
InterruptedException {
Partitioner partitioner = new HashPartitioner();
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
int dagId = 1;
String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
Random random = new Random();
Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
-1, HashPartitioner.class);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, pipeliningEnabled);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, isFinalMergeEnabled);
CompressionCodec codec = null;
if (shouldCompress) {
codec = new DefaultCodec();
((Configurable) codec).setConf(conf);
}
int numRecordsWritten = 0;
Map<Integer, Multimap<String, String>> expectedValues = new HashMap<Integer, Multimap<String, String>>();
for (int i = 0; i < numPartitions; i++) {
expectedValues.put(i, LinkedListMultimap.<String, String> create());
}
UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
conf, numPartitions, availableMemory);
int sizePerBuffer = kvWriter.sizePerBuffer;
BitSet partitionsWithData = new BitSet(numPartitions);
Text keyText = new Text();
Text valText = new Text();
for (int i = 0; i < numRegularRecords; i++) {
String key = createRandomString(Math.abs(random.nextInt(10)));
String val = createRandomString(Math.abs(random.nextInt(20)));
keyText.set(key);
valText.set(val);
int partition = partitioner.getPartition(keyText, valText, numPartitions);
partitionsWithData.set(partition);
expectedValues.get(partition).put(key, val);
kvWriter.write(keyText, valText);
numRecordsWritten++;
}
// Write Large key records
for (int i = 0; i < numLargeKeys; i++) {
String key = createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
String val = createRandomString(Math.abs(random.nextInt(20)));
keyText.set(key);
valText.set(val);
int partition = partitioner.getPartition(keyText, valText, numPartitions);
partitionsWithData.set(partition);
expectedValues.get(partition).put(key, val);
kvWriter.write(keyText, valText);
numRecordsWritten++;
}
if (pipeliningEnabled) {
verify(outputContext, times(numLargeKeys)).sendEvents(anyList());
}
// Write Large val records
for (int i = 0; i < numLargevalues; i++) {
String key = createRandomString(Math.abs(random.nextInt(10)));
String val = createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
keyText.set(key);
valText.set(val);
int partition = partitioner.getPartition(keyText, valText, numPartitions);
partitionsWithData.set(partition);
expectedValues.get(partition).put(key, val);
kvWriter.write(keyText, valText);
numRecordsWritten++;
}
if (pipeliningEnabled) {
verify(outputContext, times(numLargevalues + numLargeKeys)).sendEvents(anyList());
}
// Write records where key + val are large (but both can fit in the buffer individually)
for (int i = 0; i < numLargeKvPairs; i++) {
String key = createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
String val = createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
keyText.set(key);
valText.set(val);
int partition = partitioner.getPartition(keyText, valText, numPartitions);
partitionsWithData.set(partition);
expectedValues.get(partition).put(key, val);
kvWriter.write(keyText, valText);
numRecordsWritten++;
}
if (pipeliningEnabled) {
verify(outputContext, times(numLargevalues + numLargeKeys + numLargeKvPairs))
.sendEvents(anyList());
}
List<Event> events = kvWriter.close();
verify(outputContext, never()).reportFailure(any(), any(), any());
if (!pipeliningEnabled) {
VertexManagerEvent vmEvent = null;
for (Event event : events) {
if (event instanceof VertexManagerEvent) {
assertNull(vmEvent);
vmEvent = (VertexManagerEvent) event;
}
}
VertexManagerEventPayloadProto vmEventPayload =
VertexManagerEventPayloadProto.parseFrom(
ByteString.copyFrom(vmEvent.getUserPayload().asReadOnlyBuffer()));
assertEquals(numRecordsWritten, vmEventPayload.getNumRecord());
}
TezCounter outputLargeRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS);
assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
outputLargeRecordsCounter.getValue());
if (pipeliningEnabled || !isFinalMergeEnabled) {
// verify spill data files and index file exist
for (int i = 0; i < kvWriter.numSpills.get(); i++) {
assertTrue(localFs.exists(kvWriter.outputFileHandler.getSpillFileForWrite(i, 0)));
assertTrue(localFs.exists(kvWriter.outputFileHandler.getSpillIndexFileForWrite(i, 0)));
}
return;
}
// Validate the events
assertEquals(2, events.size());
assertTrue(events.get(0) instanceof VertexManagerEvent);
VertexManagerEvent vme = (VertexManagerEvent) events.get(0);
verifyPartitionStats(vme, partitionsWithData);
assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
assertEquals(0, cdme.getSourceIndexStart());
assertEquals(numPartitions, cdme.getCount());
DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(
ByteString.copyFrom(cdme.getUserPayload()));
assertFalse(eventProto.hasData());
BitSet emptyPartitionBits = null;
if (partitionsWithData.cardinality() != numPartitions) {
assertTrue(eventProto.hasEmptyPartitions());
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(
eventProto.getEmptyPartitions());
emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
assertEquals(numPartitions - partitionsWithData.cardinality(),
emptyPartitionBits.cardinality());
} else {
assertFalse(eventProto.hasEmptyPartitions());
emptyPartitionBits = new BitSet(numPartitions);
}
assertEquals(HOST_STRING, eventProto.getHost());
assertEquals(SHUFFLE_PORT, eventProto.getPort());
assertEquals(uniqueId, eventProto.getPathComponent());
// Verify the data
// Verify the actual data
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
Path outputFilePath = kvWriter.finalOutPath;
Path spillFilePath = kvWriter.finalIndexPath;
if (numRecordsWritten > 0) {
assertTrue(localFs.exists(outputFilePath));
assertTrue(localFs.exists(spillFilePath));
checkPermissions(outputFilePath, spillFilePath);
} else {
return;
}
// Special case for 0 records.
TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
DataInputBuffer keyBuffer = new DataInputBuffer();
DataInputBuffer valBuffer = new DataInputBuffer();
Text keyDeser = new Text();
Text valDeser = new Text();
for (int i = 0; i < numPartitions; i++) {
if (emptyPartitionBits.get(i)) {
continue;
}
TezIndexRecord indexRecord = spillRecord.getIndex(i);
FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
inStream.seek(indexRecord.getStartOffset());
IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
null, false, 0, -1);
while (reader.nextRawKey(keyBuffer)) {
reader.nextRawValue(valBuffer);
keyDeser.readFields(keyBuffer);
valDeser.readFields(valBuffer);
int partition = partitioner.getPartition(keyDeser, valDeser, numPartitions);
assertTrue(expectedValues.get(partition).remove(keyDeser.toString(), valDeser.toString()));
}
inStream.close();
}
for (int i = 0; i < numPartitions; i++) {
assertEquals(0, expectedValues.get(i).size());
expectedValues.remove(i);
}
assertEquals(0, expectedValues.size());
}
private int[] getPartitionStats(VertexManagerEvent vme) throws IOException {
RoaringBitmap partitionStats = new RoaringBitmap();
VertexManagerEventPayloadProto
payload = VertexManagerEventPayloadProto
.parseFrom(ByteString.copyFrom(vme.getUserPayload()));
if (!reportPartitionStats.isEnabled()) {
assertFalse(payload.hasPartitionStats());
assertFalse(payload.hasDetailedPartitionStats());
return null;
}
if (reportPartitionStats.isPrecise()) {
assertTrue(payload.hasDetailedPartitionStats());
List<Integer> sizeInMBList =
payload.getDetailedPartitionStats().getSizeInMbList();
int[] stats = new int[sizeInMBList.size()];
for (int i=0; i<sizeInMBList.size(); i++) {
stats[i] += sizeInMBList.get(i);
}
return stats;
} else {
assertTrue(payload.hasPartitionStats());
ByteString compressedPartitionStats = payload.getPartitionStats();
byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(
compressedPartitionStats);
ByteArrayInputStream bin = new ByteArrayInputStream(rawData);
partitionStats.deserialize(new DataInputStream(bin));
int[] stats = new int[partitionStats.getCardinality()];
Iterator<Integer> it = partitionStats.iterator();
final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values();
final int RANGE_LEN = RANGES.length;
while (it.hasNext()) {
int pos = it.next();
int index = ((pos) / RANGE_LEN);
int rangeIndex = ((pos) % RANGE_LEN);
if (RANGES[rangeIndex].getSizeInMB() > 0) {
stats[index] += RANGES[rangeIndex].getSizeInMB();
}
}
return stats;
}
}
private void verifyPartitionStats(VertexManagerEvent vme,
BitSet expectedPartitionsWithData) throws IOException {
int[] stats = getPartitionStats(vme);
if (stats == null) {
return;
}
for (int i = 0; i < stats.length; i++) {
// The stats should be greater than zero if and only if
// the partition has data
assertTrue(expectedPartitionsWithData.get(i) == (stats[i] > 0));
}
}
@Test(timeout = 10000)
public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
baseTestWithPipelinedTransfer(10, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testSingleSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
baseTestWithPipelinedTransfer(50, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testMultipleSpills_WithPipelinedShuffle() throws IOException, InterruptedException {
baseTestWithPipelinedTransfer(200, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testNoRecords_WithPipelinedShuffle() throws IOException, InterruptedException {
baseTestWithPipelinedTransfer(0, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testNoRecords_SinglePartition_WithPipelinedShuffle() throws IOException, InterruptedException {
// skipBuffers
baseTestWithPipelinedTransfer(0, 1, null, shouldCompress);
}
@Test(timeout = 10000)
public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, InterruptedException {
baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet(2, 5), shouldCompress);
}
@Test(timeout = 10000)
public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException {
textTest(0, 10, 2048, 10, 20, 50, true, false);
}
@SuppressWarnings("unchecked")
private void baseTestWithPipelinedTransfer(int numRecords, int numPartitions, Set<Integer>
skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException {
PartitionerForTest partitioner = new PartitionerForTest();
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
int dagId = 1;
String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
shouldCompress, -1);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
CompressionCodec codec = null;
if (shouldCompress) {
codec = new DefaultCodec();
((Configurable) codec).setConf(conf);
}
int numOutputs = numPartitions;
long availableMemory = 2048;
int numRecordsWritten = 0;
UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
conf, numOutputs, availableMemory);
int sizePerBuffer = kvWriter.sizePerBuffer;
int sizePerRecord = 4 + 8; // IntW + LongW
int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
BitSet partitionsWithData = new BitSet(numPartitions);
IntWritable intWritable = new IntWritable();
LongWritable longWritable = new LongWritable();
for (int i = 0; i < numRecords; i++) {
intWritable.set(i);
longWritable.set(i);
int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
if (skippedPartitions != null && skippedPartitions.contains(partition)) {
continue;
}
partitionsWithData.set(partition);
kvWriter.write(intWritable, longWritable);
numRecordsWritten++;
}
int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
List<Event> lastEvents = kvWriter.close();
if (numPartitions == 1) {
assertEquals(false, kvWriter.skipBuffers);
}
//no events are sent to kvWriter upon close with pipelining
assertTrue(lastEvents.size() == 0);
verify(outputContext, atLeast(numExpectedSpills)).sendEvents(eventCaptor.capture());
int numOfCapturedEvents = eventCaptor.getAllValues().size();
lastEvents = eventCaptor.getAllValues().get(numOfCapturedEvents - 1);
VertexManagerEvent VMEvent = (VertexManagerEvent)lastEvents.get(0);
for (int i=0; i<numOfCapturedEvents; i++) {
List<Event> events = eventCaptor.getAllValues().get(i);
if (i < numOfCapturedEvents - 1) {
assertTrue(events.size() == 1);
assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
} else {
assertTrue(events.size() == 2);
assertTrue(events.get(0) instanceof VertexManagerEvent);
assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
}
}
verifyPartitionStats(VMEvent, partitionsWithData);
verify(outputContext, never()).reportFailure(any(),
any(), any());
assertNull(kvWriter.currentBuffer);
assertEquals(0, kvWriter.availableBuffers.size());
// Verify the counters
TezCounter outputRecordBytesCounter =
counters.findCounter(TaskCounter.OUTPUT_BYTES);
TezCounter outputRecordsCounter =
counters.findCounter(TaskCounter.OUTPUT_RECORDS);
TezCounter outputBytesWithOverheadCounter =
counters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
TezCounter fileOutputBytesCounter =
counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
TezCounter spilledRecordsCounter =
counters.findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter additionalSpillBytesWritternCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
TezCounter additionalSpillBytesReadCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
TezCounter numAdditionalSpillsCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
assertEquals(numRecordsWritten * sizePerRecord,
outputRecordBytesCounter.getValue());
assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
outputBytesWithOverheadCounter.getValue());
long fileOutputBytes = fileOutputBytesCounter.getValue();
if (numRecordsWritten > 0) {
assertTrue(fileOutputBytes > 0);
if (!shouldCompress) {
assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
}
} else {
assertEquals(0, fileOutputBytes);
}
// due to multiple threads, buffers could be merged in chunks in scheduleSpill.
assertTrue(recordsPerBuffer * numExpectedSpills >= spilledRecordsCounter.getValue());
long additionalSpillBytesWritten =
additionalSpillBytesWritternCounter.getValue();
long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
//No additional spill bytes written when final merge is disabled.
assertEquals(additionalSpillBytesWritten, 0);
//No additional spills when final merge is disabled.
assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
//No additional spills when final merge is disabled.
assertEquals(numAdditionalSpillsCounter.getValue(), 0);
assertTrue(lastEvents.size() > 0);
//Get the last event
int index = lastEvents.size() - 1;
assertTrue(lastEvents.get(index) instanceof CompositeDataMovementEvent);
CompositeDataMovementEvent cdme =
(CompositeDataMovementEvent)lastEvents.get(index);
assertEquals(0, cdme.getSourceIndexStart());
assertEquals(numOutputs, cdme.getCount());
DataMovementEventPayloadProto eventProto =
DataMovementEventPayloadProto.parseFrom(
ByteString.copyFrom(cdme.getUserPayload()));
//Ensure that this is the last event
assertTrue(eventProto.getLastEvent());
verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, skippedPartitions);
verify(outputContext, atLeast(1)).notifyProgress();
// Verify if all spill files are available.
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
if (numRecordsWritten > 0) {
int numSpills = kvWriter.numSpills.get();
for (int i = 0; i < numSpills; i++) {
Path outputFile = taskOutput.getSpillFileForWrite(i, 10);
Path indexFile = taskOutput.getSpillIndexFileForWrite(i, 10);
assertTrue(localFs.exists(outputFile));
assertTrue(localFs.exists(indexFile));
checkPermissions(outputFile, indexFile);
}
} else {
return;
}
}
private void checkPermissions(Path outputFile, Path indexFile) throws IOException {
assertEquals("Incorrect output permissions (user)", FsAction.READ_WRITE,
localFs.getFileStatus(outputFile).getPermission().getUserAction());
assertEquals("Incorrect output permissions (group)", FsAction.READ,
localFs.getFileStatus(outputFile).getPermission().getGroupAction());
assertEquals("Incorrect index permissions (user)", FsAction.READ_WRITE,
localFs.getFileStatus(indexFile).getPermission().getUserAction());
assertEquals("Incorrect index permissions (group)", FsAction.READ,
localFs.getFileStatus(indexFile).getPermission().getGroupAction());
}
private void verifyEmptyPartitions(DataMovementEventPayloadProto eventProto,
int numRecordsWritten, int numPartitions, Set<Integer> skippedPartitions)
throws IOException {
if (eventProto.hasEmptyPartitions()) {
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(
eventProto.getEmptyPartitions());
BitSet emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
if (numRecordsWritten == 0) {
assertEquals(numPartitions, emptyPartitionBits.cardinality());
} else {
if (skippedPartitions != null) {
for (Integer e : skippedPartitions) {
assertTrue(emptyPartitionBits.get(e));
}
assertEquals(skippedPartitions.size(), emptyPartitionBits.cardinality());
}
}
if (emptyPartitionBits.cardinality() != numPartitions) {
assertEquals(HOST_STRING, eventProto.getHost());
assertEquals(SHUFFLE_PORT, eventProto.getPort());
assertTrue(eventProto.hasPathComponent());
} else {
assertFalse(eventProto.hasHost());
assertFalse(eventProto.hasPort());
assertFalse(eventProto.hasPathComponent());
}
} else {
assertEquals(HOST_STRING, eventProto.getHost());
assertEquals(SHUFFLE_PORT, eventProto.getPort());
assertTrue(eventProto.hasPathComponent());
}
}
@Test(timeout = 10000)
public void testNoSpill_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(10, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testSingleSpill_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(50, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testSinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(0, 1, null, shouldCompress);
}
@Test(timeout = 10000)
public void testMultipleSpills_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(200, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testNoRecords_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(0, 10, null, shouldCompress);
}
@Test(timeout = 10000)
public void testNoRecords_SinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(0, 1, null, shouldCompress);
}
@Test(timeout = 10000)
public void testSkippedPartitions_WithFinalMergeDisabled() throws IOException, InterruptedException {
baseTestWithFinalMergeDisabled(200, 10, Sets.newHashSet(2, 5), shouldCompress);
}
@SuppressWarnings("unchecked")
private void baseTestWithFinalMergeDisabled(int numRecords, int numPartitions,
Set<Integer> skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException {
PartitionerForTest partitioner = new PartitionerForTest();
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
int dagId = 1;
String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
shouldCompress, -1);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setBoolean(TezRuntimeConfiguration
.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, false);
CompressionCodec codec = null;
if (shouldCompress) {
codec = new DefaultCodec();
((Configurable) codec).setConf(conf);
}
int numOutputs = numPartitions;
long availableMemory = 2048;
int numRecordsWritten = 0;
UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
conf, numOutputs, availableMemory);
int sizePerBuffer = kvWriter.sizePerBuffer;
int sizePerRecord = 4 + 8; // IntW + LongW
int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
BitSet partitionsWithData = new BitSet(numPartitions);
IntWritable intWritable = new IntWritable();
LongWritable longWritable = new LongWritable();
for (int i = 0; i < numRecords; i++) {
intWritable.set(i);
longWritable.set(i);
int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
if (skippedPartitions != null && skippedPartitions.contains(partition)) {
continue;
}
partitionsWithData.set(partition);
kvWriter.write(intWritable, longWritable);
numRecordsWritten++;
}
int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
ArgumentCaptor<List> eventCaptor = ArgumentCaptor.forClass(List.class);
List<Event> lastEvents = kvWriter.close();
if (numPartitions == 1) {
assertEquals(true, kvWriter.skipBuffers);
}
// max events sent are spills + one VM event. If there are no spills, atleast empty
// partitions would be sent out finally.
int spills = Math.max(1, kvWriter.numSpills.get());
assertEquals((spills + 1), lastEvents.size()); //spills + VMEvent
verify(outputContext, atMost(0)).sendEvents(eventCaptor.capture());
for (int i=0; i<lastEvents.size(); i++) {
Event event =lastEvents.get(i);
if (event instanceof VertexManagerEvent) {
//when there are no records, empty IFile with 6 bytes would be created which would add up
// to stats.
if (numRecordsWritten > 0) {
verifyPartitionStats(((VertexManagerEvent) event), partitionsWithData);
}
}
}
verify(outputContext, never()).reportFailure(any(),
any(), any());
assertNull(kvWriter.currentBuffer);
assertEquals(0, kvWriter.availableBuffers.size());
// Verify the counters
TezCounter outputRecordBytesCounter =
counters.findCounter(TaskCounter.OUTPUT_BYTES);
TezCounter outputRecordsCounter =
counters.findCounter(TaskCounter.OUTPUT_RECORDS);
TezCounter outputBytesWithOverheadCounter =
counters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
TezCounter fileOutputBytesCounter =
counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
TezCounter spilledRecordsCounter =
counters.findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter additionalSpillBytesWritternCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
TezCounter additionalSpillBytesReadCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
TezCounter numAdditionalSpillsCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
assertEquals(numRecordsWritten * sizePerRecord,
outputRecordBytesCounter.getValue());
assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
if (outputRecordsCounter.getValue() > 0) {
assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
outputBytesWithOverheadCounter.getValue());
} else {
assertEquals(0, outputBytesWithOverheadCounter.getValue());
}
long fileOutputBytes = fileOutputBytesCounter.getValue();
if (numRecordsWritten > 0) {
assertTrue(fileOutputBytes > 0);
if (!shouldCompress) {
assertTrue("fileOutputBytes=" + fileOutputBytes + ", outputRecordBytes="
+outputRecordBytesCounter.getValue(),
fileOutputBytes > outputRecordBytesCounter.getValue());
}
} else {
assertEquals(0, fileOutputBytes);
}
// due to multiple threads, buffers could be merged in chunks in scheduleSpill.
assertTrue(recordsPerBuffer * numExpectedSpills >= spilledRecordsCounter.getValue());
long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
//No additional spill bytes written when final merge is disabled.
assertEquals(additionalSpillBytesWritten, 0);
//No additional spills when final merge is disabled.
assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
//No additional spills when final merge is disabled.
assertEquals(numAdditionalSpillsCounter.getValue(), 0);
assertTrue(lastEvents.size() > 0);
//Get the last event
int index = lastEvents.size() - 1;
assertTrue(lastEvents.get(index) instanceof CompositeDataMovementEvent);
CompositeDataMovementEvent cdme =
(CompositeDataMovementEvent)lastEvents.get(index);
assertEquals(0, cdme.getSourceIndexStart());
assertEquals(numOutputs, cdme.getCount());
DataMovementEventPayloadProto eventProto =
DataMovementEventPayloadProto.parseFrom(
ByteString.copyFrom(cdme.getUserPayload()));
verifyEmptyPartitions(eventProto, numRecordsWritten, numPartitions, skippedPartitions);
if (outputRecordsCounter.getValue() > 0) {
//Ensure that this is the last event
assertTrue(eventProto.getLastEvent());
}
// Verify if all path components have spillIds when final merge is disabled
Pattern mergePathComponentPattern = Pattern.compile("(.*)(_\\d+)");
for(Event event : lastEvents) {
if (!(event instanceof CompositeDataMovementEvent)) {
continue;
}
cdme = (CompositeDataMovementEvent)event;
eventProto = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(cdme.getUserPayload()));
assertEquals(false, eventProto.getPipelined());
if (eventProto.hasPathComponent()) {
//for final merge disabled cases, it should have _spillId
Matcher matcher = mergePathComponentPattern.matcher(eventProto.getPathComponent());
assertTrue("spill id should be present in path component " + eventProto.getPathComponent(), matcher.matches());
assertEquals(2, matcher.groupCount());
assertEquals(uniqueId, matcher.group(1));
assertTrue("spill id should be present in path component", matcher.group(2) != null);
Path outputPath = new Path(outputContext.getWorkDirs()[0],
"output/" + eventProto.getPathComponent() + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
Path indexPath = outputPath.suffix(Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
checkPermissions(outputPath, indexPath);
} else {
assertEquals(0, eventProto.getSpillId());
if (outputRecordsCounter.getValue() > 0) {
assertEquals(true, eventProto.getLastEvent());
} else {
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
.getEmptyPartitions());
BitSet emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
assertEquals(numPartitions, emptyPartitionBits.cardinality());
}
}
}
verify(outputContext, atLeast(1)).notifyProgress();
// Verify if all spill files are available.
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
if (numRecordsWritten > 0) {
int numSpills = kvWriter.numSpills.get();
for (int i = 0; i < numSpills; i++) {
assertTrue(localFs.exists(taskOutput.getSpillFileForWrite(i, 10)));
assertTrue(localFs.exists(taskOutput.getSpillIndexFileForWrite(i, 10)));
}
} else {
return;
}
}
private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent)
throws IOException, InterruptedException {
baseTest(numRecords, numPartitions, skippedPartitions, shouldCompress,
maxSingleBufferSizeBytes, bufferMergePercent, 2048, false);
}
private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
boolean shouldCompress, int maxSingleBufferSizeBytes, int bufferMergePercent, int
availableMemory, boolean dataViaEventEnabled)
throws IOException, InterruptedException {
PartitionerForTest partitioner = new PartitionerForTest();
ApplicationId appId = ApplicationId.newInstance(10000000, 1);
TezCounters counters = new TezCounters();
String uniqueId = UUID.randomUUID().toString();
int dagId = 1;
String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
shouldCompress, maxSingleBufferSizeBytes);
conf.setInt(
TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT,
bufferMergePercent);
conf.setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED,
dataViaEventEnabled);
CompressionCodec codec = null;
if (shouldCompress) {
codec = new DefaultCodec();
((Configurable) codec).setConf(conf);
}
int numOutputs = numPartitions;
int numRecordsWritten = 0;
Map<Integer, Multimap<Integer, Long>> expectedValues = new HashMap<Integer, Multimap<Integer, Long>>();
for (int i = 0; i < numOutputs; i++) {
expectedValues.put(i, LinkedListMultimap.<Integer, Long> create());
}
UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
conf, numOutputs, availableMemory);
int sizePerBuffer = kvWriter.sizePerBuffer;
int sizePerRecord = 4 + 8; // IntW + LongW
int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
IntWritable intWritable = new IntWritable();
LongWritable longWritable = new LongWritable();
BitSet partitionsWithData = new BitSet(numPartitions);
for (int i = 0; i < numRecords; i++) {
intWritable.set(i);
longWritable.set(i);
int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
if (skippedPartitions != null && skippedPartitions.contains(partition)) {
continue;
}
partitionsWithData.set(partition);
expectedValues.get(partition).put(intWritable.get(), longWritable.get());
kvWriter.write(intWritable, longWritable);
numRecordsWritten++;
}
List<Event> events = kvWriter.close();
if (numPartitions == 1) {
assertEquals(true, kvWriter.skipBuffers);
// VM & DME events
assertEquals(2, events.size());
Event event1 = events.get(1);
assertTrue(event1 instanceof CompositeDataMovementEvent);
CompositeDataMovementEvent dme = (CompositeDataMovementEvent) event1;
ByteBuffer bb = dme.getUserPayload();
ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload =
ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(bb));
assertEquals(kvWriter.outputRecordsCounter.getValue(), shufflePayload.getNumRecord());
}
int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
int numExpectedSpills = numRecordsWritten / recordsPerBuffer / kvWriter.spillLimit;
verify(outputContext, never()).reportFailure(any(), any(), any());
assertNull(kvWriter.currentBuffer);
assertEquals(0, kvWriter.availableBuffers.size());
// Verify the counters
TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES);
TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS);
TezCounter outputBytesWithOverheadCounter = counters
.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
TezCounter additionalSpillBytesWritternCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
TezCounter additionalSpillBytesReadCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
TezCounter numAdditionalSpillsCounter = counters
.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue());
if (numPartitions > 1) {
assertEquals(numRecordsWritten * sizePerRecordWithOverhead, outputBytesWithOverheadCounter.getValue());
}
assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
long fileOutputBytes = fileOutputBytesCounter.getValue();
if (numRecordsWritten > 0) {
assertTrue(fileOutputBytes > 0);
if ((!shouldCompress) && (!dataViaEventEnabled)) {
assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
}
} else {
assertEquals(0, fileOutputBytes);
}
if (!dataViaEventEnabled) {
assertEquals(recordsPerBuffer * numExpectedSpills,
spilledRecordsCounter.getValue());
}
long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
if (numExpectedSpills == 0) {
assertEquals(0, additionalSpillBytesWritten);
assertEquals(0, additionalSpillBytesRead);
} else {
assertTrue(additionalSpillBytesWritten > 0);
if (!dataViaEventEnabled) {
assertTrue(additionalSpillBytesRead > 0);
if (!shouldCompress) {
assertTrue(additionalSpillBytesWritten >
(recordsPerBuffer * numExpectedSpills * sizePerRecord));
assertTrue(additionalSpillBytesRead >
(recordsPerBuffer * numExpectedSpills * sizePerRecord));
}
} else {
if (kvWriter.writer.getCompressedLength() >
TezRuntimeConfiguration.TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE_DEFAULT) {
assertTrue(additionalSpillBytesWritten > 0);
}
}
}
if (!dataViaEventEnabled) {
assertEquals(additionalSpillBytesWritten, additionalSpillBytesRead);
}
// due to multiple threads, buffers could be merged in chunks in scheduleSpill.
assertTrue(numExpectedSpills >= numAdditionalSpillsCounter.getValue());
BitSet emptyPartitionBits = null;
// Verify the events returned
assertEquals(2, events.size());
assertTrue(events.get(0) instanceof VertexManagerEvent);
VertexManagerEvent vme = (VertexManagerEvent) events.get(0);
verifyPartitionStats(vme, partitionsWithData);
assertTrue(events.get(1) instanceof CompositeDataMovementEvent);
CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(1);
assertEquals(0, cdme.getSourceIndexStart());
assertEquals(numOutputs, cdme.getCount());
DataMovementEventPayloadProto eventProto =
DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(
cdme.getUserPayload()));
if (skippedPartitions == null && numRecordsWritten > 0) {
assertFalse(eventProto.hasEmptyPartitions());
emptyPartitionBits = new BitSet(numPartitions);
} else {
assertTrue(eventProto.hasEmptyPartitions());
byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(eventProto
.getEmptyPartitions());
emptyPartitionBits = TezUtilsInternal.fromByteArray(emptyPartitions);
if (numRecordsWritten == 0) {
assertEquals(numPartitions, emptyPartitionBits.cardinality());
} else {
for (Integer e : skippedPartitions) {
assertTrue(emptyPartitionBits.get(e));
}
assertEquals(skippedPartitions.size(), emptyPartitionBits.cardinality());
}
}
if (emptyPartitionBits.cardinality() != numPartitions) {
assertEquals(HOST_STRING, eventProto.getHost());
assertEquals(SHUFFLE_PORT, eventProto.getPort());
assertEquals(uniqueId, eventProto.getPathComponent());
} else {
assertFalse(eventProto.hasHost());
assertFalse(eventProto.hasPort());
assertFalse(eventProto.hasPathComponent());
}
// Verify the actual data
TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId);
Path outputFilePath = kvWriter.finalOutPath;
Path spillFilePath = kvWriter.finalIndexPath;
if (numRecordsWritten <= 0) {
return;
}
boolean isInMem= eventProto.getData().hasData();
assertTrue(localFs.exists(outputFilePath));
assertEquals("Incorrect output permissions (user)", FsAction.READ_WRITE,
localFs.getFileStatus(outputFilePath).getPermission().getUserAction());
assertEquals("Incorrect output permissions (group)", FsAction.READ,
localFs.getFileStatus(outputFilePath).getPermission().getGroupAction());
if( !isInMem ) {
assertTrue(localFs.exists(spillFilePath));
assertEquals("Incorrect index permissions (user)", FsAction.READ_WRITE,
localFs.getFileStatus(spillFilePath).getPermission().getUserAction());
assertEquals("Incorrect index permissions (group)", FsAction.READ,
localFs.getFileStatus(spillFilePath).getPermission().getGroupAction());
// verify no intermediate spill files have been left around
synchronized (kvWriter.spillInfoList) {
for (SpillInfo spill : kvWriter.spillInfoList) {
assertFalse("lingering intermediate spill file " + spill.outPath,
localFs.exists(spill.outPath));
}
}
}
// Special case for 0 records.
DataInputBuffer keyBuffer = new DataInputBuffer();
DataInputBuffer valBuffer = new DataInputBuffer();
IntWritable keyDeser = new IntWritable();
LongWritable valDeser = new LongWritable();
for (int i = 0; i < numOutputs; i++) {
IFile.Reader reader = null;
InputStream inStream;
if (isInMem) {
// Read from in memory payload
int dataLoadSize = eventProto.getData().getData().size();
inStream = new ByteArrayInputStream(eventProto.getData().getData().toByteArray());
reader = new IFile.Reader(inStream, dataLoadSize, codec, null,
null, false, 0, -1);
} else {
TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
TezIndexRecord indexRecord = spillRecord.getIndex(i);
if (skippedPartitions != null && skippedPartitions.contains(i)) {
assertFalse("The Index Record for partition " + i + " should not have any data", indexRecord.hasData());
continue;
}
FSDataInputStream tmpStream = FileSystem.getLocal(conf).open(outputFilePath);
tmpStream.seek(indexRecord.getStartOffset());
inStream = tmpStream;
reader = new IFile.Reader(tmpStream, indexRecord.getPartLength(), codec, null,
null, false, 0, -1);
}
while (reader.nextRawKey(keyBuffer)) {
reader.nextRawValue(valBuffer);
keyDeser.readFields(keyBuffer);
valDeser.readFields(valBuffer);
int partition = partitioner.getPartition(keyDeser, valDeser, numOutputs);
assertTrue(expectedValues.get(partition).remove(keyDeser.get(), valDeser.get()));
}
inStream.close();
}
for (int i = 0; i < numOutputs; i++) {
assertEquals(0, expectedValues.get(i).size());
expectedValues.remove(i);
}
assertEquals(0, expectedValues.size());
verify(outputContext, atLeast(1)).notifyProgress();
}
private static String createRandomString(int size) {
StringBuilder sb = new StringBuilder(size);
Random random = new Random();
for (int i = 0; i < size; i++) {
int r = Math.abs(random.nextInt()) % 26;
sb.append((char) (65 + r));
}
return sb.toString();
}
private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
String uniqueId, String auxiliaryService) {
OutputContext outputContext = mock(OutputContext.class);
doReturn(counters).when(outputContext).getCounters();
doReturn(appId).when(outputContext).getApplicationId();
doReturn(1).when(outputContext).getDAGAttemptNumber();
doReturn("dagName").when(outputContext).getDAGName();
doReturn("destinationVertexName").when(outputContext).getDestinationVertexName();
doReturn(1).when(outputContext).getOutputIndex();
doReturn(1).when(outputContext).getTaskAttemptNumber();
doReturn(1).when(outputContext).getTaskIndex();
doReturn(1).when(outputContext).getTaskVertexIndex();
doReturn("vertexName").when(outputContext).getTaskVertexName();
doReturn(uniqueId).when(outputContext).getUniqueIdentifier();
doAnswer(new Answer<ByteBuffer>() {
@Override
public ByteBuffer answer(InvocationOnMock invocation) throws Throwable {
ByteBuffer portBuffer = ByteBuffer.allocate(4);
portBuffer.mark();
portBuffer.putInt(SHUFFLE_PORT);
portBuffer.reset();
return portBuffer;
}
}).when(outputContext).getServiceProviderMetaData(eq(auxiliaryService));
Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
String[] outDirs = new String[] { outDirBase.toString() };
doReturn(outDirs).when(outputContext).getWorkDirs();
return outputContext;
}
private Configuration createConfiguration(OutputContext outputContext,
Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
boolean shouldCompress, int maxSingleBufferSizeBytes) {
return createConfiguration(outputContext, keyClass, valClass, shouldCompress,
maxSingleBufferSizeBytes, PartitionerForTest.class);
}
private Configuration createConfiguration(OutputContext outputContext,
Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
boolean shouldCompress, int maxSingleBufferSizeBytes,
Class<? extends Partitioner> partitionerClass) {
Configuration conf = new Configuration(false);
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClass.getName());
if (maxSingleBufferSizeBytes >= 0) {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
maxSingleBufferSizeBytes);
}
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, shouldCompress);
if (shouldCompress) {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC,
DefaultCodec.class.getName());
}
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
reportPartitionStats.getType());
return conf;
}
public static class PartitionerForTest implements Partitioner {
@Override
public int getPartition(Object key, Object value, int numPartitions) {
if (key instanceof IntWritable) {
return ((IntWritable) key).get() % numPartitions;
} else {
throw new UnsupportedOperationException(
"Test partitioner expected to be called with IntWritable only");
}
}
}
private static class UnorderedPartitionedKVWriterForTest extends UnorderedPartitionedKVWriter {
public UnorderedPartitionedKVWriterForTest(OutputContext outputContext, Configuration conf,
int numOutputs, long availableMemoryBytes) throws IOException {
super(outputContext, conf, numOutputs, availableMemoryBytes);
}
@Override
String getHost() {
return HOST_STRING;
}
}
}