/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.message

import java.nio._
import junit.framework.Assert._
import org.junit.Test
import kafka.utils.TestUtils
import kafka.common.InvalidMessageSizeException

class ByteBufferMessageSetTest extends BaseMessageSetTestCases {

  override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet = 
    new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
  
  @Test
  def testSmallFetchSize() {
    // create a ByteBufferMessageSet that doesn't contain a full message
    // iterating it should get an InvalidMessageSizeException
    val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
    val buffer = messages.serialized.slice
    buffer.limit(10)
    val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
    try {
      for (message <- messageSetWithNoFullMessage)
        fail("shouldn't see any message")
    }
    catch {
      case e: InvalidMessageSizeException => //this is expected
      case e2 => fail("shouldn't see any other exceptions")
    }
  }

  @Test
  def testValidBytes() {
    {
      val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
      val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
      buffer.put(messages.serialized)
      buffer.putShort(4)
      val messagesPlus = new ByteBufferMessageSet(buffer)
      assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
    }

    // test valid bytes on empty ByteBufferMessageSet
    {
      assertEquals("Valid bytes on an empty ByteBufferMessageSet should return 0", 0,
        MessageSet.Empty.asInstanceOf[ByteBufferMessageSet].validBytes)
    }
  }

  @Test
  def testEquals() {
    var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
    var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))

    assertTrue(messages.equals(moreMessages))

    messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
    moreMessages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))

    assertTrue(messages.equals(moreMessages))
  }
  

  @Test
  def testIterator() {
    val messageList = List(
        new Message("msg1".getBytes),
        new Message("msg2".getBytes),
        new Message("msg3".getBytes)
      )

    // test for uncompressed regular messages
    {
      val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
      //make sure ByteBufferMessageSet is re-iterable.
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
      //make sure the last offset after iteration is correct
      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)

      //make sure shallow iterator is the same as deep iterator
      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
                                     TestUtils.getMessageIterator(messageSet.iterator))
    }

    // test for compressed regular messages
    {
      val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
      //make sure ByteBufferMessageSet is re-iterable.
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
      //make sure the last offset after iteration is correct
      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)

      verifyShallowIterator(messageSet)
    }

    // test for mixed empty and non-empty messagesets uncompressed
    {
      val emptyMessageList : List[Message] = Nil
      val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
      val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
      val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
      buffer.put(emptyMessageSet.serialized)
      buffer.put(regularMessgeSet.serialized)
      buffer.rewind
      val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
      //make sure ByteBufferMessageSet is re-iterable.
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
      //make sure the last offset after iteration is correct
      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)

      //make sure shallow iterator is the same as deep iterator
      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
                                     TestUtils.getMessageIterator(mixedMessageSet.iterator))
    }

    // test for mixed empty and non-empty messagesets compressed
    {
      val emptyMessageList : List[Message] = Nil
      val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
      val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
      val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit)
      buffer.put(emptyMessageSet.serialized)
      buffer.put(regularMessgeSet.serialized)
      buffer.rewind
      val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
      //make sure ByteBufferMessageSet is re-iterable.
      TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
      //make sure the last offset after iteration is correct
      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)

      verifyShallowIterator(mixedMessageSet)
    }
  }

  def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
      //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
      val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
      val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
      assertTrue(shallowOffsets.subsetOf(deepOffsets))
  }
}
