| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.atlas.notification; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.gson.Gson; |
| import com.google.gson.GsonBuilder; |
| import com.google.gson.JsonElement; |
| import com.google.gson.JsonParser; |
| import com.google.gson.JsonSerializationContext; |
| import com.google.gson.JsonSerializer; |
| import org.apache.atlas.AtlasException; |
| import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; |
| import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; |
| import org.apache.atlas.model.instance.AtlasObjectId; |
| import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; |
| import org.apache.atlas.typesystem.IReferenceableInstance; |
| import org.apache.atlas.typesystem.IStruct; |
| import org.apache.atlas.typesystem.Referenceable; |
| import org.apache.atlas.typesystem.json.InstanceSerialization; |
| import org.apache.commons.configuration.Configuration; |
| import org.apache.commons.lang.StringUtils; |
| import org.codehaus.jackson.map.DeserializationConfig; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.codehaus.jettison.json.JSONArray; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Type; |
| import java.net.Inet4Address; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED; |
| import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES; |
| |
| /** |
| * Abstract notification interface implementation. |
| */ |
| public abstract class AbstractNotification implements NotificationInterface { |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractNotification.class); |
| |
| private static String msgIdPrefix = UUID.randomUUID().toString(); |
| private static AtomicInteger msgIdSuffix = new AtomicInteger(0); |
| private static final ObjectMapper mapper = new ObjectMapper().configure(DeserializationConfig.Feature.USE_BIG_DECIMAL_FOR_FLOATS, true); |
| |
| /** |
| * The current expected version for notification messages. |
| */ |
| public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0"); |
| |
| public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8 |
| |
| /** |
| * IP address of the host in which this process has started |
| */ |
| private static String localHostAddress = ""; |
| |
| /** |
| * |
| */ |
| private static String currentUser = ""; |
| |
| |
| /** |
| * Used for message serialization. |
| */ |
| public static final Gson GSON = new GsonBuilder(). |
| registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializer()). |
| registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()). |
| registerTypeAdapter(IStruct.class, new StructSerializer()). |
| registerTypeAdapter(JSONArray.class, new JSONArraySerializer()). |
| registerTypeAdapter(AtlasEntityWithExtInfo.class, new AtlasEntityWithExtInfoSerializer()). |
| registerTypeAdapter(AtlasEntitiesWithExtInfo.class, new AtlasEntitiesWithExtInfoSerializer()). |
| registerTypeAdapter(AtlasObjectId.class, new AtlasObjectIdSerializer()). |
| create(); |
| |
| // ----- Constructors ---------------------------------------------------- |
| |
| public AbstractNotification(Configuration applicationProperties) throws AtlasException { |
| } |
| |
| @VisibleForTesting |
| protected AbstractNotification() { |
| } |
| |
| // ----- NotificationInterface ------------------------------------------- |
| |
| @Override |
| public <T> void send(NotificationType type, List<T> messages) throws NotificationException { |
| List<String> strMessages = new ArrayList<>(messages.size()); |
| |
| for (int index = 0; index < messages.size(); index++) { |
| createNotificationMessages(messages.get(index), strMessages); |
| } |
| |
| sendInternal(type, strMessages); |
| } |
| |
| @Override |
| public <T> void send(NotificationType type, T... messages) throws NotificationException { |
| send(type, Arrays.asList(messages)); |
| } |
| |
| @Override |
| public void setCurrentUser(String user) { |
| currentUser = user; |
| } |
| |
| // ----- AbstractNotification -------------------------------------------- |
| /** |
| * Send the given messages. |
| * |
| * @param type the message type |
| * @param messages the array of messages to send |
| * |
| * @throws NotificationException if an error occurs while sending |
| */ |
| protected abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException; |
| |
| |
| // ----- utility methods ------------------------------------------------- |
| |
| public static String getMessageJson(Object message) { |
| AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message); |
| |
| return GSON.toJson(notificationMsg); |
| } |
| |
| private static String getHostAddress() { |
| if (StringUtils.isEmpty(localHostAddress)) { |
| try { |
| localHostAddress = Inet4Address.getLocalHost().getHostAddress(); |
| } catch (UnknownHostException e) { |
| LOG.warn("failed to get local host address", e); |
| |
| localHostAddress = ""; |
| } |
| } |
| |
| return localHostAddress; |
| } |
| |
| private static String getCurrentUser() { |
| return currentUser; |
| } |
| |
| /** |
| * Get the notification message JSON from the given object. |
| * |
| * @param message the message in object form |
| * |
| * @return the message as a JSON string |
| */ |
| public static void createNotificationMessages(Object message, List<String> msgJsonList) { |
| AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser()); |
| String msgJson = GSON.toJson(notificationMsg); |
| |
| boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES; |
| |
| if (msgLengthExceedsLimit) { // get utf-8 bytes for msgJson and check for length limit again |
| byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); |
| |
| msgLengthExceedsLimit = msgBytes.length > MESSAGE_MAX_LENGTH_BYTES; |
| |
| if (msgLengthExceedsLimit) { |
| String msgId = getNextMessageId(); |
| CompressionKind compressionKind = CompressionKind.NONE; |
| |
| if (MESSAGE_COMPRESSION_ENABLED) { |
| byte[] encodedBytes = AtlasNotificationBaseMessage.gzipCompressAndEncodeBase64(msgBytes); |
| |
| compressionKind = CompressionKind.GZIP; |
| |
| LOG.info("Compressed large message: msgID={}, uncompressed={} bytes, compressed={} bytes", msgId, msgBytes.length, encodedBytes.length); |
| |
| msgLengthExceedsLimit = encodedBytes.length > MESSAGE_MAX_LENGTH_BYTES; |
| |
| if (!msgLengthExceedsLimit) { // no need to split |
| AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind); |
| |
| msgJson = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above |
| msgBytes = null; // not used after this point |
| } else { // encodedBytes will be split |
| msgJson = null; // not used after this point |
| msgBytes = encodedBytes; |
| } |
| } |
| |
| if (msgLengthExceedsLimit) { |
| // compressed messages are already base64-encoded |
| byte[] encodedBytes = MESSAGE_COMPRESSION_ENABLED ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes); |
| int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES; |
| |
| if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) { |
| splitCount++; |
| } |
| |
| for (int i = 0, offset = 0; i < splitCount; i++) { |
| int length = MESSAGE_MAX_LENGTH_BYTES; |
| |
| if ((offset + length) > encodedBytes.length) { |
| length = encodedBytes.length - offset; |
| } |
| |
| AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount); |
| |
| String splitMsgJson = GSON.toJson(splitMsg); |
| |
| msgJsonList.add(splitMsgJson); |
| |
| offset += length; |
| } |
| |
| LOG.info("Split large message: msgID={}, splitCount={}, length={} bytes", msgId, splitCount, encodedBytes.length); |
| } |
| } |
| } |
| |
| if (!msgLengthExceedsLimit) { |
| msgJsonList.add(msgJson); |
| } |
| } |
| |
| |
| // ----- serializers ----------------------------------------------------- |
| |
| /** |
| * Serializer for Referenceable. |
| */ |
| public static final class ReferenceableSerializer implements JsonSerializer<IReferenceableInstance> { |
| @Override |
| public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) { |
| String instanceJson = InstanceSerialization.toJson(src, true); |
| return new JsonParser().parse(instanceJson).getAsJsonObject(); |
| } |
| } |
| |
| /** |
| * Serializer for IStruct. |
| */ |
| public static final class StructSerializer implements JsonSerializer<IStruct> { |
| @Override |
| public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) { |
| String instanceJson = InstanceSerialization.toJson(src, true); |
| return new JsonParser().parse(instanceJson).getAsJsonObject(); |
| } |
| } |
| |
| /** |
| * Serializer for JSONArray. |
| */ |
| public static final class JSONArraySerializer implements JsonSerializer<JSONArray> { |
| @Override |
| public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) { |
| return new JsonParser().parse(src.toString()).getAsJsonArray(); |
| } |
| } |
| |
| /** |
| * Serializer for AtlasEntityWithExtInfo. |
| */ |
| public static final class AtlasEntityWithExtInfoSerializer implements JsonSerializer<AtlasEntityWithExtInfo> { |
| @Override |
| public JsonElement serialize(AtlasEntityWithExtInfo src, Type typeOfSrc, JsonSerializationContext context) { |
| try { |
| String instanceJson = mapper.writeValueAsString(src); |
| return new JsonParser().parse(instanceJson).getAsJsonObject(); |
| } catch (IOException excp) { |
| LOG.warn("failed to serialize entity {}", src, excp); |
| } |
| |
| return null; |
| } |
| } |
| |
| /** |
| * Serializer for AtlasEntitiesWithExtInfo. |
| */ |
| public static final class AtlasEntitiesWithExtInfoSerializer implements JsonSerializer<AtlasEntitiesWithExtInfo> { |
| @Override |
| public JsonElement serialize(AtlasEntitiesWithExtInfo src, Type typeOfSrc, JsonSerializationContext context) { |
| try { |
| String instanceJson = mapper.writeValueAsString(src); |
| return new JsonParser().parse(instanceJson).getAsJsonObject(); |
| } catch (IOException excp) { |
| LOG.warn("failed to serialize entity {}", src, excp); |
| } |
| |
| return null; |
| } |
| } |
| |
| /** |
| * Serializer for AtlasObjectId. |
| */ |
| public static final class AtlasObjectIdSerializer implements JsonSerializer<AtlasObjectId> { |
| @Override |
| public JsonElement serialize(AtlasObjectId src, Type typeOfSrc, JsonSerializationContext context) { |
| try { |
| String instanceJson = mapper.writeValueAsString(src); |
| return new JsonParser().parse(instanceJson).getAsJsonObject(); |
| } catch (IOException excp) { |
| LOG.warn("failed to serialize objectId {}", src, excp); |
| } |
| |
| return null; |
| } |
| } |
| |
| private static String getNextMessageId() { |
| String nextMsgIdPrefix = msgIdPrefix; |
| int nextMsgIdSuffix = msgIdSuffix.getAndIncrement(); |
| |
| if (nextMsgIdSuffix == Short.MAX_VALUE) { // get a new UUID after 32,767 IDs |
| msgIdPrefix = UUID.randomUUID().toString(); |
| msgIdSuffix = new AtomicInteger(0); |
| } |
| |
| return nextMsgIdPrefix + "_" + Integer.toString(nextMsgIdSuffix); |
| } |
| } |