blob: 0b013b69a71510475459c61b165e975a4f528d92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class KafkaDataSourceMetadataTest
{
private static final KafkaDataSourceMetadata START0 = startMetadata("foo", ImmutableMap.of());
private static final KafkaDataSourceMetadata START1 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START2 = startMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
private static final KafkaDataSourceMetadata START3 = startMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata START4 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
private static final KafkaDataSourceMetadata START5 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START6 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata START7 = startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata START8 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata START9 = startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END0 = endMetadata("foo", ImmutableMap.of());
private static final KafkaDataSourceMetadata END1 = endMetadata("foo", ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END2 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata END3 = endMetadata("foo", ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata END4 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of());
private static final KafkaDataSourceMetadata END5 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END6 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata END7 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END8 = endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 4L));
private static final KafkaDataSourceMetadata END9 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L));
private static final KafkaDataSourceMetadata END10 = endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L));
@Test
public void testMatches()
{
Assert.assertTrue(START0.matches(START0));
Assert.assertTrue(START0.matches(START1));
Assert.assertTrue(START0.matches(START2));
Assert.assertTrue(START0.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior
Assert.assertFalse(START0.matches(START4));
Assert.assertTrue(START0.matches(START5));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START0.matches(START6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START0.matches(START7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START0.matches(START8));
// when merging, we lose the sequence numbers for topics foo2, and foo22 here when merging, so false
Assert.assertFalse(START0.matches(START9));
Assert.assertTrue(START1.matches(START0));
Assert.assertTrue(START1.matches(START1));
// sequence numbers for topic foo partition 1 are inconsistent, so false
Assert.assertFalse(START1.matches(START2));
Assert.assertTrue(START1.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertTrue(START1.matches(START5));
// when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START1.matches(START6));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START1.matches(START7));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START1.matches(START8));
// when merging, we lose the sequence numbers for topics foo2 and foo22, so false
Assert.assertFalse(START1.matches(START9));
Assert.assertTrue(START2.matches(START0));
Assert.assertFalse(START2.matches(START1));
Assert.assertTrue(START2.matches(START2));
Assert.assertTrue(START2.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START2.matches(START4));
// when merging, sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START2.matches(START5));
// when merging, we lose the sequence numbers for topic foo2, and sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START2.matches(START6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(START2.matches(START7));
Assert.assertTrue(START3.matches(START0));
Assert.assertTrue(START3.matches(START1));
Assert.assertTrue(START3.matches(START2));
Assert.assertTrue(START3.matches(START3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START3.matches(START4));
Assert.assertTrue(START3.matches(START5));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START3.matches(START6));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START3.matches(START7));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(START3.matches(START8));
// when merging, we lose the sequence numbers for topics foo2 and foo22, so false
Assert.assertFalse(START3.matches(START9));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START0));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START1));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START2));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START3));
Assert.assertTrue(START4.matches(START4));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START5));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START6));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START7));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START8));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(START4.matches(START9));
Assert.assertTrue(START5.matches(START0));
Assert.assertTrue(START5.matches(START1));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START5.matches(START2));
Assert.assertTrue(START5.matches(START3));
Assert.assertTrue(START5.matches(START4));
Assert.assertTrue(START5.matches(START5));
Assert.assertTrue(START5.matches(START6));
Assert.assertTrue(START5.matches(START7));
Assert.assertTrue(START5.matches(START8));
Assert.assertTrue(START5.matches(START9));
Assert.assertTrue(START6.matches(START0));
Assert.assertTrue(START6.matches(START1));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(START6.matches(START2));
Assert.assertTrue(START6.matches(START3));
Assert.assertTrue(START6.matches(START4));
Assert.assertTrue(START6.matches(START5));
Assert.assertTrue(START6.matches(START6));
Assert.assertTrue(START6.matches(START7));
Assert.assertTrue(START6.matches(START8));
Assert.assertTrue(START6.matches(START9));
Assert.assertTrue(START7.matches(START0));
Assert.assertTrue(START7.matches(START1));
Assert.assertTrue(START7.matches(START2));
Assert.assertTrue(START7.matches(START3));
Assert.assertTrue(START7.matches(START4));
Assert.assertTrue(START7.matches(START5));
Assert.assertTrue(START7.matches(START6));
Assert.assertTrue(START7.matches(START7));
Assert.assertTrue(START7.matches(START8));
Assert.assertTrue(START7.matches(START9));
Assert.assertTrue(START8.matches(START0));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START1));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START2));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START3));
Assert.assertTrue(START8.matches(START4));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START5));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START6));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START8.matches(START7));
Assert.assertTrue(START8.matches(START8));
Assert.assertTrue(START8.matches(START9));
Assert.assertTrue(START9.matches(START0));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START1));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START2));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START3));
Assert.assertTrue(START9.matches(START4));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START5));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START6));
// when merging, we lose the sequence numbers for topic foo, so false
Assert.assertFalse(START9.matches(START7));
Assert.assertTrue(START9.matches(START8));
Assert.assertTrue(START9.matches(START9));
Assert.assertTrue(END0.matches(END0));
Assert.assertTrue(END0.matches(END1));
Assert.assertTrue(END0.matches(END2));
Assert.assertTrue(END0.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END0.matches(END4));
Assert.assertTrue(END0.matches(END5));
Assert.assertTrue(END0.matches(END6));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END7));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END8));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END9));
// when merging, we lose the sequence numbers for topic foo2, so false
Assert.assertFalse(END0.matches(END10));
Assert.assertTrue(END1.matches(END0));
Assert.assertTrue(END1.matches(END1));
Assert.assertTrue(END1.matches(END2));
Assert.assertTrue(END1.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END1.matches(END4));
Assert.assertTrue(END1.matches(END5));
Assert.assertTrue(END1.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END8));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END9));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END1.matches(END10));
Assert.assertTrue(END2.matches(END0));
Assert.assertTrue(END2.matches(END1));
Assert.assertTrue(END2.matches(END2));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END2.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END2.matches(END4));
Assert.assertTrue(END2.matches(END5));
Assert.assertTrue(END2.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END8));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END9));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END2.matches(END10));
Assert.assertTrue(END3.matches(END0));
Assert.assertTrue(END3.matches(END1));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END3.matches(END2));
Assert.assertTrue(END3.matches(END3));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END3.matches(END4));
Assert.assertTrue(END3.matches(END5));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END3.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END8));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END9));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END3.matches(END10));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END0));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END1));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END2));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END3));
Assert.assertTrue(END4.matches(END4));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END5));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END6));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END7));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END8));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END9));
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertFalse(END4.matches(END10));
Assert.assertTrue(END5.matches(END0));
Assert.assertTrue(END5.matches(END1));
Assert.assertTrue(END5.matches(END2));
Assert.assertTrue(END5.matches(END3));
Assert.assertTrue(END5.matches(END4));
Assert.assertTrue(END5.matches(END5));
Assert.assertTrue(END5.matches(END6));
Assert.assertTrue(END5.matches(END7));
Assert.assertTrue(END5.matches(END8));
Assert.assertTrue(END5.matches(END9));
Assert.assertTrue(END5.matches(END10));
Assert.assertTrue(END6.matches(END0));
Assert.assertTrue(END6.matches(END1));
Assert.assertTrue(END6.matches(END2));
// when merging, the sequence numbers for topic foo-1 are inconsistent, so false
Assert.assertFalse(END6.matches(END3));
Assert.assertTrue(END6.matches(END4));
Assert.assertTrue(END6.matches(END5));
Assert.assertTrue(END6.matches(END6));
Assert.assertTrue(END6.matches(END7));
Assert.assertTrue(END6.matches(END8));
Assert.assertTrue(END6.matches(END9));
Assert.assertTrue(END6.matches(END10));
Assert.assertTrue(END7.matches(END0));
Assert.assertTrue(END7.matches(END1));
Assert.assertTrue(END7.matches(END2));
Assert.assertTrue(END7.matches(END3));
Assert.assertTrue(END7.matches(END4));
Assert.assertTrue(END7.matches(END5));
Assert.assertTrue(END7.matches(END6));
Assert.assertTrue(END7.matches(END7));
Assert.assertTrue(END7.matches(END8));
Assert.assertTrue(END7.matches(END9));
Assert.assertTrue(END7.matches(END10));
Assert.assertTrue(END8.matches(END0));
Assert.assertTrue(END8.matches(END1));
Assert.assertTrue(END8.matches(END2));
// when merging, the sequence numbers for topic foo-1, and foo-2 are inconsistent, so false
Assert.assertFalse(END8.matches(END3));
Assert.assertTrue(END8.matches(END4));
Assert.assertTrue(END8.matches(END5));
Assert.assertTrue(END8.matches(END6));
Assert.assertTrue(END8.matches(END7));
Assert.assertTrue(END8.matches(END8));
Assert.assertTrue(END8.matches(END9));
Assert.assertTrue(END8.matches(END10));
Assert.assertTrue(END9.matches(END0));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END1));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END2));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END3));
Assert.assertTrue(END9.matches(END4));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END5));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END9.matches(END8));
Assert.assertTrue(END9.matches(END9));
Assert.assertTrue(END9.matches(END10));
Assert.assertTrue(END10.matches(END0));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END1));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END2));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END3));
Assert.assertTrue(END10.matches(END4));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END5));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END6));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END7));
// when merging, we lose the sequence numbers for topic foo2 here when merging, so false
Assert.assertFalse(END10.matches(END8));
Assert.assertTrue(END10.matches(END9));
Assert.assertTrue(END10.matches(END10));
}
@Test
public void testIsValidStart()
{
Assert.assertTrue(START0.isValidStart());
Assert.assertTrue(START1.isValidStart());
Assert.assertTrue(START2.isValidStart());
Assert.assertTrue(START3.isValidStart());
Assert.assertTrue(START4.isValidStart());
Assert.assertTrue(START5.isValidStart());
Assert.assertTrue(START6.isValidStart());
Assert.assertTrue(START7.isValidStart());
Assert.assertTrue(START8.isValidStart());
Assert.assertTrue(START9.isValidStart());
}
@Test
public void testPlus()
{
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
START1.plus(START3)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
START0.plus(START2)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
START1.plus(START2)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
START2.plus(START1)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
START2.plus(START2)
);
// add comment on this
Assert.assertEquals(
START4,
START1.plus(START4)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
START1.plus(START5)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L)),
START1.plus(START6)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
START1.plus(START7)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
START2.plus(START6)
);
// add comment on this
Assert.assertEquals(
START0,
START4.plus(START0)
);
// add comment on this
Assert.assertEquals(
START1,
START4.plus(START1)
);
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertEquals(
START4,
START4.plus(START5)
);
Assert.assertEquals(
START5,
START5.plus(START4)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 1, 3L)),
START5.plus(START6)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
START7.plus(START8)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
START7.plus(START9)
);
Assert.assertEquals(
startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
START8.plus(START7)
);
Assert.assertEquals(
startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
START8.plus(START9)
);
Assert.assertEquals(
startMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
START9.plus(START8)
);
Assert.assertEquals(
endMetadata(ImmutableMap.of(0, 2L, 2, 5L)),
END0.plus(END1)
);
Assert.assertEquals(
endMetadata(ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
END1.plus(END2)
);
// add comment on this
Assert.assertEquals(
END4,
END0.plus(END4)
);
// add comment on this
Assert.assertEquals(
END4,
END1.plus(END4)
);
// add comment on this
Assert.assertEquals(
END0,
END4.plus(END0)
);
// add comment on this
Assert.assertEquals(
END1,
END4.plus(END1)
);
Assert.assertEquals(
END3,
END2.plus(END3)
);
Assert.assertEquals(
END2,
END3.plus(END2)
);
// when sequence map is empty, we don't know whether its multi-topic or not, so assume single topic to preserve existing behavior Assert.assertFalse(START1.matches(START4));
Assert.assertEquals(
END4,
END4.plus(END5)
);
Assert.assertEquals(
END5,
END5.plus(END4)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.plus(END9)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.plus(END10)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
END5.plus(END6)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.plus(END7)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.plus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END9.plus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END9.plus(END10)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2", "foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END10.plus(END9)
);
}
@Test
public void testMinus()
{
Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START0.minus(START2)
);
Assert.assertEquals(
START0,
START0.minus(START4)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START1.minus(START2)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(1, 3L)),
START1.minus(START3)
);
Assert.assertEquals(
START1,
START1.minus(START4)
);
Assert.assertEquals(
startMetadata("foo", ImmutableMap.of()),
START1.minus(START5)
);
Assert.assertEquals(
startMetadata("foo", ImmutableMap.of()),
START1.minus(START6)
);
Assert.assertEquals(
startMetadata("foo", ImmutableMap.of(1, 3L)),
START1.minus(START7)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of(2, 5L)),
START2.minus(START1)
);
Assert.assertEquals(
startMetadata(ImmutableMap.of()),
START2.minus(START2)
);
Assert.assertEquals(
START4,
START4.minus(START0)
);
Assert.assertEquals(
START4,
START4.minus(START1)
);
Assert.assertEquals(
startMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
START5.minus(START1)
);
Assert.assertEquals(
endMetadata(ImmutableMap.of(1, 4L)),
END2.minus(END1)
);
Assert.assertEquals(
endMetadata(ImmutableMap.of(2, 5L)),
END1.minus(END2)
);
Assert.assertEquals(
END0,
END0.minus(END4)
);
Assert.assertEquals(
END4,
END4.minus(END0)
);
Assert.assertEquals(
END1,
END1.minus(END4)
);
Assert.assertEquals(
END4,
END4.minus(END1)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of()),
END5.minus(END1)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
END5.minus(END4)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(2, 5L)),
END5.minus(END6)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
END6.minus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(1, 4L)),
END6.minus(END7)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo2"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.minus(END5)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo", "foo2"), ImmutableMap.of(2, 5L)),
END7.minus(END8)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.minus(END9)
);
Assert.assertEquals(
endMetadataMultiTopic("foo.*", ImmutableList.of("foo"), ImmutableMap.of(0, 2L, 2, 5L)),
END7.minus(END10)
);
Assert.assertEquals(
END9,
END9.minus(END6)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
END9.minus(END7)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of(2, 5L)),
END9.minus(END8)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
END9.minus(END9)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo2"), ImmutableMap.of()),
END9.minus(END10)
);
Assert.assertEquals(
endMetadataMultiTopic("foo2.*", ImmutableList.of("foo22"), ImmutableMap.of(0, 2L, 2, 5L)),
END10.minus(END9)
);
}
@Test
public void testKafkaDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KafkaDataSourceMetadata kdm1 = endMetadata(ImmutableMap.of());
String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(kdm1, dsMeta1);
KafkaDataSourceMetadata kdm2 = endMetadata(ImmutableMap.of(1, 3L));
String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(kdm2, dsMeta2);
}
@Test
public void testKafkaDataSourceMetadataSerde() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KafkaDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of(1, 3L));
String kdmStr1 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":3},\"partitionOffsetMap\":{\"1\":3},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(dsMeta1, expectedKdm1);
KafkaDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of(1, 3L, 2, 1900L));
String kdmStr2 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":3, \"2\":1900},\"partitionOffsetMap\":{\"1\":3, \"2\":1900},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(dsMeta2, expectedKdm2);
}
private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> offsets)
{
return startMetadata("foo", offsets);
}
private static KafkaDataSourceMetadata startMetadata(
String topic,
Map<Integer, Long> offsets
)
{
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
offsets,
k -> new KafkaTopicPartition(
false,
topic,
k
)
);
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, newOffsets, ImmutableSet.of()));
}
private static KafkaDataSourceMetadata startMetadataMultiTopic(
String topicPattern,
List<String> topics,
Map<Integer, Long> offsets
)
{
Assert.assertFalse(topics.isEmpty());
Pattern pattern = Pattern.compile(topicPattern);
Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
for (String topic : topics) {
newOffsets.put(
new KafkaTopicPartition(
true,
topic,
e.getKey()
),
e.getValue()
);
}
}
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topicPattern, newOffsets, ImmutableSet.of()));
}
private static KafkaDataSourceMetadata endMetadata(Map<Integer, Long> offsets)
{
return endMetadata("foo", offsets);
}
private static KafkaDataSourceMetadata endMetadata(String topic, Map<Integer, Long> offsets)
{
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
offsets,
k -> new KafkaTopicPartition(
false,
"foo",
k
)
);
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, newOffsets));
}
private static KafkaDataSourceMetadata endMetadataMultiTopic(
String topicPattern,
List<String> topics,
Map<Integer, Long> offsets
)
{
Assert.assertFalse(topics.isEmpty());
Pattern pattern = Pattern.compile(topicPattern);
Assert.assertTrue(topics.stream().allMatch(t -> pattern.matcher(t).matches()));
Map<KafkaTopicPartition, Long> newOffsets = new HashMap<>();
for (Map.Entry<Integer, Long> e : offsets.entrySet()) {
for (String topic : topics) {
newOffsets.put(
new KafkaTopicPartition(
true,
topic,
e.getKey()
),
e.getValue()
);
}
}
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topicPattern, newOffsets));
}
private static ObjectMapper createObjectMapper()
{
DruidModule module = new KafkaIndexTaskModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
).build();
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}