| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.atlas.notification; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.gson.Gson; |
| import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; |
| import org.apache.commons.lang3.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.lang.reflect.ParameterizedType; |
| import java.lang.reflect.Type; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS; |
| import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS; |
| |
| /** |
| * Deserializer that works with notification messages. The version of each deserialized message is checked against an |
| * expected version. |
| */ |
| public abstract class AtlasNotificationMessageDeserializer<T> implements MessageDeserializer<T> { |
| private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationMessageDeserializer.class); |
| |
| |
| public static final String VERSION_MISMATCH_MSG = |
| "Notification message version mismatch. Expected %s but recieved %s. Message %s"; |
| |
| private final Type notificationMessageType; |
| private final Type messageType; |
| private final MessageVersion expectedVersion; |
| private final Logger notificationLogger; |
| private final Gson gson; |
| |
| |
| private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>(); |
| private final long splitMessageBufferPurgeIntervalMs; |
| private final long splitMessageSegmentsWaitTimeMs; |
| private long splitMessagesLastPurgeTime = System.currentTimeMillis(); |
| private final AtomicLong messageCountTotal = new AtomicLong(0); |
| private final AtomicLong messageCountSinceLastInterval = new AtomicLong(0); |
| // ----- Constructors ---------------------------------------------------- |
| |
| /** |
| * Create a notification message deserializer. |
| * |
| * @param notificationMessageType the type of the notification message |
| * @param expectedVersion the expected message version |
| * @param gson JSON serialization/deserialization |
| * @param notificationLogger logger for message version mismatch |
| */ |
| public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, |
| Gson gson, Logger notificationLogger) { |
| this(notificationMessageType, expectedVersion, gson, notificationLogger, |
| NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000, |
| NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000); |
| } |
| |
| public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, |
| Gson gson, Logger notificationLogger, |
| long splitMessageSegmentsWaitTimeMs, |
| long splitMessageBufferPurgeIntervalMs) { |
| this.notificationMessageType = notificationMessageType; |
| this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0]; |
| this.expectedVersion = expectedVersion; |
| this.gson = gson; |
| this.notificationLogger = notificationLogger; |
| this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs; |
| this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs; |
| } |
| |
| // ----- MessageDeserializer --------------------------------------------- |
| |
| @Override |
| public T deserialize(String messageJson) { |
| final T ret; |
| |
| messageCountTotal.incrementAndGet(); |
| messageCountSinceLastInterval.incrementAndGet(); |
| |
| AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class); |
| |
| if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage |
| ret = gson.fromJson(messageJson, messageType); |
| } else { |
| String msgJson = messageJson; |
| |
| if (msg.getMsgSplitCount() > 1) { // multi-part message |
| AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); |
| |
| checkVersion(splitMsg, msgJson); |
| |
| String msgId = splitMsg.getMsgId(); |
| |
| if (StringUtils.isEmpty(msgId)) { |
| LOG.error("Received multi-part message with no message ID. Ignoring message"); |
| |
| msg = null; |
| } else { |
| final int splitIdx = splitMsg.getMsgSplitIdx(); |
| final int splitCount = splitMsg.getMsgSplitCount(); |
| |
| final SplitMessageAggregator splitMsgs; |
| |
| if (splitIdx == 0) { |
| splitMsgs = new SplitMessageAggregator(splitMsg); |
| |
| splitMsgBuffer.put(splitMsgs.getMsgId(), splitMsgs); |
| } else { |
| splitMsgs = splitMsgBuffer.get(msgId); |
| } |
| |
| if (splitMsgs == null) { |
| LOG.error("Received msgID={}: {} of {}, but first message didn't arrive. Ignoring message", msgId, splitIdx + 1, splitCount); |
| |
| msg = null; |
| } else if (splitMsgs.getTotalSplitCount() <= splitIdx) { |
| LOG.error("Received msgID={}: {} of {} - out of bounds. Ignoring message", msgId, splitIdx + 1, splitCount); |
| |
| msg = null; |
| } else { |
| LOG.info("Received msgID={}: {} of {}", msgId, splitIdx + 1, splitCount); |
| |
| boolean isReady = splitMsgs.add(splitMsg); |
| |
| if (isReady) { // last message |
| splitMsgBuffer.remove(msgId); |
| |
| boolean isValidMessage = true; |
| |
| StringBuilder sb = new StringBuilder(); |
| |
| for (int i = 0; i < splitMsgs.getTotalSplitCount(); i++) { |
| splitMsg = splitMsgs.get(i); |
| |
| if (splitMsg == null) { |
| LOG.warn("MsgID={}: message {} of {} is missing. Ignoring message", msgId, i + 1, splitCount); |
| |
| isValidMessage = false; |
| |
| break; |
| } |
| |
| sb.append(splitMsg.getMessage()); |
| } |
| |
| if (isValidMessage) { |
| msgJson = sb.toString(); |
| |
| if (CompressionKind.GZIP.equals(splitMsg.getMsgCompressionKind())) { |
| byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); |
| byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes); |
| |
| msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); |
| |
| LOG.info("Received msgID={}: splitCount={}, compressed={} bytes, uncompressed={} bytes", msgId, splitCount, encodedBytes.length, bytes.length); |
| } else { |
| byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(msgJson); |
| byte[] bytes = AtlasNotificationBaseMessage.decodeBase64(encodedBytes); |
| |
| msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); |
| |
| LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length); |
| } |
| |
| msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class); |
| } else { |
| msg = null; |
| } |
| } else { // more messages to arrive |
| msg = null; |
| } |
| } |
| } |
| } |
| |
| if (msg != null) { |
| if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) { |
| AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); |
| |
| byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage()); |
| byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes); |
| |
| msgJson = AtlasNotificationBaseMessage.getStringUtf8(bytes); |
| |
| LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length); |
| } |
| |
| AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType); |
| |
| checkVersion(atlasNotificationMessage, msgJson); |
| |
| ret = atlasNotificationMessage.getMessage(); |
| } else { |
| ret = null; |
| } |
| } |
| |
| |
| long now = System.currentTimeMillis(); |
| long timeSinceLastPurge = now - splitMessagesLastPurgeTime; |
| |
| if(timeSinceLastPurge >= splitMessageBufferPurgeIntervalMs) { |
| purgeStaleMessages(splitMsgBuffer, now, splitMessageSegmentsWaitTimeMs); |
| |
| LOG.info("Notification processing stats: total={}, sinceLastStatsReport={}", messageCountTotal.get(), messageCountSinceLastInterval.getAndSet(0)); |
| |
| splitMessagesLastPurgeTime = now; |
| } |
| |
| return ret; |
| } |
| |
| @VisibleForTesting |
| static void purgeStaleMessages(Map<String, SplitMessageAggregator> splitMsgBuffer, long now, long maxWaitTime) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("==> purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")"); |
| } |
| |
| List<SplitMessageAggregator> evictionList = null; |
| |
| for (SplitMessageAggregator aggregrator : splitMsgBuffer.values()) { |
| long waitTime = now - aggregrator.getFirstSplitTimestamp(); |
| |
| if (waitTime < maxWaitTime) { |
| continue; |
| } |
| |
| if(evictionList == null) { |
| evictionList = new ArrayList<>(); |
| } |
| |
| evictionList.add(aggregrator); |
| } |
| |
| if(evictionList != null) { |
| for (SplitMessageAggregator aggregrator : evictionList) { |
| LOG.error("evicting notification msgID={}, totalSplitCount={}, receivedSplitCount={}", aggregrator.getMsgId(), aggregrator.getTotalSplitCount(), aggregrator.getReceivedSplitCount()); |
| splitMsgBuffer.remove(aggregrator.getMsgId()); |
| } |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("<== purgeStaleMessages(bufferedMessageCount=" + splitMsgBuffer.size() + ")"); |
| } |
| } |
| |
| // ----- helper methods -------------------------------------------------- |
| |
| /** |
| * Check the message version against the expected version. |
| * |
| * @param notificationMessage the notification message |
| * @param messageJson the notification message json |
| * |
| * @throws IncompatibleVersionException if the message version is incompatable with the expected version |
| */ |
| protected void checkVersion(AtlasNotificationBaseMessage notificationMessage, String messageJson) { |
| int comp = notificationMessage.compareVersion(expectedVersion); |
| |
| // message has newer version |
| if (comp > 0) { |
| String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson); |
| |
| notificationLogger.error(msg); |
| |
| throw new IncompatibleVersionException(msg); |
| } |
| |
| // message has older version |
| if (comp < 0) { |
| notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, notificationMessage.getVersion(), messageJson)); |
| } |
| } |
| } |