blob: cb44fc68ad61d883b910b9ec3fae5629c138bd6c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.typesystem.IReferenceableInstance;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.json.InstanceSerialization;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
/**
* Abstract notification interface implementation.
*/
public abstract class AbstractNotification implements NotificationInterface {
/**
* The current expected version for notification messages.
*/
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
private final boolean embedded;
private final boolean isHAEnabled;
/**
* Used for message serialization.
*/
public static final Gson GSON = new GsonBuilder().
registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializer()).
registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()).
registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
create();
// ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false);
this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
}
@VisibleForTesting
protected AbstractNotification() {
embedded = false;
isHAEnabled = false;
}
// ----- NotificationInterface -------------------------------------------
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
String[] strMessages = new String[messages.size()];
for (int index = 0; index < messages.size(); index++) {
strMessages[index] = getMessageJson(messages.get(index));
}
sendInternal(type, strMessages);
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
send(type, Arrays.asList(messages));
}
// ----- AbstractNotification --------------------------------------------
/**
* Determine whether or not the notification service embedded in Atlas server.
*
* @return true if the the notification service embedded in Atlas server.
*/
protected final boolean isEmbedded() {
return embedded;
}
/**
* Determine whether or not the high availability feature is enabled.
*
* @return true if the high availability feature is enabled.
*/
protected final boolean isHAEnabled() {
return isHAEnabled;
}
/**
* Send the given messages.
*
* @param type the message type
* @param messages the array of messages to send
*
* @throws NotificationException if an error occurs while sending
*/
protected abstract void sendInternal(NotificationType type, String[] messages) throws NotificationException;
// ----- utility methods -------------------------------------------------
/**
* Get the notification message JSON from the given object.
*
* @param message the message in object form
*
* @return the message as a JSON string
*/
public static String getMessageJson(Object message) {
VersionedMessage<?> versionedMessage = new VersionedMessage<>(CURRENT_MESSAGE_VERSION, message);
return GSON.toJson(versionedMessage);
}
// ----- serializers -----------------------------------------------------
/**
* Serializer for Referenceable.
*/
public static final class ReferenceableSerializer implements JsonSerializer<IReferenceableInstance> {
@Override
public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) {
String instanceJson = InstanceSerialization.toJson(src, true);
return new JsonParser().parse(instanceJson).getAsJsonObject();
}
}
/**
* Serializer for JSONArray.
*/
public static final class JSONArraySerializer implements JsonSerializer<JSONArray> {
@Override
public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonParser().parse(src.toString()).getAsJsonArray();
}
}
}