blob: 77f47f9f9045b97a623122e8d6db4209806540d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kafka;
import java.util.*;
import java.util.HashMap;
import java.util.Map;
import kafka.api.TopicMetadata;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.util.ScalaJavaUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*;
public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
@Test
public void testCreateCoordinatorStream() {
SystemAdmin admin = Mockito.spy(systemAdmin());
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec("testCoordinatorStream", "testSystem");
admin.createStream(spec);
admin.validateStream(spec);
Mockito.verify(admin).createStream(Mockito.any());
}
@Test
public void testCreateCoordinatorStreamWithSpecialCharsInTopicName() {
final String STREAM = "test.coordinator_test.Stream";
Properties coordProps = new Properties();
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isCoordinatorStream());
assertEquals(SYSTEM(), internalSpec.getSystemName());
assertEquals(STREAM, internalSpec.getPhysicalName());
assertEquals(1, internalSpec.getPartitionCount());
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
admin.createStream(spec);
admin.validateStream(spec);
}
@Test
public void testCreateChangelogStream() {
final String STREAM = "testChangeLogStream";
final int PARTITIONS = 12;
final int REP_FACTOR = 1;
Properties coordProps = new Properties();
Properties changeLogProps = new Properties();
changeLogProps.setProperty("cleanup.policy", "compact");
changeLogProps.setProperty("segment.bytes", "139");
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isChangeLogStream());
assertEquals(SYSTEM(), internalSpec.getSystemName());
assertEquals(STREAM, internalSpec.getPhysicalName());
assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
assertEquals(PARTITIONS, internalSpec.getPartitionCount());
assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
admin.createStream(spec);
admin.validateStream(spec);
}
@Test
public void testCreateChangelogStreamWithSpecialCharsInTopicName() {
final String STREAM = "test.Change_Log.Stream";
final int PARTITIONS = 12;
final int REP_FACTOR = 1;
Properties coordProps = new Properties();
Properties changeLogProps = new Properties();
changeLogProps.setProperty("cleanup.policy", "compact");
changeLogProps.setProperty("segment.bytes", "139");
Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
Mockito.doAnswer(invocationOnMock -> {
StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
assertTrue(internalSpec.isChangeLogStream());
assertEquals(SYSTEM(), internalSpec.getSystemName());
assertEquals(STREAM, internalSpec.getPhysicalName());
assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
assertEquals(PARTITIONS, internalSpec.getPartitionCount());
assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
return internalSpec;
}).when(admin).toKafkaSpec(Mockito.any());
admin.createStream(spec);
admin.validateStream(spec);
}
@Test
public void testCreateStream() {
StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
systemAdmin().validateStream(spec);
assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec));
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamDoesNotExist() {
StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8);
systemAdmin().validateStream(spec);
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamWrongPartitionCount() {
StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
systemAdmin().validateStream(spec2);
}
@Test(expected = StreamValidationException.class)
public void testValidateStreamWrongName() {
StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1));
systemAdmin().validateStream(spec2);
}
@Test
public void testClearStream() {
StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8);
assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec));
assertTrue(systemAdmin().clearStream(spec));
scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName());
scala.collection.immutable.Map<String, TopicMetadata> metadata = systemAdmin().getTopicMetadata(topic);
assertTrue(metadata.get(spec.getPhysicalName()).get().partitionsMetadata().isEmpty());
}
}