blob: 7999d45fcd39048fc9536f2cc7795a9bde10a417 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.output;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.OutputStatisticsReporter;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@SuppressWarnings({ "rawtypes", "unchecked" })
@RunWith(Parameterized.class)
public class TestOnFileSortedOutput {
private static final Random rnd = new Random();
private static final String UniqueID = "UUID";
private static final String HOST = "localhost";
private static final int PORT = 80;
private Configuration conf;
private FileSystem fs;
private Path workingDir;
//no of outputs
private int partitions;
//For sorter (pipelined / Default)
private SorterImpl sorterImpl;
private int sorterThreads;
final AtomicLong outputSize = new AtomicLong();
final AtomicLong numRecords = new AtomicLong();
private KeyValuesWriter writer;
private OrderedPartitionedKVOutput sortedOutput;
private boolean sendEmptyPartitionViaEvent;
//Partition index for which data should not be written to.
private int emptyPartitionIdx;
private ReportPartitionStats reportPartitionStats;
/**
* Constructor
*
* @param sendEmptyPartitionViaEvent
* @param sorterImpl Which sorter impl ( pipeline/legacy )
* @param sorterThreads number of threads needed for sorter (required only for pipelined sorter)
* @param emptyPartitionIdx for which data should not be generated
*/
public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent,
SorterImpl sorterImpl, int sorterThreads, int emptyPartitionIdx,
ReportPartitionStats reportPartitionStats) throws IOException {
this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
this.emptyPartitionIdx = emptyPartitionIdx;
this.sorterImpl = sorterImpl;
this.sorterThreads = sorterThreads;
this.reportPartitionStats = reportPartitionStats;
conf = new Configuration();
workingDir = new Path(".", this.getClass().getName());
String localDirs = workingDir.toString();
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
fs = FileSystem.getLocal(conf);
}
@Before
public void setup() throws Exception {
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, sorterImpl.name());
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS, sorterThreads);
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
HashPartitioner.class.getName());
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED,
sendEmptyPartitionViaEvent);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
reportPartitionStats.getType());
outputSize.set(0);
numRecords.set(0);
fs.mkdirs(workingDir);
this.partitions = Math.max(1, rnd.nextInt(10));
}
@After
public void cleanup() throws IOException {
fs.delete(workingDir, true);
}
@SuppressWarnings("deprecation")
@Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}, {4}]")
public static Collection<Object[]> getParameters() {
Collection<Object[]> parameters = new ArrayList<Object[]>();
//empty_partition_via_events_enabled, noOfSortThreads,
// partitionToBeEmpty, reportPartitionStats
parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0,
ReportPartitionStats.PRECISE });
//Pipelined sorter
parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0,
ReportPartitionStats.ENABLED });
parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0,
ReportPartitionStats.PRECISE });
return parameters;
}
private void startSortedOutput(int partitions) throws Exception {
OutputContext context = createTezOutputContext();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4);
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
sortedOutput.initialize();
sortedOutput.start();
writer = sortedOutput.getWriter();
}
private void _testPipelinedShuffle(String sorterName) throws Exception {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, sorterName);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
OutputContext context = createTezOutputContext();
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
sortedOutput.initialize();
sortedOutput.start();
assertFalse(sortedOutput.finalMergeEnabled);
assertTrue(sortedOutput.pipelinedShuffle);
}
@Test (timeout = 5000)
public void testPipelinedShuffle() throws Exception {
_testPipelinedShuffle(SorterImpl.PIPELINED.name());
}
@Test (timeout = 5000)
public void testPipelinedShuffleWithFinalMerge() throws Exception {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.PIPELINED.name());
//wrong setting for final merge enable in output
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
OutputContext context = createTezOutputContext();
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
sortedOutput.initialize();
sortedOutput.start();
assertFalse(sortedOutput.finalMergeEnabled); //should be disabled as pipelining is on
assertTrue(sortedOutput.pipelinedShuffle);
}
@Test
public void testPipelinedSettingsWithDefaultSorter() throws Exception {
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 3);
//negative. with default sorter
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS, SorterImpl.LEGACY.name());
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, true);
OutputContext context = createTezOutputContext();
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
sortedOutput.initialize();
try {
sortedOutput.start();
fail("Should have thrown illegal argument exception as pipelining is enabled with "
+ "DefaultSorter");
} catch(IllegalArgumentException ie) {
assertTrue(ie.getMessage().contains("works with PipelinedSorter"));
}
}
@Test (timeout = 5000)
public void testSortBufferSize() throws Exception{
OutputContext context = createTezOutputContext();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 2048);
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
try {
//Memory limit checks are done in sorter impls. For e.g, defaultsorter does not support > 2GB
sortedOutput.initialize();
DefaultSorter sorter = new DefaultSorter(context, conf, 100, 3500*1024*1024l);
fail();
} catch(IllegalArgumentException e) {
assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB));
}
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 0);
payLoad = TezUtils.createUserPayloadFromConf(conf);
doReturn(payLoad).when(context).getUserPayload();
sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
try {
sortedOutput.initialize();
fail();
} catch(IllegalArgumentException e) {
assertTrue(e.getMessage().contains(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB));
}
}
@Test(timeout = 5000)
public void baseTest() throws Exception {
startSortedOutput(partitions);
//Write random set of keys
long recordsWritten = numRecords.get();
for (int i = 0; i < Math.max(1, rnd.nextInt(50)); i++) {
Text key = new Text(new BigInteger(256, rnd).toString());
LinkedList values = new LinkedList();
for (int j = 0; j < Math.max(2, rnd.nextInt(10)); j++) {
recordsWritten++;
values.add(new Text(new BigInteger(256, rnd).toString()));
}
writer.write(key, values);
}
List<Event> eventList = sortedOutput.close();
assertTrue(eventList != null && eventList.size() == 2);
assertEquals(recordsWritten, numRecords.get());
ShuffleUserPayloads.DataMovementEventPayloadProto
payload = ShuffleUserPayloads.DataMovementEventPayloadProto
.parseFrom(
ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
ShuffleUserPayloads.VertexManagerEventPayloadProto
vmPayload = ShuffleUserPayloads.VertexManagerEventPayloadProto
.parseFrom(
ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload()));
if (reportPartitionStats.isPrecise()) {
assertTrue(vmPayload.hasDetailedPartitionStats());
} else {
assertTrue(vmPayload.hasPartitionStats());
}
assertEquals(HOST, payload.getHost());
assertEquals(PORT, payload.getPort());
assertEquals(UniqueID, payload.getPathComponent());
}
@Test(timeout = 5000)
public void testWithSomeEmptyPartition() throws Exception {
//ensure atleast 2 partitions are available
partitions = Math.max(2, partitions);
startSortedOutput(partitions);
//write random data
for (int i = 0; i < 2 * partitions; i++) {
Text key = new Text(new BigInteger(256, rnd).toString());
Text value = new Text(new BigInteger(256, rnd).toString());
//skip writing to certain partitions
if (i % partitions != emptyPartitionIdx) {
writer.write(key, value);
}
}
List<Event> eventList = sortedOutput.close();
assertTrue(eventList != null && eventList.size() == 2);
ShuffleUserPayloads.DataMovementEventPayloadProto
payload = ShuffleUserPayloads.DataMovementEventPayloadProto
.parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
assertEquals(HOST, payload.getHost());
assertEquals(PORT, payload.getPort());
assertEquals(UniqueID, payload.getPathComponent());
}
@Test(timeout = 5000)
public void testAllEmptyPartition() throws Exception {
startSortedOutput(partitions);
//Close output without writing any data to it.
List<Event> eventList = sortedOutput.close();
assertTrue(eventList != null && eventList.size() == 2);
ShuffleUserPayloads.DataMovementEventPayloadProto
payload = ShuffleUserPayloads.DataMovementEventPayloadProto
.parseFrom(ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload()));
if (sendEmptyPartitionViaEvent) {
assertEquals("", payload.getHost());
assertEquals(0, payload.getPort());
assertEquals("", payload.getPathComponent());
} else {
assertEquals(HOST, payload.getHost());
assertEquals(PORT, payload.getPort());
assertEquals(UniqueID, payload.getPathComponent());
}
}
private OutputContext createTezOutputContext() throws IOException {
String[] workingDirs = { workingDir.toString() };
Configuration localConf = new Configuration(false);
UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
serviceProviderMetaData.writeInt(PORT);
TezCounters counters = new TezCounters();
OutputStatisticsReporter reporter = mock(OutputStatisticsReporter.class);
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
outputSize.set((Long) invocation.getArguments()[0]);
return null;
}
}).when(reporter).reportDataSize(anyLong());
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
numRecords.set((Long) invocation.getArguments()[0]);
return null;
}
}).when(reporter).reportItemsProcessed(anyLong());
OutputContext context = mock(OutputContext.class);
doReturn(localConf).when(context).getContainerConfiguration();
doReturn(counters).when(context).getCounters();
doReturn(workingDirs).when(context).getWorkDirs();
doReturn(payLoad).when(context).getUserPayload();
doReturn(5 * 1024 * 1024l).when(context).getTotalMemoryAvailableToTask();
doReturn(UniqueID).when(context).getUniqueIdentifier();
doReturn("v0").when(context).getTaskVertexName();
doReturn("v1").when(context).getDestinationVertexName();
doReturn(ByteBuffer.wrap(serviceProviderMetaData.getData())).when(context)
.getServiceProviderMetaData
(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
doAnswer(new Answer() {
@Override public Object answer(InvocationOnMock invocation) throws Throwable {
long requestedSize = (Long) invocation.getArguments()[0];
MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
.getArguments()[1];
callback.memoryAssigned(requestedSize);
return null;
}
}).when(context).requestInitialMemory(anyLong(), any(MemoryUpdateCallback.class));
ExecutionContext ExecutionContext = mock(ExecutionContext.class);
doReturn(HOST).when(ExecutionContext).getHostName();
doReturn(reporter).when(context).getStatisticsReporter();
doReturn(ExecutionContext).when(context).getExecutionContext();
return context;
}
@Test(timeout=5000)
public void testInvalidSorter() throws Exception {
try {
_testPipelinedShuffle("Foo");
Assert.fail("Expected start to fail due to invalid sorter");
} catch (IllegalArgumentException e) {
// Expected
}
}
@Test(timeout=5000)
public void testLowerCaseNamedSorter() throws Exception {
_testPipelinedShuffle("Pipelined");
}
}