blob: b8a148a4942512e29c64535d692fa6e7dfae187a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor.data;
import org.apache.commons.io.FileUtils;
import org.apache.nemo.common.HashRange;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.coder.IntDecoderFactory;
import org.apache.nemo.common.coder.IntEncoderFactory;
import org.apache.nemo.common.coder.PairDecoderFactory;
import org.apache.nemo.common.coder.PairEncoderFactory;
import org.apache.nemo.common.ir.IdManager;
import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.local.LocalMessageDispatcher;
import org.apache.nemo.runtime.common.message.local.LocalMessageEnvironment;
import org.apache.nemo.runtime.common.state.BlockState;
import org.apache.nemo.runtime.executor.data.block.Block;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
import org.apache.nemo.runtime.executor.data.stores.*;
import org.apache.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
import org.apache.nemo.runtime.executor.data.streamchainer.DecompressionStreamChainer;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.RuntimeMaster;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.apache.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests write and read for {@link BlockStore}s.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({BlockManagerMaster.class, RuntimeMaster.class, SerializerManager.class})
public final class BlockStoreTest {
private static final String TMP_FILE_DIRECTORY = "./tmpFiles";
private static final Serializer SERIALIZER = new Serializer(
PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of()),
PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of()),
Collections.singletonList(new CompressionStreamChainer(CompressionProperty.Value.LZ4)),
Collections.singletonList(new DecompressionStreamChainer(CompressionProperty.Value.LZ4)));
private static final SerializerManager serializerManager = mock(SerializerManager.class);
private BlockManagerMaster blockManagerMaster;
private Injector baseInjector;
// Variables for shuffle test
private static final int NUM_WRITE_VERTICES = 3;
private static final int NUM_READ_VERTICES = 3;
private static final int DATA_SIZE = 1000;
private List<String> blockIdList;
private List<List<NonSerializedPartition<Integer>>> partitionsPerBlock;
// Variables for concurrent read test
private static final int NUM_CONC_READ_TASKS = 10;
private static final int CONC_READ_DATA_SIZE = 1000;
private String concBlockId;
private NonSerializedPartition<Integer> concBlockPartition;
// Variables for shuffle in range test
private static final int NUM_WRITE_HASH_TASKS = 2;
private static final int NUM_READ_HASH_TASKS = 3;
private static final int HASH_DATA_SIZE = 1000;
private static final int HASH_RANGE = 4;
private List<String> hashedBlockIdList;
private List<List<NonSerializedPartition<Integer>>> hashedBlockPartitionList;
private List<KeyRange> readKeyRangeList;
private List<List<Iterable>> expectedDataInRange;
private String getTaskId(final int index) {
return RuntimeIdManager.generateTaskId("STAGE", index, 0);
}
/**
* Generates the ids and the data which will be used for the block store tests.
*
* @throws Exception exception on the way.
*/
@Before
public void setUp() throws Exception {
baseInjector = LocalMessageDispatcher.getInjector();
final Injector injector = LocalMessageEnvironment
.forkInjector(baseInjector, MessageEnvironment.MASTER_COMMUNICATION_ID);
blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
when(serializerManager.getSerializer(any())).thenReturn(SERIALIZER);
// Following part is for for the shuffle test.
final List<String> writeVertexIdList = new ArrayList<>(NUM_WRITE_VERTICES);
final List<String> readTaskIdList = new ArrayList<>(NUM_READ_VERTICES);
blockIdList = new ArrayList<>(NUM_WRITE_VERTICES);
partitionsPerBlock = new ArrayList<>(NUM_WRITE_VERTICES);
// Generates the ids of the tasks to be used.
IntStream.range(0, NUM_WRITE_VERTICES).forEach(number -> writeVertexIdList.add("Write_IR_vertex"));
IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
// Generates the ids and the data of the blocks to be used.
final String shuffleEdge = IdManager.newEdgeId();
IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
// Create a block for each writer task.
final String taskId = getTaskId(writeTaskIdx);
final String blockId = RuntimeIdManager.generateBlockId(shuffleEdge, taskId);
blockIdList.add(blockId);
// Create blocks for this block.
final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_VERTICES);
partitionsPerBlock.add(partitionsForBlock);
IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx -> {
final int partitionsCount = writeTaskIdx * NUM_READ_VERTICES + readTaskIdx;
partitionsForBlock.add(new NonSerializedPartition(
readTaskIdx, getRangedNumList(partitionsCount * DATA_SIZE, (partitionsCount + 1) * DATA_SIZE), -1, -1));
});
});
// Following part is for the concurrent read test.
final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
final String concEdge = IdManager.newEdgeId();
// Generates the ids and the data to be used.
concBlockId = RuntimeIdManager.generateBlockId(concEdge, getTaskId(NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1));
IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> concReadTaskIdList.add("conc_read_IR_vertex"));
concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, CONC_READ_DATA_SIZE), -1, -1);
// Following part is for the shuffle in hash range test
final int numHashedBlocks = NUM_WRITE_HASH_TASKS;
final List<String> writeHashTaskIdList = new ArrayList<>(NUM_WRITE_HASH_TASKS);
final List<String> readHashTaskIdList = new ArrayList<>(NUM_READ_HASH_TASKS);
readKeyRangeList = new ArrayList<>(NUM_READ_HASH_TASKS);
hashedBlockIdList = new ArrayList<>(numHashedBlocks);
hashedBlockPartitionList = new ArrayList<>(numHashedBlocks);
expectedDataInRange = new ArrayList<>(NUM_READ_HASH_TASKS);
// Generates the ids of the tasks to be used.
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
final String hashEdge = IdManager.newEdgeId();
// Generates the ids and the data of the blocks to be used.
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
final String taskId = getTaskId(NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1 + writeTaskIdx);
final String blockId = RuntimeIdManager.generateBlockId(hashEdge, taskId);
hashedBlockIdList.add(blockId);
final List<NonSerializedPartition<Integer>> hashedBlock = new ArrayList<>(HASH_RANGE);
// Generates the data having each hash value.
IntStream.range(0, HASH_RANGE).forEach(hashValue ->
hashedBlock.add(new NonSerializedPartition(hashValue, getFixedKeyRangedNumList(
hashValue,
writeTaskIdx * HASH_DATA_SIZE * HASH_RANGE + hashValue * HASH_DATA_SIZE,
writeTaskIdx * HASH_DATA_SIZE * HASH_RANGE + (hashValue + 1) * HASH_DATA_SIZE), -1, -1)));
hashedBlockPartitionList.add(hashedBlock);
});
// Generates the range of hash value to read for each read task.
final int smallDataRangeEnd = 1 + NUM_READ_HASH_TASKS - NUM_WRITE_HASH_TASKS;
readKeyRangeList.add(HashRange.of(0, smallDataRangeEnd));
IntStream.range(0, NUM_READ_HASH_TASKS - 1).forEach(readTaskIdx -> {
readKeyRangeList.add(HashRange.of(smallDataRangeEnd + readTaskIdx,
smallDataRangeEnd + readTaskIdx + 1));
});
// Generates the expected result of hash range retrieval for each read task.
for (int readTaskIdx = 0; readTaskIdx < NUM_READ_HASH_TASKS; readTaskIdx++) {
final KeyRange<Integer> hashRange = readKeyRangeList.get(readTaskIdx);
final List<Iterable> expectedRangePartitions = new ArrayList<>(NUM_WRITE_HASH_TASKS);
for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_HASH_TASKS; writeTaskIdx++) {
final List<Iterable> appendingList = new ArrayList<>();
for (int hashVal = hashRange.rangeBeginInclusive(); hashVal < hashRange.rangeEndExclusive(); hashVal++) {
appendingList.add(hashedBlockPartitionList.get(writeTaskIdx).get(hashVal).getData());
}
final List concatStreamBase = new ArrayList<>();
Stream<Object> concatStream = concatStreamBase.stream();
for (final Iterable data : appendingList) {
concatStream = Stream.concat(concatStream, StreamSupport.stream(data.spliterator(), false));
}
expectedRangePartitions.add(concatStream.collect(Collectors.toList()));
}
expectedDataInRange.add(expectedRangePartitions);
}
}
/**
* Test {@link MemoryStore}.
*
* @throws Exception exception on the way.
*/
@Test(timeout = 10000)
public void testMemoryStore() throws Exception {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
final BlockStore memoryStore = injector.getInstance(MemoryStore.class);
shuffle(memoryStore, memoryStore);
concurrentRead(memoryStore, memoryStore);
shuffleInHashRange(memoryStore, memoryStore);
}
/**
* Test {@link SerializedMemoryStore}.
*
* @throws Exception exception on the way.
*/
@Test(timeout = 10000)
public void testSerMemoryStore() throws Exception {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
final BlockStore serMemoryStore = injector.getInstance(SerializedMemoryStore.class);
shuffle(serMemoryStore, serMemoryStore);
concurrentRead(serMemoryStore, serMemoryStore);
shuffleInHashRange(serMemoryStore, serMemoryStore);
}
/**
* Test {@link LocalFileStore}.
*
* @throws Exception exception on the way.
*/
@Test(timeout = 10000)
public void testLocalFileStore() throws Exception {
FileUtils.deleteDirectory(new File(TMP_FILE_DIRECTORY));
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(JobConf.FileDirectory.class, TMP_FILE_DIRECTORY);
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
final BlockStore localFileStore = injector.getInstance(LocalFileStore.class);
shuffle(localFileStore, localFileStore);
concurrentRead(localFileStore, localFileStore);
shuffleInHashRange(localFileStore, localFileStore);
FileUtils.deleteDirectory(new File(TMP_FILE_DIRECTORY));
}
/**
* Test {@link GlusterFileStore}.
* Actually, we cannot create a virtual GFS volume in here.
* Instead, this test mimics the GFS circumstances by doing the read and write on separate file stores.
*
* @throws Exception exception on the way.
*/
@Test(timeout = 10000)
public void testGlusterFileStore() throws Exception {
FileUtils.deleteDirectory(new File(TMP_FILE_DIRECTORY));
final RemoteFileStore writerSideRemoteFileStore =
createGlusterFileStore("writer");
final RemoteFileStore readerSideRemoteFileStore =
createGlusterFileStore("reader");
shuffle(writerSideRemoteFileStore, readerSideRemoteFileStore);
concurrentRead(writerSideRemoteFileStore, readerSideRemoteFileStore);
shuffleInHashRange(writerSideRemoteFileStore, readerSideRemoteFileStore);
FileUtils.deleteDirectory(new File(TMP_FILE_DIRECTORY));
}
private GlusterFileStore createGlusterFileStore(final String executorId)
throws InjectionException {
final Injector injector = LocalMessageEnvironment.forkInjector(baseInjector, executorId);
injector.bindVolatileParameter(JobConf.GlusterVolumeDirectory.class, TMP_FILE_DIRECTORY);
injector.bindVolatileParameter(JobConf.JobId.class, "GFS test");
injector.bindVolatileParameter(JobConf.ExecutorId.class, executorId);
injector.bindVolatileInstance(SerializerManager.class, serializerManager);
injector.bindVolatileParameter(JobConf.MaxOffheapMb.class, 128);
return injector.getInstance(GlusterFileStore.class);
}
/**
* Tests shuffle for {@link BlockStore}s.
* Assumes following circumstances:
* Task 1 (write)-> (read)-> Task 4
* Task 2 (write)-> shuffle (read)-> Task 5
* Task 3 (write)-> (read)-> Task 6
* It checks that each writer and reader does not throw any exception
* and the read data is identical with written data (including the order).
*/
private void shuffle(final BlockStore writerSideStore,
final BlockStore readerSideStore) {
final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_VERTICES);
final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_VERTICES);
final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_VERTICES);
final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_VERTICES);
final long startNano = System.nanoTime();
// Write concurrently
IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx ->
writeFutureList.add(writeExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
final String blockId = blockIdList.get(writeTaskIdx);
final Block block = writerSideStore.createBlock(blockId);
for (final NonSerializedPartition<Integer> partition : partitionsPerBlock.get(writeTaskIdx)) {
final Iterable data = partition.getData();
data.forEach(element -> block.write(partition.getKey(), element));
}
block.commit();
writerSideStore.writeBlock(block);
blockManagerMaster.onProducerTaskScheduled(getTaskId(writeTaskIdx), Collections.singleton(blockId));
blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE,
"Writer side of the shuffle edge");
return true;
} catch (final Exception e) {
e.printStackTrace();
return false;
}
}
})));
// Wait each writer to success
IntStream.range(0, NUM_WRITE_VERTICES).forEach(writer -> {
try {
assertTrue(writeFutureList.get(writer).get());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
final long writeEndNano = System.nanoTime();
// Read concurrently and check whether the result is equal to the input
IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx ->
readFutureList.add(readExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_VERTICES; writeTaskIdx++) {
readResultCheck(blockIdList.get(writeTaskIdx),
HashRange.of(readTaskIdx, readTaskIdx + 1),
readerSideStore, partitionsPerBlock.get(writeTaskIdx).get(readTaskIdx).getData());
}
return true;
} catch (final Exception e) {
e.printStackTrace();
return false;
}
}
})));
// Wait each reader to success
IntStream.range(0, NUM_READ_VERTICES).forEach(reader -> {
try {
assertTrue(readFutureList.get(reader).get());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
// Remove all blocks
blockIdList.forEach(blockId -> {
final boolean exist = readerSideStore.deleteBlock(blockId);
if (!exist) {
throw new RuntimeException("The result of deleteBlock(" + blockId + ") is false");
}
});
final long readEndNano = System.nanoTime();
writeExecutor.shutdown();
readExecutor.shutdown();
System.out.println(
"Shuffle - write time in millis: " + (writeEndNano - startNano) / 1000000 +
", Read time in millis: " + (readEndNano - writeEndNano) / 1000000 + " in store " +
writerSideStore.getClass().toString());
}
/**
* Tests concurrent read for {@link BlockStore}s.
* Assumes following circumstances:
* -> Task 2
* Task 1 (write)-> broadcast (concurrent read)-> ...
* -> Task 11
* It checks that each writer and reader does not throw any exception
* and the read data is identical with written data (including the order).
*/
private void concurrentRead(final BlockStore writerSideStore,
final BlockStore readerSideStore) {
final ExecutorService writeExecutor = Executors.newSingleThreadExecutor();
final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_CONC_READ_TASKS);
final Future<Boolean> writeFuture;
final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_CONC_READ_TASKS);
final long startNano = System.nanoTime();
// Write a block
writeFuture = writeExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
final Block block = writerSideStore.createBlock(concBlockId);
final Iterable data = concBlockPartition.getData();
data.forEach(element -> block.write(concBlockPartition.getKey(), element));
block.commit();
writerSideStore.writeBlock(block);
blockManagerMaster.onProducerTaskScheduled(getTaskId(0), Collections.singleton(block.getId()));
blockManagerMaster.onBlockStateChanged(
concBlockId, BlockState.State.AVAILABLE, "Writer side of the concurrent read edge");
return true;
} catch (final Exception e) {
e.printStackTrace();
return false;
}
}
});
// Wait the writer to success
try {
assertTrue(writeFuture.get());
} catch (final Exception e) {
throw new RuntimeException(e);
}
final long writeEndNano = System.nanoTime();
// Read the single block concurrently and check whether the result is equal to the input
IntStream.range(0, NUM_CONC_READ_TASKS).forEach(readTaskIdx ->
readFutureList.add(readExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
readResultCheck(concBlockId, HashRange.all(), readerSideStore, concBlockPartition.getData());
return true;
} catch (final Exception e) {
e.printStackTrace();
return false;
}
}
})));
// Wait each reader to success
IntStream.range(0, NUM_CONC_READ_TASKS).forEach(reader -> {
try {
assertTrue(readFutureList.get(reader).get());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
// Remove the block
final boolean exist = writerSideStore.deleteBlock(concBlockId);
if (!exist) {
throw new RuntimeException("The result of deleteBlock(" + concBlockId + ") is false");
}
final long readEndNano = System.nanoTime();
writeExecutor.shutdown();
readExecutor.shutdown();
System.out.println(
"Concurrent read - write time in millis: " + (writeEndNano - startNano) / 1000000 +
", Read time in millis: " + (readEndNano - writeEndNano) / 1000000 + " in store " +
writerSideStore.getClass().toString());
}
/**
* Tests shuffle in hash range for {@link BlockStore}s.
* Assumes following circumstances:
* Task 1 (write (hash 0~3))-> (read (hash 0~1))-> Task 3
* Task 2 (write (hash 0~3))-> shuffle (read (hash 2))-> Task 4
* (read (hash 3))-> Task 5
* It checks that each writer and reader does not throw any exception
* and the read data is identical with written data (including the order).
*/
private void shuffleInHashRange(final BlockStore writerSideStore,
final BlockStore readerSideStore) {
final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_HASH_TASKS);
final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_HASH_TASKS);
final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_HASH_TASKS);
final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_HASH_TASKS);
final long startNano = System.nanoTime();
// Write concurrently
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx ->
writeFutureList.add(writeExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
final String blockId = hashedBlockIdList.get(writeTaskIdx);
final Block block = writerSideStore.createBlock(blockId);
for (final NonSerializedPartition<Integer> partition : hashedBlockPartitionList.get(writeTaskIdx)) {
final Iterable data = partition.getData();
data.forEach(element -> block.write(partition.getKey(), element));
}
block.commit();
writerSideStore.writeBlock(block);
blockManagerMaster.onProducerTaskScheduled(getTaskId(writeTaskIdx), Collections.singleton(blockId));
blockManagerMaster.onBlockStateChanged(blockId, BlockState.State.AVAILABLE,
"Writer side of the shuffle in hash range edge");
return true;
} catch (final Exception e) {
e.printStackTrace();
return false;
}
}
})));
// Wait each writer to success
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writer -> {
try {
assertTrue(writeFutureList.get(writer).get());
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
final long writeEndNano = System.nanoTime();
// Read concurrently and check whether the result is equal to the expected data
IntStream.range(0, NUM_READ_HASH_TASKS).forEach(readTaskIdx ->
readFutureList.add(readExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_HASH_TASKS; writeTaskIdx++) {
final KeyRange<Integer> hashRangeToRetrieve = readKeyRangeList.get(readTaskIdx);
readResultCheck(hashedBlockIdList.get(writeTaskIdx), hashRangeToRetrieve,
readerSideStore, expectedDataInRange.get(readTaskIdx).get(writeTaskIdx));
}
return true;
} catch (final Exception e) {
e.printStackTrace();
return false;
}
}
})));
// Wait each reader to success
IntStream.range(0, NUM_READ_HASH_TASKS).forEach(reader -> {
try {
assertTrue(readFutureList.get(reader).get());
} catch (final Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});
final long readEndNano = System.nanoTime();
// Remove stored blocks
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writer -> {
final boolean exist = writerSideStore.deleteBlock(hashedBlockIdList.get(writer));
if (!exist) {
throw new RuntimeException("The result of deleteBlock(" +
hashedBlockIdList.get(writer) + ") is false");
}
});
writeExecutor.shutdown();
readExecutor.shutdown();
System.out.println(
"Shuffle in hash range - write time in millis: " + (writeEndNano - startNano) / 1000000 +
", Read time in millis: " + (readEndNano - writeEndNano) / 1000000 + " in store " +
writerSideStore.getClass().toString());
}
private List getFixedKeyRangedNumList(final int key,
final int start,
final int end) {
final List numList = new ArrayList<>(end - start);
IntStream.range(start, end).forEach(number -> numList.add(Pair.of(key, number)));
return numList;
}
/**
* Compares the expected iterable with the data read from a {@link BlockStore}.
*/
private void readResultCheck(final String blockId,
final KeyRange hashRange,
final BlockStore blockStore,
final Iterable expectedResult) throws IOException {
final Optional<Block> optionalBlock = blockStore.readBlock(blockId);
if (!optionalBlock.isPresent()) {
throw new IOException("The result block " + blockId + " is empty.");
}
final Iterable<NonSerializedPartition> nonSerializedResult = optionalBlock.get().readPartitions(hashRange);
final Iterable serToNonSerialized = DataUtil.convertToNonSerPartitions(
SERIALIZER, optionalBlock.get().readSerializedPartitions(hashRange));
assertEquals(expectedResult, DataUtil.concatNonSerPartitions(nonSerializedResult));
assertEquals(expectedResult, DataUtil.concatNonSerPartitions(serToNonSerialized));
}
}