blob: 13f2f0ba8ac298ec1379938c8b06f84d855e8f7b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import static org.mockito.Matchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
/**
* AbstractNotificationConsumer tests.
*/
public class AbstractNotificationConsumerTest {
private static final Gson GSON = new Gson();
@Test
public void testNext() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.next());
assertTrue(consumer.hasNext());
assertEquals(testMessage2, consumer.next());
assertTrue(consumer.hasNext());
assertEquals(testMessage3, consumer.next());
assertTrue(consumer.hasNext());
assertEquals(testMessage4, consumer.next());
assertFalse(consumer.hasNext());
}
@Test
public void testNextBackVersion() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
String json4 = GSON.toJson(testMessage4);
jsonList.add(json1);
jsonList.add(json2);
jsonList.add(json3);
jsonList.add(json4);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue1", 99), consumer.next());
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue2", 98), consumer.next());
verify(logger).info(endsWith(json2));
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue3", 97), consumer.next());
verify(logger).info(endsWith(json3));
assertTrue(consumer.hasNext());
assertEquals(new TestMessage("sValue4", 96), consumer.next());
verify(logger).info(endsWith(json4));
assertFalse(consumer.hasNext());
}
@Test
public void testNextForwardVersion() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
List<String> jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1);
jsonList.add(json2);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.next());
assertTrue(consumer.hasNext());
try {
consumer.next();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
verify(logger).error(endsWith(json2));
}
assertFalse(consumer.hasNext());
}
@Test
public void testPeek() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List<String> jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.peek());
assertTrue(consumer.hasNext());
assertEquals(testMessage1, consumer.peek());
assertTrue(consumer.hasNext());
}
private static class TestMessage {
private String s;
private int i;
public TestMessage(String s, int i) {
this.s = s;
this.i = i;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestMessage that = (TestMessage) o;
return i == that.i &&
Objects.equals(s, that.s);
}
@Override
public int hashCode() {
return Objects.hash(s, i);
}
}
private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
private final List<String> messageList;
private int index = 0;
public TestNotificationConsumer(Type versionedMessageType, List<String> messages, Logger logger) {
super(new TestDeserializer<T>(versionedMessageType, logger));
this.messageList = messages;
}
@Override
protected String getNext() {
return messageList.get(index++);
}
@Override
protected String peekMessage() {
return messageList.get(index);
}
@Override
public boolean hasNext() {
return index < messageList.size();
}
@Override
public void commit() {
// do nothing.
}
@Override
public void close() {
//do nothing
}
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
private TestDeserializer(Type versionedMessageType, Logger logger) {
super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
}
}
}