blob: 8113b41ca93edf8713fa30a841db56bd7bef4193 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.HashSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
public class StreamsMetadataStateTest {
private StreamsMetadataState metadataState;
private HostInfo hostOne;
private HostInfo hostTwo;
private HostInfo hostThree;
private TopicPartition topic1P0;
private TopicPartition topic2P0;
private TopicPartition topic3P0;
private Map<HostInfo, Set<TopicPartition>> hostToActivePartitions;
private Map<HostInfo, Set<TopicPartition>> hostToStandbyPartitions;
private StreamsBuilder builder;
private TopicPartition topic1P1;
private TopicPartition topic2P1;
private TopicPartition topic4P0;
private Map<TopicPartition, PartitionInfo> partitionInfos;
private final String globalTable = "global-table";
private final LogContext logContext = new LogContext(String.format("test [%s] ", "StreamsMetadataStateTest"));
private StreamPartitioner<String, Object> partitioner;
private Set<String> storeNames;
@Before
public void before() {
builder = new StreamsBuilder();
final KStream<Object, Object> one = builder.stream("topic-one");
one.groupByKey().count(Materialized.as("table-one"));
final KStream<Object, Object> two = builder.stream("topic-two");
two.groupByKey().count(Materialized.as("table-two"));
builder.stream("topic-three")
.groupByKey()
.count(Materialized.as("table-three"));
one.merge(two).groupByKey().count(Materialized.as("merged-table"));
builder.stream("topic-four").mapValues(value -> value);
builder.globalTable("global-topic",
Consumed.with(null, null),
Materialized.as(globalTable));
TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId");
topic1P0 = new TopicPartition("topic-one", 0);
topic1P1 = new TopicPartition("topic-one", 1);
topic2P0 = new TopicPartition("topic-two", 0);
topic2P1 = new TopicPartition("topic-two", 1);
topic3P0 = new TopicPartition("topic-three", 0);
topic4P0 = new TopicPartition("topic-four", 0);
hostOne = new HostInfo("host-one", 8080);
hostTwo = new HostInfo("host-two", 9090);
hostThree = new HostInfo("host-three", 7070);
hostToActivePartitions = new HashMap<>();
hostToActivePartitions.put(hostOne, mkSet(topic1P0, topic2P1, topic4P0));
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, topic1P1));
hostToActivePartitions.put(hostThree, Collections.singleton(topic3P0));
hostToStandbyPartitions = new HashMap<>();
hostToStandbyPartitions.put(hostThree, mkSet(topic1P0, topic2P1, topic4P0));
hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1));
hostToStandbyPartitions.put(hostTwo, Collections.singleton(topic3P0));
partitionInfos = new HashMap<>();
partitionInfos.put(new TopicPartition("topic-one", 0), new PartitionInfo("topic-one", 0, null, null, null));
partitionInfos.put(new TopicPartition("topic-one", 1), new PartitionInfo("topic-one", 1, null, null, null));
partitionInfos.put(new TopicPartition("topic-two", 0), new PartitionInfo("topic-two", 0, null, null, null));
partitionInfos.put(new TopicPartition("topic-two", 1), new PartitionInfo("topic-two", 1, null, null, null));
partitionInfos.put(new TopicPartition("topic-three", 0), new PartitionInfo("topic-three", 0, null, null, null));
partitionInfos.put(new TopicPartition("topic-four", 0), new PartitionInfo("topic-four", 0, null, null, null));
final TopologyMetadata topologyMetadata = new TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()), new DummyStreamsConfig());
topologyMetadata.buildAndRewriteTopology();
metadataState = new StreamsMetadataState(topologyMetadata, hostOne, logContext);
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos);
partitioner = (topic, key, value, numPartitions) -> 1;
storeNames = mkSet("table-one", "table-two", "merged-table", globalTable);
}
static class MultiValuedPartitioner implements StreamPartitioner<String, Object> {
@Override
@Deprecated
public Integer partition(final String topic, final String key, final Object value, final int numPartitions) {
return null;
}
@Override
public Optional<Set<Integer>> partitions(final String topic, final String key, final Object value, final int numPartitions) {
final Set<Integer> partitions = new HashSet<>();
partitions.add(0);
partitions.add(1);
return Optional.of(partitions);
}
}
@Test
public void shouldNotThrowExceptionWhenOnChangeNotCalled() {
final Collection<StreamsMetadata> metadata = new StreamsMetadataState(
new TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()), new DummyStreamsConfig()),
hostOne,
logContext
).getAllMetadataForStore("store");
assertEquals(0, metadata.size());
}
@Test
public void shouldGetAllStreamInstances() {
final StreamsMetadata one = new StreamsMetadataImpl(hostOne,
mkSet(globalTable, "table-one", "table-two", "merged-table"),
mkSet(topic1P0, topic2P1, topic4P0),
mkSet("table-one", "table-two", "merged-table"),
mkSet(topic2P0, topic1P1));
final StreamsMetadata two = new StreamsMetadataImpl(hostTwo,
mkSet(globalTable, "table-two", "table-one", "merged-table"),
mkSet(topic2P0, topic1P1),
mkSet("table-three"),
mkSet(topic3P0));
final StreamsMetadata three = new StreamsMetadataImpl(hostThree,
mkSet(globalTable, "table-three"),
Collections.singleton(topic3P0),
mkSet("table-one", "table-two", "merged-table"),
mkSet(topic1P0, topic2P1, topic4P0));
final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
assertEquals(3, actual.size());
assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
assertTrue("expected " + actual + " to contain " + three, actual.contains(three));
}
@Test
public void shouldGetAllStreamsInstancesWithNoStores() {
builder.stream("topic-five").filter((key, value) -> true).to("some-other-topic");
final TopicPartition tp5 = new TopicPartition("topic-five", 1);
final HostInfo hostFour = new HostInfo("host-four", 8080);
hostToActivePartitions.put(hostFour, mkSet(tp5));
metadataState.onChange(hostToActivePartitions, Collections.emptyMap(),
Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null)));
final StreamsMetadata expected = new StreamsMetadataImpl(hostFour, Collections.singleton(globalTable),
Collections.singleton(tp5), Collections.emptySet(), Collections.emptySet());
final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
}
@Test
public void shouldGetInstancesForStoreName() {
final StreamsMetadata one = new StreamsMetadataImpl(hostOne,
mkSet(globalTable, "table-one", "table-two", "merged-table"),
mkSet(topic1P0, topic2P1, topic4P0),
mkSet("table-one", "table-two", "merged-table"),
mkSet(topic2P0, topic1P1));
final StreamsMetadata two = new StreamsMetadataImpl(hostTwo,
mkSet(globalTable, "table-two", "table-one", "merged-table"),
mkSet(topic2P0, topic1P1),
mkSet("table-three"),
mkSet(topic3P0));
final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("table-one");
final Map<HostInfo, StreamsMetadata> actualAsMap = actual.stream()
.collect(Collectors.toMap(StreamsMetadata::hostInfo, Function.identity()));
assertEquals(3, actual.size());
assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
assertTrue("expected " + hostThree + " to contain as standby",
actualAsMap.get(hostThree).standbyStateStoreNames().contains("table-one"));
}
@Test
public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() {
assertThrows(NullPointerException.class, () -> metadataState.getAllMetadataForStore(null));
}
@Test
public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() {
final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("not-a-store");
assertTrue(actual.isEmpty());
}
@Test
public void shouldGetInstanceWithKey() {
final TopicPartition tp4 = new TopicPartition("topic-three", 1);
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), 0);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
"the-key",
Serdes.String().serializer());
assertEquals(expected, actual);
}
@Test
public void shouldGetInstanceWithKeyAndCustomPartitioner() {
final TopicPartition tp4 = new TopicPartition("topic-three", 1);
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), 1);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
"the-key",
partitioner);
assertEquals(expected, actual);
assertEquals(1, actual.partition());
}
@Test
public void shouldFailWhenIqQueriedWithCustomPartitionerReturningMultiplePartitions() {
final TopicPartition tp4 = new TopicPartition("topic-three", 1);
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, tp4));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)));
assertThrows(IllegalArgumentException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three",
"the-key",
new MultiValuedPartitioner()));
}
@Test
public void shouldReturnNotAvailableWhenClusterIsEmpty() {
metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
final KeyQueryMetadata result = metadataState.getKeyQueryMetadataForKey("table-one", "a", Serdes.String().serializer());
assertEquals(KeyQueryMetadata.NOT_AVAILABLE, result);
}
@Test
public void shouldGetInstanceWithKeyWithMergedStreams() {
final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
hostToActivePartitions.put(hostTwo, mkSet(topic2P0, topic1P1, topic2P2));
hostToStandbyPartitions.put(hostOne, mkSet(topic2P0, topic1P1, topic2P2));
metadataState.onChange(hostToActivePartitions, hostToStandbyPartitions,
Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("merged-table", "the-key",
(topic, key, value, numPartitions) -> 2);
assertEquals(expected, actual);
}
@Test
public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() {
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("not-a-store",
"key",
Serdes.String().serializer());
assertNull(actual);
}
@Test
public void shouldThrowWhenKeyIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three", null, Serdes.String().serializer()));
}
@Test
public void shouldThrowWhenSerializerIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three", "key", (Serializer<Object>) null));
}
@Test
public void shouldThrowIfStoreNameIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey(null, "key", Serdes.String().serializer()));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowIfStreamPartitionerIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey(null, "key", (StreamPartitioner) null));
}
@Test
public void shouldHaveGlobalStoreInAllMetadata() {
final Collection<StreamsMetadata> metadata = metadataState.getAllMetadataForStore(globalTable);
assertEquals(3, metadata.size());
for (final StreamsMetadata streamsMetadata : metadata) {
assertTrue(streamsMetadata.stateStoreNames().contains(globalTable));
}
}
@Test
public void shouldGetLocalMetadataWithRightActiveStandbyInfo() {
assertEquals(hostOne, metadataState.getLocalMetadata().hostInfo());
assertEquals(hostToActivePartitions.get(hostOne), metadataState.getLocalMetadata().topicPartitions());
assertEquals(hostToStandbyPartitions.get(hostOne), metadataState.getLocalMetadata().standbyTopicPartitions());
assertEquals(storeNames, metadataState.getLocalMetadata().stateStoreNames());
assertEquals(storeNames.stream().filter(s -> !s.equals(globalTable)).collect(Collectors.toSet()),
metadataState.getLocalMetadata().standbyStateStoreNames());
}
@Test
public void shouldGetQueryMetadataForGlobalStoreWithKey() {
final KeyQueryMetadata metadata = metadataState.getKeyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer());
assertEquals(hostOne, metadata.activeHost());
assertTrue(metadata.standbyHosts().isEmpty());
}
@Test
public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()), new DummyStreamsConfig()),
StreamsMetadataState.UNKNOWN_HOST,
logContext
);
streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer()));
}
@Test
public void shouldGetQueryMetadataForGlobalStoreWithKeyAndPartitioner() {
final KeyQueryMetadata metadata = metadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner);
assertEquals(hostOne, metadata.activeHost());
assertTrue(metadata.standbyHosts().isEmpty());
}
@Test
public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()), new DummyStreamsConfig()),
StreamsMetadataState.UNKNOWN_HOST,
logContext
);
streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner));
}
@Test
public void shouldReturnAllMetadataThatRemainsValidAfterChange() {
final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
assertFalse("invalid test", allMetadata.isEmpty());
metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), partitionInfos);
assertEquals("encapsulation broken", allMetadata, copy);
}
@Test
public void shouldNotReturnMutableReferenceToInternalAllMetadataCollection() {
final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
assertFalse("invalid test", allMetadata.isEmpty());
try {
// Either this should not affect internal state of 'metadataState'
allMetadata.clear();
} catch (final UnsupportedOperationException e) {
// Or should fail.
}
assertFalse("encapsulation broken", metadataState.getAllMetadata().isEmpty());
}
}