| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.proton.engine.impl; |
| |
| import static org.apache.qpid.proton.engine.Transport.DEFAULT_MAX_FRAME_SIZE; |
| import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.argThat; |
| import static org.mockito.Mockito.inOrder; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.nio.ByteBuffer; |
| |
| import org.apache.qpid.proton.amqp.transport.Close; |
| import org.apache.qpid.proton.amqp.transport.FrameBody; |
| import org.apache.qpid.proton.amqp.transport.Open; |
| import org.apache.qpid.proton.codec.AMQPDefinedTypes; |
| import org.apache.qpid.proton.codec.DecoderImpl; |
| import org.apache.qpid.proton.codec.EncoderImpl; |
| import org.apache.qpid.proton.engine.Transport; |
| import org.apache.qpid.proton.engine.TransportException; |
| import org.apache.qpid.proton.framing.TransportFrame; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.ArgumentMatcher; |
| import org.mockito.InOrder; |
| |
| // TODO test a frame with a payload (potentially followed by another frame) |
| public class FrameParserTest |
| { |
| private FrameHandler _mockFrameHandler = mock(FrameHandler.class); |
| private DecoderImpl _decoder = new DecoderImpl(); |
| private EncoderImpl _encoder = new EncoderImpl(_decoder); |
| private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE, new TransportImpl()); |
| |
| private final AmqpFramer _amqpFramer = new AmqpFramer(); |
| |
| @Before |
| public void setUp() |
| { |
| AMQPDefinedTypes.registerAllTypes(_decoder, _encoder); |
| |
| when(_mockFrameHandler.isHandlingFrames()).thenReturn(true); |
| } |
| |
| @Test |
| public void testInputOfInvalidProtocolHeader_causesErrorAndRefusesFurtherInput() |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| buffer.put("hello".getBytes()); |
| _frameParser.process(); |
| assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM); |
| } |
| |
| @Test |
| public void testInputOfValidProtocolHeader() |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| buffer.put(HEADER); |
| _frameParser.process(); |
| |
| assertNotNull(_frameParser.tail()); |
| } |
| |
| @Test |
| public void testInputOfValidProtocolHeaderInMultipleChunks() |
| { |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| buffer.put(HEADER, 0, 2); |
| _frameParser.process(); |
| } |
| |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| buffer.put(HEADER, 2, HEADER.length - 2); |
| _frameParser.process(); |
| } |
| |
| assertNotNull(_frameParser.tail()); |
| } |
| |
| @Test |
| public void testInputOfValidFrame_invokesFrameTransportCallback() |
| { |
| sendHeader(); |
| |
| // now send an open frame |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| Open openFrame = generateOpenFrame(); |
| int channel = 0; |
| byte[] frame = _amqpFramer.generateFrame(channel, openFrame); |
| buffer.put(frame); |
| |
| _frameParser.process(); |
| verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame)); |
| } |
| |
| @Test |
| public void testInputOfFrameInMultipleChunks_invokesFrameTransportCallback() |
| { |
| sendHeader(); |
| |
| Open openFrame = generateOpenFrame(); |
| int channel = 0; |
| byte[] frame = _amqpFramer.generateFrame(channel, openFrame); |
| int lengthOfFirstChunk = 2; |
| int lengthOfSecondChunk = (frame.length - lengthOfFirstChunk)/2; |
| int lengthOfThirdChunk = frame.length - lengthOfFirstChunk - lengthOfSecondChunk; |
| |
| // send the first chunk |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| buffer.put(frame, 0, lengthOfFirstChunk); |
| |
| _frameParser.process(); |
| |
| verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class)); |
| } |
| |
| // send the second chunk |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| int secondChunkOffset = lengthOfFirstChunk; |
| buffer.put(frame, secondChunkOffset, lengthOfSecondChunk); |
| |
| _frameParser.process(); |
| verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class)); |
| } |
| |
| // send the third and final chunk |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| int thirdChunkOffset = lengthOfFirstChunk + lengthOfSecondChunk; |
| buffer.put(frame, thirdChunkOffset, lengthOfThirdChunk); |
| |
| _frameParser.process(); |
| verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame)); |
| } |
| } |
| |
| @Test |
| public void testInputOfTwoFrames_invokesFrameTransportTwice() |
| { |
| sendHeader(); |
| |
| int channel = 0; |
| Open openFrame = generateOpenFrame(); |
| byte[] openFrameBytes = _amqpFramer.generateFrame(channel, openFrame); |
| |
| Close closeFrame = generateCloseFrame(); |
| byte[] closeFrameBytes = _amqpFramer.generateFrame(channel, closeFrame); |
| |
| _frameParser.tail() |
| .put(openFrameBytes) |
| .put(closeFrameBytes); |
| |
| _frameParser.process(); |
| |
| InOrder inOrder = inOrder(_mockFrameHandler); |
| inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame)); |
| inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame)); |
| } |
| |
| @Test |
| public void testFrameTransportTemporarilyRefusesOpenFrame() |
| { |
| when(_mockFrameHandler.isHandlingFrames()).thenReturn(false); |
| |
| sendHeader(); |
| |
| // now send an open frame |
| int channel = 0; |
| Open openFrame = generateOpenFrame(); |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| byte[] frame = _amqpFramer.generateFrame(channel, openFrame); |
| buffer.put(frame); |
| |
| _frameParser.process(); |
| } |
| |
| verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class)); |
| |
| when(_mockFrameHandler.isHandlingFrames()).thenReturn(true); |
| |
| // now ensure that the held frame gets sent on second input |
| Close closeFrame = generateCloseFrame(); |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| byte[] frame = _amqpFramer.generateFrame(channel, closeFrame); |
| buffer.put(frame); |
| |
| _frameParser.process(); |
| } |
| |
| InOrder inOrder = inOrder(_mockFrameHandler); |
| inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame)); |
| inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame)); |
| } |
| |
| @Test |
| public void testFrameTransportTemporarilyRefusesOpenAndCloseFrame() |
| { |
| when(_mockFrameHandler.isHandlingFrames()).thenReturn(false); |
| |
| sendHeader(); |
| |
| // now send an open frame |
| int channel = 0; |
| Open openFrame = generateOpenFrame(); |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| byte[] frame = _amqpFramer.generateFrame(channel, openFrame); |
| buffer.put(frame); |
| |
| _frameParser.process(); |
| } |
| verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class)); |
| |
| // now send a close frame |
| Close closeFrame = generateCloseFrame(); |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| |
| byte[] frame = _amqpFramer.generateFrame(channel, closeFrame); |
| buffer.put(frame); |
| |
| _frameParser.process(); |
| } |
| verify(_mockFrameHandler, never()).handleFrame(any(TransportFrame.class)); |
| |
| when(_mockFrameHandler.isHandlingFrames()).thenReturn(true); |
| |
| _frameParser.flush(); |
| |
| InOrder inOrder = inOrder(_mockFrameHandler); |
| inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, openFrame)); |
| inOrder.verify(_mockFrameHandler).handleFrame(frameMatching(channel, closeFrame)); |
| } |
| |
| private void sendHeader() throws TransportException |
| { |
| ByteBuffer buffer = _frameParser.tail(); |
| buffer.put(HEADER); |
| _frameParser.process(); |
| } |
| |
| private Open generateOpenFrame() |
| { |
| Open open = new Open(); |
| open.setContainerId("containerid"); |
| return open; |
| } |
| |
| private Close generateCloseFrame() |
| { |
| Close close = new Close(); |
| return close; |
| } |
| |
| private TransportFrame frameMatching(int channel, FrameBody frameBody) |
| { |
| return argThat(new TransportFrameMatcher(channel, frameBody)); |
| } |
| |
| private class TransportFrameMatcher implements ArgumentMatcher<TransportFrame> |
| { |
| private final TransportFrame _expectedTransportFrame; |
| |
| TransportFrameMatcher(int expectedChannel, FrameBody expectedFrameBody) |
| { |
| _expectedTransportFrame = new TransportFrame(expectedChannel, expectedFrameBody, null); |
| } |
| |
| @Override |
| public boolean matches(TransportFrame transportFrame) |
| { |
| if(transportFrame == null) |
| { |
| return false; |
| } |
| |
| FrameBody actualFrame = transportFrame.getBody(); |
| |
| int _expectedChannel = _expectedTransportFrame.getChannel(); |
| FrameBody expectedFrame = _expectedTransportFrame.getBody(); |
| |
| return _expectedChannel == transportFrame.getChannel() |
| && expectedFrame.getClass().equals(actualFrame.getClass()); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "TransportFrameMatcher, Expected: " + _expectedTransportFrame; |
| } |
| } |
| } |