blob: cb8f34dc11f31a2d44d8d42291074a4775afb57e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kafka;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.samza.Partition;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.startpoint.StartpointOldest;
import org.apache.samza.startpoint.StartpointSpecific;
import org.apache.samza.startpoint.StartpointTimestamp;
import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.kafka.KafkaSystemAdmin.KafkaStartpointToOffsetResolver;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.samza.system.kafka.KafkaSystemAdmin.*;
import static org.junit.Assert.*;
public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
private static final String SYSTEM = "kafka";
private static final String TOPIC = "input";
private static final String TEST_SYSTEM = "test-system";
private static final String TEST_STREAM = "test-stream";
private static final Integer TEST_PARTITION_ID = 0;
private static final TopicPartition TEST_TOPIC_PARTITION = new TopicPartition(TEST_STREAM, TEST_PARTITION_ID);
private static final Partition TEST_PARTITION = new Partition(TEST_PARTITION_ID);
private static final SystemStreamPartition TEST_SYSTEM_STREAM_PARTITION = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, TEST_PARTITION);
private static final String TEST_OFFSET = "10";
@Test
public void testCreateStreamShouldCoordinatorStreamWithCorrectTopicProperties() throws Exception {
String coordinatorTopicName = String.format("topic-name-%s", RandomStringUtils.randomAlphabetic(5));
StreamSpec coordinatorStreamSpec = KafkaStreamSpec.createCoordinatorStreamSpec(coordinatorTopicName, SYSTEM());
boolean hasCreatedStream = systemAdmin().createStream(coordinatorStreamSpec);
assertTrue(hasCreatedStream);
Map<String, String> coordinatorTopicProperties = getTopicConfigFromKafkaBroker(coordinatorTopicName);
assertEquals("compact", coordinatorTopicProperties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("26214400", coordinatorTopicProperties.get(TopicConfig.SEGMENT_BYTES_CONFIG));
assertEquals("86400000", coordinatorTopicProperties.get(TopicConfig.DELETE_RETENTION_MS_CONFIG));
}
private static Map<String, String> getTopicConfigFromKafkaBroker(String topicName) throws Exception {
List<ConfigResource> configResourceList = ImmutableList.of(
new ConfigResource(ConfigResource.Type.TOPIC, topicName));
Map<ConfigResource, org.apache.kafka.clients.admin.Config> configResourceConfigMap =
adminClient().describeConfigs(configResourceList).all().get();
Map<String, String> kafkaTopicConfig = new HashMap<>();
configResourceConfigMap.values().forEach(configEntry -> {
configEntry.entries().forEach(config -> {
kafkaTopicConfig.put(config.name(), config.value());
});
});
return kafkaTopicConfig;
}
@Test
public void testGetOffsetsAfter() {
SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, TOPIC, new Partition(0));
SystemStreamPartition ssp2 = new SystemStreamPartition(SYSTEM, TOPIC, new Partition(1));
Map<SystemStreamPartition, String> offsets = new HashMap<>();
offsets.put(ssp1, "1");
offsets.put(ssp2, "2");
offsets = systemAdmin().getOffsetsAfter(offsets);
Assert.assertEquals("2", offsets.get(ssp1));
Assert.assertEquals("3", offsets.get(ssp2));
}
@Test
public void testToKafkaSpecForCheckpointStreamShouldReturnTheCorrectStreamSpecByPreservingTheConfig() {
String topicName = "testStream";
String streamId = "samza-internal-checkpoint-stream-id";
int partitionCount = 1;
Map<String, String> map = new HashMap<>();
map.put("cleanup.policy", "compact");
map.put("replication.factor", "3");
map.put("segment.bytes", "536870912");
map.put("delete.retention.ms", "86400000");
Config config = new MapConfig(map);
StreamSpec spec = new StreamSpec(streamId, topicName, SYSTEM, partitionCount, config);
KafkaSystemAdmin kafkaSystemAdmin = systemAdmin();
KafkaStreamSpec kafkaStreamSpec = kafkaSystemAdmin.toKafkaSpec(spec);
System.out.println(kafkaStreamSpec);
assertEquals(streamId, kafkaStreamSpec.getId());
assertEquals(topicName, kafkaStreamSpec.getPhysicalName());
assertEquals(partitionCount, kafkaStreamSpec.getPartitionCount());
assertEquals(3, kafkaStreamSpec.getReplicationFactor());
assertEquals("compact", kafkaStreamSpec.getConfig().get("cleanup.policy"));
assertEquals("536870912", kafkaStreamSpec.getConfig().get("segment.bytes"));
assertEquals("86400000", kafkaStreamSpec.getConfig().get("delete.retention.ms"));
}
@Test
public void testToKafkaSpec() {
String topicName = "testStream";
int defaultPartitionCount = 2;
int changeLogPartitionFactor = 5;
Map<String, String> map = new HashMap<>();
Config config = new MapConfig(map);
StreamSpec spec = new StreamSpec("id", topicName, SYSTEM, defaultPartitionCount, config);
KafkaSystemAdmin kafkaAdmin = systemAdmin();
KafkaStreamSpec kafkaSpec = kafkaAdmin.toKafkaSpec(spec);
Assert.assertEquals("id", kafkaSpec.getId());
Assert.assertEquals(topicName, kafkaSpec.getPhysicalName());
Assert.assertEquals(SYSTEM, kafkaSpec.getSystemName());
Assert.assertEquals(defaultPartitionCount, kafkaSpec.getPartitionCount());
// validate that conversion is using coordination metadata
map.put("job.coordinator.segment.bytes", "123");
map.put("job.coordinator.cleanup.policy", "superCompact");
int coordReplicatonFactor = 4;
map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR(),
String.valueOf(coordReplicatonFactor));
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
spec = StreamSpec.createCoordinatorStreamSpec(topicName, SYSTEM);
kafkaSpec = admin.toKafkaSpec(spec);
Assert.assertEquals(coordReplicatonFactor, kafkaSpec.getReplicationFactor());
Assert.assertEquals("123", kafkaSpec.getProperties().getProperty("segment.bytes"));
// cleanup policy is overridden in the KafkaAdmin
Assert.assertEquals("compact", kafkaSpec.getProperties().getProperty("cleanup.policy"));
// validate that conversion is using changeLog metadata
map = new HashMap<>();
map.put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM);
map.put(String.format("stores.%s.changelog", "fakeStore"), topicName);
int changeLogReplicationFactor = 3;
map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"),
String.valueOf(changeLogReplicationFactor));
admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
spec = StreamSpec.createChangeLogStreamSpec(topicName, SYSTEM, changeLogPartitionFactor);
kafkaSpec = admin.toKafkaSpec(spec);
Assert.assertEquals(changeLogReplicationFactor, kafkaSpec.getReplicationFactor());
// same, but with missing topic info
try {
admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
spec = StreamSpec.createChangeLogStreamSpec("anotherTopic", SYSTEM, changeLogPartitionFactor);
kafkaSpec = admin.toKafkaSpec(spec);
Assert.fail("toKafkaSpec should've failed for missing topic");
} catch (StreamValidationException e) {
// expected
}
// validate that conversion is using intermediate streams properties
String interStreamId = "isId";
Map<String, String> interStreamMap = new HashMap<>();
interStreamMap.put("app.mode", ApplicationConfig.ApplicationMode.BATCH.toString());
interStreamMap.put(String.format("streams.%s.samza.intermediate", interStreamId), "true");
interStreamMap.put(String.format("streams.%s.samza.system", interStreamId), "testSystem");
interStreamMap.put(String.format("streams.%s.p1", interStreamId), "v1");
interStreamMap.put(String.format("streams.%s.retention.ms", interStreamId), "123");
// legacy format
interStreamMap.put(String.format("systems.%s.streams.%s.p2", "testSystem", interStreamId), "v2");
admin = Mockito.spy(createSystemAdmin(SYSTEM, interStreamMap));
spec = new StreamSpec(interStreamId, topicName, SYSTEM, defaultPartitionCount, config);
kafkaSpec = admin.toKafkaSpec(spec);
Assert.assertEquals("v1", kafkaSpec.getProperties().getProperty("p1"));
Assert.assertEquals("v2", kafkaSpec.getProperties().getProperty("p2"));
Assert.assertEquals("123", kafkaSpec.getProperties().getProperty("retention.ms"));
Assert.assertEquals(defaultPartitionCount, kafkaSpec.getPartitionCount());
}
@Test
public void testCreateCoordinatorStream() {
SystemAdmin admin = Mockito.spy(systemAdmin());
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
admin.createStream(spec);
admin.validateStream(spec);
Mockito.verify(admin).createStream(Mockito.any());
}
@Test
public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
final String stream = "test.coordinator_test.Stream";
Map<String, String> map = new HashMap<>();
map.put("job.coordinator.segment.bytes", "123");
map.put("job.coordinator.cleanup.policy", "compact");
int coordReplicatonFactor = 2;
map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR(),
String.valueOf(coordReplicatonFactor));
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(stream, SYSTEM);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isCoordinatorStream());
assertEquals(SYSTEM, internalSpec.getSystemName());
assertEquals(stream, internalSpec.getPhysicalName());
assertEquals(1, internalSpec.getPartitionCount());
Assert.assertEquals(coordReplicatonFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
Assert.assertEquals("123", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
// cleanup policy is overridden in the KafkaAdmin
Assert.assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy"));
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
admin.createStream(spec);
admin.validateStream(spec);
}
@Test
public void testCreateChangelogStreamHelp() {
testCreateChangelogStreamHelp("testChangeLogStream");
}
@Test
public void testCreateChangelogStreamWithSpecialCharsInTopicName() {
// cannot contain period
testCreateChangelogStreamHelp("test-Change_Log-Stream");
}
public void testCreateChangelogStreamHelp(final String topic) {
final int partitions = 12;
final int repFactor = 2;
Map<String, String> map = new HashMap<>();
map.put(JobConfig.JOB_DEFAULT_SYSTEM, SYSTEM);
map.put(String.format("stores.%s.changelog", "fakeStore"), topic);
map.put(String.format("stores.%s.changelog.replication.factor", "fakeStore"), String.valueOf(repFactor));
map.put(String.format("stores.%s.changelog.kafka.segment.bytes", "fakeStore"), "139");
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(SYSTEM, map));
StreamSpec spec = StreamSpec.createChangeLogStreamSpec(topic, SYSTEM, partitions);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isChangeLogStream());
assertEquals(SYSTEM, internalSpec.getSystemName());
assertEquals(topic, internalSpec.getPhysicalName());
assertEquals(repFactor, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
assertEquals(partitions, internalSpec.getPartitionCount());
assertEquals("139", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("segment.bytes"));
assertEquals("compact", ((KafkaStreamSpec) internalSpec).getProperties().getProperty("cleanup.policy"));
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
admin.createStream(spec);
admin.validateStream(spec);
}
@Test
public void testCreateStream() {
StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
KafkaSystemAdmin admin = systemAdmin();
assertTrue("createStream should return true if the stream does not exist and then is created.",
admin.createStream(spec));
admin.validateStream(spec);
assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamDoesNotExist() {
StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8);
systemAdmin().validateStream(spec);
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamWrongPartitionCount() {
StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
assertTrue("createStream should return true if the stream does not exist and then is created.",
systemAdmin().createStream(spec1));
systemAdmin().validateStream(spec2);
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamWrongName() {
StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
assertTrue("createStream should return true if the stream does not exist and then is created.",
systemAdmin().createStream(spec1));
systemAdmin().validateStream(spec2);
}
@Test
public void testClearStream() {
StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
KafkaSystemAdmin admin = systemAdmin();
String topicName = spec.getPhysicalName();
assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
// validate topic exists
assertTrue(admin.clearStream(spec));
// validate that topic was removed
DescribeTopicsResult dtr = admin.adminClient.describeTopics(ImmutableSet.of(topicName));
try {
TopicDescription td = dtr.all().get().get(topicName);
Assert.fail("topic " + topicName + " should've been removed. td=" + td);
} catch (Exception e) {
if (!(e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException)) {
Assert.fail("topic " + topicName + " should've been removed. Expected UnknownTopicOrPartitionException.");
}
}
}
@Test
public void testShouldAssembleMetadata() {
Map<SystemStreamPartition, String> oldestOffsets = new ImmutableMap.Builder<SystemStreamPartition, String>()
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), "o1")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), "o2")
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)), "o3")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)), "o4")
.build();
Map<SystemStreamPartition, String> newestOffsets = new ImmutableMap.Builder<SystemStreamPartition, String>()
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), "n1")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), "n2")
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)), "n3")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)), "n4")
.build();
Map<SystemStreamPartition, String> upcomingOffsets = new ImmutableMap.Builder<SystemStreamPartition, String>()
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), "u1")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), "u2")
.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)), "u3")
.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)), "u4")
.build();
Map<String, SystemStreamMetadata> metadata = assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets);
assertNotNull(metadata);
assertEquals(2, metadata.size());
assertTrue(metadata.containsKey("stream1"));
assertTrue(metadata.containsKey("stream2"));
SystemStreamMetadata stream1Metadata = metadata.get("stream1");
SystemStreamMetadata stream2Metadata = metadata.get("stream2");
assertNotNull(stream1Metadata);
assertNotNull(stream2Metadata);
assertEquals("stream1", stream1Metadata.getStreamName());
assertEquals("stream2", stream2Metadata.getStreamName());
SystemStreamMetadata.SystemStreamPartitionMetadata expectedSystemStream1Partition0Metadata =
new SystemStreamMetadata.SystemStreamPartitionMetadata("o1", "n1", "u1");
SystemStreamMetadata.SystemStreamPartitionMetadata expectedSystemStream1Partition1Metadata =
new SystemStreamMetadata.SystemStreamPartitionMetadata("o3", "n3", "u3");
SystemStreamMetadata.SystemStreamPartitionMetadata expectedSystemStream2Partition0Metadata =
new SystemStreamMetadata.SystemStreamPartitionMetadata("o2", "n2", "u2");
SystemStreamMetadata.SystemStreamPartitionMetadata expectedSystemStream2Partition1Metadata =
new SystemStreamMetadata.SystemStreamPartitionMetadata("o4", "n4", "u4");
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> stream1PartitionMetadata =
stream1Metadata.getSystemStreamPartitionMetadata();
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> stream2PartitionMetadata =
stream2Metadata.getSystemStreamPartitionMetadata();
assertEquals(expectedSystemStream1Partition0Metadata, stream1PartitionMetadata.get(new Partition(0)));
assertEquals(expectedSystemStream1Partition1Metadata, stream1PartitionMetadata.get(new Partition(1)));
assertEquals(expectedSystemStream2Partition0Metadata, stream2PartitionMetadata.get(new Partition(0)));
assertEquals(expectedSystemStream2Partition1Metadata, stream2PartitionMetadata.get(new Partition(1)));
}
@Test
public void testStartpointSpecificOffsetVisitorShouldResolveToCorrectOffset() {
final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET);
// Invoke the consumer with startpoint.
String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
Assert.assertEquals(TEST_OFFSET, resolvedOffset);
}
@Test
public void testStartpointTimestampVisitorShouldResolveToCorrectOffset() {
// Define dummy variables for testing.
final Long testTimeStamp = 10L;
final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp);
final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = ImmutableMap.of(
TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
// Mock the consumer interactions.
Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, testTimeStamp))).thenReturn(offsetForTimesResult);
Mockito.when(consumer.position(TEST_TOPIC_PARTITION)).thenReturn(Long.valueOf(TEST_OFFSET));
String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
Assert.assertEquals(TEST_OFFSET, resolvedOffset);
}
@Test
public void testStartpointTimestampVisitorShouldResolveToCorrectOffsetWhenTimestampDoesNotExist() {
final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L);
final Map<TopicPartition, OffsetAndTimestamp> offsetForTimesResult = new HashMap<>();
offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
// Mock the consumer interactions.
Mockito.when(consumer.offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L))).thenReturn(offsetForTimesResult);
Mockito.when(consumer.endOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp);
Assert.assertEquals(TEST_OFFSET, resolvedOffset);
// Mock verifications.
Mockito.verify(consumer).offsetsForTimes(ImmutableMap.of(TEST_TOPIC_PARTITION, 0L));
}
@Test
public void testStartpointOldestVisitorShouldResolveToCorrectOffset() {
// Define dummy variables for testing.
final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
final StartpointOldest testStartpointSpecific = new StartpointOldest();
// Mock the consumer interactions.
Mockito.when(consumer.beginningOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
// Invoke the consumer with startpoint.
String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
Assert.assertEquals(TEST_OFFSET, resolvedOffset);
}
@Test
public void testStartpointUpcomingVisitorShouldResolveToCorrectOffset() {
// Define dummy variables for testing.
final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
final KafkaStartpointToOffsetResolver kafkaStartpointToOffsetResolver = new KafkaStartpointToOffsetResolver(consumer);
final StartpointUpcoming testStartpointSpecific = new StartpointUpcoming();
// Mock the consumer interactions.
Mockito.when(consumer.endOffsets(ImmutableSet.of(TEST_TOPIC_PARTITION))).thenReturn(ImmutableMap.of(TEST_TOPIC_PARTITION, 10L));
// Invoke the consumer with startpoint.
String resolvedOffset = kafkaStartpointToOffsetResolver.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific);
Assert.assertEquals(TEST_OFFSET, resolvedOffset);
}
}