blob: 5914b789a14962dce4e285c11d95b884c37e6c7e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.streams.querymanager.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rya.streams.api.queries.QueryChangeLog;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.querymanager.QueryChangeLogSource;
import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
/**
* Integration tests the methods of {@link KafkaQueryChangeLogSource}.
*/
public class KafkaQueryChangeLogSourceIT {
@Rule
public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false);
@Before
public void clearTopics() throws InterruptedException {
kafka.deleteAllTopics();
}
@Test
public void discoverExistingLogs() throws Exception {
// Create a valid Query Change Log topic.
final String ryaInstance = UUID.randomUUID().toString();
final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
kafka.createTopic(topic);
// Create the source.
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
kafka.getKafkaHostname(),
Integer.parseInt( kafka.getKafkaPort() ),
Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
// Register a listener that counts down a latch if it sees the new topic.
final CountDownLatch created = new CountDownLatch(1);
source.subscribe(new SourceListener() {
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
assertEquals(ryaInstance, ryaInstanceName);
created.countDown();
}
@Override
public void notifyDelete(final String ryaInstanceName) { }
});
try {
// Start the source.
source.startAndWait();
// If the latch isn't counted down, then fail the test.
assertTrue( created.await(5, TimeUnit.SECONDS) );
} finally {
source.stopAndWait();
}
}
@Test
public void discoverNewLogs() throws Exception {
// Create the source.
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
kafka.getKafkaHostname(),
Integer.parseInt( kafka.getKafkaPort() ),
Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS));
// Register a listener that counts down a latch if it sees the new topic.
final String ryaInstance = UUID.randomUUID().toString();
final CountDownLatch created = new CountDownLatch(1);
source.subscribe(new SourceListener() {
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
assertEquals(ryaInstance, ryaInstanceName);
created.countDown();
}
@Override
public void notifyDelete(final String ryaInstanceName) { }
});
try {
// Start the source.
source.startAndWait();
// Wait twice the polling duration to ensure it iterates at least once.
Thread.sleep(200);
// Create a valid Query Change Log topic.
final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
kafka.createTopic(topic);
// If the latch isn't counted down, then fail the test.
assertTrue( created.await(5, TimeUnit.SECONDS) );
} finally {
source.stopAndWait();
}
}
@Test
public void discoverLogDeletions() throws Exception {
// Create a valid Query Change Log topic.
final String ryaInstance = UUID.randomUUID().toString();
final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
kafka.createTopic(topic);
// Create the source.
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
kafka.getKafkaHostname(),
Integer.parseInt( kafka.getKafkaPort() ),
Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
// Register a listener that uses latches to indicate when the topic is created and deleted.
final CountDownLatch created = new CountDownLatch(1);
final CountDownLatch deleted = new CountDownLatch(1);
source.subscribe(new SourceListener() {
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
assertEquals(ryaInstance, ryaInstanceName);
created.countDown();
}
@Override
public void notifyDelete(final String ryaInstanceName) {
assertEquals(ryaInstance, ryaInstanceName);
deleted.countDown();
}
});
try {
// Start the source
source.startAndWait();
// Wait for it to indicate the topic was created.
assertTrue( created.await(5, TimeUnit.SECONDS) );
// Delete the topic.
kafka.deleteTopic(topic);
// If the latch isn't counted down, then fail the test.
assertTrue( deleted.await(5, TimeUnit.SECONDS) );
} finally {
source.stopAndWait();
}
}
@Test
public void newListenerReceivesAllKnownLogs() throws Exception {
// Create a valid Query Change Log topic.
final String ryaInstance = UUID.randomUUID().toString();
final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
kafka.createTopic(topic);
// Create the source.
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
kafka.getKafkaHostname(),
Integer.parseInt( kafka.getKafkaPort() ),
Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS));
// Register a listener that counts down a latch if it sees the new topic.
final CountDownLatch created = new CountDownLatch(1);
source.subscribe(new SourceListener() {
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
assertEquals(ryaInstance, ryaInstanceName);
created.countDown();
}
@Override
public void notifyDelete(final String ryaInstanceName) { }
});
try {
// Start the source
source.startAndWait();
// Wait for that first listener to indicate the topic was created. This means that one has been cached.
assertTrue( created.await(5, TimeUnit.SECONDS) );
// Register a second listener that counts down when that same topic is encountered. This means the
// newly subscribed listener was notified with the already known change log.
final CountDownLatch newListenerCreated = new CountDownLatch(1);
source.subscribe(new SourceListener() {
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
assertEquals(ryaInstance, ryaInstanceName);
newListenerCreated.countDown();
}
@Override
public void notifyDelete(final String ryaInstanceName) { }
});
assertTrue( newListenerCreated.await(5, TimeUnit.SECONDS) );
} finally {
source.stopAndWait();
}
}
@Test
public void unsubscribedDoesNotReceiveNotifications() throws Exception {
// Create the source.
final QueryChangeLogSource source = new KafkaQueryChangeLogSource(
kafka.getKafkaHostname(),
Integer.parseInt( kafka.getKafkaPort() ),
Scheduler.newFixedRateSchedule(0, 100, TimeUnit.MILLISECONDS));
try {
// Start the source.
source.startAndWait();
// Create a listener that flips a boolean to true when it is notified.
final AtomicBoolean notified = new AtomicBoolean(false);
final SourceListener listener = new SourceListener() {
@Override
public void notifyCreate(final String ryaInstanceName, final QueryChangeLog log) {
notified.set(true);
}
@Override
public void notifyDelete(final String ryaInstanceName) {
notified.set(true);
}
};
// Register and then unregister it.
source.subscribe(listener);
source.unsubscribe(listener);
// Create a topic.
final String ryaInstance = UUID.randomUUID().toString();
final String topic = KafkaTopics.queryChangeLogTopic(ryaInstance);
kafka.createTopic(topic);
//Wait longer than the polling time for the listener to be notified.
Thread.sleep(300);
// Show the boolean was never flipped to true.
assertFalse(notified.get());
} finally {
source.stopAndWait();
}
}
}