blob: 7fab2cc1a3a6e29e44ae5c5d9aa00a58637781ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.storm.util
import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConverters._
import backtype.storm.generated.Grouping
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalacheck.Gen
import org.scalatest.mock.MockitoSugar
import org.scalatest.prop.PropertyChecks
import org.scalatest.{Matchers, PropSpec}
import org.apache.gearpump.{Message, Time}
import org.apache.gearpump.Time.MilliSeconds
import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
import org.apache.gearpump.streaming.MockUtil
class StormOutputCollectorSpec
extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
private val stormTaskId = 0
private val streamIdGen = Gen.alphaStr
private val valuesGen = Gen.listOf[String](Gen.alphaStr).map(_.asJava.asInstanceOf[JList[AnyRef]])
private val timestampGen = Gen.chooseNum[Long](0L, 1000L)
property("StormOutputCollector emits tuple values into a stream") {
forAll(timestampGen, streamIdGen, valuesGen) {
(timestamp: MilliSeconds, streamId: String, values: JList[AnyRef]) =>
val targets = mock[JMap[String, JMap[String, Grouping]]]
val taskToComponent = mock[JMap[Integer, String]]
val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
(Map[String, Array[Int]], JList[Integer])]
val targetPartitions = mock[Map[String, Array[Int]]]
val targetStormTaskIds = mock[JList[Integer]]
when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
when(targets.containsKey(streamId)).thenReturn(false)
stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
verify(taskContext, times(0)).output(anyObject[Message])
when(targets.containsKey(streamId)).thenReturn(true)
stormOutputCollector.setTimestamp(timestamp)
stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds
verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
message: Message =>
val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions)
message.value == expected && message.timestamp.toEpochMilli == timestamp
}))
}
}
property("StormOutputCollector emit direct to a task") {
val idGen = Gen.chooseNum[Int](0, 1000)
val targetGen = Gen.alphaStr
forAll(idGen, targetGen, timestampGen, streamIdGen, valuesGen) {
(id: Int, target: String, timestamp: Long, streamId: String, values: JList[AnyRef]) =>
val targets = mock[JMap[String, JMap[String, Grouping]]]
val taskToComponent = mock[JMap[Integer, String]]
when(taskToComponent.get(id)).thenReturn(target)
val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
(Map[String, Array[Int]], JList[Integer])]
val targetPartitions = mock[Map[String, Array[Int]]]
val targetStormTaskIds = mock[JList[Integer]]
when(getTargetPartitionsFn(streamId, values)).thenReturn((targetPartitions,
targetStormTaskIds))
val taskContext = MockUtil.mockTaskContext
val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
when(targets.containsKey(streamId)).thenReturn(false)
verify(taskContext, times(0)).output(anyObject[Message])
when(targets.containsKey(streamId)).thenReturn(true)
stormOutputCollector.setTimestamp(timestamp)
stormOutputCollector.emitDirect(id, streamId, values)
val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index)
verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({
message: Message => {
val expected = new GearpumpTuple(values, stormTaskId, streamId,
Map(target -> partitions))
val result = message.value == expected && message.timestamp.toEpochMilli == timestamp
result
}
}))
}
}
}