blob: f520c6d0e84547abd35afa838ca352d595873e3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util
import java.util
import java.util.Collections
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
import org.apache.samza.coordinator.stream.messages.SetConfig
import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemStream}
import org.junit.{Assert, Test}
import org.mockito.Matchers.any
import org.mockito.Mockito
import org.apache.samza.config.{JobConfig, MapConfig}
import org.apache.samza.metadatastore.MetadataStore
class TestCoordinatorStreamUtil {
@Test
def testCreateCoordinatorStream() {
val systemStream = Mockito.spy(new SystemStream("testSystem", "testStream"))
val systemAdmin = Mockito.mock(classOf[SystemAdmin])
CoordinatorStreamUtil.createCoordinatorStream(systemStream, systemAdmin)
Mockito.verify(systemStream).getStream
Mockito.verify(systemAdmin).createStream(any(classOf[StreamSpec]))
}
@Test
def testBuildCoordinatorStreamConfig() {
val addConfig = new util.HashMap[String, String]
addConfig.put("job.name", "test-job-name")
addConfig.put("job.id", "i001")
addConfig.put("job.coordinator.system", "samzatest")
addConfig.put("systems.samzatest.test","test")
addConfig.put("test.only","nothing")
val config = new MapConfig(addConfig)
val configMap = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
Assert.assertEquals(configMap.get("systems.samzatest.test"), "test")
Assert.assertEquals(configMap.get("test.only"), null)
}
@Test
def testReadConfigFromCoordinatorStream() {
val keyForNonBlankVal = "app.id"
val nonBlankVal = "1"
val keyForEmptyVal = "task.opt"
val emptyVal = ""
val keyForNullVal = "zk.server"
val nullVal = null
val valueSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE)
val configMap = new util.HashMap[String, Array[Byte]]() {
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, keyForNonBlankVal),
valueSerde.toBytes(nonBlankVal))
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, keyForEmptyVal),
valueSerde.toBytes(emptyVal))
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, keyForNullVal),
valueSerde.toBytes(nullVal))
}
val coordinatorStreamStore = Mockito.mock(classOf[CoordinatorStreamStore])
Mockito.when(coordinatorStreamStore.all()).thenReturn(configMap)
val configFromCoordinatorStream = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
Assert.assertEquals(configFromCoordinatorStream.get(keyForNonBlankVal), nonBlankVal)
Assert.assertEquals(configFromCoordinatorStream.get(keyForEmptyVal), emptyVal)
Assert.assertFalse(configFromCoordinatorStream.containsKey(keyForNullVal))
}
@Test
def testWriteConfigToCoordinatorStream() {
val addConfig = new util.HashMap[String, String]
addConfig.put("job.name", "test-job-name")
addConfig.put("job.id", "i001")
addConfig.put("job.coordinator.system", "samzatest")
addConfig.put("systems.samzatest.test","test")
addConfig.put("test.only","nothing")
addConfig.put("systems.samzatest.samza.factory", "org.apache.samza.system.MockSystemFactory")
val config = new MapConfig(addConfig)
val configMap = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
CoordinatorStreamUtil.writeConfigToCoordinatorStream(configMap)
}
@Test
def testReadLaunchConfigFromCoordinatorStream() {
// Empty config when auto sizing is disabled.
Assert.assertEquals(new MapConfig(), CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(new MapConfig(), null))
val valueSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE)
val config = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_ENABLED, "true"))
val expected = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, "20"))
val mockMetadataStore = Mockito.mock(classOf[MetadataStore])
val configMap = new util.HashMap[String, Array[Byte]]() {
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
JobConfig.JOB_ID),
valueSerde.toBytes("321"))
put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT),
valueSerde.toBytes("20"))
}
Mockito.when(mockMetadataStore.all()).thenReturn(configMap)
// Verify the launch config is expected
Assert.assertEquals(expected, CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(config, mockMetadataStore))
}
}