blob: 9f02cef12e8ae23de99a69bce462fe998ff26489 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.task
import java.time.Instant
import java.util.Random
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
val appId = 0
val executorId = 0
val taskId = TaskId(0, 0)
val session = new Random().nextInt()
val downstreamProcessorId = 1
val partitioner = Partitioner[HashPartitioner]
val parallism = 2
val downstreamProcessor = ProcessorDescription(downstreamProcessorId, classOf[NextTask].getName,
parallism)
val subscriber = Subscriber(downstreamProcessorId, partitioner, downstreamProcessor.parallelism,
downstreamProcessor.life)
private def prepare: (Subscription, ExpressTransport) = {
val transport = mock[ExpressTransport]
val subscription = new Subscription(appId, executorId, taskId, subscriber, session, transport)
subscription.start()
val expectedAckRequest = InitialAckRequest(taskId, session)
verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), TaskId(1, 1))
(subscription, transport)
}
it should "not send any more message when its life ends" in {
val (subscription, transport) = prepare
subscription.changeLife(LifeTime(0, 0))
val count = subscription.sendMessage(Message("some"))
assert(count == 0)
}
it should "send message and handle ack correctly" in {
val (subscription, transport) = prepare
val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
subscription.sendMessage(msg1)
verify(transport, times(1)).transport(msg1, TaskId(1, 1))
assert(subscription.minClock == 70)
val msg2 = new Message("0", timestamp = Instant.ofEpochMilli(50))
subscription.sendMessage(msg2)
verify(transport, times(1)).transport(msg2, TaskId(1, 0))
// minClock has been set to smaller one
assert(subscription.minClock == 50)
val initialMinClock = subscription.minClock
// Acks initial AckRequest(0)
subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session))
// Sends 100 messages
100 until 200 foreach { clock =>
subscription.sendMessage(Message("1", clock))
subscription.sendMessage(Message("2", clock))
}
// Ack not received, minClock no change
assert(subscription.minClock == initialMinClock)
subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
// Ack received, minClock changed
assert(subscription.minClock > initialMinClock)
// Expects to receive two ackRequest for two downstream tasks
val ackRequestForTask0 = AckRequest(taskId, 200, session)
verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
val ackRequestForTask1 = AckRequest(taskId, 200, session)
verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
}
it should "disallow more message sending if there is no ack back" in {
val (subscription, transport) = prepare
// send 100 messages
0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock =>
subscription.sendMessage(Message(randomMessage, clock))
}
assert(subscription.allowSendingMoreMessages() == false)
}
it should "report minClock as Long.MaxValue when there is no pending message" in {
val (subscription, _) = prepare
val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
subscription.sendMessage(msg1)
assert(subscription.minClock == 70)
subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
assert(subscription.minClock == MAX_TIME_MILLIS)
}
private def randomMessage: String = new Random().nextInt.toString
}
object SubscriptionSpec {
class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
}
}