blob: 3b0e36b9bbb1b4166c15336ee22247fc0a605fbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.proton.engine.impl;
import static org.apache.qpid.proton.engine.Transport.DEFAULT_MAX_FRAME_SIZE;
import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.FrameBody;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.framing.TransportFrame;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
// TODO test a frame with a payload (potentially followed by another frame)
public class FrameParserTest
{
private FrameHandler _mockFrameHandler = mock(FrameHandler.class);
private DecoderImpl _decoder = new DecoderImpl();
private EncoderImpl _encoder = new EncoderImpl(_decoder);
private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE, new TransportImpl());
private final AmqpFramer _amqpFramer = new AmqpFramer();
@Before
public void setUp()
{
AMQPDefinedTypes.registerAllTypes(_decoder, _encoder);
when(_mockFrameHandler.isHandlingFrames()).thenReturn(true);
}
@Test
public void testInputOfInvalidProtocolHeader_causesErrorAndRefusesFurtherInput()
{
ByteBuffer buffer = _frameParser.tail();
buffer.put("hello".getBytes());
_frameParser.process();
assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
}
@Test
public void testInputOfValidProtocolHeader()
{
ByteBuffer buffer = _frameParser.tail();
buffer.put(HEADER);
_frameParser.process();
assertNotNull(_frameParser.tail());
}
@Test
public void testInputOfValidProtocolHeaderInMultipleChunks()
{
{
ByteBuffer buffer = _frameParser.tail();
buffer.put(HEADER, 0, 2);
_frameParser.process();
}
{
ByteBuffer buffer = _frameParser.tail();
buffer.put(HEADER, 2, HEADER.length - 2);
_frameParser.process();
}
assertNotNull(_frameParser.tail());
}
@Test
public void testInputOfValidFrame_invokesFrameTransportCallback()
{
sendHeader();
// now send an open frame
ByteBuffer buffer = _frameParser.tail();
Open openFrame = generateOpenFrame();
int channel = 0;
byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
buffer.put(frame);
_frameParser.process();
verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
}
@Test
public void testInputOfFrameInMultipleChunks_invokesFrameTransportCallback()
{
sendHeader();
Open openFrame = generateOpenFrame();
int channel = 0;
byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
int lengthOfFirstChunk = 2;
int lengthOfSecondChunk = (frame.length - lengthOfFirstChunk)/2;
int lengthOfThirdChunk = frame.length - lengthOfFirstChunk - lengthOfSecondChunk;
// send the first chunk
{
ByteBuffer buffer = _frameParser.tail();
buffer.put(frame, 0, lengthOfFirstChunk);
_frameParser.process();
verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
}
// send the second chunk
{
ByteBuffer buffer = _frameParser.tail();
int secondChunkOffset = lengthOfFirstChunk;
buffer.put(frame, secondChunkOffset, lengthOfSecondChunk);
_frameParser.process();
verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
}
// send the third and final chunk
{
ByteBuffer buffer = _frameParser.tail();
int thirdChunkOffset = lengthOfFirstChunk + lengthOfSecondChunk;
buffer.put(frame, thirdChunkOffset, lengthOfThirdChunk);
_frameParser.process();
verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
}
}
@Test
public void testInputOfTwoFrames_invokesFrameTransportTwice()
{
sendHeader();
int channel = 0;
Open openFrame = generateOpenFrame();
byte[] openFrameBytes = _amqpFramer.generateFrame(channel, openFrame);
Close closeFrame = generateCloseFrame();
byte[] closeFrameBytes = _amqpFramer.generateFrame(channel, closeFrame);
_frameParser.tail()
.put(openFrameBytes)
.put(closeFrameBytes);
_frameParser.process();
InOrder inOrder = inOrder(_mockFrameHandler);
inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame));
}
@Test
public void testFrameTransportTemporarilyRefusesOpenFrame()
{
when(_mockFrameHandler.isHandlingFrames()).thenReturn(false);
sendHeader();
// now send an open frame
int channel = 0;
Open openFrame = generateOpenFrame();
{
ByteBuffer buffer = _frameParser.tail();
byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
buffer.put(frame);
_frameParser.process();
}
verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
when(_mockFrameHandler.isHandlingFrames()).thenReturn(true);
// now ensure that the held frame gets sent on second input
Close closeFrame = generateCloseFrame();
{
ByteBuffer buffer = _frameParser.tail();
byte[] frame = _amqpFramer.generateFrame(channel, closeFrame);
buffer.put(frame);
_frameParser.process();
}
InOrder inOrder = inOrder(_mockFrameHandler);
inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame));
}
@Test
public void testFrameTransportTemporarilyRefusesOpenAndCloseFrame()
{
when(_mockFrameHandler.isHandlingFrames()).thenReturn(false);
sendHeader();
// now send an open frame
int channel = 0;
Open openFrame = generateOpenFrame();
{
ByteBuffer buffer = _frameParser.tail();
byte[] frame = _amqpFramer.generateFrame(channel, openFrame);
buffer.put(frame);
_frameParser.process();
}
verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
// now send a close frame
Close closeFrame = generateCloseFrame();
{
ByteBuffer buffer = _frameParser.tail();
byte[] frame = _amqpFramer.generateFrame(channel, closeFrame);
buffer.put(frame);
_frameParser.process();
}
verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class));
when(_mockFrameHandler.isHandlingFrames()).thenReturn(true);
_frameParser.flush();
InOrder inOrder = inOrder(_mockFrameHandler);
inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame));
inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame));
}
private void sendHeader() throws TransportException
{
ByteBuffer buffer = _frameParser.tail();
buffer.put(HEADER);
_frameParser.process();
}
private Open generateOpenFrame()
{
Open open = new Open();
open.setContainerId("containerid");
return open;
}
private Close generateCloseFrame()
{
Close close = new Close();
return close;
}
private TransportFrame frameMatching(int channel, FrameBody frameBody)
{
return argThat(new TransportFrameMatcher(channel, frameBody));
}
private class TransportFrameMatcher implements ArgumentMatcher<TransportFrame>
{
private final TransportFrame _expectedTransportFrame;
TransportFrameMatcher(int expectedChannel, FrameBody expectedFrameBody)
{
_expectedTransportFrame = new TransportFrame(expectedChannel, expectedFrameBody, null);
}
@Override
public boolean matches(TransportFrame transportFrame)
{
if(transportFrame == null)
{
return false;
}
FrameBody actualFrame = transportFrame.getBody();
int _expectedChannel = _expectedTransportFrame.getChannel();
FrameBody expectedFrame = _expectedTransportFrame.getBody();
return _expectedChannel == transportFrame.getChannel()
&& expectedFrame.getClass().equals(actualFrame.getClass());
}
@Override
public String toString()
{
return "TransportFrameMatcher, Expected: " + _expectedTransportFrame;
}
}
}