blob: a8d5cef6b2f87733f3f6d25ce06a6cc41c285d14 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.util.ByteArrayManager.Counter;
import org.apache.hadoop.hdfs.util.ByteArrayManager.CounterMap;
import org.apache.hadoop.hdfs.util.ByteArrayManager.FixedLengthManager;
import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Test {@link ByteArrayManager}.
*/
public class TestByteArrayManager {
static {
GenericTestUtils.setLogLevel(
LoggerFactory.getLogger(ByteArrayManager.class), Level.ALL);
}
static final Logger LOG = LoggerFactory.getLogger(TestByteArrayManager.class);
private static final Comparator<Future<Integer>> CMP = new Comparator<Future<Integer>>() {
@Override
public int compare(Future<Integer> left, Future<Integer> right) {
try {
return left.get().intValue() - right.get().intValue();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
@Test
public void testCounter() throws Exception {
final long countResetTimePeriodMs = 200L;
final Counter c = new Counter(countResetTimePeriodMs);
final int n = ThreadLocalRandom.current().nextInt(512) + 512;
final List<Future<Integer>> futures = new ArrayList<Future<Integer>>(n);
final ExecutorService pool = Executors.newFixedThreadPool(32);
try {
// increment
for(int i = 0; i < n; i++) {
futures.add(pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return (int)c.increment();
}
}));
}
// sort and wait for the futures
Collections.sort(futures, CMP);
} finally {
pool.shutdown();
}
// check futures
Assert.assertEquals(n, futures.size());
for(int i = 0; i < n; i++) {
Assert.assertEquals(i + 1, futures.get(i).get().intValue());
}
Assert.assertEquals(n, c.getCount());
// test auto-reset
Thread.sleep(countResetTimePeriodMs + 100);
Assert.assertEquals(1, c.increment());
}
@Test
public void testAllocateRecycle() throws Exception {
final int countThreshold = 4;
final int countLimit = 8;
final long countResetTimePeriodMs = 200L;
final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs));
final CounterMap counters = bam.getCounters();
final ManagerMap managers = bam.getManagers();
final int[] uncommonArrays = {0, 1, 2, 4, 8, 16, 32, 64};
final int arrayLength = 1024;
final Allocator allocator = new Allocator(bam);
final Recycler recycler = new Recycler(bam);
try {
{ // allocate within threshold
for(int i = 0; i < countThreshold; i++) {
allocator.submit(arrayLength);
}
waitForAll(allocator.futures);
Assert.assertEquals(countThreshold,
counters.get(arrayLength, false).getCount());
Assert.assertNull(managers.get(arrayLength, false));
for(int n : uncommonArrays) {
Assert.assertNull(counters.get(n, false));
Assert.assertNull(managers.get(n, false));
}
}
{ // recycle half of the arrays
for(int i = 0; i < countThreshold/2; i++) {
recycler.submit(removeLast(allocator.futures).get());
}
for(Future<Integer> f : recycler.furtures) {
Assert.assertEquals(-1, f.get().intValue());
}
recycler.furtures.clear();
}
{ // allocate one more
allocator.submit(arrayLength).get();
Assert.assertEquals(countThreshold + 1, counters.get(arrayLength, false).getCount());
Assert.assertNotNull(managers.get(arrayLength, false));
}
{ // recycle the remaining arrays
final int n = allocator.recycleAll(recycler);
recycler.verify(n);
}
{
// allocate until the maximum.
for(int i = 0; i < countLimit; i++) {
allocator.submit(arrayLength);
}
waitForAll(allocator.futures);
// allocate one more should be blocked
final AllocatorThread t = new AllocatorThread(arrayLength, bam);
t.start();
// check if the thread is waiting, timed wait or runnable.
for(int i = 0; i < 5; i++) {
Thread.sleep(100);
final Thread.State threadState = t.getState();
if (threadState != Thread.State.RUNNABLE
&& threadState != Thread.State.WAITING
&& threadState != Thread.State.TIMED_WAITING) {
Assert.fail("threadState = " + threadState);
}
}
// recycle an array
recycler.submit(removeLast(allocator.futures).get());
Assert.assertEquals(1, removeLast(recycler.furtures).get().intValue());
// check if the thread is unblocked
Thread.sleep(100);
Assert.assertEquals(Thread.State.TERMINATED, t.getState());
// recycle the remaining, the recycle should be full.
Assert.assertEquals(countLimit-1, allocator.recycleAll(recycler));
recycler.submit(t.array);
recycler.verify(countLimit);
// recycle one more; it should not increase the free queue size
Assert.assertEquals(countLimit, bam.release(new byte[arrayLength]));
}
} finally {
allocator.pool.shutdown();
recycler.pool.shutdown();
}
}
static <T> Future<T> removeLast(List<Future<T>> furtures) throws Exception {
return remove(furtures, furtures.size() - 1);
}
static <T> Future<T> remove(List<Future<T>> furtures, int i) throws Exception {
return furtures.isEmpty()? null: furtures.remove(i);
}
static <T> void waitForAll(List<Future<T>> furtures) throws Exception {
for(Future<T> f : furtures) {
f.get();
}
}
static class AllocatorThread extends Thread {
private final ByteArrayManager bam;
private final int arrayLength;
private byte[] array;
AllocatorThread(int arrayLength, ByteArrayManager bam) {
this.bam = bam;
this.arrayLength = arrayLength;
}
@Override
public void run() {
try {
array = bam.newByteArray(arrayLength);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Allocator {
private final ByteArrayManager bam;
final ExecutorService pool = Executors.newFixedThreadPool(8);
final List<Future<byte[]>> futures = new LinkedList<Future<byte[]>>();
Allocator(ByteArrayManager bam) {
this.bam = bam;
}
Future<byte[]> submit(final int arrayLength) {
final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
final byte[] array = bam.newByteArray(arrayLength);
Assert.assertEquals(arrayLength, array.length);
return array;
}
});
futures.add(f);
return f;
}
int recycleAll(Recycler recycler) throws Exception {
final int n = futures.size();
for(Future<byte[]> f : futures) {
recycler.submit(f.get());
}
futures.clear();
return n;
}
}
static class Recycler {
private final ByteArrayManager bam;
final ExecutorService pool = Executors.newFixedThreadPool(8);
final List<Future<Integer>> furtures = new LinkedList<Future<Integer>>();
Recycler(ByteArrayManager bam) {
this.bam = bam;
}
Future<Integer> submit(final byte[] array) {
final Future<Integer> f = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return bam.release(array);
}
});
furtures.add(f);
return f;
}
void verify(final int expectedSize) throws Exception {
Assert.assertEquals(expectedSize, furtures.size());
Collections.sort(furtures, CMP);
for(int i = 0; i < furtures.size(); i++) {
Assert.assertEquals(i+1, furtures.get(i).get().intValue());
}
furtures.clear();
}
}
@Test
public void testByteArrayManager() throws Exception {
final int countThreshold = 32;
final int countLimit = 64;
final long countResetTimePeriodMs = 10000L;
final ByteArrayManager.Impl bam = new ByteArrayManager.Impl(
new ByteArrayManager.Conf(
countThreshold, countLimit, countResetTimePeriodMs));
final CounterMap counters = bam.getCounters();
final ManagerMap managers = bam.getManagers();
final ExecutorService pool = Executors.newFixedThreadPool(128);
final Runner[] runners = new Runner[Runner.NUM_RUNNERS];
final Thread[] threads = new Thread[runners.length];
final int num = 1 << 10;
for(int i = 0; i < runners.length; i++) {
runners[i] = new Runner(i, countThreshold, countLimit, pool, i, bam);
threads[i] = runners[i].start(num);
}
final List<Exception> exceptions = new ArrayList<Exception>();
final Thread randomRecycler = new Thread() {
@Override
public void run() {
LOG.info("randomRecycler start");
for(int i = 0; shouldRun(); i++) {
final int j = ThreadLocalRandom.current().nextInt(runners.length);
try {
runners[j].recycle();
} catch (Exception e) {
e.printStackTrace();
exceptions.add(new Exception(this + " has an exception", e));
}
if ((i & 0xFF) == 0) {
LOG.info("randomRecycler sleep, i=" + i);
sleepMs(100);
}
}
LOG.info("randomRecycler done");
}
boolean shouldRun() {
for(int i = 0; i < runners.length; i++) {
if (threads[i].isAlive()) {
return true;
}
if (!runners[i].isEmpty()) {
return true;
}
}
return false;
}
};
randomRecycler.start();
randomRecycler.join();
Assert.assertTrue(exceptions.isEmpty());
Assert.assertNull(counters.get(0, false));
for(int i = 1; i < runners.length; i++) {
if (!runners[i].assertionErrors.isEmpty()) {
for(AssertionError e : runners[i].assertionErrors) {
LOG.error("AssertionError " + i, e);
}
Assert.fail(runners[i].assertionErrors.size() + " AssertionError(s)");
}
final int arrayLength = Runner.index2arrayLength(i);
final boolean exceedCountThreshold = counters.get(arrayLength, false).getCount() > countThreshold;
final FixedLengthManager m = managers.get(arrayLength, false);
if (exceedCountThreshold) {
Assert.assertNotNull(m);
} else {
Assert.assertNull(m);
}
}
}
static void sleepMs(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
Assert.fail("Sleep is interrupted: " + e);
}
}
static class Runner implements Runnable {
static final int NUM_RUNNERS = 5;
static int index2arrayLength(int index) {
return ByteArrayManager.MIN_ARRAY_LENGTH << (index - 1);
}
private final ByteArrayManager bam;
final int maxArrayLength;
final int countThreshold;
final int maxArrays;
final ExecutorService pool;
final List<Future<byte[]>> arrays = new ArrayList<Future<byte[]>>();
final AtomicInteger count = new AtomicInteger();
final int p;
private int n;
final List<AssertionError> assertionErrors = new ArrayList<AssertionError>();
Runner(int index, int countThreshold, int maxArrays,
ExecutorService pool, int p, ByteArrayManager bam) {
this.maxArrayLength = index2arrayLength(index);
this.countThreshold = countThreshold;
this.maxArrays = maxArrays;
this.pool = pool;
this.p = p;
this.bam = bam;
}
boolean isEmpty() {
synchronized (arrays) {
return arrays.isEmpty();
}
}
Future<byte[]> submitAllocate() {
count.incrementAndGet();
final Future<byte[]> f = pool.submit(new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
final int lower = maxArrayLength == ByteArrayManager.MIN_ARRAY_LENGTH?
0: maxArrayLength >> 1;
final int arrayLength = ThreadLocalRandom.current().nextInt(
maxArrayLength - lower) + lower + 1;
final byte[] array = bam.newByteArray(arrayLength);
try {
Assert.assertEquals("arrayLength=" + arrayLength + ", lower=" + lower,
maxArrayLength, array.length);
} catch(AssertionError e) {
assertionErrors.add(e);
}
return array;
}
});
synchronized (arrays) {
arrays.add(f);
}
return f;
}
Future<byte[]> removeFirst() throws Exception {
synchronized (arrays) {
return remove(arrays, 0);
}
}
void recycle() throws Exception {
final Future<byte[]> f = removeFirst();
if (f != null) {
printf("randomRecycler: ");
try {
recycle(f.get(10, TimeUnit.MILLISECONDS));
} catch(TimeoutException e) {
recycle(new byte[maxArrayLength]);
printf("timeout, new byte[%d]\n", maxArrayLength);
}
}
}
int recycle(final byte[] array) {
return bam.release(array);
}
Future<Integer> submitRecycle(final byte[] array) {
count.decrementAndGet();
final Future<Integer> f = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return recycle(array);
}
});
return f;
}
@Override
public void run() {
for(int i = 0; i < n; i++) {
final boolean isAllocate = ThreadLocalRandom.current()
.nextInt(NUM_RUNNERS) < p;
if (isAllocate) {
submitAllocate();
} else {
try {
final Future<byte[]> f = removeFirst();
if (f != null) {
submitRecycle(f.get());
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(this + " has " + e);
}
}
if ((i & 0xFF) == 0) {
sleepMs(100);
}
}
}
Thread start(int n) {
this.n = n;
final Thread t = new Thread(this);
t.start();
return t;
}
@Override
public String toString() {
return getClass().getSimpleName() + ": max=" + maxArrayLength
+ ", count=" + count;
}
}
static class NewByteArrayWithLimit extends ByteArrayManager {
private final int maxCount;
private int count = 0;
NewByteArrayWithLimit(int maxCount) {
this.maxCount = maxCount;
}
@Override
public synchronized byte[] newByteArray(int size) throws InterruptedException {
for(; count >= maxCount; ) {
wait();
}
count++;
return new byte[size];
}
@Override
public synchronized int release(byte[] array) {
if (count == maxCount) {
notifyAll();
}
count--;
return 0;
}
}
public static void main(String[] args) throws Exception {
GenericTestUtils.setLogLevel(LoggerFactory.getLogger(ByteArrayManager.class),
Level.OFF);
final int arrayLength = 64 * 1024; //64k
final int nThreads = 512;
final int nAllocations = 1 << 15;
final int maxArrays = 1 << 10;
final int nTrials = 5;
System.out.println("arrayLength=" + arrayLength
+ ", nThreads=" + nThreads
+ ", nAllocations=" + nAllocations
+ ", maxArrays=" + maxArrays);
final ByteArrayManager[] impls = {
new ByteArrayManager.NewByteArrayWithoutLimit(),
new NewByteArrayWithLimit(maxArrays),
new ByteArrayManager.Impl(new ByteArrayManager.Conf(
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT,
maxArrays,
HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT))
};
final double[] avg = new double[impls.length];
for(int i = 0; i < impls.length; i++) {
double duration = 0;
printf("%26s:", impls[i].getClass().getSimpleName());
for(int j = 0; j < nTrials; j++) {
final int[] sleepTime = new int[nAllocations];
for(int k = 0; k < sleepTime.length; k++) {
sleepTime[k] = ThreadLocalRandom.current().nextInt(100);
}
final long elapsed = performanceTest(arrayLength, maxArrays, nThreads,
sleepTime, impls[i]);
duration += elapsed;
printf("%5d, ", elapsed);
}
avg[i] = duration/nTrials;
printf("avg=%6.3fs", avg[i]/1000);
for(int j = 0; j < i; j++) {
printf(" (%6.2f%%)", percentageDiff(avg[j], avg[i]));
}
printf("\n");
}
}
static double percentageDiff(double original, double newValue) {
return (newValue - original)/original*100;
}
static void printf(String format, Object... args) {
System.out.printf(format, args);
System.out.flush();
}
static long performanceTest(final int arrayLength, final int maxArrays,
final int nThreads, final int[] sleepTimeMSs, final ByteArrayManager impl)
throws Exception {
final ExecutorService pool = Executors.newFixedThreadPool(nThreads);
final List<Future<Void>> futures = new ArrayList<Future<Void>>(sleepTimeMSs.length);
final long startTime = Time.monotonicNow();
for(int i = 0; i < sleepTimeMSs.length; i++) {
final long sleepTime = sleepTimeMSs[i];
futures.add(pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
byte[] array = impl.newByteArray(arrayLength);
sleepMs(sleepTime);
impl.release(array);
return null;
}
}));
}
for(Future<Void> f : futures) {
f.get();
}
final long endTime = Time.monotonicNow();
pool.shutdown();
return endTime - startTime;
}
}