blob: 197842a4487c9f251c0367c55e63f7f09be782ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.ONHEAP;
/**
*
*/
public class SegmentedRingByteBufferTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
@Test
public void testAligned() throws Exception {
doTestAligned(ONHEAP);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAlignedDirect() throws Exception {
doTestAligned(DIRECT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNotAligned() throws Exception {
doTestNotAligned(ONHEAP);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNotAlignedDirect() throws Exception {
doTestNotAligned(DIRECT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoOverflowMultiThreaded() throws Exception {
doTestNoOverflowMultiThreaded(ONHEAP);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNoOverflowMultiThreadedDirect() throws Exception {
doTestNoOverflowMultiThreaded(DIRECT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultiThreaded() throws Exception {
doTestMultiThreaded(ONHEAP);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultiThreadedDirect() throws Exception {
doTestMultiThreaded(DIRECT);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultiThreaded2() throws Exception {
doTestMultiThreaded2(ONHEAP);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultiThreadedDirect2() throws Exception {
doTestMultiThreaded2(DIRECT);
}
/**
* @param mode Mode.
*/
private void doTestAligned(SegmentedRingByteBuffer.BufferMode mode) {
int cap = 128;
int size = 8;
SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
assertNull(buf.poll());
// Head and tail in initial state.
for (int j = 0; j < 2; j++) {
for (int i = 0; i < cap / size; i++) {
SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);
ByteBuffer bbuf = seg.buffer();
assertEquals(size * i, bbuf.position());
assertEquals(size * (i + 1), bbuf.limit());
bbuf.putLong(i + (j * 10));
seg.release();
}
assertNull(buf.offer(size));
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
ByteBuffer bbuf = segs.get(0).buffer();
assertEquals(cap, bbuf.remaining());
for (int i = 0; i < cap / size; i++)
assertEquals(i + (j * 10), bbuf.getLong());
segs.get(0).release();
assertEquals(0, bbuf.remaining());
assertNull(buf.poll());
}
// Move tail.
for (int i = 0; i < 2; i++) {
SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);
ByteBuffer bbuf = seg.buffer();
assertEquals(size * i, bbuf.position());
assertEquals(size * (i + 1), bbuf.limit());
bbuf.putLong(i);
seg.release();
}
// Move head to tail.
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
ByteBuffer bbuf = segs.get(0).buffer();
assertEquals(size * 2, bbuf.remaining());
for (int i = 0; i < 2; i++)
assertEquals(i, bbuf.getLong());
segs.get(0).release();
assertEquals(0, bbuf.remaining());
assertNull(buf.poll());
}
/**
* @param mode Mode.
*/
private void doTestNotAligned(SegmentedRingByteBuffer.BufferMode mode) {
int size = 8;
int cap = 32 - size / 2; // 3.5 long values.
SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
assertNull(buf.poll());
// Write 2 segments.
SegmentedRingByteBuffer.WriteSegment wseg;
List<SegmentedRingByteBuffer.ReadSegment> rsegs;
ByteBuffer bbuf;
wseg = buf.offer(size);
bbuf = wseg.buffer();
bbuf.putLong(1);
wseg.release();
wseg = buf.offer(size);
bbuf = wseg.buffer();
bbuf.putLong(2);
wseg.release();
// Read 2 segments.
rsegs = buf.poll();
bbuf = rsegs.get(0).buffer();
assertEquals(1, bbuf.getLong());
assertEquals(2, bbuf.getLong());
rsegs.get(0).release();
assertNull(buf.poll());
// Write 2 segments.
wseg = buf.offer(size);
bbuf = wseg.buffer();
bbuf.putLong(3);
wseg.release();
// This one will overflow buffer.
wseg = buf.offer(size);
bbuf = wseg.buffer();
bbuf.putLong(4);
wseg.release();
// Ring buffer should return two separate segments instead of one due to an overflow.
rsegs = buf.poll();
bbuf = rsegs.get(0).buffer();
// First segment.
assertEquals(3, bbuf.getLong());
assertEquals(4, bbuf.remaining());
int pos = bbuf.position();
byte[] tmp = new byte[8];
byte[] arr = new byte[bbuf.capacity()];
bbuf.position(0);
bbuf.limit(bbuf.capacity());
bbuf.get(arr);
System.arraycopy(arr, pos, tmp, 0, 4);
// One more segment available.
bbuf = rsegs.get(1).buffer();
assertEquals(4, bbuf.remaining());
bbuf.position(0);
bbuf.limit(bbuf.capacity());
bbuf.get(arr);
System.arraycopy(arr, 0, tmp, 4, 4);
ByteBuffer bb = ByteBuffer.wrap(tmp);
bb.order(ByteOrder.nativeOrder());
assertEquals(4, bb.getLong());
rsegs.get(1).release();
assertNull(buf.poll());
}
/**
* @param mode Mode.
*/
private void doTestNoOverflowMultiThreaded(
SegmentedRingByteBuffer.BufferMode mode
) throws org.apache.ignite.IgniteCheckedException, BrokenBarrierException, InterruptedException {
int producerCnt = 16;
final int cap = 256 * 1024;
final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicReference<Throwable> ex = new AtomicReference<>();
final CyclicBarrier startBarrier = new CyclicBarrier(producerCnt);
final CyclicBarrier restartBarrier = new CyclicBarrier(producerCnt + 1);
final AtomicLong totalWritten = new AtomicLong();
IgniteInternalFuture<Long> fut;
try {
fut = GridTestUtils.runMultiThreadedAsync(() -> {
try {
try {
startBarrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
fail();
}
while (!stop.get()) {
TestObject obj = new TestObject();
SegmentedRingByteBuffer.WriteSegment seg = buf.offer(obj.size());
ByteBuffer bbuf;
if (seg == null) {
try {
restartBarrier.await(getTestTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
break;
}
continue;
}
bbuf = seg.buffer();
assertEquals(obj.size(), bbuf.remaining());
bbuf.putLong(obj.id);
bbuf.putInt(obj.len);
bbuf.put(obj.arr);
assertEquals(0, bbuf.remaining());
seg.release();
long total = totalWritten.addAndGet(obj.size());
assertTrue(total <= cap);
}
}
catch (Throwable th) {
ex.compareAndSet(null, th);
}
}, producerCnt, "producer-thread");
long endTime = System.currentTimeMillis() + 60 * 1000L;
while (System.currentTimeMillis() < endTime && ex.get() == null) {
while (restartBarrier.getNumberWaiting() != producerCnt && ex.get() == null)
U.sleep(10);
if (ex.get() != null)
fail("Exception in producer thread, ex=" + ex.get());
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
if (segs != null) {
for (SegmentedRingByteBuffer.ReadSegment seg : segs)
seg.release();
}
totalWritten.set(0);
restartBarrier.await();
}
}
finally {
stop.set(true);
restartBarrier.reset();
}
fut.get();
if (ex.get() != null)
fail("Exception in producer thread, ex=" + ex.get());
}
/**
* @param mode Mode.
*/
private void doTestMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
int producerCnt = 16;
final int cap = 256 * 1024;
final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
final AtomicBoolean stop = new AtomicBoolean(false);
final AtomicReference<Throwable> ex = new AtomicReference<>();
final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
IgniteInternalFuture<Long> fut;
try {
fut = GridTestUtils.runMultiThreadedAsync(() -> {
try {
try {
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
fail();
}
while (!stop.get()) {
TestObject obj = new TestObject();
SegmentedRingByteBuffer.WriteSegment seg;
ByteBuffer bbuf;
for (;;) {
if (stop.get())
return;
seg = buf.offer(obj.size());
if (seg != null)
break;
}
try {
bbuf = seg.buffer();
assertEquals(obj.size(), bbuf.remaining());
bbuf.putLong(obj.id);
bbuf.putInt(obj.len);
bbuf.put(obj.arr);
assertEquals(0, bbuf.remaining());
}
finally {
seg.release();
}
}
}
catch (Throwable th) {
ex.compareAndSet(null, th);
}
}, producerCnt, "producer-thread");
Random rnd = new Random();
long endTime = System.currentTimeMillis() + 60 * 1000L;
while (System.currentTimeMillis() < endTime && ex.get() == null) {
try {
U.sleep(rnd.nextInt(100) + 1);
}
catch (IgniteInterruptedCheckedException e) {
e.printStackTrace();
}
List<SegmentedRingByteBuffer.ReadSegment> segs;
if ((segs = buf.poll()) != null) {
for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
assertTrue(seg.buffer().hasRemaining());
seg.release();
}
}
}
}
finally {
stop.set(true);
}
fut.get();
if (ex.get() != null)
fail("Exception in producer thread, ex=" + ex.get());
}
/**
* @param mode Mode.
*/
private void doTestMultiThreaded2(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
int producerCnt = 16;
final int cap = 256 * 1024;
final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);
final AtomicReference<Throwable> ex = new AtomicReference<>();
final AtomicBoolean stop = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
final Set<TestObject> items = Collections.newSetFromMap(new ConcurrentHashMap<TestObject, Boolean>());
IgniteInternalFuture<Long> fut;
try {
fut = GridTestUtils.runMultiThreadedAsync(() -> {
try {
try {
barrier.await();
}
catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
fail();
}
while (!stop.get()) {
TestObject obj = new TestObject();
SegmentedRingByteBuffer.WriteSegment seg;
ByteBuffer bbuf;
for (;;) {
if (stop.get())
return;
seg = buf.offer(obj.size());
if (seg != null)
break;
}
try {
bbuf = seg.buffer();
assertEquals(obj.size(), bbuf.remaining());
bbuf.putLong(obj.id);
bbuf.putInt(obj.len);
bbuf.put(obj.arr);
assertEquals(0, bbuf.remaining());
assertTrue("Ooops! The same value is already exist in Set! ", items.add(obj));
}
finally {
seg.release();
}
}
}
catch (Throwable th) {
ex.compareAndSet(null, th);
}
}, producerCnt, "producer-thread");
Random rnd = new Random();
long endTime = System.currentTimeMillis() + 60 * 1000L;
while (System.currentTimeMillis() < endTime && ex.get() == null) {
try {
U.sleep(rnd.nextInt(100) + 1);
}
catch (IgniteInterruptedCheckedException e) {
e.printStackTrace();
}
List<SegmentedRingByteBuffer.ReadSegment> segs;
while ((segs = buf.poll()) != null) {
int size = 0;
for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
ByteBuffer bbuf = seg.buffer();
assertTrue(bbuf.hasRemaining());
size += bbuf.remaining();
}
byte[] arr = new byte[size];
int idx = 0;
for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
ByteBuffer bbuf = seg.buffer();
assertTrue(bbuf.hasRemaining());
int len = bbuf.remaining();
bbuf.get(arr, idx, len);
idx += len;
}
ByteBuffer bbuf = ByteBuffer.wrap(arr);
bbuf.order(ByteOrder.nativeOrder());
assertTrue(bbuf.hasRemaining());
while (bbuf.hasRemaining()) {
long id = bbuf.getLong();
int len = bbuf.getInt();
arr = new byte[len];
bbuf.get(arr);
TestObject obj = new TestObject(id, arr);
assertTrue(items.remove(obj));
}
for (SegmentedRingByteBuffer.ReadSegment seg : segs)
seg.release();
}
}
}
finally {
stop.set(true);
}
fut.get();
if (ex.get() != null)
fail("Exception in producer thread, ex=" + ex.get());
List<SegmentedRingByteBuffer.ReadSegment> segs;
while ((segs = buf.poll()) != null) {
int size = 0;
for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
ByteBuffer bbuf = seg.buffer();
assertTrue(bbuf.hasRemaining());
size += bbuf.remaining();
}
byte[] arr = new byte[size];
int idx = 0;
for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
ByteBuffer bbuf = seg.buffer();
assertTrue(bbuf.hasRemaining());
int len = bbuf.remaining();
bbuf.get(arr, idx, len);
idx += len;
}
ByteBuffer bbuf = ByteBuffer.wrap(arr);
bbuf.order(ByteOrder.nativeOrder());
assertTrue(bbuf.hasRemaining());
while (bbuf.hasRemaining()) {
long id = bbuf.getLong();
int len = bbuf.getInt();
arr = new byte[len];
bbuf.get(arr);
TestObject obj = new TestObject(id, arr);
assertTrue(items.remove(obj));
}
for (SegmentedRingByteBuffer.ReadSegment seg : segs)
seg.release();
}
assertNull(buf.poll());
assertEquals(0, items.size());
}
/**
*
*/
private static class TestObject {
/** Id. */
private long id;
/** Length. */
private int len;
/** Array. */
private byte[] arr;
/**
* Default constructor.
*/
public TestObject() {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
id = rnd.nextLong();
len = rnd.nextInt(32 * 1024);
arr = new byte[len];
rnd.nextBytes(arr);
}
/**
* @param id Id.
* @param arr Array.
*/
public TestObject(long id, byte[] arr) {
this.id = id;
this.arr = arr;
len = arr.length;
}
/**
*
*/
public int size() {
return 8 + 4 + arr.length;
}
/** {@inheritDoc} */
@Override public boolean equals(Object that) {
if (this == that)
return true;
if (that == null || getClass() != that.getClass())
return false;
TestObject obj = (TestObject)that;
if (id != obj.id)
return false;
if (len != obj.len)
return false;
return Arrays.equals(arr, obj.arr);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int res = (int)(id ^ (id >>> 32));
res = 31 * res + len;
res = 31 * res + Arrays.hashCode(arr);
return res;
}
}
}