blob: c45a1da9517ae78b51fed07f6bc07b92003b4428 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
import org.apache.atlas.model.notification.AtlasNotificationMessage;
import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.model.notification.MessageVersion;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED;
import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES;
/**
* Abstract notification interface implementation.
*/
public abstract class AbstractNotification implements NotificationInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNotification.class);
private static String msgIdPrefix = UUID.randomUUID().toString();
private static AtomicInteger msgIdSuffix = new AtomicInteger(0);
/**
* The current expected version for notification messages.
*/
public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
public static final int MAX_BYTES_PER_CHAR = 4; // each char can encode upto 4 bytes in UTF-8
/**
* IP address of the host in which this process has started
*/
private static String localHostAddress = "";
/**
*
*/
private static String currentUser = "";
// ----- Constructors ----------------------------------------------------
public AbstractNotification(Configuration applicationProperties) throws AtlasException {
}
@VisibleForTesting
protected AbstractNotification() {
}
@Override
public void init(String source, Object failedMessagesLogger) {
}
// ----- NotificationInterface -------------------------------------------
@Override
public <T> void send(NotificationType type, List<T> messages) throws NotificationException {
List<String> strMessages = new ArrayList<>(messages.size());
for (int index = 0; index < messages.size(); index++) {
createNotificationMessages(messages.get(index), strMessages);
}
sendInternal(type, strMessages);
}
@Override
public <T> void send(NotificationType type, T... messages) throws NotificationException {
send(type, Arrays.asList(messages));
}
@Override
public void setCurrentUser(String user) {
currentUser = user;
}
// ----- AbstractNotification --------------------------------------------
/**
* Send the given messages.
*
* @param type the message type
* @param messages the array of messages to send
*
* @throws NotificationException if an error occurs while sending
*/
public abstract void sendInternal(NotificationType type, List<String> messages) throws NotificationException;
// ----- utility methods -------------------------------------------------
public static String getMessageJson(Object message) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message);
return AtlasType.toV1Json(notificationMsg);
}
private static String getHostAddress() {
if (StringUtils.isEmpty(localHostAddress)) {
try {
localHostAddress = Inet4Address.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
LOG.warn("failed to get local host address", e);
localHostAddress = "";
}
}
return localHostAddress;
}
private static String getCurrentUser() {
return currentUser;
}
/**
* Get the notification message JSON from the given object.
*
* @param message the message in object form
*
* @return the message as a JSON string
*/
public static void createNotificationMessages(Object message, List<String> msgJsonList) {
AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser());
String msgJson = AtlasType.toV1Json(notificationMsg);
boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES;
if (msgLengthExceedsLimit) { // get utf-8 bytes for msgJson and check for length limit again
byte[] msgBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson);
msgLengthExceedsLimit = msgBytes.length > MESSAGE_MAX_LENGTH_BYTES;
if (msgLengthExceedsLimit) {
String msgId = getNextMessageId();
CompressionKind compressionKind = CompressionKind.NONE;
if (MESSAGE_COMPRESSION_ENABLED) {
byte[] encodedBytes = AtlasNotificationBaseMessage.gzipCompressAndEncodeBase64(msgBytes);
compressionKind = CompressionKind.GZIP;
LOG.info("Compressed large message: msgID={}, uncompressed={} bytes, compressed={} bytes", msgId, msgBytes.length, encodedBytes.length);
msgLengthExceedsLimit = encodedBytes.length > MESSAGE_MAX_LENGTH_BYTES;
if (!msgLengthExceedsLimit) { // no need to split
AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind);
msgJson = AtlasType.toV1Json(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above
msgBytes = null; // not used after this point
} else { // encodedBytes will be split
msgJson = null; // not used after this point
msgBytes = encodedBytes;
}
}
if (msgLengthExceedsLimit) {
// compressed messages are already base64-encoded
byte[] encodedBytes = MESSAGE_COMPRESSION_ENABLED ? msgBytes : AtlasNotificationBaseMessage.encodeBase64(msgBytes);
int splitCount = encodedBytes.length / MESSAGE_MAX_LENGTH_BYTES;
if ((encodedBytes.length % MESSAGE_MAX_LENGTH_BYTES) != 0) {
splitCount++;
}
for (int i = 0, offset = 0; i < splitCount; i++) {
int length = MESSAGE_MAX_LENGTH_BYTES;
if ((offset + length) > encodedBytes.length) {
length = encodedBytes.length - offset;
}
AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount);
String splitMsgJson = AtlasType.toV1Json(splitMsg);
msgJsonList.add(splitMsgJson);
offset += length;
}
LOG.info("Split large message: msgID={}, splitCount={}, length={} bytes", msgId, splitCount, encodedBytes.length);
}
}
}
if (!msgLengthExceedsLimit) {
msgJsonList.add(msgJson);
}
}
private static String getNextMessageId() {
String nextMsgIdPrefix = msgIdPrefix;
int nextMsgIdSuffix = msgIdSuffix.getAndIncrement();
if (nextMsgIdSuffix == Short.MAX_VALUE) { // get a new UUID after 32,767 IDs
msgIdPrefix = UUID.randomUUID().toString();
msgIdSuffix = new AtomicInteger(0);
}
return nextMsgIdPrefix + "_" + Integer.toString(nextMsgIdSuffix);
}
}