blob: f8293b8cf9e6d9ce0cfbd78af9ea756dbf6413ea [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import java.io.File;
import java.util.Properties;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class KafkaStreamsTest {
// We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
// quick enough
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
@Test
public void testStartAndClose() throws Exception {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
final int initCountDifference = newInitCount - oldInitCount;
assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
streams.close();
Assert.assertEquals("each reporter initialized should also be closed",
oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
}
@Test
public void testCloseIsIdempotent() throws Exception {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.close();
final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
streams.close();
Assert.assertEquals("subsequent close() calls should do nothing",
closeCount, MockMetricsReporter.CLOSE_COUNT.get());
}
@Test(expected = IllegalStateException.class)
public void testCannotStartOnceClosed() throws Exception {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.close();
try {
streams.start();
} catch (final IllegalStateException e) {
Assert.assertEquals("Cannot restart after closing.", e.getMessage());
throw e;
} finally {
streams.close();
}
}
@Test(expected = IllegalStateException.class)
public void testCannotStartTwice() throws Exception {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
try {
streams.start();
} catch (final IllegalStateException e) {
Assert.assertEquals("This process was already started.", e.getMessage());
throw e;
} finally {
streams.close();
}
}
@Test
public void testCleanup() throws Exception {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.cleanUp();
streams.start();
streams.close();
streams.cleanUp();
}
@Test
public void testCleanupIsolation() throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final String appId1 = "testIsolation-1";
final String appId2 = "testIsolation-2";
final String stateDir = TestUtils.tempDirectory("kafka-test").getPath();
final File stateDirApp1 = new File(stateDir + File.separator + appId1);
final File stateDirApp2 = new File(stateDir + File.separator + appId2);
final Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
assertFalse(stateDirApp1.exists());
assertFalse(stateDirApp2.exists());
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId1);
final KafkaStreams streams1 = new KafkaStreams(builder, props);
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId2);
final KafkaStreams streams2 = new KafkaStreams(builder, props);
assertTrue(stateDirApp1.exists());
assertTrue(stateDirApp2.exists());
streams1.cleanUp();
assertFalse(stateDirApp1.exists());
assertTrue(stateDirApp2.exists());
streams2.cleanUp();
assertFalse(stateDirApp1.exists());
assertFalse(stateDirApp2.exists());
}
@Test(expected = IllegalStateException.class)
public void testCannotCleanupWhileRunning() throws Exception {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
try {
streams.cleanUp();
} catch (final IllegalStateException e) {
Assert.assertEquals("Cannot clean up while running.", e.getMessage());
throw e;
} finally {
streams.close();
}
}
}