blob: 40ec38e89d1b4d39aa3d5f63b4f3bbed56d6f586 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.samza.system.SystemStream;
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestStreamConfig {
private static final String SYSTEM = "system";
private static final String STREAM_ID = "streamId";
private static final SystemStream SYSTEM_STREAM = new SystemStream(SYSTEM, STREAM_ID);
private static final String OTHER_STREAM_ID = "otherStreamId";
private static final String PHYSICAL_STREAM = "physicalStream";
private static final SystemStream SYSTEM_STREAM_PHYSICAL = new SystemStream(SYSTEM, PHYSICAL_STREAM);
private static final String SAMZA_IGNORED_PROPERTY = "samza.ignored.property";
private static final String UNUSED_VALUE = "should_not_be_used";
@Test
public void testGetStreamMsgSerde() {
String value = "my.msg.serde";
doTestSamzaProperty(StreamConfig.MSG_SERDE, value,
(config, systemStream) -> assertEquals(Optional.of(value), config.getStreamMsgSerde(systemStream)));
doTestSamzaProperty(StreamConfig.MSG_SERDE, "",
(config, systemStream) -> assertEquals(Optional.empty(), config.getStreamMsgSerde(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.MSG_SERDE,
(config, systemStream) -> assertEquals(Optional.empty(), config.getStreamMsgSerde(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getStreamMsgSerde);
}
@Test
public void testGetStreamKeySerde() {
String value = "my.key.serde";
doTestSamzaProperty(StreamConfig.KEY_SERDE, value,
(config, systemStream) -> assertEquals(Optional.of(value), config.getStreamKeySerde(systemStream)));
doTestSamzaProperty(StreamConfig.KEY_SERDE, "",
(config, systemStream) -> assertEquals(Optional.empty(), config.getStreamKeySerde(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.KEY_SERDE,
(config, systemStream) -> assertEquals(Optional.empty(), config.getStreamKeySerde(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getStreamKeySerde);
}
@Test
public void testGetResetOffset() {
doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET, "true",
(config, systemStream) -> assertTrue(config.getResetOffset(systemStream)));
doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET, "false",
(config, systemStream) -> assertFalse(config.getResetOffset(systemStream)));
// if not true/false, then use false
doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET, "unknown_value",
(config, systemStream) -> assertFalse(config.getResetOffset(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_RESET_OFFSET,
(config, systemStream) -> assertFalse(config.getResetOffset(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getResetOffset);
}
@Test
public void testIsResetOffsetConfigured() {
doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET, "true",
(config, systemStream) -> assertTrue(config.isResetOffsetConfigured(systemStream)));
doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET, "false",
(config, systemStream) -> assertTrue(config.isResetOffsetConfigured(systemStream)));
// if not true/false, then use false
doTestSamzaProperty(StreamConfig.CONSUMER_RESET_OFFSET, "unknown_value",
(config, systemStream) -> assertTrue(config.isResetOffsetConfigured(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_RESET_OFFSET,
(config, systemStream) -> assertFalse(config.isResetOffsetConfigured(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::isResetOffsetConfigured);
}
@Test
public void testGetDefaultStreamOffset() {
String value = "my_offset_default";
doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT, value,
(config, systemStream) -> assertEquals(Optional.of(value), config.getDefaultStreamOffset(systemStream)));
doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT, "",
(config, systemStream) -> assertEquals(Optional.of(""), config.getDefaultStreamOffset(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_OFFSET_DEFAULT,
(config, systemStream) -> assertEquals(Optional.empty(),
new StreamConfig(config).getDefaultStreamOffset(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getDefaultStreamOffset);
}
@Test
public void testIsDefaultStreamOffsetConfigured() {
String value = "my_offset_default";
doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT, value,
(config, systemStream) -> assertTrue(config.isDefaultStreamOffsetConfigured(systemStream)));
doTestSamzaProperty(StreamConfig.CONSUMER_OFFSET_DEFAULT, "",
(config, systemStream) -> assertTrue(config.isDefaultStreamOffsetConfigured(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.CONSUMER_OFFSET_DEFAULT,
(config, systemStream) -> assertFalse(config.isDefaultStreamOffsetConfigured(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::isDefaultStreamOffsetConfigured);
}
@Test
public void testGetBootstrapEnabled() {
doTestSamzaProperty(StreamConfig.BOOTSTRAP, "true",
(config, systemStream) -> assertTrue(config.getBootstrapEnabled(systemStream)));
doTestSamzaProperty(StreamConfig.BOOTSTRAP, "false",
(config, systemStream) -> assertFalse(config.getBootstrapEnabled(systemStream)));
// if not true/false, then use false
doTestSamzaProperty(StreamConfig.BOOTSTRAP, "unknown_value",
(config, systemStream) -> assertFalse(config.getBootstrapEnabled(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.BOOTSTRAP,
(config, systemStream) -> assertFalse(config.getBootstrapEnabled(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getBootstrapEnabled);
}
@Test
public void testGetBroadcastEnabled() {
doTestSamzaProperty(StreamConfig.BROADCAST, "true",
(config, systemStream) -> assertTrue(config.getBroadcastEnabled(systemStream)));
doTestSamzaProperty(StreamConfig.BROADCAST, "false",
(config, systemStream) -> assertFalse(config.getBroadcastEnabled(systemStream)));
// if not true/false, then use false
doTestSamzaProperty(StreamConfig.BROADCAST, "unknown_value",
(config, systemStream) -> assertFalse(config.getBroadcastEnabled(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.BROADCAST,
(config, systemStream) -> assertFalse(config.getBroadcastEnabled(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getBroadcastEnabled);
}
@Test
public void testGetPriority() {
doTestSamzaProperty(StreamConfig.PRIORITY, "0",
(config, systemStream) -> assertEquals(0, config.getPriority(systemStream)));
doTestSamzaProperty(StreamConfig.PRIORITY, "100",
(config, systemStream) -> assertEquals(100, config.getPriority(systemStream)));
doTestSamzaProperty(StreamConfig.PRIORITY, "-1",
(config, systemStream) -> assertEquals(-1, config.getPriority(systemStream)));
doTestSamzaPropertyDoesNotExist(StreamConfig.PRIORITY,
(config, systemStream) -> assertEquals(-1, config.getPriority(systemStream)));
doTestSamzaPropertyInvalidConfig(StreamConfig::getPriority);
}
@Test
public void testGetSerdeStreams() {
assertEquals(Collections.emptySet(),
new StreamConfig(new MapConfig()).getSerdeStreams(SYSTEM));
// not key/msg serde property for "streams."
StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
assertEquals(Collections.emptySet(), streamConfig.getSerdeStreams(SYSTEM));
// not matching system for "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.KEY_SERDE, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), "otherSystem")));
assertEquals(Collections.emptySet(), streamConfig.getSerdeStreams(SYSTEM));
// not key/msg serde property for "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
assertEquals(Collections.emptySet(), streamConfig.getSerdeStreams(SYSTEM));
// not matching system for "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, "otherSystem", STREAM_ID) + StreamConfig.KEY_SERDE,
UNUSED_VALUE)));
assertEquals(Collections.emptySet(), streamConfig.getSerdeStreams(SYSTEM));
// not matching system for "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.KEY_SERDE, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), "otherSystem")));
assertEquals(Collections.emptySet(), streamConfig.getSerdeStreams(SYSTEM));
String serdeValue = "my.serde.class";
// key serde for "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.KEY_SERDE, serdeValue)));
assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)), streamConfig.getSerdeStreams(SYSTEM));
// msg serde for "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.MSG_SERDE, serdeValue)));
assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)), streamConfig.getSerdeStreams(SYSTEM));
// serde for "streams." with physical stream name mapping
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM,
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.KEY_SERDE, serdeValue)));
assertEquals(Collections.singleton(new SystemStream(SYSTEM, PHYSICAL_STREAM)), streamConfig.getSerdeStreams(SYSTEM));
// key serde for "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + StreamConfig.KEY_SERDE,
serdeValue)));
assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)), streamConfig.getSerdeStreams(SYSTEM));
// msg serde for "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + StreamConfig.MSG_SERDE,
serdeValue)));
assertEquals(Collections.singleton(new SystemStream(SYSTEM, STREAM_ID)), streamConfig.getSerdeStreams(SYSTEM));
// merge several different ways of providing serdes
String streamIdWithPhysicalName = "streamIdWithPhysicalName";
streamConfig = new StreamConfig(new MapConfig(new ImmutableMap.Builder<String, String>()
// need to map the stream ids to the system
.put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)
// key and msg serde for "streams."
.put(String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.KEY_SERDE, serdeValue)
.put(String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + StreamConfig.MSG_SERDE, serdeValue)
// key serde for "streams." with physical stream name mapping
.put(String.format(StreamConfig.STREAM_ID_PREFIX, streamIdWithPhysicalName) + StreamConfig.KEY_SERDE,
serdeValue)
.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, streamIdWithPhysicalName), PHYSICAL_STREAM)
// key serde for "systems.<system>.streams."
.put(String.format(StreamConfig.STREAM_PREFIX, SYSTEM, OTHER_STREAM_ID) + StreamConfig.KEY_SERDE,
serdeValue)
.build()));
assertEquals(Sets.newHashSet(new SystemStream(SYSTEM, STREAM_ID), new SystemStream(SYSTEM, PHYSICAL_STREAM),
new SystemStream(SYSTEM, OTHER_STREAM_ID)), streamConfig.getSerdeStreams(SYSTEM));
}
@Test
public void testGetStreamProperties() {
assertEquals(new MapConfig(), new StreamConfig(new MapConfig()).getStreamProperties(STREAM_ID));
String propertyName = "stream.property.name";
// BEGIN: tests in which properties cannot be found in the config
// not matching stream id for "streams."
StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, OTHER_STREAM_ID) + propertyName, UNUSED_VALUE)));
assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
// not matching stream id for "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, OTHER_STREAM_ID) + propertyName, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, OTHER_STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
// no system mapping when using "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, UNUSED_VALUE)));
assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
// ignore property with "samza" prefix for "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + SAMZA_IGNORED_PROPERTY, UNUSED_VALUE)));
assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
// ignore property with "samza" prefix for "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + SAMZA_IGNORED_PROPERTY, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(), streamConfig.getStreamProperties(STREAM_ID));
// should not map physical name back to stream id if physical name is passed as stream id
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM)));
assertEquals(new MapConfig(), streamConfig.getStreamProperties(PHYSICAL_STREAM));
// BEGIN: tests in which properties can be found in the config
String propertyValue = "value";
// "streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + propertyName, propertyValue)));
assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
streamConfig.getStreamProperties(STREAM_ID));
// "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, propertyValue,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
streamConfig.getStreamProperties(STREAM_ID));
// "systems.<system>.default.stream."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, propertyValue,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
streamConfig.getStreamProperties(STREAM_ID));
// use physical name mapping for "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, PHYSICAL_STREAM) + propertyName, propertyValue,
// should not use stream id since there is physical stream
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM)));
assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
streamConfig.getStreamProperties(STREAM_ID));
// "streams." should override "systems.<system>.streams."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + propertyName, propertyValue,
// should not use "systems.<system>.streams." since there is a "streams." config
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
streamConfig.getStreamProperties(STREAM_ID));
// "systems.<system>.streams." should override "systems.<system>.default.stream."
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, propertyValue,
// should not use "systems.<system>.default.stream." since there is a "systems.<system>.streams."
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, UNUSED_VALUE,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(ImmutableMap.of(propertyName, propertyValue)),
streamConfig.getStreamProperties(STREAM_ID));
// merge multiple ways of specifying configs
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
// "streams."
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + "from.stream.id.property", "fromStreamIdValue",
// second "streams." property
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + "from.stream.id.other.property",
"fromStreamIdOtherValue",
// "systems.<system>.streams."
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + "from.system.stream.property",
"fromSystemStreamValue",
// need to map the stream id to a system
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM)));
assertEquals(new MapConfig(ImmutableMap.of(
"from.stream.id.property", "fromStreamIdValue",
"from.stream.id.other.property", "fromStreamIdOtherValue",
"from.system.stream.property", "fromSystemStreamValue")),
streamConfig.getStreamProperties(STREAM_ID));
}
@Test
public void testGetSystem() {
assertNull(new StreamConfig(new MapConfig()).getSystem(STREAM_ID));
// system is specified directly
StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
JobConfig.JOB_DEFAULT_SYSTEM, "otherSystem",
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, OTHER_STREAM_ID), "otherSystem")));
assertEquals(SYSTEM, streamConfig.getSystem(STREAM_ID));
// fall back to job default system
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, OTHER_STREAM_ID), "otherSystem")));
assertEquals(SYSTEM, streamConfig.getSystem(STREAM_ID));
}
@Test
public void testGetPhysicalName() {
assertEquals(STREAM_ID, new StreamConfig(new MapConfig()).getPhysicalName(STREAM_ID));
// ignore mapping for other stream ids
StreamConfig streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, OTHER_STREAM_ID), PHYSICAL_STREAM)));
assertEquals(STREAM_ID, streamConfig.getPhysicalName(STREAM_ID));
streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM)));
assertEquals(PHYSICAL_STREAM, streamConfig.getPhysicalName(STREAM_ID));
}
@Test
public void testGetIsIntermediateStream() {
assertFalse(new StreamConfig(new MapConfig()).getIsIntermediateStream(STREAM_ID));
// ignore mapping for other stream ids
StreamConfig streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID, OTHER_STREAM_ID), "true")));
assertFalse(streamConfig.getIsIntermediateStream(STREAM_ID));
// do not use stream id property if physical name is passed as input
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID, STREAM_ID), "true",
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM)));
assertFalse(streamConfig.getIsIntermediateStream(PHYSICAL_STREAM));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID, STREAM_ID), "true")));
assertTrue(streamConfig.getIsIntermediateStream(STREAM_ID));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID, STREAM_ID), "false")));
assertFalse(streamConfig.getIsIntermediateStream(STREAM_ID));
}
@Test
public void testGetDeleteCommittedMessages() {
assertFalse(new StreamConfig(new MapConfig()).getDeleteCommittedMessages(STREAM_ID));
// ignore mapping for other stream ids
StreamConfig streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID, OTHER_STREAM_ID),
"true")));
assertFalse(streamConfig.getDeleteCommittedMessages(STREAM_ID));
// do not use stream id property if physical name is passed as input
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID, STREAM_ID), "true",
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM)));
assertFalse(streamConfig.getDeleteCommittedMessages(PHYSICAL_STREAM));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID, STREAM_ID), "true")));
assertTrue(streamConfig.getDeleteCommittedMessages(STREAM_ID));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID, STREAM_ID), "false")));
assertFalse(streamConfig.getDeleteCommittedMessages(STREAM_ID));
}
@Test
public void testGetIsBounded() {
assertFalse(new StreamConfig(new MapConfig()).getIsBounded(STREAM_ID));
// ignore mapping for other stream ids
StreamConfig streamConfig = new StreamConfig(new MapConfig(
ImmutableMap.of(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID, OTHER_STREAM_ID),
"true")));
assertFalse(streamConfig.getIsBounded(STREAM_ID));
// do not use stream id property if physical name is passed as input
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID, STREAM_ID), "true",
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM)));
assertFalse(streamConfig.getIsBounded(PHYSICAL_STREAM));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID, STREAM_ID), "true")));
assertTrue(streamConfig.getIsBounded(STREAM_ID));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID, STREAM_ID), "false")));
assertFalse(streamConfig.getIsBounded(STREAM_ID));
}
@Test
public void testGetStreamIds() {
assertEquals(ImmutableList.of(), ImmutableList.copyOf(
new StreamConfig(new MapConfig()).getStreamIds()));
StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + ".property", "value")));
assertEquals(ImmutableSet.of(STREAM_ID), ImmutableSet.copyOf(streamConfig.getStreamIds()));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + ".property.subProperty", "value")));
assertEquals(ImmutableSet.of(STREAM_ID), ImmutableSet.copyOf(streamConfig.getStreamIds()));
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + ".property0", "value",
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + ".property1", "value",
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + ".property.subProperty0", "value",
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + ".property.subProperty1", "value",
String.format(StreamConfig.STREAM_ID_PREFIX, OTHER_STREAM_ID) + ".property", "value")));
assertEquals(ImmutableSet.of(STREAM_ID, OTHER_STREAM_ID), ImmutableSet.copyOf(streamConfig.getStreamIds()));
}
private static void doTestSamzaProperty(String propertyName, String propertyValue, SamzaPropertyAssertion assertion) {
doTestSamzaPropertyAccess(propertyName, propertyValue, assertion);
doTestSamzaPropertyAccessWithPhysicalStream(propertyName, propertyValue, assertion);
doTestSamzaPropertyPriority(propertyName, propertyValue, assertion);
doTestSamzaPropertyMultipleStreams(propertyName, propertyValue, assertion);
}
/**
* Tests for Samza property access using different lookup methods, when using stream id in system stream.
* This includes tests in which there is no specified physical stream, so the stream id is used as the physical
* stream.
*/
private static void doTestSamzaPropertyAccess(String propertyName, String value, SamzaPropertyAssertion assertion) {
// streams.<streamId>.<property>
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + propertyName, value,
// all streams need to have a system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM))),
SYSTEM_STREAM);
// systems.<system>.streams.<stream>.<property> where stream id has no specified physical stream
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, value,
// specify the system for the stream id
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM))),
SYSTEM_STREAM);
// systems.<system>.streams.<stream>.<property> where stream is the streamId, system is from job default system
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, value,
// use job default system to get the system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM))),
SYSTEM_STREAM);
// systems.<system>.default.stream.<property> where stream id has no specified physical stream
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, value,
// specify the system for the stream id
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM))),
SYSTEM_STREAM);
// systems.<system>.streams.<stream>.<property> where no system mapping (fall back to SystemStream)
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, value,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM);
// systems.<system>.default.stream.<property> where no system mapping (fall back to SystemStream)
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, value,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM);
// systems.<system>.streams.<stream>.<property> where stream id has a physical name but property is from stream id
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, value,
// use job default system to get the system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM);
// systems.<system>.default.stream.<property> where stream id has a physical name but property is from stream id
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, value,
// use job default system to get the system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM);
}
/**
* Tests for Samza property access using different lookup methods, when using physical stream in system stream.
*/
private static void doTestSamzaPropertyAccessWithPhysicalStream(String propertyName, String value,
SamzaPropertyAssertion assertion) {
// streams.<streamId>.<property>
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + propertyName, value,
// all streams need to have a system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM_PHYSICAL);
// systems.<system>.streams.<stream>.<property> with a specific system for the stream id
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, PHYSICAL_STREAM) + propertyName, value,
// specify the system for the stream id
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM_PHYSICAL);
// systems.<system>.streams.<stream>.<property> with system coming from job default system
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, PHYSICAL_STREAM) + propertyName, value,
// use job default system to get the system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM_PHYSICAL);
// systems.<system>.default.stream.<property>
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, value,
// specify the system for the stream id
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM_PHYSICAL);
}
/**
* Tests that certain properties have priority over others. For conciseness, this will not explicitly test priorities
* for all different ways of specifying a property. This will compare two options at a time, and then we can infer
* priorities transitively.
*/
private static void doTestSamzaPropertyPriority(String propertyName, String value, SamzaPropertyAssertion assertion) {
// streams.<streamId>.<property> vs. systems.<system>.streams.<stream>.<property>
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + propertyName, value,
// all streams need to have a system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// this config should not be used
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, UNUSED_VALUE))),
SYSTEM_STREAM);
// systems.<system>.streams.<stream>.<property> vs. systems.<system>.default.stream.<property>
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, value,
// specify the system for the stream id
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
// this config should not be used
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, UNUSED_VALUE))),
SYSTEM_STREAM);
/*
* The next logical case to check would be systems.<system>.default.stream.<property> with a system mapping in the
* config vs. systems.<system>.streams.<stream>.<property> with a system mapping, but that is not possible, so move
* on to the next case.
*/
/*
* systems.<system>.streams.<stream>.<property> without a system mapping in the config vs.
* systems.<system>.default.stream.<property> without a system mapping in the config
*/
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, value,
// this config should not be used
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, UNUSED_VALUE))),
SYSTEM_STREAM);
}
/**
* Tests for Samza property access in which multiple streams are configured.
* Only testing cases in which streams may share some properties.
*/
private static void doTestSamzaPropertyMultipleStreams(String propertyName, String value, SamzaPropertyAssertion assertion) {
// systems.<system>.default.stream.<property> where stream id has no specified physical stream
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, SYSTEM) + propertyName, value,
// specify the systems for the streams
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, STREAM_ID), SYSTEM,
String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, OTHER_STREAM_ID), SYSTEM))),
SYSTEM_STREAM);
}
private static void doTestSamzaPropertyInvalidConfig(SamzaPropertyLookup lookup) {
// configure physical stream and have mapping from stream id to physical stream
StreamConfig streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM,
String.format(StreamConfig.STREAM_ID_PREFIX, PHYSICAL_STREAM) + ".property", "value",
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)));
try {
lookup.doLookup(streamConfig, SYSTEM_STREAM_PHYSICAL);
fail("Expected an exception due to having too many mappings to a physical stream");
} catch (IllegalStateException e) {
// expected to reach here
}
// two separate stream ids map to same physical stream
streamConfig = new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM,
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, OTHER_STREAM_ID), PHYSICAL_STREAM,
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM)));
try {
lookup.doLookup(streamConfig, SYSTEM_STREAM_PHYSICAL);
fail("Expected an exception due to having too many mappings to a physical stream");
} catch (IllegalStateException e) {
// expected to reach here
}
}
private static void doTestSamzaPropertyDoesNotExist(String propertyName, SamzaPropertyAssertion assertion) {
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
// just put in some value which will be ignored
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + SAMZA_IGNORED_PROPERTY, UNUSED_VALUE))),
SYSTEM_STREAM);
/*
* Won't use streams.<streamId>.<property> if streamId is mapped to a physical stream
*/
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_ID_PREFIX, STREAM_ID) + propertyName, UNUSED_VALUE,
// all streams need to have a system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM);
/*
* Won't use systems.<system>.streams.<stream>.<property> if stream is mapped to a physical stream and the property
* is only specified using the physical stream
*/
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, PHYSICAL_STREAM) + propertyName, UNUSED_VALUE,
// all streams need to have a system
JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM);
/*
* Won't use systems.<system>.streams.<stream>.<property> if stream is mapped to a physical stream and there is no
* system mapping for the stream id
*/
assertion.doAssertion(new StreamConfig(new MapConfig(ImmutableMap.of(
String.format(StreamConfig.STREAM_PREFIX, SYSTEM, STREAM_ID) + propertyName, UNUSED_VALUE,
// map the stream id to the physical stream
String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID, STREAM_ID), PHYSICAL_STREAM))),
SYSTEM_STREAM_PHYSICAL);
}
/**
* Used to execute assertions for tests which look up specific Samza properties.
*/
@FunctionalInterface
private interface SamzaPropertyAssertion {
void doAssertion(StreamConfig streamConfig, SystemStream systemStream);
}
/**
* Used to specify how to look up specific Samza properties.
*/
@FunctionalInterface
private interface SamzaPropertyLookup {
void doLookup(StreamConfig streamConfig, SystemStream systemStream);
}
}