blob: d1fce3efafb563e63a372578fe04f4a221ee6c0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system
import java.util
import java.util.Collections
import org.junit.Assert._
import org.junit.Test
import org.apache.samza.Partition
import org.apache.samza.config.TaskConfig
import org.apache.samza.serializers._
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MockMessageChooser
import org.apache.samza.util.BlockingEnvelopeMap
import org.mockito.Mockito
import scala.collection.JavaConverters._
class TestSystemConsumers {
def testPollIntervalMs {
val numEnvelopes = 1000
val system = "test-system"
val systemStreamPartition0 = new SystemStreamPartition(system, "some-stream", new Partition(0))
val systemStreamPartition1 = new SystemStreamPartition(system, "some-stream", new Partition(1))
val envelope = new IncomingMessageEnvelope(systemStreamPartition0, "1", "k", "v")
val consumer = new CustomPollResponseSystemConsumer(envelope)
var now = 0L
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.doReturn(Mockito.mock(classOf[SystemAdmin])).when(systemAdmins.getSystemAdmin(system))
val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), systemAdmins,
new SerdeManager, new SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
consumers.register(systemStreamPartition0, "0")
consumers.register(systemStreamPartition1, "1234")
consumers.start
// Tell the consumer to respond with 1000 messages for SSP0, and no
// messages for SSP1.
consumer.setResponseSizes(numEnvelopes)
// Choose to trigger a refresh with data.
assertNull(consumers.choose())
// 2: First on start, second on choose.
assertEquals(2, consumer.polls)
assertEquals(2, consumer.lastPoll.size)
assertTrue(consumer.lastPoll.contains(systemStreamPartition0))
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
assertEquals(envelope, consumers.choose())
assertEquals(envelope, consumers.choose())
// We aren't polling because we're getting non-null envelopes.
assertEquals(2, consumer.polls)
// Advance the clock to trigger a new poll even though there are still
// messages.
now = TaskConfig.DEFAULT_POLL_INTERVAL_MS
assertEquals(envelope, consumers.choose())
// We polled even though there are still 997 messages in the unprocessed
// message buffer.
assertEquals(3, consumer.polls)
assertEquals(1, consumer.lastPoll.size)
// Only SSP1 was polled because we still have messages for SSP2.
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
// Now drain all messages for SSP0. There should be exactly 997 messages,
// since we have chosen 3 already, and we started with 1000.
(0 until (numEnvelopes - 3)).foreach { i =>
assertEquals(envelope, consumers.choose())
}
// Nothing left. Should trigger a poll here.
assertNull(consumers.choose())
assertEquals(4, consumer.polls)
assertEquals(2, consumer.lastPoll.size)
// Now we ask for messages from both again.
assertTrue(consumer.lastPoll.contains(systemStreamPartition0))
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
}
def testBasicSystemConsumersFunctionality {
val system = "test-system"
val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
val envelope = new IncomingMessageEnvelope(systemStreamPartition, "1", "k", "v")
val consumer = new CustomPollResponseSystemConsumer(envelope)
var now = 0
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.doReturn(Mockito.mock(classOf[SystemAdmin])).when(systemAdmins.getSystemAdmin(system))
val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer), systemAdmins,
new SerdeManager, new SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
consumers.register(systemStreamPartition, "0")
consumers.start
// Start should trigger a poll to the consumer.
assertEquals(1, consumer.polls)
// Tell the consumer to start returning messages when polled.
consumer.setResponseSizes(1)
// Choose to trigger a refresh with data.
assertNull(consumers.choose())
// Choose should have triggered a second poll, since no messages are available.
assertEquals(2, consumer.polls)
// Choose a few times. This time there is no data.
assertEquals(envelope, consumers.choose())
assertNull(consumers.choose())
assertNull(consumers.choose())
// Return more than one message this time.
consumer.setResponseSizes(2)
// Choose to trigger a refresh with data.
assertNull(consumers.choose())
// Increase clock interval.
now = TaskConfig.DEFAULT_POLL_INTERVAL_MS
// We get two messages now.
assertEquals(envelope, consumers.choose())
// Should not poll even though clock interval increases past interval threshold.
assertEquals(2, consumer.polls)
assertEquals(envelope, consumers.choose())
assertNull(consumers.choose())
}
@Test
def testSystemConumersShouldRegisterStartAndStopChooser {
val system = "test-system"
val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
var consumerStarted = 0
var consumerStopped = 0
var consumerRegistered = Map[SystemStreamPartition, String]()
var chooserStarted = 0
var chooserStopped = 0
var chooserRegistered = Map[SystemStreamPartition, String]()
val consumer = Map(system -> new SystemConsumer {
def start = consumerStarted += 1
def stop = consumerStopped += 1
def register(systemStreamPartition: SystemStreamPartition, offset: String) = consumerRegistered += systemStreamPartition -> offset
def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]().asJava
})
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
val consumers = new SystemConsumers(new MessageChooser {
def update(envelope: IncomingMessageEnvelope) = Unit
def choose = null
def start = chooserStarted += 1
def stop = chooserStopped += 1
def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
}, consumer, systemAdmins)
consumers.register(systemStreamPartition, "0")
consumers.start
consumers.stop
assertEquals(1, chooserStarted)
assertEquals(1, chooserStopped)
assertEquals(1, chooserRegistered.size)
assertEquals("0", chooserRegistered(systemStreamPartition))
assertEquals(1, consumerStarted)
assertEquals(1, consumerStopped)
assertEquals(1, consumerRegistered.size)
assertEquals("0", consumerRegistered(systemStreamPartition))
}
@Test
def testThrowSystemConsumersExceptionWhenTheSystemDoesNotHaveConsumer() {
val system = "test-system"
val system2 = "test-system2"
val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
val systemStreamPartition2 = new SystemStreamPartition(system2, "some-stream", new Partition(1))
var started = 0
var stopped = 0
var registered = Map[SystemStreamPartition, String]()
val consumer = Map(system -> new SystemConsumer {
def start {}
def stop {}
def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]().asJava
})
val consumers = new SystemConsumers(new MessageChooser {
def update(envelope: IncomingMessageEnvelope) = Unit
def choose = null
def start = started += 1
def stop = stopped += 1
def register(systemStreamPartition: SystemStreamPartition, offset: String) = registered += systemStreamPartition -> offset
}, consumer, null)
// it should throw a SystemConsumersException because system2 does not have a consumer
var caughtRightException = false
try {
consumers.register(systemStreamPartition2, "0")
} catch {
case e: SystemConsumersException => caughtRightException = true
case _: Throwable => caughtRightException = false
}
assertTrue("suppose to throw SystemConsumersException, but apparently it did not", caughtRightException)
}
@Test
def testDroppingMsgOrThrowExceptionWhenSerdeFails() {
val system = "test-system"
val systemStreamPartition = new SystemStreamPartition(system, "some-stream", new Partition(1))
val msgChooser = new DefaultChooser
val consumer = Map(system -> new SerializingConsumer)
val systemMessageSerdes = Map(system -> (new StringSerde("UTF-8")).asInstanceOf[Serde[Object]])
val serdeManager = new SerdeManager(systemMessageSerdes = systemMessageSerdes)
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
// throw exceptions when the deserialization has error
val consumers = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = false)
consumers.register(systemStreamPartition, "0")
consumers.start
consumer(system).putStringMessage
consumer(system).putBytesMessage
var caughtRightException = false
try {
consumers.choose()
} catch {
case e: SystemConsumersException => caughtRightException = true
case _: Throwable => caughtRightException = false
}
assertTrue("suppose to throw SystemConsumersException", caughtRightException)
consumers.stop
// it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
val consumers2 = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = true)
consumers2.register(systemStreamPartition, "0")
consumers2.start
consumer(system).putBytesMessage
consumer(system).putStringMessage
consumer(system).putBytesMessage
var notThrowException = true;
try {
consumers2.choose()
} catch {
case e: Throwable => notThrowException = false
}
assertTrue("it should not throw any exception", notThrowException)
var msgEnvelope = Some(consumers2.choose())
assertTrue("Consumer did not succeed in receiving the second message after Serde exception in choose", msgEnvelope.get != null)
consumers2.stop
// ensure that the system consumer will continue after poll() method ignored a Serde exception
consumer(system).putStringMessage
consumer(system).putBytesMessage
notThrowException = true;
try {
consumers2.start
} catch {
case e: Throwable => notThrowException = false
}
assertTrue("SystemConsumer start should not throw any Serde exception", notThrowException)
msgEnvelope = null
msgEnvelope = Some(consumers2.choose())
assertTrue("Consumer did not succeed in receiving the second message after Serde exception in poll", msgEnvelope.get != null)
consumers2.stop
}
@Test
def testSystemConsumersShouldNotPollEndOfStreamSSPs {
val system = "test-system"
val stream = "some-stream"
val systemStreamPartition1 = new SystemStreamPartition(system, stream, new Partition(1))
val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
val normalEnvelope = new IncomingMessageEnvelope(systemStreamPartition1, "1", "k", "v")
val endOfStreamEnvelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition2)
val consumer = new CustomPollResponseSystemConsumer(normalEnvelope)
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
systemAdmins, new SerdeManager, new SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
consumers.register(systemStreamPartition1, "0")
consumers.register(systemStreamPartition2, "0")
consumers.start
// Start should trigger a poll to the consumer.
assertEquals(1, consumer.polls)
assertEquals(2, consumer.lastPoll.size())
// Tell the consumer to start returning messages when polled.
val nextResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]](
systemStreamPartition1 -> Collections.singletonList(normalEnvelope),
systemStreamPartition2 -> Collections.singletonList(endOfStreamEnvelope)
)
consumer.setNextResponse(nextResponse)
// Choose to trigger a refresh with data.
assertNull(consumers.choose())
// Choose should have triggered a second poll, since no messages are available.
assertEquals(2, consumer.polls)
assertEquals(2, consumer.lastPoll.size())
// Choose a few times and let chooser handle the end of stream message
assertNotNull(consumers.choose())
assertNotNull(consumers.choose())
consumers.tryUpdate(systemStreamPartition1)
// Now assuming that chooser has processed end of stream message,
// tryUpdate shouldn't add ssp back to emptySystemStreamPartitionsBySystem
consumers.tryUpdate(systemStreamPartition2)
assertNull(consumers.choose())
assertEquals(3, consumer.polls)
// SystemConsumers should poll only one partition: ssp1
assertEquals(1, consumer.lastPoll.size())
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
}
@Test
def testSystemConsumersRegistration {
val system = "test-system"
val stream = "some-stream"
val systemStreamPartition1 = new SystemStreamPartition(system, stream, new Partition(1))
val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
val consumer = Mockito.mock(classOf[SystemConsumer])
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
systemAdmins, new SerdeManager, new SystemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
TaskConfig.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
consumers.register(systemStreamPartition1, "0")
}
/**
* A simple MockSystemConsumer that keeps track of what was polled, and lets
* you define how many envelopes to return in the poll response. You can
* supply the envelope to use for poll responses through the constructor.
* You can also directly set the next response by calling setNextResponse
*/
private class CustomPollResponseSystemConsumer(envelope: IncomingMessageEnvelope) extends SystemConsumer {
var polls = 0
var pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
var lastPoll: java.util.Set[SystemStreamPartition] = null
def start {}
def stop {}
def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = {
polls += 1
lastPoll = new util.HashSet[SystemStreamPartition](systemStreamPartitions)
pollResponse.asJava
}
def setResponseSizes(numEnvelopes: Int) {
val q = new java.util.ArrayList[IncomingMessageEnvelope]()
(0 until numEnvelopes).foreach { i => q.add(envelope) }
pollResponse = Map(envelope.getSystemStreamPartition -> q)
pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
}
def setNextResponse(nextResponse: Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]) {
pollResponse = nextResponse
}
}
/**
* A simple consumer that provides two extra methods: one is to put bytes
* format message and the other to put string format message.
*/
private class SerializingConsumer extends BlockingEnvelopeMap {
val systemStreamPartition = new SystemStreamPartition("test-system", "some-stream", new Partition(1))
def putBytesMessage() {
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "0", "test".getBytes()))
}
def putStringMessage() {
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "0", "1", "test"))
}
def start() {}
def stop() {}
override def register(systemStreamPartition: SystemStreamPartition, offset: String): Unit = {
super[BlockingEnvelopeMap].register(systemStreamPartition, offset)
}
}
}
object TestSystemConsumers {
def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer], systemAdmins: SystemAdmins = SystemAdmins.empty()) : SystemConsumers = {
new SystemConsumers(new DefaultChooser, consumers.asScala.toMap, systemAdmins)
}
}