| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.net; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.isA; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.EOFException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SocketChannel; |
| |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import org.apache.geode.distributed.internal.DMStats; |
| |
| public class NioPlainEngineTest { |
| |
| private DMStats mockStats; |
| private NioPlainEngine nioEngine; |
| private BufferPool bufferPool; |
| |
| @Before |
| public void setUp() throws Exception { |
| mockStats = mock(DMStats.class); |
| bufferPool = new BufferPool(mockStats); |
| nioEngine = new NioPlainEngine(bufferPool); |
| } |
| |
| @Test |
| public void unwrap() { |
| ByteBuffer buffer = ByteBuffer.allocate(100); |
| buffer.position(0).limit(buffer.capacity()); |
| nioEngine.unwrap(buffer); |
| assertThat(buffer.position()).isEqualTo(buffer.limit()); |
| } |
| |
| @Test |
| public void ensureWrappedCapacity() { |
| ByteBuffer wrappedBuffer = bufferPool.acquireReceiveBuffer(100); |
| verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class)); |
| wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); |
| nioEngine.lastReadPosition = 10; |
| int requestedCapacity = 210; |
| ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer, |
| BufferPool.BufferType.TRACKED_RECEIVER); |
| verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class)); |
| assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity); |
| assertThat(result).isNotSameAs(wrappedBuffer); |
| // make sure that data was transferred to the new buffer |
| for (int i = 0; i < 10; i++) { |
| assertThat(result.get(i)).isEqualTo(wrappedBuffer.get(i)); |
| } |
| } |
| |
| @Test |
| public void ensureWrappedCapacityWithEnoughExistingCapacityAndConsumedDataPresent() { |
| int requestedCapacity = 210; |
| final int consumedDataPresentInBuffer = 100; |
| final int unconsumedDataPresentInBuffer = 10; |
| // the buffer will have enough capacity but will need to be compacted |
| ByteBuffer wrappedBuffer = |
| ByteBuffer.allocate(requestedCapacity + unconsumedDataPresentInBuffer); |
| wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); |
| nioEngine.lastProcessedPosition = consumedDataPresentInBuffer; |
| // previous read left 10 bytes |
| nioEngine.lastReadPosition = consumedDataPresentInBuffer + unconsumedDataPresentInBuffer; |
| ByteBuffer result = |
| wrappedBuffer = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer, |
| BufferPool.BufferType.UNTRACKED); |
| assertThat(result.capacity()).isEqualTo(requestedCapacity + unconsumedDataPresentInBuffer); |
| assertThat(result).isSameAs(wrappedBuffer); |
| // make sure that data was transferred to the new buffer |
| for (int i = 0; i < 10; i++) { |
| assertThat(result.get(i)).isEqualTo(wrappedBuffer.get(i)); |
| } |
| assertThat(nioEngine.lastProcessedPosition).isEqualTo(0); |
| assertThat(nioEngine.lastReadPosition).isEqualTo(10); |
| } |
| |
| @Test |
| public void readAtLeast() throws Exception { |
| final int amountToRead = 150; |
| final int individualRead = 60; |
| final int preexistingBytes = 10; |
| ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000); |
| SocketChannel mockChannel = mock(SocketChannel.class); |
| |
| // simulate some socket reads |
| when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { |
| @Override |
| public Integer answer(InvocationOnMock invocation) throws Throwable { |
| ByteBuffer buffer = invocation.getArgument(0); |
| buffer.position(buffer.position() + individualRead); |
| return individualRead; |
| } |
| }); |
| |
| nioEngine.lastReadPosition = 10; |
| |
| ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); |
| verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); |
| assertThat(data.position()).isEqualTo(0); |
| assertThat(data.limit()).isEqualTo(amountToRead); |
| assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes); |
| assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead); |
| |
| data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); |
| verify(mockChannel, times(5)).read(any(ByteBuffer.class)); |
| // at end of last readAtLeast data |
| assertThat(data.position()).isEqualTo(amountToRead); |
| // we read amountToRead bytes |
| assertThat(data.limit()).isEqualTo(amountToRead * 2); |
| // we did 2 more reads from the network |
| assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes); |
| // the next read will start at the end of consumed data |
| assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2); |
| |
| } |
| |
| @Test(expected = EOFException.class) |
| public void readAtLeastThrowsEOFException() throws Exception { |
| final int amountToRead = 150; |
| ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000); |
| SocketChannel mockChannel = mock(SocketChannel.class); |
| |
| // simulate some socket reads |
| when(mockChannel.read(any(ByteBuffer.class))).thenReturn(-1); |
| |
| nioEngine.lastReadPosition = 10; |
| |
| nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); |
| } |
| |
| } |