blob: 8324b57fe64fd492812c0b75420d92ad3813c627 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.slf4j.Logger;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import static org.mockito.Matchers.endsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.kafka.common.TopicPartition;
/**
* AbstractNotificationConsumer tests.
*/
public class AbstractNotificationConsumerTest {
private static final Gson GSON = new Gson();
@Test
public void testReceive() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List jsonList = new LinkedList<>();
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3)));
jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4)));
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
assertFalse(messageList.isEmpty());
assertEquals(testMessage1, messageList.get(0).getMessage());
assertEquals(testMessage2, messageList.get(1).getMessage());
assertEquals(testMessage3, messageList.get(2).getMessage());
assertEquals(testMessage4, messageList.get(3).getMessage());
}
@Test
public void testNextBackVersion() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
TestMessage testMessage3 = new TestMessage("sValue3", 97);
TestMessage testMessage4 = new TestMessage("sValue4", 96);
List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2));
String json3 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.5.0"), testMessage3));
String json4 = GSON.toJson(testMessage4);
jsonList.add(json1);
jsonList.add(json2);
jsonList.add(json3);
jsonList.add(json4);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage());
assertEquals(new TestMessage("sValue2", 98), messageList.get(1).getMessage());
assertEquals(new TestMessage("sValue3", 97), messageList.get(2).getMessage());
assertEquals(new TestMessage("sValue4", 96), messageList.get(3).getMessage());
}
@Test
public void testNextForwardVersion() throws Exception {
Logger logger = mock(Logger.class);
TestMessage testMessage1 = new TestMessage("sValue1", 99);
TestMessage testMessage2 = new TestMessage("sValue2", 98);
List jsonList = new LinkedList<>();
String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1));
String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2));
jsonList.add(json1);
jsonList.add(json2);
Type versionedMessageType = new TypeToken<VersionedMessage<TestMessage>>(){}.getType();
NotificationConsumer<TestMessage> consumer =
new TestNotificationConsumer<>(versionedMessageType, jsonList, logger);
try {
List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L);
messageList.get(1).getMessage();
fail("Expected VersionMismatchException!");
} catch (IncompatibleVersionException e) {
}
}
private static class TestMessage {
private String s;
private int i;
public TestMessage(String s, int i) {
this.s = s;
this.i = i;
}
public String getS() {
return s;
}
public void setS(String s) {
this.s = s;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TestMessage that = (TestMessage) o;
return i == that.i &&
Objects.equals(s, that.s);
}
@Override
public int hashCode() {
return Objects.hash(s, i);
}
}
private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> {
private final List<T> messageList;
private int index = 0;
public TestNotificationConsumer(Type versionedMessageType, List<T> messages, Logger logger) {
super(new TestDeserializer<T>(versionedMessageType, logger));
this.messageList = messages;
}
@Override
public void commit(TopicPartition partition, long offset) {
// do nothing.
}
@Override
public void close() {
//do nothing
}
@Override
public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
for(Object json : messageList) {
tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));
}
return tempMessageList;
}
}
private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T> {
private TestDeserializer(Type versionedMessageType, Logger logger) {
super(versionedMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger);
}
}
}