blob: b79056278316653440ae8479191da50dcbd57d0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.adl;
import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* This class is responsible for testing multiple threads trying to access same
* or multiple files from the offset.
*/
@RunWith(Parameterized.class)
public class TestConcurrentDataReadOperations extends AdlMockWebServer {
private static final Logger LOG = LoggerFactory
.getLogger(TestConcurrentDataReadOperations.class);
private static final Object LOCK = new Object();
private static FSDataInputStream commonHandle = null;
private int concurrencyLevel;
public TestConcurrentDataReadOperations(int concurrencyLevel) {
this.concurrencyLevel = concurrencyLevel;
}
@Parameterized.Parameters(name = "{index}")
public static Collection<?> testDataNumberOfConcurrentRun() {
return Arrays.asList(new Object[][] {{1}, {2}, {3}, {4}, {5}});
}
public static byte[] getRandomByteArrayData(int size) {
byte[] b = new byte[size];
Random rand = new Random();
rand.nextBytes(b);
return b;
}
private void setDispatcher(final ArrayList<CreateTestData> testData) {
getMockServer().setDispatcher(new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest recordedRequest)
throws InterruptedException {
CreateTestData currentRequest = null;
for (CreateTestData local : testData) {
if (recordedRequest.getPath().contains(local.path.toString())) {
currentRequest = local;
break;
}
}
if (currentRequest == null) {
new MockResponse().setBody("Request data not found")
.setResponseCode(501);
}
if (recordedRequest.getRequestLine().contains("op=GETFILESTATUS")) {
return new MockResponse().setResponseCode(200).setBody(
TestADLResponseData
.getGetFileStatusJSONResponse(currentRequest.data.length));
}
if (recordedRequest.getRequestLine().contains("op=OPEN")) {
String request = recordedRequest.getRequestLine();
int offset = 0;
int byteCount = 0;
Pattern pattern = Pattern.compile("offset=([0-9]+)");
Matcher matcher = pattern.matcher(request);
if (matcher.find()) {
LOG.debug(matcher.group(1));
offset = Integer.parseInt(matcher.group(1));
}
pattern = Pattern.compile("length=([0-9]+)");
matcher = pattern.matcher(request);
if (matcher.find()) {
LOG.debug(matcher.group(1));
byteCount = Integer.parseInt(matcher.group(1));
}
Buffer buf = new Buffer();
buf.write(currentRequest.data, offset,
Math.min(currentRequest.data.length - offset, byteCount));
return new MockResponse().setResponseCode(200)
.setChunkedBody(buf, 4 * 1024 * 1024);
}
return new MockResponse().setBody("NOT SUPPORTED").setResponseCode(501);
}
});
}
@Before
public void resetHandle() {
commonHandle = null;
}
@Test
public void testParallelReadOnDifferentStreams()
throws IOException, InterruptedException, ExecutionException {
ArrayList<CreateTestData> createTestData = new ArrayList<CreateTestData>();
Random random = new Random();
for (int i = 0; i < concurrencyLevel; i++) {
CreateTestData testData = new CreateTestData();
testData
.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()),
getRandomByteArrayData(random.nextInt(1 * 1024 * 1024)));
createTestData.add(testData);
}
setDispatcher(createTestData);
ArrayList<ReadTestData> readTestData = new ArrayList<ReadTestData>();
for (CreateTestData local : createTestData) {
ReadTestData localReadData = new ReadTestData();
localReadData.set(local.path, local.data, 0);
readTestData.add(localReadData);
}
runReadTest(readTestData, false);
}
@Test
public void testParallelReadOnSameStreams()
throws IOException, InterruptedException, ExecutionException {
ArrayList<CreateTestData> createTestData = new ArrayList<CreateTestData>();
Random random = new Random();
for (int i = 0; i < 1; i++) {
CreateTestData testData = new CreateTestData();
testData
.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()),
getRandomByteArrayData(1024 * 1024));
createTestData.add(testData);
}
setDispatcher(createTestData);
ArrayList<ReadTestData> readTestData = new ArrayList<ReadTestData>();
ByteArrayInputStream buffered = new ByteArrayInputStream(
createTestData.get(0).data);
ReadTestData readInitially = new ReadTestData();
byte[] initialData = new byte[1024 * 1024];
buffered.read(initialData);
readInitially.set(createTestData.get(0).path, initialData, 0);
readTestData.add(readInitially);
runReadTest(readTestData, false);
readTestData.clear();
for (int i = 0; i < concurrencyLevel * 5; i++) {
ReadTestData localReadData = new ReadTestData();
int offset = random.nextInt((1024 * 1024) - 1);
int length = 1024 * 1024 - offset;
byte[] expectedData = new byte[length];
buffered.reset();
buffered.skip(offset);
buffered.read(expectedData);
localReadData.set(createTestData.get(0).path, expectedData, offset);
readTestData.add(localReadData);
}
runReadTest(readTestData, true);
}
void runReadTest(ArrayList<ReadTestData> testData, boolean useSameStream)
throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(testData.size());
Future[] subtasks = new Future[testData.size()];
for (int i = 0; i < testData.size(); i++) {
subtasks[i] = executor.submit(
new ReadConcurrentRunnable(testData.get(i).data, testData.get(i).path,
testData.get(i).offset, useSameStream));
}
executor.shutdown();
// wait until all tasks are finished
executor.awaitTermination(120, TimeUnit.SECONDS);
for (int i = 0; i < testData.size(); ++i) {
Assert.assertTrue((Boolean) subtasks[i].get());
}
}
class ReadTestData {
private Path path;
private byte[] data;
private int offset;
public void set(Path filePath, byte[] dataToBeRead, int fromOffset) {
this.path = filePath;
this.data = dataToBeRead;
this.offset = fromOffset;
}
}
class CreateTestData {
private Path path;
private byte[] data;
public void set(Path filePath, byte[] dataToBeWritten) {
this.path = filePath;
this.data = dataToBeWritten;
}
}
class ReadConcurrentRunnable implements Callable<Boolean> {
private Path path;
private int offset;
private byte[] expectedData;
private boolean useSameStream;
public ReadConcurrentRunnable(byte[] expectedData, Path path, int offset,
boolean useSameStream) {
this.path = path;
this.offset = offset;
this.expectedData = expectedData;
this.useSameStream = useSameStream;
}
public Boolean call() throws IOException {
try {
FSDataInputStream in;
if (useSameStream) {
synchronized (LOCK) {
if (commonHandle == null) {
commonHandle = getMockAdlFileSystem().open(path);
}
in = commonHandle;
}
} else {
in = getMockAdlFileSystem().open(path);
}
byte[] actualData = new byte[expectedData.length];
in.readFully(offset, actualData);
Assert.assertArrayEquals("Path :" + path.toString() + " did not match.",
expectedData, actualData);
if (!useSameStream) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
}
}